[Commits] Rev 4056: MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Mon Mar 17 09:20:12 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4056
revision-id: knielsen at knielsen-hq.org-20140309092738-8keteor9jnelg1kk
parent: elenst at wheezy-64.home-20140315125635-4mv83huc3dntadur
committer: knielsen at knielsen-hq.org
branch nick: work-10.0-mdev5804
timestamp: Sun 2014-03-09 10:27:38 +0100
message:
  MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure
  
  Before, the arrival of same GTID twice in multi-source replication
  would cause double-apply or in gtid strict mode an error.
  
  Keep the behaviour, but add an option --gtid-ignore-duplicates which
  allows to correctly handle duplicates, ignoring all but the first.
  This relies on the user ensuring correct configuration so that
  sequence numbers are strictly increasing within each replication
  domain; then duplicates can be detected simply by comparing the
  sequence numbers against what is already applied.
  
  Only one master connection (but possibly multiple parallel worker
  threads within that connection) is allowed to apply events within
  one replication domain at a time; any other connection that
  receives a GTID in the same domain either discards it (if it is
  already applied) or waits for the other connection to not have
  any events to apply.
  
  Intermediate patch, as proof-of-concept for testing. The main limitation
  is that currently it is only implemented for parallel replication,
  @@slave_parallel_threads > 0.
=== added file 'mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf'
--- a/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf	2014-03-09 09:27:38 +0000
@@ -0,0 +1,24 @@
+!include my.cnf
+
+[mysqld.1]
+log-slave-updates
+loose-innodb
+
+[mysqld.2]
+log-slave-updates
+loose-innodb
+
+[mysqld.3]
+log-bin=server3-bin
+log-slave-updates
+loose-innodb
+
+[mysqld.4]
+server-id=4
+log-bin=server4-bin
+log-slave-updates
+loose-innodb
+
+[ENV]
+SERVER_MYPORT_4=                @mysqld.4.port
+SERVER_MYSOCK_4=                @mysqld.4.socket

=== added file 'mysql-test/suite/multi_source/gtid_ignore_duplicates.result'
--- a/mysql-test/suite/multi_source/gtid_ignore_duplicates.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result	2014-03-09 09:27:38 +0000
@@ -0,0 +1,188 @@
+*** Test all-to-all replication with --gtid-ignore-duplicates ***
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+CHANGE MASTER 'b2a' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+CHANGE MASTER 'c2a' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'b2a';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2a';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 2;
+SET SESSION gtid_domain_id= 2;
+CHANGE MASTER 'a2b' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+CHANGE MASTER 'c2b' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2b';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2b';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 3;
+SET SESSION gtid_domain_id= 3;
+CHANGE MASTER 'a2c' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+CHANGE MASTER 'b2c' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2c';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = 'b2c';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+CHANGE MASTER 'a2d' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2d';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+BEGIN;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t1 VALUES (3);
+COMMIT;
+INSERT INTO t1 VALUES (4), (5);
+INSERT INTO t1 VALUES (6);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+INSERT INTO t1 VALUES (10);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+STOP SLAVE "c2b";
+SET default_master_connection = "c2b";
+include/wait_for_slave_to_stop.inc
+STOP SLAVE "a2b";
+SET default_master_connection = "a2b";
+include/wait_for_slave_to_stop.inc
+INSERT INTO t1 VALUES (11);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+11
+SET default_master_connection = "b2a";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+INSERT INTO t1 VALUES (12);
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+12
+include/save_master_gtid.inc
+START SLAVE "b2a";
+SET default_master_connection = "b2a";
+include/wait_for_slave_to_start.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+11
+12
+START SLAVE "c2b";
+SET default_master_connection = "c2b";
+include/wait_for_slave_to_start.inc
+START SLAVE "a2b";
+SET default_master_connection = "a2b";
+include/wait_for_slave_to_start.inc
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+11
+12
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note    1938    SLAVE 'c2a' stopped
+Note    1938    SLAVE 'b2a' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note    1938    SLAVE 'a2b' stopped
+Note    1938    SLAVE 'c2b' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note    1938    SLAVE 'a2c' stopped
+Note    1938    SLAVE 'b2c' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note    1938    SLAVE 'a2d' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;

=== added file 'mysql-test/suite/multi_source/gtid_ignore_duplicates.test'
--- a/mysql-test/suite/multi_source/gtid_ignore_duplicates.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test	2014-03-09 09:27:38 +0000
@@ -0,0 +1,208 @@
+--source include/not_embedded.inc
+--source include/have_innodb.inc
+
+--echo *** Test all-to-all replication with --gtid-ignore-duplicates ***
+
+--connect (server_1,127.0.0.1,root,,,$SERVER_MYPORT_1)
+--connect (server_2,127.0.0.1,root,,,$SERVER_MYPORT_2)
+--connect (server_3,127.0.0.1,root,,,$SERVER_MYPORT_3)
+--connect (server_4,127.0.0.1,root,,,$SERVER_MYPORT_4)
+
+# Setup A <-> B, B <-> C, C <-> A, and A -> D.
+
+--connection server_1
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+--replace_result $SERVER_MYPORT_2 MYPORT_2
+eval CHANGE MASTER 'b2a' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+--replace_result $SERVER_MYPORT_3 MYPORT_3
+eval CHANGE MASTER 'c2a' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'b2a';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2a';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+--connection server_2
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 2;
+SET SESSION gtid_domain_id= 2;
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval CHANGE MASTER 'a2b' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+--replace_result $SERVER_MYPORT_3 MYPORT_3
+eval CHANGE MASTER 'c2b' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2b';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2b';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+--connection server_3
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 3;
+SET SESSION gtid_domain_id= 3;
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval CHANGE MASTER 'a2c' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+--replace_result $SERVER_MYPORT_2 MYPORT_2
+eval CHANGE MASTER 'b2c' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2c';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = 'b2c';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+--connection server_4
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval CHANGE MASTER 'a2d' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2d';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+BEGIN;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t1 VALUES (3);
+COMMIT;
+INSERT INTO t1 VALUES (4), (5);
+INSERT INTO t1 VALUES (6);
+
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_3
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_4
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_1
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+# Test that we can connect at a GTID position that has not yet reached
+# that master server.
+# We stop the connections C->B and A->B, create an event on C, Check that
+# the event has reached A (but not B). Then let A stop and re-connect to
+# B, which will connect at the new event, which is in the future for B.
+
+--connection server_3
+INSERT INTO t1 VALUES (10);
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+STOP SLAVE "c2b";
+SET default_master_connection = "c2b";
+--source include/wait_for_slave_to_stop.inc
+STOP SLAVE "a2b";
+SET default_master_connection = "a2b";
+--source include/wait_for_slave_to_stop.inc
+
+--connection server_3
+INSERT INTO t1 VALUES (11);
+--source include/save_master_gtid.inc
+
+--connection server_1
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+SET default_master_connection = "b2a";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+
+--connection server_2
+INSERT INTO t1 VALUES (12);
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+--source include/save_master_gtid.inc
+
+--connection server_1
+START SLAVE "b2a";
+SET default_master_connection = "b2a";
+--source include/wait_for_slave_to_start.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+
+--connection server_2
+START SLAVE "c2b";
+SET default_master_connection = "c2b";
+--source include/wait_for_slave_to_start.inc
+START SLAVE "a2b";
+SET default_master_connection = "a2b";
+--source include/wait_for_slave_to_start.inc
+
+--connection server_1
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+
+
+# Clean up.
+--connection server_1
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_1
+
+--connection server_2
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_2
+
+--connection server_3
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_3
+
+--connection server_4
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_4

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2014-03-05 22:20:10 +0000
+++ b/sql/log_event.cc	2014-03-09 09:27:38 +0000
@@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'",
 
 end:
   if (sub_id && !thd->is_slave_error)
