[Commits] Rev 4038: MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Wed Mar 12 01:14:50 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4038
revision-id: knielsen at knielsen-hq.org-20140311231449-utdsqb3ik68z0ove
parent: knielsen at knielsen-hq.org-20140309092738-0e3u7beasfsge8p1
committer: knielsen at knielsen-hq.org
branch nick: work-10.0-mdev5804
timestamp: Wed 2014-03-12 00:14:49 +0100
message:
  MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure
  
  Some fixes, mainly to make it work in non-parallel replication mode also
  (--slave-parallel-threads=0).
  
  Patch should be fairly complete now.
=== modified file 'mysql-test/r/mysqld--help.result'
--- a/mysql-test/r/mysqld--help.result	2014-03-05 22:20:10 +0000
+++ b/mysql-test/r/mysqld--help.result	2014-03-11 23:14:49 +0000
@@ -212,6 +212,16 @@
  multiple masters), each independent source server must
  use a distinct domain_id. For simple tree-shaped
  replication topologies, it can be left at its default, 0.
+ --gtid-ignore-duplicates 
+ When set, different master connections in multi-source
+ replication are allowed to receive and process event
+ groups with the same GTID (when using GTID mode). Only
+ one will be applied, any others will be ignored. Within a
+ given replication domain, just the sequence number will
+ be used to decide whether a given GTID has been already
+ applied; this means it is the responsibility of the user
+ to ensure that GTID sequence numbers are strictly
+ increasing.
  --gtid-strict-mode  Enforce strict seq_no ordering of events in the binary
  log. Slave stops with an error if it encounters an event
  that would cause it to generate an out-of-order binlog if
@@ -1094,6 +1104,7 @@ gdb FALSE
 general-log FALSE
 group-concat-max-len 1024
 gtid-domain-id 0
+gtid-ignore-duplicates FALSE
 gtid-strict-mode FALSE
 help TRUE
 histogram-size 0

=== modified file 'mysql-test/suite/multi_source/gtid_ignore_duplicates.result'
--- a/mysql-test/suite/multi_source/gtid_ignore_duplicates.result	2014-03-09 09:27:38 +0000
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result	2014-03-11 23:14:49 +0000
@@ -151,38 +151,129 @@ SELECT * FROM t1 WHERE a >= 10 ORDER BY
 10
 11
 12
+*** Test also with not using parallel replication.
+SET default_master_connection = "b2a";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+SET default_master_connection = "c2a";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "b2a";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+SET default_master_connection = "c2a";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+SET default_master_connection = "a2b";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+SET default_master_connection = "c2b";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "a2b";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+SET default_master_connection = "c2b";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+SET default_master_connection = "a2c";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+SET default_master_connection = "b2c";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "a2c";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+SET default_master_connection = "b2c";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+SET default_master_connection = "a2d";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "a2d";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+INSERT INTO t1 VALUES (21);
+BEGIN;
+INSERT INTO t1 VALUES (22);
+INSERT INTO t1 VALUES (23);
+COMMIT;
+INSERT INTO t1 VALUES (24), (25);
+INSERT INTO t1 VALUES (26);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+21
+22
+23
+24
+25
+26
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+21
+22
+23
+24
+25
+26
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+21
+22
+23
+24
+25
+26
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+21
+22
+23
+24
+25
+26
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
 Warnings:
 Note    1938    SLAVE 'c2a' stopped
 Note    1938    SLAVE 'b2a' stopped
-include/reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
-DROP TABLE t1;
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
 Warnings:
 Note    1938    SLAVE 'a2b' stopped
 Note    1938    SLAVE 'c2b' stopped
-include/reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
-DROP TABLE t1;
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
 Warnings:
 Note    1938    SLAVE 'a2c' stopped
 Note    1938    SLAVE 'b2c' stopped
-include/reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
-DROP TABLE t1;
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
 Warnings:
 Note    1938    SLAVE 'a2d' stopped
-include/reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
 DROP TABLE t1;
