[Commits] Rev 4513: MDEV-6676: Speculative parallel replication: Intermediate commit. in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Nov 13 16:26:24 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4513
revision-id: knielsen at knielsen-hq.org-20141015140351-m3qii86avbjtiokn
parent: knielsen at knielsen-hq.org-20141015114013-jfatzrzb2ncq07q3
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: work-10.0-mdev6676
timestamp: Wed 2014-10-15 16:03:51 +0200
message:
  MDEV-6676: Speculative parallel replication: Intermediate commit.
  
  Preserve the value of @@replicate_allow_parallel from the master in the
  slave's binlog, so that it will be used correctly on deeper-level slaves as
  well.
  
  Also preserve the FL_DDL and FL_WAITED flags from the master's GTID event in
  the slave's binlog.
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_multilevel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_multilevel.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multilevel.result	2014-10-15 14:03:51 +0000
@@ -0,0 +1,155 @@
+include/rpl_init.inc [topology=1->2->3->4]
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
+*** MDEV-6676: Test that @@replicate_allow_parallel is preserved in slave binlog ***
+INSERT INTO t1 VALUES(1,1);
+BEGIN;
+INSERT INTO t1 VALUES(2,1);
+INSERT INTO t1 VALUES(3,1);
+COMMIT;
+SET SESSION replicate_allow_parallel=0;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET SESSION replicate_allow_parallel=1;
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       11
+3       1
+include/save_master_gtid.inc
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       11
+3       1
+status
+Ok, no retry
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       11
+3       1
+status
+Ok, no retry
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       11
+3       1
+status
+Ok, no retry
+*** MDEV-6676: Test that the FL_WAITED flag in GTID is preserved in slave binlog ***
+include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
+include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
+include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
+BEGIN;
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="thd_report_wait_for SIGNAL waiting1";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting1";
+SET debug_sync="thd_report_wait_for SIGNAL waiting2";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting2";
+SET debug_sync="thd_report_wait_for SIGNAL waiting3";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting3";
+SET debug_sync="thd_report_wait_for SIGNAL waiting4";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting4";
+SET debug_sync="thd_report_wait_for SIGNAL waiting5";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting5";
+SET debug_sync="thd_report_wait_for SIGNAL waiting6";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting6";
+SET debug_sync="thd_report_wait_for SIGNAL waiting7";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting7";
+SET debug_sync="thd_report_wait_for SIGNAL waiting8";
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET debug_sync="now WAIT_FOR waiting8";
+COMMIT;
+SET debug_sync="RESET";
+COMMIT;
+COMMIT;
+COMMIT;
+COMMIT;
+COMMIT;
+COMMIT;
+COMMIT;
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       20
+3       1
+include/save_master_gtid.inc
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       20
+3       1
+status
+Ok, no retry
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       20
+3       1
+status
+Ok, no retry
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       1
+2       20
+3       1
+status
+Ok, no retry
+include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+DROP TABLE t1;
+include/rpl_end.inc

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_multilevel.cnf'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multilevel.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel.cnf	2014-10-15 14:03:51 +0000
@@ -0,0 +1,24 @@
+!include ../my.cnf
+
+[mysqld.1]
+log-slave-updates
+loose-innodb
+
+[mysqld.2]
+log-slave-updates
+loose-innodb
+
+[mysqld.3]
+log-slave-updates
+loose-innodb
+
+[mysqld.4]
+log-slave-updates
+loose-innodb
+
+[ENV]
+SERVER_MYPORT_3=                @mysqld.3.port
+SERVER_MYSOCK_3=                @mysqld.3.socket
+
+SERVER_MYPORT_4=                @mysqld.4.port
+SERVER_MYSOCK_4=                @mysqld.4.socket

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_multilevel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multilevel.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel.test	2014-10-15 14:03:51 +0000
@@ -0,0 +1,275 @@
+--source include/have_innodb.inc
+--source include/have_debug_sync.inc
+--let $rpl_topology=1->2->3->4
+--source include/rpl_init.inc
+
+# Test parallel replication with a multi-level replication hierarchy.
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+--save_master_pos
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
+
+
+--connection server_3
+--sync_with_master
+--save_master_pos
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
+
+
+--connection server_4
+--sync_with_master
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
+
+
+--echo *** MDEV-6676: Test that @@replicate_allow_parallel is preserved in slave binlog ***
+--connection server_1
+
+INSERT INTO t1 VALUES(1,1);
+BEGIN;
+INSERT INTO t1 VALUES(2,1);
+INSERT INTO t1 VALUES(3,1);
+COMMIT;
+# Do a lot of updates on same row in sequence. These would be likely to cause
+# conflicts and rollbacks in optimistic parallel replication, but we disable
+# that by disabling @@replicate_allow_parallel. We can test that the flag is
+# preserved down the replication hierarchy by checking that no slave retries
+# are made.
+SET SESSION replicate_allow_parallel=0;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+UPDATE t1 SET b=b+1 WHERE a=2;
+SET SESSION replicate_allow_parallel=1;
+SELECT * FROM t1 ORDER BY a;
+--source include/save_master_gtid.inc
+
+--connection server_2
+--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--disable_query_log
+eval SELECT IF($retry1=$retry2, "Ok, no retry",
+       CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
+              $retry1, " now ", $retry2, ")")) AS status;
+--enable_query_log
+
+--connection server_3
+--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--disable_query_log
+eval SELECT IF($retry1=$retry2, "Ok, no retry",
+       CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
+              $retry1, " now ", $retry2, ")")) AS status;
+--enable_query_log
+
+--connection server_4
+--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--disable_query_log
+eval SELECT IF($retry1=$retry2, "Ok, no retry",
+       CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
+              $retry1, " now ", $retry2, ")")) AS status;
+--enable_query_log
+
+
+--echo *** MDEV-6676: Test that the FL_WAITED flag in GTID is preserved in slave binlog ***
+
+--connection server_2
+--source include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
+
+
+--connection server_3
+--source include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
+
+
+--connection server_4
+--source include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
+
+--connection server_1
+# Do a lot of updates on same row in sequence. Ensure that all of these but the
+# first have to do a lock wait on the master, setting FL_WAITED in the GTID
+# event. This should cause all slaves to not attempt to run those updates in
+# parallel with prior events, so that no retries are made.
+
+BEGIN;
+UPDATE t1 SET b=b+1 WHERE a=2;
+
+--connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting1";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting1";
+
+--connect (con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting2";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting2";
+
+--connect (con_temp3,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting3";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting3";
+
+--connect (con_temp4,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting4";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting4";
+
+--connect (con_temp5,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting5";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting5";
+
+--connect (con_temp6,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting6";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting6";
+
+--connect (con_temp7,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting7";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting7";
+
+--connect (con_temp8,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync="thd_report_wait_for SIGNAL waiting8";
+send UPDATE t1 SET b=b+1 WHERE a=2;
+--connection server_1
+SET debug_sync="now WAIT_FOR waiting8";
+
+COMMIT;
+SET debug_sync="RESET";
+
+--connection con_temp1
+reap;
+
+COMMIT;
+--connection con_temp2
+reap;
+
+COMMIT;
+--connection con_temp3
+reap;
+
+COMMIT;
+--connection con_temp4
+reap;
+
+COMMIT;
+--connection con_temp5
+reap;
+
+COMMIT;
+--connection con_temp6
+reap;
+
+COMMIT;
+--connection con_temp7
+reap;
+
+COMMIT;
+--connection con_temp8
+reap;
+
+--connection server_1
+SELECT * FROM t1 ORDER BY a;
+--source include/save_master_gtid.inc
+
+--connection server_2
+--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--disable_query_log
+eval SELECT IF($retry1=$retry2, "Ok, no retry",
+       CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
+              $retry1, " now ", $retry2, ")")) AS status;
+--enable_query_log
+
+--connection server_3
+--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--disable_query_log
+eval SELECT IF($retry1=$retry2, "Ok, no retry",
+       CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
+              $retry1, " now ", $retry2, ")")) AS status;
+--enable_query_log
+
+--connection server_4
+--let $retry1= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+--let $retry2= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1)
+--disable_query_log
+eval SELECT IF($retry1=$retry2, "Ok, no retry",
+       CONCAT("ERROR: ", $retry2-$retry1, " retries during replication (was ",
+              $retry1, " now ", $retry2, ")")) AS status;
+--enable_query_log
+
+
+# Clean up
+
+--connection server_2
+--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+
+--connection server_3
+--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+
+--connection server_4
+--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+
+--connection server_1
+DROP TABLE t1;
+
+--source include/rpl_end.inc

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2014-10-08 07:56:52 +0000
+++ b/sql/log_event.cc	2014-10-15 14:03:51 +0000
@@ -6421,6 +6421,9 @@ Gtid_log_event::Gtid_log_event(THD *thd_
     flags2|= FL_TRANSACTIONAL;
   if (thd_arg->variables.option_bits & OPTION_RPL_ALLOW_PARALLEL)
     flags2|= FL_ALLOW_PARALLEL;
+  /* Preserve any DDL or WAITED flag in the slave's binlog. */
+  if (thd_arg->rgi_slave)
+    flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED));
 }
 
 
@@ -6540,9 +6543,11 @@ static char gtid_begin_string[] = "BEGIN
 int
 Gtid_log_event::do_apply_event(rpl_group_info *rgi)
 {
+  ulonglong bits= thd->variables.option_bits;
   thd->variables.server_id= this->server_id;
   thd->variables.gtid_domain_id= this->domain_id;
   thd->variables.gtid_seq_no= this->seq_no;
+  rgi->gtid_ev_flags2= flags2;
   mysql_reset_thd_for_next_command(thd);
 
   if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
@@ -6552,12 +6557,17 @@ Gtid_log_event::do_apply_event(rpl_group
       return 1;
   }
 
-  DBUG_ASSERT((thd->variables.option_bits & OPTION_GTID_BEGIN) == 0);
+  DBUG_ASSERT((bits & OPTION_GTID_BEGIN) == 0);
   if (flags2 & FL_STANDALONE)
     return 0;
 
   /* Execute this like a BEGIN query event. */
-  thd->variables.option_bits|= OPTION_GTID_BEGIN;
+  bits|= OPTION_GTID_BEGIN;
+  if (flags2 & FL_ALLOW_PARALLEL)
+    bits|= (ulonglong)OPTION_RPL_ALLOW_PARALLEL;
+  else
+    bits&= ~(ulonglong)OPTION_RPL_ALLOW_PARALLEL;
+  thd->variables.option_bits= bits;
   DBUG_PRINT("info", ("Set OPTION_GTID_BEGIN"));
   thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
                         &my_charset_bin, next_query_id());

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-09-19 13:25:37 +0000
+++ b/sql/rpl_rli.cc	2014-10-15 14:03:51 +0000
@@ -1600,6 +1600,7 @@ rpl_group_info::reinit(Relay_log_info *r
   row_stmt_start_timestamp= 0;
   long_find_row_note_printed= false;
   did_mark_start_commit= false;
+  gtid_ev_flags2= 0;
   gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
   speculation= SPECULATE_NO;
   commit_orderer.reinit();

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-09-19 13:25:37 +0000
+++ b/sql/rpl_rli.h	2014-10-15 14:03:51 +0000
@@ -621,6 +621,8 @@ struct rpl_group_info
     counting one event group twice.
   */
   bool did_mark_start_commit;
+  /* Copy of flags2 from GTID event. */
+  uchar gtid_ev_flags2;
   enum {
     GTID_DUPLICATE_NULL=0,
     GTID_DUPLICATE_IGNORE=1,



More information about the commits mailing list