-    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
 
   /*
     Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -6806,7 +6806,8 @@ Gtid_list_log_event::do_apply_event(rpl_
                                                         sub_id_list[i],
                                                         false, false)))
         return ret;
-      rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]);
+      rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i],
+                                                    NULL);
     }
   }
   ret= Log_event::do_apply_event(rgi);
@@ -7326,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_gr
   thd->mdl_context.release_transactional_locks();
 
   if (!res && sub_id)
-    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+    rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
 
   /*
     Increment the global status commit count variable

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2014-03-05 22:20:10 +0000
+++ b/sql/mysqld.cc	2014-03-09 09:27:38 +0000
@@ -553,6 +553,7 @@ ulong opt_slave_domain_parallel_threads=
 ulong opt_binlog_commit_wait_count= 0;
 ulong opt_binlog_commit_wait_usec= 0;
 ulong opt_slave_parallel_max_queued= 131072;
+my_bool opt_gtid_ignore_duplicates= FALSE;
 
 const double log_10[] = {
   1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
@@ -987,7 +988,7 @@ PSI_cond_key key_COND_rpl_thread_queue,
   key_COND_rpl_thread_pool,
   key_COND_parallel_entry, key_COND_group_commit_orderer,
   key_COND_prepare_ordered;
-PSI_cond_key key_COND_wait_gtid;
+PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
 
 static PSI_cond_info all_server_conds[]=
 {
@@ -1035,7 +1036,8 @@ static PSI_cond_info all_server_conds[]=
   { &key_COND_parallel_entry, "COND_parallel_entry", 0},
   { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
   { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
-  { &key_COND_wait_gtid, "COND_wait_gtid", 0}
+  { &key_COND_wait_gtid, "COND_wait_gtid", 0},
+  { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
 };
 
 PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2014-03-05 22:20:10 +0000
+++ b/sql/mysqld.h	2014-03-09 09:27:38 +0000
@@ -184,6 +184,7 @@ extern ulong opt_slave_domain_parallel_t
 extern ulong opt_slave_parallel_max_queued;
 extern ulong opt_binlog_commit_wait_count;
 extern ulong opt_binlog_commit_wait_usec;
+extern my_bool opt_gtid_ignore_duplicates;
 extern ulong back_log;
 extern ulong executed_events;
 extern char language[FN_REFLEN];
@@ -299,7 +300,7 @@ extern PSI_cond_key key_TC_LOG_MMAP_COND
 extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
   key_COND_rpl_thread_pool,
   key_COND_parallel_entry, key_COND_group_commit_orderer;
-extern PSI_cond_key key_COND_wait_gtid;
+extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
 
 extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
   key_thread_handle_manager, key_thread_kill_server, key_thread_main,

=== modified file 'sql/rpl_gtid.cc'
--- a/sql/rpl_gtid.cc	2014-02-10 14:12:17 +0000
+++ b/sql/rpl_gtid.cc	2014-03-09 09:27:38 +0000
@@ -33,7 +33,8 @@ const LEX_STRING rpl_gtid_slave_state_ta
 
 
 void
-rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+                                   const Relay_log_info *rli)
 {
   int err;
   /*
@@ -44,7 +45,7 @@ rpl_slave_state::update_state_hash(uint6
     it is even committed.
   */
   mysql_mutex_lock(&LOCK_slave_state);
-  err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
+  err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli);
   mysql_mutex_unlock(&LOCK_slave_state);
   if (err)
   {
@@ -76,17 +77,102 @@ rpl_slave_state::record_and_update_gtid(
     rgi->gtid_sub_id= 0;
     if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
       DBUG_RETURN(1);
-    update_state_hash(sub_id, &rgi->current_gtid);
+    update_state_hash(sub_id, &rgi->current_gtid, rgi->rli);
   }
   DBUG_RETURN(0);
 }
 
 
+/*
+  Check GTID event execution when --gtid-ignore-duplicates.
+
+  The idea with --gtid-ignore-duplicates is that we allow multiple master
+  connections (in multi-source replication) to all receive the same GTIDs and
+  event groups. Only one instance of each is applied; we use the sequence
+  number in the GTID to decide whether a GTID has already been applied.
+
+  So if the seq_no of a GTID (or a higher sequence number) has already been
+  applied, then the event should be skipped. If not then the event should be
+  applied.
+
+  To avoid two master connections tring to apply the same event
+  simultaneously, only one is allowed to work in any given domain at any point
+  in time. The associated Relay_log_info object is called the owner of the
+  domain (and there can be multiple parallel worker threads working in that
+  domain for that Relay_log_info). Any other Relay_log_info/master connection
+  must wait for the domain to become free, or for their GTID to have been
+  applied, before being allowed to proceed.
+
+  Returns:
+    0  This GTID is already applied, it should be skipped.
+    1  The GTID is not yet applied; this rli is now the owner, and must apply
+       the event and release the domain afterwards.
+   -1  Error (out of memory to allocate a new element for the domain).
+*/
+int
+rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
+{
+  uint32 domain_id= gtid->domain_id;
+  uint32 seq_no= gtid->seq_no;
+  rpl_slave_state::element *elem;
+  int res;
+
+  mysql_mutex_lock(&LOCK_slave_state);
+  if (!(elem= get_element(domain_id)))
+  {
+    res= -1;
+    goto err;
+  }
+  /*
+    Note that the elem pointer does not change once inserted in the hash. So
+    we can re-use the pointer without looking it up again in the hash after
+    each lock release and re-take.
+  */
+
+  /* ToDo: Make this wait killable. */
+  for (;;)
+  {
+    if (elem->highest_seq_no >= seq_no)
+    {
+      /* This sequence number is already applied, ignore it. */
+      res= 0;
+      break;
+    }
+    if (!elem->owner_rli)
+    {
+      /* The domain became free, grab it and apply the event. */
+      elem->owner_rli= rli;
+      elem->owner_count= 1;
+      res= 1;
+      break;
+    }
+    if (elem->owner_rli == rli)
+    {
+      /* Already own this domain, increment reference count and apply event. */
+      ++elem->owner_count;
+      res= 1;
+      break;
+    }
+    /*
+      Someone else is currently processing this GTID (or an earlier one).
+      Wait for them to complete (or fail), and then check again.
+    */
+    mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
+                    &LOCK_slave_state);
+  }
+
+err:
+  mysql_mutex_unlock(&LOCK_slave_state);
+  return res;
+}
+
+
 static void
 rpl_slave_state_free_element(void *arg)
 {
   struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
   mysql_cond_destroy(&elem->COND_wait_gtid);
+  mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
   my_free(elem);
 }
 