+include/reset_master_slave.inc
+DROP TABLE t1;
+include/reset_master_slave.inc
+DROP TABLE t1;
+include/reset_master_slave.inc
+DROP TABLE t1;
+include/reset_master_slave.inc

=== modified file 'mysql-test/suite/multi_source/gtid_ignore_duplicates.test'
--- a/mysql-test/suite/multi_source/gtid_ignore_duplicates.test	2014-03-09 09:27:38 +0000
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test	2014-03-11 23:14:49 +0000
@@ -170,39 +170,135 @@ SET default_master_connection = "a2b";
 SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
 
 
+--echo *** Test also with not using parallel replication.
+
+--connection server_1
+SET default_master_connection = "b2a";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+SET default_master_connection = "c2a";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "b2a";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+SET default_master_connection = "c2a";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+
+
+--connection server_2
+SET default_master_connection = "a2b";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+SET default_master_connection = "c2b";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "a2b";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+SET default_master_connection = "c2b";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+
+
+--connection server_3
+SET default_master_connection = "a2c";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+SET default_master_connection = "b2c";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "a2c";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+SET default_master_connection = "b2c";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+
+
+--connection server_4
+SET default_master_connection = "a2d";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+SET GLOBAL slave_parallel_threads=0;
+SET default_master_connection = "a2d";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+
+
+--connection server_2
+INSERT INTO t1 VALUES (21);
+BEGIN;
+INSERT INTO t1 VALUES (22);
+INSERT INTO t1 VALUES (23);
+COMMIT;
+INSERT INTO t1 VALUES (24), (25);
+INSERT INTO t1 VALUES (26);
+
+--source include/save_master_gtid.inc
+
+--connection server_1
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+
+--connection server_3
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+
+--connection server_4
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+
+
 # Clean up.
 --connection server_1
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
---source reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
-DROP TABLE t1;
---disconnect server_1
 
 --connection server_2
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
---source reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
-DROP TABLE t1;
---disconnect server_2
 
 --connection server_3
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
---source reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
-DROP TABLE t1;
---disconnect server_3
 
 --connection server_4
 SET GLOBAL gtid_domain_id=0;
 STOP ALL SLAVES;
---source reset_master_slave.inc
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+
+--connection server_1
+DROP TABLE t1;
+--source reset_master_slave.inc
+--disconnect server_1
+
+--connection server_2
+DROP TABLE t1;
+--source reset_master_slave.inc
+--disconnect server_2
+
+--connection server_3
 DROP TABLE t1;
+--source reset_master_slave.inc
+--disconnect server_3
+
+--connection server_4
+DROP TABLE t1;
+--source reset_master_slave.inc
 --disconnect server_4

=== modified file 'mysql-test/suite/perfschema/r/dml_setup_instruments.result'
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result	2014-02-26 15:38:42 +0000
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result	2014-03-11 23:14:49 +0000
@@ -38,6 +38,7 @@ order by name limit 10;
 NAME    ENABLED TIMED
 wait/synch/cond/sql/COND_flush_thread_cache     YES     YES
 wait/synch/cond/sql/COND_group_commit_orderer   YES     YES
+wait/synch/cond/sql/COND_gtid_ignore_duplicates YES     YES
 wait/synch/cond/sql/COND_manager        YES     YES
 wait/synch/cond/sql/COND_parallel_entry YES     YES
 wait/synch/cond/sql/COND_prepare_ordered        YES     YES
@@ -45,7 +46,6 @@ wait/synch/cond/sql/COND_queue_state	YES
 wait/synch/cond/sql/COND_rpl_thread     YES     YES
 wait/synch/cond/sql/COND_rpl_thread_pool        YES     YES
 wait/synch/cond/sql/COND_rpl_thread_queue       YES     YES
-wait/synch/cond/sql/COND_server_started YES     YES
 select * from performance_schema.setup_instruments
 where name='Wait';
 select * from performance_schema.setup_instruments