@@ -147,7 +233,7 @@ rpl_slave_state::deinit()
 
 int
 rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
-                        uint64 seq_no)
+                        uint64 seq_no, const Relay_log_info *rli)
 {
   element *elem= NULL;
   list_element *list_elem= NULL;
@@ -170,6 +256,20 @@ rpl_slave_state::update(uint32 domain_id
     mysql_cond_broadcast(&elem->COND_wait_gtid);
   }
 
+  if (opt_gtid_ignore_duplicates && rli)
+  {
+    uint32 count= elem->owner_count;
+    DBUG_ASSERT(count > 0);
+    DBUG_ASSERT(elem->owner_rli == rli);
+    --count;
+    elem->owner_count= count;
+    if (count == 0)
+    {
+      elem->owner_rli= NULL;
+      mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+    }
+  }
+
   if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
     return 1;
   list_elem->server_id= server_id;
@@ -199,7 +299,11 @@ rpl_slave_state::get_element(uint32 doma
   elem->domain_id= domain_id;
   elem->highest_seq_no= 0;
   elem->gtid_waiter= NULL;
+  elem->owner_rli= NULL;
+  elem->owner_count= 0;
   mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
+  mysql_cond_init(key_COND_gtid_ignore_duplicates,
+                  &elem->COND_gtid_ignore_duplicates, 0);
   if (my_hash_insert(&hash, (uchar *)elem))
   {
     my_free(elem);
@@ -821,7 +925,7 @@ rpl_slave_state::load(THD *thd, char *st
     if (gtid_parser_helper(&state_from_master, end, &gtid) ||
         !(sub_id= next_sub_id(gtid.domain_id)) ||
         record_gtid(thd, &gtid, sub_id, false, in_statement) ||
-        update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
+        update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
       return 1;
     if (state_from_master == end)
       break;

=== modified file 'sql/rpl_gtid.h'
--- a/sql/rpl_gtid.h	2014-02-08 21:28:41 +0000
+++ b/sql/rpl_gtid.h	2014-03-09 09:27:38 +0000
@@ -91,6 +91,8 @@ struct gtid_waiting {
 };
 
 
+class Relay_log_info;
+
 /*
   Replication slave state.
 
@@ -131,6 +133,19 @@ struct rpl_slave_state
     uint64 min_wait_seq_no;
     mysql_cond_t COND_wait_gtid;
 
+    /*
+      For --gtid-ignore-duplicates. The Relay_log_info that currently owns
+      this domain, and the number of worker threads that are active in it.
+
+      The idea is that only one of multiple master connections is allowed to
+      actively apply events for a given domain. Other connections must either
+      discard the events (if the seq_no in GTID shows they have already been
+      applied), or wait to see if the current owner will apply it.
+    */
+    const Relay_log_info *owner_rli;
+    uint32 owner_count;
+    mysql_cond_t COND_gtid_ignore_duplicates;
+
     list_element *grab_list() { list_element *l= list; list= NULL; return l; }
     void add(list_element *l)
     {
@@ -155,7 +170,8 @@ struct rpl_slave_state
   void deinit();
   void truncate_hash();
   ulong count() const { return hash.records; }
-  int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
+  int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
+             uint64 seq_no, const Relay_log_info *rli);
   int truncate_state_table(THD *thd);
   int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
                   bool in_transaction, bool in_statement);
@@ -171,8 +187,10 @@ struct rpl_slave_state
   element *get_element(uint32 domain_id);
   int put_back_list(uint32 domain_id, list_element *list);
 
-  void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
+  void update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+                         const Relay_log_info *rli);
   int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
+  int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli);
 };
 
 

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-03-07 11:08:38 +0000
+++ b/sql/rpl_parallel.cc	2014-03-09 09:27:38 +0000
@@ -202,7 +202,7 @@ handle_rpl_parallel_thread(void *arg)
   struct rpl_parallel_thread::queued_event *events;
   bool group_standalone= true;
   bool in_event_group= false;
-  bool group_skip_for_stop= false;
+  bool skip_event_group= false;
   rpl_group_info *group_rgi= NULL;
   group_commit_orderer *gco, *tmp_gco;
   uint64 event_gtid_sub_id= 0;
@@ -385,13 +385,13 @@ handle_rpl_parallel_thread(void *arg)
             point where we can safely stop. So set a flag that will cause us
             to skip, rather than execute, the following events.
           */
-          group_skip_for_stop= true;
+          skip_event_group= true;
         }
         else
-          group_skip_for_stop= false;
+          skip_event_group= false;
 
         if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
-          group_skip_for_stop= true;
+          skip_event_group= true;
         else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
         {
           /*
@@ -420,6 +420,16 @@ handle_rpl_parallel_thread(void *arg)
           thd->wait_for_commit_ptr->wakeup_subsequent_commits(err);
         }
         thd->wait_for_commit_ptr= &rgi->commit_orderer;
+
+        if (opt_gtid_ignore_duplicates)
+        {
+          int res=
+            rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid,
+                                                             rgi->rli);
+          /* ToDo: Handle res==-1 error. */
+          if (!res)
+            skip_event_group= true;
+        }
       }
 
       group_ending= event_type == XID_EVENT ||
@@ -438,7 +448,7 @@ handle_rpl_parallel_thread(void *arg)
         processing between the event groups as a simple way to ensure that
         everything is stopped and cleaned up correctly.
       */
-      if (!rgi->is_error && !group_skip_for_stop)
+      if (!rgi->is_error && !skip_event_group)
         err= rpt_handle_event(events, rpt);
       else
         err= thd->wait_for_prior_commit();
@@ -464,7 +474,7 @@ handle_rpl_parallel_thread(void *arg)
         rgi->next= rgis_to_free;
         rgis_to_free= rgi;
         group_rgi= rgi= NULL;