=== added file 'mysql-test/suite/sys_vars/r/gtid_ignore_duplicates_basic.result'
--- a/mysql-test/suite/sys_vars/r/gtid_ignore_duplicates_basic.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/r/gtid_ignore_duplicates_basic.result	2014-03-11 23:14:49 +0000
@@ -0,0 +1,13 @@
+SET @save_gtid_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SELECT @@GLOBAL.gtid_ignore_duplicates as 'must be zero because of default';
+must be zero because of default
+0
+SELECT @@SESSION.gtid_ignore_duplicates  as 'no session var';
+ERROR HY000: Variable 'gtid_ignore_duplicates' is a GLOBAL variable
+SET GLOBAL gtid_ignore_duplicates= FALSE;
+SET GLOBAL gtid_ignore_duplicates= DEFAULT;
+SET GLOBAL gtid_ignore_duplicates= TRUE;
+SELECT @@GLOBAL.gtid_ignore_duplicates;
+@@GLOBAL.gtid_ignore_duplicates
+1
+SET GLOBAL gtid_ignore_duplicates = @save_gtid_ignore_duplicates;

=== added file 'mysql-test/suite/sys_vars/t/gtid_ignore_duplicates_basic.test'
--- a/mysql-test/suite/sys_vars/t/gtid_ignore_duplicates_basic.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/sys_vars/t/gtid_ignore_duplicates_basic.test	2014-03-11 23:14:49 +0000
@@ -0,0 +1,14 @@
+--source include/not_embedded.inc
+
+SET @save_gtid_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+
+SELECT @@GLOBAL.gtid_ignore_duplicates as 'must be zero because of default';
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT @@SESSION.gtid_ignore_duplicates  as 'no session var';
+
+SET GLOBAL gtid_ignore_duplicates= FALSE;
+SET GLOBAL gtid_ignore_duplicates= DEFAULT;
+SET GLOBAL gtid_ignore_duplicates= TRUE;
+SELECT @@GLOBAL.gtid_ignore_duplicates;
+
+SET GLOBAL gtid_ignore_duplicates = @save_gtid_ignore_duplicates;

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2014-03-09 09:27:38 +0000
+++ b/sql/log_event.cc	2014-03-11 23:14:49 +0000
@@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'",
 
 end:
   if (sub_id && !thd->is_slave_error)
-    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
+    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rgi);
 
   /*
     Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -7327,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_gr
   thd->mdl_context.release_transactional_locks();
 
   if (!res && sub_id)
-    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
+    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rgi);
 
   /*
     Increment the global status commit count variable

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2014-03-09 09:27:38 +0000
+++ b/sql/mysqld.cc	2014-03-11 23:14:49 +0000
@@ -9447,6 +9447,7 @@ PSI_stage_info stage_waiting_for_prior_t
 PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
 PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
 PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
+PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
 
 #ifdef HAVE_PSI_INTERFACE
 
@@ -9565,7 +9566,8 @@ PSI_stage_info *all_server_stages[]=
   & stage_waiting_to_finalize_termination,
   & stage_waiting_to_get_readlock,
   & stage_master_gtid_wait_primary,
-  & stage_master_gtid_wait
+  & stage_master_gtid_wait,
+  & stage_gtid_wait_other_connection
 };
 
 PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2014-03-09 09:27:38 +0000
+++ b/sql/mysqld.h	2014-03-11 23:14:49 +0000
@@ -441,6 +441,7 @@ extern PSI_stage_info stage_waiting_for_
 extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
 extern PSI_stage_info stage_master_gtid_wait_primary;
 extern PSI_stage_info stage_master_gtid_wait;
+extern PSI_stage_info stage_gtid_wait_other_connection;
 
 #ifdef HAVE_PSI_STATEMENT_INTERFACE
 /**

=== modified file 'sql/rpl_gtid.cc'
--- a/sql/rpl_gtid.cc	2014-03-09 09:27:38 +0000
+++ b/sql/rpl_gtid.cc	2014-03-11 23:14:49 +0000
@@ -34,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_ta
 
 void
 rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
-                                   const Relay_log_info *rli)
+                                   rpl_group_info *rgi)
 {
   int err;
   /*
@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint6
     it is even committed.
   */
   mysql_mutex_lock(&LOCK_slave_state);