-        group_skip_for_stop= false;
+        skip_event_group= false;
         DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
       }
 
@@ -526,7 +536,7 @@ handle_rpl_parallel_thread(void *arg)
       mysql_mutex_lock(&rpt->LOCK_rpl_thread);
       rpt->free_rgi(group_rgi);
       group_rgi= NULL;
-      group_skip_for_stop= false;
+      skip_event_group= false;
     }
     if (!in_event_group)
     {

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-03-04 13:32:42 +0000
+++ b/sql/rpl_rli.cc	2014-03-09 09:27:38 +0000
@@ -1435,7 +1435,8 @@ rpl_load_gtid_slave_state(THD *thd)
     if ((err= rpl_global_gtid_slave_state.update(tmp_entry.gtid.domain_id,
                                                  tmp_entry.gtid.server_id,
                                                  tmp_entry.sub_id,
-                                                 tmp_entry.gtid.seq_no)))
+                                                 tmp_entry.gtid.seq_no,
+                                                 NULL)))
     {
       mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
       my_error(ER_OUT_OF_RESOURCES, MYF(0));

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2014-03-13 14:43:11 +0000
+++ b/sql/slave.cc	2014-03-09 09:27:38 +0000
@@ -2047,6 +2047,39 @@ when it try to get the value of TIME_ZON
       }
     }
 
+    query_str.length(0);
+    if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_ignore_duplicates="),
+                         system_charset_info) ||
+        query_str.append_ulonglong(opt_gtid_ignore_duplicates != false))
+    {
+      err_code= ER_OUTOFMEMORY;
+      errmsg= "The slave I/O thread stops because a fatal out-of-memory error "
+        "is encountered when it tries to set @slave_gtid_ignore_duplicates.";
+      sprintf(err_buff, "%s Error: Out of memory", errmsg);
+      goto err;
+    }
+
+    rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
+    if (rc)
+    {
+      err_code= mysql_errno(mysql);
+      if (is_network_error(err_code))
+      {
+        mi->report(ERROR_LEVEL, err_code,
+                   "Setting @slave_gtid_ignore_duplicates failed with "
+                   "error: %s", mysql_error(mysql));
+        goto network_err;
+      }
+      else
+      {
+        /* Fatal error */
+        errmsg= "The slave I/O thread stops because a fatal error is "
+          "encountered when it tries to set @slave_gtid_ignore_duplicates.";
+        sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+        goto err;
+      }
+    }
+
     if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID)
     {
       query_str.length(0);

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2014-02-26 14:28:07 +0000
+++ b/sql/sql_repl.cc	2014-03-09 09:27:38 +0000
@@ -115,6 +115,39 @@ fake_event_write(NET *net, String *packe
 
 
 /*
+  Helper structure, used to pass miscellaneous info from mysql_binlog_send()
+  into the helper functions that it calls.
+*/
+struct binlog_send_info {
+  rpl_binlog_state until_binlog_state;
+  slave_connection_state gtid_state;
+  THD *thd;
+  NET *net;
+  String *packet;
+  char *log_file_name;
+  slave_connection_state *until_gtid_state;
+  Format_description_log_event *fdev;
+  int mariadb_slave_capability;
+  enum_gtid_skip_type gtid_skip_group;
+  enum_gtid_until_state gtid_until_group;
+  ushort flags;
+  uint8 current_checksum_alg;
+  bool slave_gtid_strict_mode;
+  bool send_fake_gtid_list;
+  bool slave_gtid_ignore_duplicates;
+  bool using_gtid_state;
+
+  binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn)
+    : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
+      log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+      gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
+      flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
+      slave_gtid_strict_mode(false), send_fake_gtid_list(false),
+      slave_gtid_ignore_duplicates(false)
+  { }
+};
+
+/*
     fake_rotate_event() builds a fake (=which does not exist physically in any
     binlog) Rotate event, which contains the name of the binlog we are going to
     send to the slave (because the slave may not know it if it just asked for
@@ -132,16 +165,16 @@ fake_event_write(NET *net, String *packe
     part.
 */
 
-static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
-                             ulonglong position, const char** errmsg,
-                             uint8 checksum_alg_arg)
+static int fake_rotate_event(binlog_send_info *info, ulonglong position,
+                             const char** errmsg, uint8 checksum_alg_arg)
 {
   DBUG_ENTER("fake_rotate_event");
   char buf[ROTATE_HEADER_LEN+100];
   my_bool do_checksum;
   int err;
-  char* p = log_file_name+dirname_length(log_file_name);
+  char* p = info->log_file_name+dirname_length(info->log_file_name);
   uint ident_len = (uint) strlen(p);
+  String *packet= info->packet;
   ha_checksum crc;
 
   if ((err= fake_event_header(packet, ROTATE_EVENT,
@@ -160,22 +193,23 @@ static int fake_rotate_event(NET* net, S
   }
 
   if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
-      (err= fake_event_write(net, packet, errmsg)))
+      (err= fake_event_write(info->net, packet, errmsg)))
     DBUG_RETURN(err);
 
   DBUG_RETURN(0);
 }
 
 
-static int fake_gtid_list_event(NET* net, String* packet,
+static int fake_gtid_list_event(binlog_send_info *info,
                                 Gtid_list_log_event *glev, const char** errmsg,
-                                uint8 checksum_alg_arg, uint32 current_pos)
+                                uint32 current_pos)
 {
   my_bool do_checksum;
   int err;
   ha_checksum crc;
   char buf[128];
   String str(buf, sizeof(buf), system_charset_info);
+  String* packet= info->packet;
 
   str.length(0);
   if (glev->to_packet(&str))
@@ -185,7 +219,7 @@ static int fake_gtid_list_event(NET* net
   }
   if ((err= fake_event_header(packet, GTID_LIST_EVENT,
                               str.length(), &do_checksum, &crc,
-                              errmsg, checksum_alg_arg, current_pos)))
+                              errmsg, info->current_checksum_alg, current_pos)))
     return err;
 
   packet->append(str);
@@ -195,7 +229,7 @@ static int fake_gtid_list_event(NET* net
   }
 
   if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
-      (err= fake_event_write(net, packet, errmsg)))
+      (err= fake_event_write(info->net, packet, errmsg)))
     return err;
 
   return 0;
@@ -627,6 +661,19 @@ get_slave_gtid_strict_mode(THD *thd)
 }
 
 
+static bool
+get_slave_gtid_ignore_duplicates(THD *thd)
+{
+  bool null_value;
+
+  const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_ignore_duplicates") };
+  user_var_entry *entry=
+    (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+                                     name.length);
+  return entry && entry->val_int(&null_value) && !null_value;
+}
+
+
 /*
   Get the value of the @slave_until_gtid user variable into the supplied
   String (this is the GTID position specified for START SLAVE UNTIL
@@ -914,16 +961,16 @@ give_error_start_pos_missing_in_binlog(i
 */
 
 static int
-check_slave_start_position(THD *thd, slave_connection_state *st,
-                           const char **errormsg, rpl_gtid *error_gtid,
-                           slave_connection_state *until_gtid_state)
+check_slave_start_position(binlog_send_info *info, const char **errormsg,
+                           rpl_gtid *error_gtid)
 {
   uint32 i;
   int err;
   slave_connection_state::entry **delete_list= NULL;
   uint32 delete_idx= 0;
+  slave_connection_state *st= &info->gtid_state;
 
-  if (rpl_load_gtid_slave_state(thd))
+  if (rpl_load_gtid_slave_state(info->thd))
   {
     *errormsg= "Failed to load replication slave GTID state";
     err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
@@ -963,6 +1010,7 @@ check_slave_start_position(THD *thd, sla
     if (!start_at_own_slave_pos)
     {
       rpl_gtid domain_gtid;
+      slave_connection_state *until_gtid_state= info->until_gtid_state;
       rpl_gtid *until_gtid;
 
       if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
@@ -981,6 +1029,17 @@ check_slave_start_position(THD *thd, sla
         continue;
       }
 
+      if (info->slave_gtid_ignore_duplicates &&
+          domain_gtid.seq_no < slave_gtid->seq_no)
+      {
+        /*
+          When --gtid-ignore-duplicates, it is ok for the slave to request
+          something that we do not have (yet) - they might already have gotten
+          it through another path in a multi-path replication hierarchy.
+        */
+        continue;
+      }
+
       if (until_gtid_state &&
           ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
             (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
@@ -1462,13 +1521,11 @@ gtid_state_from_binlog_pos(const char *i
 
 
 static bool
-is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
-                 enum_gtid_until_state gtid_until_group,
-                 Log_event_type event_type, uint8 current_checksum_alg,
-                 ushort flags, const char **errmsg,
-                 rpl_binlog_state *until_binlog_state, uint32 current_pos)
+is_until_reached(binlog_send_info *info, ulong *ev_offset,
+                 Log_event_type event_type, const char **errmsg,
+                 uint32 current_pos)
 {
-  switch (gtid_until_group)
+  switch (info->gtid_until_group)
   {
   case GTID_UNTIL_NOT_DONE:
     return false;
@@ -1479,9 +1536,10 @@ is_until_reached(THD *thd, NET *net, Str
   case GTID_UNTIL_STOP_AFTER_TRANSACTION:
     if (event_type != XID_EVENT &&
         (event_type != QUERY_EVENT ||
-         !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset,
-                                                   packet->length()-*ev_offset,
-                                                   current_checksum_alg)))
+         !Query_log_event::peek_is_commit_rollback
+               (info->packet->ptr()+*ev_offset,
+                info->packet->length()-*ev_offset,
+                info->current_checksum_alg)))
       return false;
     break;
   }
@@ -1493,12 +1551,11 @@ is_until_reached(THD *thd, NET *net, Str
     Send a last fake Gtid_list_log_event with a flag set to mark that we
     stop due to UNTIL condition.
   */
-  if (reset_transmit_packet(thd, flags, ev_offset, errmsg))
+  if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg))
     return true;
-  Gtid_list_log_event glev(until_binlog_state,
+  Gtid_list_log_event glev(&info->until_binlog_state,
                            Gtid_list_log_event::FLAG_UNTIL_REACHED);
-  if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg,
-                           current_pos))
+  if (fake_gtid_list_event(info, &glev, errmsg, current_pos))
     return true;
   *errmsg= NULL;
   return true;
@@ -1512,23 +1569,19 @@ is_until_reached(THD *thd, NET *net, Str
   Returns NULL on success, error message string on error.
 */
 static const char *
-send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
-                    Log_event_type event_type, char *log_file_name,
-                    IO_CACHE *log, int mariadb_slave_capability,
-                    ulong ev_offset, uint8 current_checksum_alg,
-                    bool using_gtid_state, slave_connection_state *gtid_state,
-                    enum_gtid_skip_type *gtid_skip_group,
-                    slave_connection_state *until_gtid_state,
-                    enum_gtid_until_state *gtid_until_group,
-                    rpl_binlog_state *until_binlog_state,
-                    bool slave_gtid_strict_mode, rpl_gtid *error_gtid,
-                    bool *send_fake_gtid_list,
-                    Format_description_log_event *fdev)
+send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
+                    IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid)
 {
   my_off_t pos;
+  String* const packet= info->packet;
   size_t len= packet->length();
+  int mariadb_slave_capability= info->mariadb_slave_capability;
+  uint8 current_checksum_alg= info->current_checksum_alg;
+  slave_connection_state *gtid_state= &info->gtid_state;
+  slave_connection_state *until_gtid_state= info->until_gtid_state;
 
-  if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state)
+  if (event_type == GTID_LIST_EVENT &&
+      info->using_gtid_state && until_gtid_state)
   {
     rpl_gtid *gtid_list;
     uint32 list_len;
@@ -1537,12 +1590,12 @@ send_event_to_slave(THD *thd, NET *net,
     if (ev_offset > len ||
         Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
                                   current_checksum_alg,
-                                  &gtid_list, &list_len, fdev))
+                                  &gtid_list, &list_len, info->fdev))
     {
       my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
       return "Failed to read Gtid_list_log_event: corrupt binlog";
     }
-    err= until_binlog_state->load(gtid_list, list_len);
+    err= info->until_binlog_state.load(gtid_list, list_len);
     my_free(gtid_list);
     if (err)
     {
@@ -1552,7 +1605,7 @@ send_event_to_slave(THD *thd, NET *net,
   }
 
   /* Skip GTID event groups until we reach slave position within a domain_id. */
-  if (event_type == GTID_EVENT && using_gtid_state)
+  if (event_type == GTID_EVENT && info->using_gtid_state)
   {
     uchar flags2;
     slave_connection_state::entry *gtid_entry;
@@ -1566,7 +1619,7 @@ send_event_to_slave(THD *thd, NET *net,
           Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
                                current_checksum_alg,
                                &event_gtid.domain_id, &event_gtid.server_id,
-                               &event_gtid.seq_no, &flags2, fdev))
+                               &event_gtid.seq_no, &flags2, info->fdev))
       {
         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         return "Failed to read Gtid_log_event: corrupt binlog";
@@ -1575,7 +1628,7 @@ send_event_to_slave(THD *thd, NET *net,
       DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
         {
           rpl_gtid *dbug_gtid;
-          if ((dbug_gtid= until_binlog_state->find_nolock(10,1)) &&
+          if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) &&
               dbug_gtid->seq_no == 100)
           {
             DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
@@ -1585,7 +1638,7 @@ send_event_to_slave(THD *thd, NET *net,
           }
         });
 
-      if (until_binlog_state->update_nolock(&event_gtid, false))
+      if (info->until_binlog_state.update_nolock(&event_gtid, false))
       {
         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         return "Failed in internal GTID book-keeping: Out of memory";
@@ -1618,12 +1671,13 @@ send_event_to_slave(THD *thd, NET *net,
           /* Skip this event group if we have not yet reached slave start pos. */
           if (event_gtid.server_id != gtid->server_id ||
               event_gtid.seq_no <= gtid->seq_no)
-            *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+            info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
                                 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
           if (event_gtid.server_id == gtid->server_id &&
               event_gtid.seq_no >= gtid->seq_no)
           {
-            if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no &&
+            if (info->slave_gtid_strict_mode &&
+                event_gtid.seq_no > gtid->seq_no &&
                 !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
             {
               /*
@@ -1645,7 +1699,7 @@ send_event_to_slave(THD *thd, NET *net,
               so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
               The fake event will be sent at the end of this event group.
             */
-            *send_fake_gtid_list= true;
+            info->send_fake_gtid_list= true;
 
             /*
               Delete this entry if we have reached slave start position (so we
@@ -1666,7 +1720,7 @@ send_event_to_slave(THD *thd, NET *net,
             This domain already reached the START SLAVE UNTIL stop condition,
             so skip this event group.
           */
-          *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+          info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
                               GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
         }
         else if (event_gtid.server_id == gtid->server_id &&
@@ -1681,9 +1735,9 @@ send_event_to_slave(THD *thd, NET *net,
           uint64 until_seq_no= gtid->seq_no;
           until_gtid_state->remove(gtid);
           if (until_gtid_state->count() == 0)
-            *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
-                                GTID_UNTIL_STOP_AFTER_STANDALONE :
-                                GTID_UNTIL_STOP_AFTER_TRANSACTION);
+            info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+                                     GTID_UNTIL_STOP_AFTER_STANDALONE :
+                                     GTID_UNTIL_STOP_AFTER_TRANSACTION);
           if (event_gtid.seq_no > until_seq_no)
           {
             /*
@@ -1693,7 +1747,7 @@ send_event_to_slave(THD *thd, NET *net,
               should be in, we can just stop now. And we also need to skip this
               event group (as it is beyond the UNTIL condition).
             */
-            *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+            info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
                                 GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
           }
         }
@@ -1707,11 +1761,11 @@ send_event_to_slave(THD *thd, NET *net,
     Note that slave that understands GTID can also tolerate holes, so there is
     no need to supply dummy event.
   */
-  switch (*gtid_skip_group)
+  switch (info->gtid_skip_group)
   {
   case GTID_SKIP_STANDALONE:
     if (!Log_event::is_part_of_group(event_type))
-      *gtid_skip_group= GTID_SKIP_NOT;
+      info->gtid_skip_group= GTID_SKIP_NOT;
     return NULL;
   case GTID_SKIP_TRANSACTION:
     if (event_type == XID_EVENT ||
@@ -1719,14 +1773,15 @@ send_event_to_slave(THD *thd, NET *net,
          Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
                                                   len - ev_offset,
                                                   current_checksum_alg)))
-      *gtid_skip_group= GTID_SKIP_NOT;
+      info->gtid_skip_group= GTID_SKIP_NOT;
     return NULL;
   case GTID_SKIP_NOT:
     break;
   }
 
   /* Do not send annotate_rows events unless slave requested it. */
-  if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+  if (event_type == ANNOTATE_ROWS_EVENT &&
+      !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
   {
     if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
     {
@@ -1820,7 +1875,7 @@ send_event_to_slave(THD *thd, NET *net,
     Skip events with the @@skip_replication flag set, if slave requested
     skipping of such events.
   */
-  if (thd->variables.option_bits & OPTION_SKIP_REPLICATION)
+  if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION)
   {
     /*
       The first byte of the packet is a '\0' to distinguish it from an error
@@ -1831,17 +1886,17 @@ send_event_to_slave(THD *thd, NET *net,
       return NULL;
   }
 
-  THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
+  THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
 
   pos= my_b_tell(log);
   if (RUN_HOOK(binlog_transmit, before_send_event,
-               (thd, flags, packet, log_file_name, pos)))
+               (info->thd, info->flags, packet, info->log_file_name, pos)))
   {
     my_errno= ER_UNKNOWN_ERROR;
     return "run 'before_send_event' hook failed";
   }
 
-  if (my_net_write(net, (uchar*) packet->ptr(), len))
+  if (my_net_write(info->net, (uchar*) packet->ptr(), len))
   {
     my_errno= ER_UNKNOWN_ERROR;
     return "Failed on my_net_write()";
@@ -1850,14 +1905,15 @@ send_event_to_slave(THD *thd, NET *net,
   DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
   if (event_type == LOAD_EVENT)
   {
-    if (send_file(thd))
+    if (send_file(info->thd))
     {
       my_errno= ER_UNKNOWN_ERROR;
       return "failed in send_file()";
     }
   }
 
-  if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+  if (RUN_HOOK(binlog_transmit, after_send_event,
+               (info->thd, info->flags, packet)))
   {
     my_errno= ER_UNKNOWN_ERROR;
     return "Failed to run hook 'after_send_event'";
@@ -1878,31 +1934,21 @@ void mysql_binlog_send(THD* thd, char* l
 
   IO_CACHE log;
   File file = -1;
-  String* const packet = &thd->packet;
+  String* const packet= &thd->packet;
   int error;
   const char *errmsg = "Unknown error", *tmp_msg;
   char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
-  NET* net = &thd->net;
   mysql_mutex_t *log_lock;
   mysql_cond_t *log_cond;
-  int mariadb_slave_capability;
   char str_buf[128];
   String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
-  bool using_gtid_state;
   char str_buf2[128];
   String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
-  slave_connection_state gtid_state, until_gtid_state_obj;
-  slave_connection_state *until_gtid_state= NULL;
+  slave_connection_state until_gtid_state_obj;
   rpl_gtid error_gtid;
-  enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
-  enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE;
-  rpl_binlog_state until_binlog_state;
-  bool slave_gtid_strict_mode= false;
-  bool send_fake_gtid_list= false;
+  binlog_send_info info(thd, packet, flags, log_file_name);
 
-  uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
   int old_max_allowed_packet= thd->variables.max_allowed_packet;
-  Format_description_log_event *fdev= NULL;
 
 #ifndef DBUG_OFF
   int left_events = max_binlog_dump_events;
@@ -1928,16 +1974,17 @@ void mysql_binlog_send(THD* thd, char* l
     heartbeat_ts= &heartbeat_buf;
     set_timespec_nsec(*heartbeat_ts, 0);
   }
-  mariadb_slave_capability= get_mariadb_slave_capability(thd);
+  info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
 
   connect_gtid_state.length(0);
-  using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
-  DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
-  if (using_gtid_state)
+  info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+  DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
+  if (info.using_gtid_state)
   {
-    slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+    info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+    info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
     if(get_slave_until_gtid(thd, &slave_until_gtid_str))
-      until_gtid_state= &until_gtid_state_obj;
+      info.until_gtid_state= &until_gtid_state_obj;
   }
 
   DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
@@ -1978,7 +2025,7 @@ void mysql_binlog_send(THD* thd, char* l
   }
 #endif
 
-  if (!(fdev= new Format_description_log_event(3)))
+  if (!(info.fdev= new Format_description_log_event(3)))
   {
     errmsg= "Out of memory initializing format_description event";
     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
@@ -1999,33 +2046,32 @@ void mysql_binlog_send(THD* thd, char* l
   }
 
   name=search_file_name;
-  if (using_gtid_state)
+  if (info.using_gtid_state)
   {
-    if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
-                        connect_gtid_state.length()))
+    if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(),
+                             connect_gtid_state.length()))
     {
       errmsg= "Out of memory or malformed slave request when obtaining start "
         "position from GTID state";
       my_errno= ER_UNKNOWN_ERROR;
       goto err;
     }
-    if (until_gtid_state &&
-        until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
-                               slave_until_gtid_str.length()))
+    if (info.until_gtid_state &&
+        info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+                                    slave_until_gtid_str.length()))
     {
       errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
         "position sent from slave";
       my_errno= ER_UNKNOWN_ERROR;
       goto err;
     }
-    if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
-                                           &error_gtid, until_gtid_state)))
+    if ((error= check_slave_start_position(&info, &errmsg, &error_gtid)))
     {
       my_errno= error;
       goto err;
     }
-    if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name,
-                                       until_gtid_state)))
+    if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name,
+                                       info.until_gtid_state)))
     {
       my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
       goto err;
@@ -2098,7 +2144,7 @@ impossible position";
     given that we want minimum modification of 4.0, we send the normal
     and fake Rotates.
   */
-  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
+  if (fake_rotate_event(&info, pos, &errmsg,
                         get_binlog_checksum_value_at_connect(thd)))
   {
     /*
@@ -2150,14 +2196,14 @@ impossible position";
        {
          Format_description_log_event *tmp;
 
-         current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
-                                                packet->length() - ev_offset);
-         DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
-                     current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
-                     current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+         info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+                                                     packet->length() - ev_offset);
+         DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+                     info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+                     info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
          if (!is_slave_checksum_aware(thd) &&
-             current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
-             current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+             info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+             info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
          {
            my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
            errmsg= "Slave can not handle replication events with the checksum "
@@ -2170,14 +2216,14 @@ impossible position";
 
          if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
                                                      packet->length()-ev_offset,
-                                                     fdev)))
+                                                     info.fdev)))
          {
            my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
            errmsg= "Corrupt Format_description event found or out-of-memory";
            goto err;
          }
-         delete fdev;
-         fdev= tmp;
+         delete info.fdev;
+         info.fdev= tmp;
 
          (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
          /*
@@ -2194,12 +2240,12 @@ impossible position";
                    ST_CREATED_OFFSET+ev_offset, (ulong) 0);
 
          /* fix the checksum due to latest changes in header */
-         if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
-             current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+         if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+             info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
            fix_checksum(packet, ev_offset);
 
          /* send it */
-         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+         if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length()))
          {
            errmsg = "Failed on my_net_write()";
            my_errno= ER_UNKNOWN_ERROR;
@@ -2235,13 +2281,13 @@ impossible position";
 
     We will send one event, the format_description, and then stop.
   */
-  if (until_gtid_state && until_gtid_state->count() == 0)
-    gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+  if (info.until_gtid_state && info.until_gtid_state->count() == 0)
+    info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
 
   /* seek to the requested position, to start the requested dump */
   my_b_seek(&log, pos);                 // Seek will done on next read
 
-  while (!net->error && net->vio != 0 && !thd->killed)
+  while (!info.net->error && info.net->vio != 0 && !thd->killed)
   {
     Log_event_type event_type= UNKNOWN_EVENT;
     killed_state killed;
@@ -2254,14 +2300,14 @@ impossible position";
     bool is_active_binlog= false;
     while (!(killed= thd->killed) &&
            !(error = Log_event::read_log_event(&log, packet, log_lock,
-                                              current_checksum_alg,
+                                              info.current_checksum_alg,
                                               log_file_name,
                                               &is_active_binlog)))
     {
 #ifndef DBUG_OFF
       if (max_binlog_dump_events && !left_events--)
       {
-        net_flush(net);
+        net_flush(info.net);
         errmsg = "Debugging binlog dump abort";
         my_errno= ER_UNKNOWN_ERROR;
         goto err;
@@ -2279,7 +2325,7 @@ impossible position";
                       {
                         if (event_type == XID_EVENT)
                         {
-                          net_flush(net);
+                          net_flush(info.net);
                           const char act[]=
                             "now "
                             "wait_for signal.continue";
@@ -2298,14 +2344,14 @@ impossible position";
       {
         Format_description_log_event *tmp;
 
-        current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+        info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
                                                packet->length() - ev_offset);
-        DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
-                    current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
-                    current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+        DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+                    info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+                    info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
         if (!is_slave_checksum_aware(thd) &&
-            current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
-            current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+            info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+            info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
         {
           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
           errmsg= "Slave can not handle replication events with the checksum "
@@ -2318,14 +2364,14 @@ impossible position";
 
         if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
                                                     packet->length()-ev_offset,
-                                                    fdev)))
+                                                    info.fdev)))
         {
           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
           errmsg= "Corrupt Format_description event found or out-of-memory";
           goto err;
         }
-        delete fdev;
-        fdev= tmp;
+        delete info.fdev;
+        info.fdev= tmp;
 
         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
       }
@@ -2343,36 +2389,28 @@ impossible position";
       }
 #endif
 
-      if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
-                                        log_file_name, &log,
-                                        mariadb_slave_capability, ev_offset,
-                                        current_checksum_alg, using_gtid_state,
-                                        &gtid_state, &gtid_skip_group,
-                                        until_gtid_state, &gtid_until_group,
-                                        &until_binlog_state,
-                                        slave_gtid_strict_mode, &error_gtid,
-                                        &send_fake_gtid_list, fdev)))
+      if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+                                        ev_offset, &error_gtid)))
       {
         errmsg= tmp_msg;
         goto err;
       }
-      if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
+      if (unlikely(info.send_fake_gtid_list) &&
+          info.gtid_skip_group == GTID_SKIP_NOT)
       {
-        Gtid_list_log_event glev(&until_binlog_state, 0);
+        Gtid_list_log_event glev(&info.until_binlog_state, 0);
 
         if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
-            fake_gtid_list_event(net, packet, &glev, &errmsg,
-                                 current_checksum_alg, my_b_tell(&log)))
+            fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
         {
           my_errno= ER_UNKNOWN_ERROR;
           goto err;
         }
-        send_fake_gtid_list= false;
+        info.send_fake_gtid_list= false;
       }