-  err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli);
+  err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
   mysql_mutex_unlock(&LOCK_slave_state);
   if (err)
   {
@@ -75,9 +75,13 @@ rpl_slave_state::record_and_update_gtid(
   if ((sub_id= rgi->gtid_sub_id))
   {
     rgi->gtid_sub_id= 0;
-    if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
-      DBUG_RETURN(1);
-    update_state_hash(sub_id, &rgi->current_gtid, rgi->rli);
+    if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
+    {
+      if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
+        DBUG_RETURN(1);
+      update_state_hash(sub_id, &rgi->current_gtid, rgi);
+    }
+    rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
   }
   DBUG_RETURN(0);
 }
@@ -110,16 +114,21 @@ rpl_slave_state::record_and_update_gtid(
    -1  Error (out of memory to allocate a new element for the domain).
 */
 int
-rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
+rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
 {
   uint32 domain_id= gtid->domain_id;
   uint32 seq_no= gtid->seq_no;
   rpl_slave_state::element *elem;
   int res;
+  bool did_enter_cond;
+  PSI_stage_info old_stage;
+  THD *thd;
+  Relay_log_info *rli= rgi->rli;
 
   mysql_mutex_lock(&LOCK_slave_state);
   if (!(elem= get_element(domain_id)))
   {
+    my_error(ER_OUT_OF_RESOURCES, MYF(0));
     res= -1;
     goto err;
   }
@@ -129,13 +138,14 @@ rpl_slave_state::check_duplicate_gtid(rp
     each lock release and re-take.
   */
 
-  /* ToDo: Make this wait killable. */
+  did_enter_cond= false;
   for (;;)
   {
     if (elem->highest_seq_no >= seq_no)
     {
       /* This sequence number is already applied, ignore it. */
       res= 0;
+      rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
       break;
     }
     if (!elem->owner_rli)
@@ -143,6 +153,7 @@ rpl_slave_state::check_duplicate_gtid(rp
       /* The domain became free, grab it and apply the event. */
       elem->owner_rli= rli;
       elem->owner_count= 1;
+      rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
       res= 1;
       break;
     }
@@ -150,23 +161,78 @@ rpl_slave_state::check_duplicate_gtid(rp
     {
       /* Already own this domain, increment reference count and apply event. */
       ++elem->owner_count;
+      rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
       res= 1;
       break;
     }
+    thd= rgi->thd;
+    if (thd->check_killed())
+    {
+      thd->send_kill_message();
+      res= -1;
+      break;
+    }
     /*
       Someone else is currently processing this GTID (or an earlier one).
       Wait for them to complete (or fail), and then check again.
     */
+    if (!did_enter_cond)
+    {
+      thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
+                      &stage_gtid_wait_other_connection, &old_stage);
+      did_enter_cond= true;
+    }
     mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
                     &LOCK_slave_state);
   }
 
 err:
-  mysql_mutex_unlock(&LOCK_slave_state);
+  if (did_enter_cond)
+    thd->EXIT_COND(&old_stage);
+  else
+    mysql_mutex_unlock(&LOCK_slave_state);
   return res;
 }
 
 
+void
+rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
+{
+  element *elem= NULL;
+
+  mysql_mutex_lock(&LOCK_slave_state);
+  if (!(elem= get_element(rgi->current_gtid.domain_id)))
+  {
+    /*
+      We cannot really deal with error here, as we are already called in an
+      error handling case (transaction failure and rollback).
+
+      However, get_element() only fails if the element did not exist already
+      and could not be allocated due to out-of-memory - and if it did not
+      exist, then we would not get here in the first place.
+    */
+    mysql_mutex_unlock(&LOCK_slave_state);
+    return;
+  }
+
+  if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
+  {
+    uint32 count= elem->owner_count;
+    DBUG_ASSERT(count > 0);
+    DBUG_ASSERT(elem->owner_rli == rgi->rli);
+    --count;
+    elem->owner_count= count;
+    if (count == 0)
+    {
+      elem->owner_rli= NULL;
+      mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+    }
+  }
+  rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
+  mysql_mutex_unlock(&LOCK_slave_state);
+}
+
+
 static void
 rpl_slave_state_free_element(void *arg)
 {
@@ -233,7 +299,7 @@ rpl_slave_state::deinit()
 
 int
 rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
-                        uint64 seq_no, const Relay_log_info *rli)
+                        uint64 seq_no, rpl_group_info *rgi)
 {
   element *elem= NULL;
   list_element *list_elem= NULL;
@@ -256,18 +322,23 @@ rpl_slave_state::update(uint32 domain_id
     mysql_cond_broadcast(&elem->COND_wait_gtid);
   }
 
-  if (opt_gtid_ignore_duplicates && rli)
+  if (rgi)
   {
-    uint32 count= elem->owner_count;
-    DBUG_ASSERT(count > 0);
-    DBUG_ASSERT(elem->owner_rli == rli);
-    --count;
-    elem->owner_count= count;
-    if (count == 0)
+    if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
     {
-      elem->owner_rli= NULL;
-      mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+      Relay_log_info *rli= rgi->rli;
+      uint32 count= elem->owner_count;
+      DBUG_ASSERT(count > 0);
+      DBUG_ASSERT(elem->owner_rli == rli);
+      --count;
+      elem->owner_count= count;
+      if (count == 0)
+      {
+        elem->owner_rli= NULL;
+        mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+      }
     }
+    rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
   }
 
   if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))