-      if (until_gtid_state &&
-          is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
-                           event_type, current_checksum_alg, flags, &errmsg,
-                           &until_binlog_state, my_b_tell(&log)))
+      if (info.until_gtid_state &&
+          is_until_reached(&info, &ev_offset, event_type, &errmsg,
+                           my_b_tell(&log)))
       {
         if (errmsg)
         {
@@ -2386,7 +2424,7 @@ impossible position";
                       {
                         if (event_type == XID_EVENT)
                         {
-                          net_flush(net);
+                          net_flush(info.net);
                         }
                       });
 
@@ -2423,7 +2461,7 @@ impossible position";
       /*
         Block until there is more data in the log
       */
-      if (net_flush(net))
+      if (net_flush(info.net))
       {
         errmsg = "failed on net_flush()";
         my_errno= ER_UNKNOWN_ERROR;
@@ -2466,7 +2504,7 @@ impossible position";
 
         mysql_mutex_lock(log_lock);
         switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
-                                                 current_checksum_alg)) {
+                                                 info.current_checksum_alg)) {
         case 0:
           /* we read successfully, so we'll need to send it to the slave */
           mysql_mutex_unlock(log_lock);
@@ -2524,7 +2562,8 @@ impossible position";
                 thd->EXIT_COND(&old_stage);
                 goto err;
               }