=== modified file 'sql/rpl_gtid.h'
--- a/sql/rpl_gtid.h	2014-03-09 09:27:38 +0000
+++ b/sql/rpl_gtid.h	2014-03-11 23:14:49 +0000
@@ -92,6 +92,7 @@ struct gtid_waiting {
 
 
 class Relay_log_info;
+struct rpl_group_info;
 
 /*
   Replication slave state.
@@ -171,7 +172,7 @@ struct rpl_slave_state
   void truncate_hash();
   ulong count() const { return hash.records; }
   int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
-             uint64 seq_no, const Relay_log_info *rli);
+             uint64 seq_no, rpl_group_info *rgi);
   int truncate_state_table(THD *thd);
   int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
                   bool in_transaction, bool in_statement);
@@ -187,10 +188,10 @@ struct rpl_slave_state
   element *get_element(uint32 domain_id);
   int put_back_list(uint32 domain_id, list_element *list);
 
-  void update_state_hash(uint64 sub_id, rpl_gtid *gtid,
-                         const Relay_log_info *rli);
+  void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi);
   int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
-  int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli);
+  int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
+  void release_domain_owner(rpl_group_info *rgi);
 };
 
 

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-03-09 09:27:38 +0000
+++ b/sql/rpl_parallel.cc	2014-03-11 23:14:49 +0000
@@ -425,10 +425,22 @@ handle_rpl_parallel_thread(void *arg)
         {
           int res=
             rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid,
-                                                             rgi->rli);
-          /* ToDo: Handle res==-1 error. */
-          if (!res)
+                                                             rgi);
+          if (res < 0)
+          {
+            /* Error. */
+            slave_output_error_info(rgi->rli, thd);
+            signal_error_to_sql_driver_thread(thd, rgi);
+          }
+          else if (!res)
+          {
+            /* GTID already applied by another master connection, skip. */
             skip_event_group= true;
+          }
+          else
+          {
+            /* We have to apply the event. */
+          }
         }
       }
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-03-09 09:27:38 +0000
+++ b/sql/rpl_rli.cc	2014-03-11 23:14:49 +0000
@@ -1493,6 +1493,7 @@ rpl_group_info::reinit(Relay_log_info *r
   row_stmt_start_timestamp= 0;
   long_find_row_note_printed= false;
   did_mark_start_commit= false;
+  gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
   commit_orderer.reinit();
 }
 