-              if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
+              if (send_heartbeat_event(info.net, packet, p_coord,
+                                       info.current_checksum_alg))
               {
                 errmsg = "Failed on my_net_write()";
                 my_errno= ER_UNKNOWN_ERROR;
@@ -2549,36 +2588,28 @@ impossible position";
 
         if (read_packet)
         {
-          if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
-                                            log_file_name, &log,
-                                            mariadb_slave_capability, ev_offset,
-                                            current_checksum_alg,
-                                            using_gtid_state, &gtid_state,
-                                            &gtid_skip_group, until_gtid_state,
-                                            &gtid_until_group, &until_binlog_state,
-                                            slave_gtid_strict_mode, &error_gtid,
-                                            &send_fake_gtid_list, fdev)))
+          if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+                                            ev_offset, &error_gtid)))
           {
             errmsg= tmp_msg;
             goto err;
           }
-          if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
+          if (unlikely(info.send_fake_gtid_list)
+              && info.gtid_skip_group == GTID_SKIP_NOT)
           {
-            Gtid_list_log_event glev(&until_binlog_state, 0);
+            Gtid_list_log_event glev(&info.until_binlog_state, 0);
 
             if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
-                fake_gtid_list_event(net, packet, &glev, &errmsg,
-                                     current_checksum_alg, my_b_tell(&log)))
+                fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
             {
               my_errno= ER_UNKNOWN_ERROR;
               goto err;
             }
-            send_fake_gtid_list= false;
+            info.send_fake_gtid_list= false;
           }
-          if (until_gtid_state &&
-              is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
-                               event_type, current_checksum_alg, flags, &errmsg,
-                               &until_binlog_state, my_b_tell(&log)))
+          if (info.until_gtid_state &&
+              is_until_reached(&info, &ev_offset, event_type, &errmsg,
+                               my_b_tell(&log)))
           {
             if (errmsg)
             {
@@ -2633,8 +2664,8 @@ impossible position";
         read and send is Format_description_log_event.
       */
       if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
-          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
-                            &errmsg, current_checksum_alg))
+          fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
+                            info.current_checksum_alg))
       {
         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         goto err;
@@ -2655,7 +2686,7 @@ impossible position";
   thd->current_linfo = 0;
   mysql_mutex_unlock(&LOCK_thread_count);
   thd->variables.max_allowed_packet= old_max_allowed_packet;
-  delete fdev;
+  delete info.fdev;
   DBUG_VOID_RETURN;
 
 err:
@@ -2731,7 +2762,7 @@ impossible position";
   if (file >= 0)
     mysql_file_close(file, MYF(MY_WME));
   thd->variables.max_allowed_packet= old_max_allowed_packet;
-  delete fdev;
+  delete info.fdev;
 
   my_message(my_errno, error_text, MYF(0));
   DBUG_VOID_RETURN;

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2014-03-12 11:34:36 +0000
+++ b/sql/sys_vars.cc	2014-03-09 09:27:38 +0000
@@ -1819,6 +1819,54 @@ static Sys_var_ulong Sys_slave_parallel_
        "--slave-parallel-threads > 0.",
        GLOBAL_VAR(opt_slave_parallel_max_queued), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));
+
+
+static bool
+check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var)
+{
+  bool running;
+
+  mysql_mutex_lock(&LOCK_active_mi);
+  running= master_info_index->give_error_if_slave_running();
+  mysql_mutex_unlock(&LOCK_active_mi);
+  if (running)
+    return true;
+
+  return false;
+}
+
+static bool
+fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type)
+{
+  bool running;
+  bool err= false;
+
+  mysql_mutex_unlock(&LOCK_global_system_variables);
+  mysql_mutex_lock(&LOCK_active_mi);
+  running= master_info_index->give_error_if_slave_running();
+  mysql_mutex_unlock(&LOCK_active_mi);
+  if (running)
+    err= true;
+  mysql_mutex_lock(&LOCK_global_system_variables);
+
+  /* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */
+  return err;
+}
+
+
+static Sys_var_mybool Sys_gtid_ignore_duplicates(
+       "gtid_ignore_duplicates",
+       "When set, different master connections in multi-source replication are "
+       "allowed to receive and process event groups with the same GTID (when "
+       "using GTID mode). Only one will be applied, any others will be "
+       "ignored. Within a given replication domain, just the sequence number "
+       "will be used to decide whether a given GTID has been already applied; "
+       "this means it is the responsibility of the user to ensure that GTID "
+       "sequence numbers are strictly increasing.",
+       GLOBAL_VAR(opt_gtid_ignore_duplicates), CMD_LINE(OPT_ARG),
+       DEFAULT(FALSE), NO_MUTEX_GUARD,
+       NOT_IN_BINLOG, ON_CHECK(check_gtid_ignore_duplicates),
+       ON_UPDATE(fix_gtid_ignore_duplicates));
 #endif
 
 



More information about the commits mailing list