@@ -1632,6 +1633,13 @@ void rpl_group_info::cleanup_context(THD
   thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
   /*
+    Ensure we always release the domain for others to process, when using
+    --gtid-ignore-duplicates.
+  */
+  if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
+    rpl_global_gtid_slave_state.release_domain_owner(this);
+
+  /*
     Reset state related to long_find_row notes in the error log:
     - timestamp
     - flag that decides whether the slave prints or not

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-03-04 13:32:42 +0000
+++ b/sql/rpl_rli.h	2014-03-11 23:14:49 +0000
@@ -575,6 +575,20 @@ struct rpl_group_info
     counting one event group twice.
   */
   bool did_mark_start_commit;
+  enum {
+    GTID_DUPLICATE_NULL=0,
+    GTID_DUPLICATE_IGNORE=1,
+    GTID_DUPLICATE_OWNER=2
+  };
+  /*
+    When --gtid-ignore-duplicates, this is set to one of the above three
+    values:
+    GTID_DUPLICATE_NULL    - Not using --gtid-ignore-duplicates.
+    GTID_DUPLICATE_IGNORE  - This gtid already applied, skip the event group.
+    GTID_DUPLICATE_OWNER   - We are the current owner of the domain, and must
+                             apply the event group and then release the domain.
+  */
+  uint8 gtid_ignore_duplicate_state;
 
   /*
     Runtime state for printing a note when slave is taking

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2014-03-09 09:27:38 +0000
+++ b/sql/slave.cc	2014-03-11 23:14:49 +0000
@@ -3508,18 +3508,46 @@ static int exec_relay_log_event(THD* thd
       */
     }
 
-    /*
-      For GTID, allocate a new sub_id for the given domain_id.
-      The sub_id must be allocated in increasing order of binlog order.
-    */
-    if (typ == GTID_EVENT &&
-        event_group_new_gtid(serial_rgi, static_cast<Gtid_log_event *>(ev)))
+    if (typ == GTID_EVENT)
     {
-      sql_print_error("Error reading relay log event: %s",
-                      "slave SQL thread aborted because of out-of-memory error");
-      mysql_mutex_unlock(&rli->data_lock);
-      delete ev;
-      DBUG_RETURN(1);
+      Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
+
+      /*
+        For GTID, allocate a new sub_id for the given domain_id.
+        The sub_id must be allocated in increasing order of binlog order.
+      */
+      if (event_group_new_gtid(serial_rgi, gev))
+      {
+        sql_print_error("Error reading relay log event: %s", "slave SQL thread "
+                        "aborted because of out-of-memory error");
+        mysql_mutex_unlock(&rli->data_lock);
+        delete ev;
+        DBUG_RETURN(1);
+      }
+
+      if (opt_gtid_ignore_duplicates)
+      {
+        serial_rgi->current_gtid.domain_id= gev->domain_id;
+        serial_rgi->current_gtid.server_id= gev->server_id;
+        serial_rgi->current_gtid.seq_no= gev->seq_no;
+        int res= rpl_global_gtid_slave_state.check_duplicate_gtid
+          (&serial_rgi->current_gtid, serial_rgi);
+        if (res < 0)
+        {
+          sql_print_error("Error processing GTID event: %s", "slave SQL "
+                          "thread aborted because of out-of-memory error");
+          mysql_mutex_unlock(&rli->data_lock);
+          delete ev;
+          DBUG_RETURN(1);
+        }
+        /*
+          If we need to skip this event group (because the GTID was already
+          applied), then do it using the code for slave_skip_counter, which
+          is able to handle skipping until the end of the event group.
+        */
+        if (!res)
+          rli->slave_skip_counter= 1;
+      }
     }
 
     serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos;

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2014-03-09 09:27:38 +0000
+++ b/sql/sys_vars.cc	2014-03-11 23:14:49 +0000
@@ -1839,18 +1839,14 @@ static bool
 fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type)
 {
   bool running;
-  bool err= false;
 
   mysql_mutex_unlock(&LOCK_global_system_variables);
   mysql_mutex_lock(&LOCK_active_mi);
   running= master_info_index->give_error_if_slave_running();
   mysql_mutex_unlock(&LOCK_active_mi);
-  if (running)
-    err= true;
   mysql_mutex_lock(&LOCK_global_system_variables);
 
-  /* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */
-  return err;
+  return running ? true : false;
 }
 
 



More information about the commits mailing list