[Commits] Rev 3958: MDEV-5363: Make parallel replication waits killable in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Wed Dec 18 17:26:23 EET 2013


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 3958
revision-id: knielsen at knielsen-hq.org-20131218152622-toggfwk1mehk9310
parent: knielsen at knielsen-hq.org-20131217122451-do7u088xsdy6e5fw
committer: knielsen at knielsen-hq.org
branch nick: tmp-10.0-base
timestamp: Wed 2013-12-18 16:26:22 +0100
message:
  MDEV-5363: Make parallel replication waits killable
    
  Add another test case. This one for killing a worker while its transaction is
  waiting to start until the previous transaction has committed.
  
  Fix setting reading_or_writing to 0 in worker threads so SHOW SLAVE STATUS can
  show something more useful than "Reading from net".
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result	2013-12-17 12:24:51 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result	2013-12-18 15:26:22 +0000
@@ -295,6 +295,7 @@ a	b
 SET sql_log_bin=0;
 CALL mtr.add_suppression("Query execution was interrupted");
 CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+CALL mtr.add_suppression("Slave: Connection was killed");
 SET sql_log_bin=1;
 SET debug_sync='now WAIT_FOR t2_query';
 SET debug_sync='now SIGNAL t2_cont';
@@ -513,6 +514,120 @@ include/start_slave.inc
 include/stop_slave.inc
 SET GLOBAL binlog_format=@old_format;
 SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=3;
+include/start_slave.inc
+*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
+SET binlog_format=statement;
+SET gtid_domain_id=2;
+INSERT INTO t3 VALUES (60, foo(60,
+'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
+'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+SET gtid_domain_id=0;
+SET debug_sync='now WAIT_FOR d2_query';
+SET gtid_domain_id=1;
+BEGIN;
+INSERT INTO t3 VALUES (61, foo(61,
+'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting',
+'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed'));
+INSERT INTO t3 VALUES (62, foo(62,
+'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2',
+'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+SET debug_sync='now WAIT_FOR d1_query';
+SET gtid_domain_id=0;
+INSERT INTO t3 VALUES (63, foo(63,
+'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
+'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
+SET debug_sync='now WAIT_FOR d0_query';
+SET debug_sync='now SIGNAL d2_cont2';
+SET debug_sync='now WAIT_FOR d2_done';
+SET debug_sync='now SIGNAL d1_cont2';
+SET debug_sync='now WAIT_FOR d1_done';
+SET debug_sync='now SIGNAL d0_cont2';
+SET debug_sync='now WAIT_FOR d0_done';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (64, foo(64,
+'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
+INSERT INTO t3 VALUES (65, foo(65, '', ''));
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+INSERT INTO t3 VALUES (66, foo(66, '', ''));
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4';
+INSERT INTO t3 VALUES (67, foo(67, '', ''));
+SET debug_sync='now WAIT_FOR master_queued4';
+SET debug_sync='now SIGNAL master_cont2';
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+a       b
+60      60
+61      61
+62      62
+63      63
+64      64
+65      65
+66      66
+67      67
+SET debug_sync='now SIGNAL d0_cont';
+SET debug_sync='now WAIT_FOR t1_waiting';
+SET debug_sync='now SIGNAL d1_cont';
+SET debug_sync='now WAIT_FOR t3_waiting';
+SET debug_sync='now SIGNAL d2_cont';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR t3_killed';
+SET debug_sync='now SIGNAL t1_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+a       b
+60      60
+61      61
+62      62
+63      63
+64      64
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+INSERT INTO t3 VALUES (69,0);
+include/start_slave.inc
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+a       b
+60      60
+61      61
+62      62
+63      63
+64      64
+65      65
+66      66
+67      67
+69      0
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
 SET GLOBAL slave_parallel_threads=10;
 include/start_slave.inc
 include/stop_slave.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	2013-12-17 12:24:51 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2013-12-18 15:26:22 +0000
@@ -402,6 +402,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY
 SET sql_log_bin=0;
 CALL mtr.add_suppression("Query execution was interrupted");
 CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+CALL mtr.add_suppression("Slave: Connection was killed");
 SET sql_log_bin=1;
 # Wait until T2 is inside executing its insert of 32, then find it in SHOW
 # PROCESSLIST to know its thread id for KILL later.
@@ -745,6 +746,201 @@ SET sql_log_bin=1;
 --source include/stop_slave.inc
 CHANGE MASTER TO master_use_gtid=slave_pos;
 --source include/start_slave.inc
+
+--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=3;
+--source include/start_slave.inc
+
+
+--echo *** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
+
+# We set up four transactions T1, T2, T3, and T4 on the master. T2, T3, and T4
+# can run in parallel with each other (same group commit and commit id),
+# but not in parallel with T1.
+#
+# We use three worker threads. T1 and T2 will be queued on the first, T3 on
+# the second, and T4 on the third. We will delay T1 commit, T3 will wait for
+# T1 to commit before it can start. We will kill T3 during this wait, and
+# check that everything works correctly.
+#
+# It is rather tricky to get the correct thread id of the worker to kill.
+# We start by injecting three dummy transactions in a debug_sync-controlled
+# manner to be able to get known thread ids for the workers in a pool with
+# just 3 worker threads. Then we let in each of the real test transactions
+# T1-T4 one at a time in a way which allows us to know which transaction
+# ends up with which thread id.
+
+--connection server_1
+SET binlog_format=statement;
+SET gtid_domain_id=2;
+INSERT INTO t3 VALUES (60, foo(60,
+       'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
+       'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+SET gtid_domain_id=0;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d2_query';
+--let $d2_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(60%' AND INFO NOT LIKE '%LIKE%'`
+
+--connection server_1
+SET gtid_domain_id=1;
+BEGIN;
+# These debug_sync's will linger on and be used to control T3 later.
+INSERT INTO t3 VALUES (61, foo(61,
+       'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting',
+       'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed'));
+INSERT INTO t3 VALUES (62, foo(62,
+       'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2',
+       'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d1_query';
+--let $d1_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(62%' AND INFO NOT LIKE '%LIKE%'`
+
+--connection server_1
+SET gtid_domain_id=0;
+INSERT INTO t3 VALUES (63, foo(63,
+       'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
+       'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d0_query';
+--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'`
+
+SET debug_sync='now SIGNAL d2_cont2';
+SET debug_sync='now WAIT_FOR d2_done';
+SET debug_sync='now SIGNAL d1_cont2';
+SET debug_sync='now WAIT_FOR d1_done';
+SET debug_sync='now SIGNAL d0_cont2';
+SET debug_sync='now WAIT_FOR d0_done';
+
+# Now prepare the real transactions T1, T2, T3, T4 on the master.
+
+--connection con_temp3
+# Create transaction T1.
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (64, foo(64,
+       'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+
+# Create transaction T2, as a group commit leader on the master.
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
+send INSERT INTO t3 VALUES (65, foo(65, '', ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+
+--connection con_temp4
+# Create transaction T3, participating in T2's group commit.
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+send INSERT INTO t3 VALUES (66, foo(66, '', ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued3';
+
+--connection con_temp5
+# Create transaction T4, participating in group commit with T2 and T3.
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4';
+send INSERT INTO t3 VALUES (67, foo(67, '', ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued4';
+SET debug_sync='now SIGNAL master_cont2';
+
+--connection con_temp3
+REAP;
+--connection con_temp4
+REAP;
+--connection con_temp5
+REAP;
+
+--connection server_1
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+
+--connection server_2
+# Now we have the four transactions pending for replication on the slave.
+# Let them be queued for our three worker threads in a controlled fashion.
+# We put them at a stage where T1 is delayed and T3 is waiting for T1 to
+# commit before T3 can start. Then we kill T3.
+
+# Make the worker D0 free, and wait for T1 to be queued in it.
+SET debug_sync='now SIGNAL d0_cont';
+SET debug_sync='now WAIT_FOR t1_waiting';
+
+# T2 will be queued on the same worker D0 as T1.
+# Now release worker D1, and wait for T3 to be queued in it.
+# T3 will wait for T1 to commit before it can start.
+SET debug_sync='now SIGNAL d1_cont';
+SET debug_sync='now WAIT_FOR t3_waiting';
+
+# Release worker D2. T4 may or may not have time to be queued on it, but
+# it will not be able to complete due to T3 being killed.
+SET debug_sync='now SIGNAL d2_cont';
+
+# Now we kill the waiting transaction T3 in worker D1.
+--replace_result $d1_thd_id THD_ID
+eval KILL $d1_thd_id;
+
+# Wait until T3 has reacted on the kill.
+SET debug_sync='now WAIT_FOR t3_killed';
+
+# Now we can allow T1 to proceed.
+SET debug_sync='now SIGNAL t1_cont';
+
+--let $slave_sql_errno= 1317,1927,1963
+--source include/wait_for_slave_sql_error.inc
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+
+# Now we have to disable the debug_sync statements, so they do not trigger
+# when the events are retried.
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+  RETURNS INT DETERMINISTIC
+  BEGIN
+    RETURN x;
+  END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_1
+INSERT INTO t3 VALUES (69,0);
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+# Restore the foo() function.
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+  RETURNS INT DETERMINISTIC
+  BEGIN
+    IF d1 != '' THEN
+      SET debug_sync = d1;
+    END IF;
+    IF d2 != '' THEN
+      SET debug_sync = d2;
+    END IF;
+    RETURN x;
+  END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+
 --connection server_2
 --source include/stop_slave.inc
 SET GLOBAL binlog_format=@old_format;

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2013-12-13 13:26:51 +0000
+++ b/sql/rpl_parallel.cc	2013-12-18 15:26:22 +0000
@@ -2,6 +2,7 @@
 #include "rpl_parallel.h"
 #include "slave.h"
 #include "rpl_mi.h"
+#include "debug_sync.h"
 
 
 /*
@@ -219,6 +220,7 @@ handle_rpl_parallel_thread(void *arg)
   thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
   set_slave_thread_options(thd);
   thd->client_capabilities = CLIENT_LOCAL_FILES;
+  thd->net.reading_or_writing= 0;
   thd_proc_info(thd, "Waiting for work from main SQL threads");
   thd->set_time();
   thd->variables.lock_wait_timeout= LONG_TIMEOUT;
@@ -308,6 +310,7 @@ handle_rpl_parallel_thread(void *arg)
                                      "Waiting for prior transaction to commit "
                                      "before starting next transaction");
             did_enter_cond= true;
+            DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
             while (wait_start_sub_id > entry->last_committed_sub_id &&
                    !thd->check_killed())
               mysql_cond_wait(&entry->COND_parallel_entry,
@@ -315,6 +318,7 @@ handle_rpl_parallel_thread(void *arg)
             if (wait_start_sub_id > entry->last_committed_sub_id)
             {
               /* The thread got a kill signal. */
+              DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
               thd->send_kill_message();
               slave_output_error_info(rgi->rli, thd);
               signal_error_to_sql_driver_thread(thd, rgi);
@@ -383,6 +387,7 @@ handle_rpl_parallel_thread(void *arg)
                            &rgi->commit_orderer);
         delete rgi;
         group_rgi= rgi= NULL;
+        DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
       }
 
       events= next;
@@ -843,6 +848,10 @@ rpl_parallel::do_event(rpl_group_info *s
         However, the commit of this event must wait for the commit of the prior
         event, to preserve binlog commit order and visibility across all
         servers in the replication hierarchy.
+
+        In addition, we must not start executing this event until we have
+        finished the previous collection of event groups that group-committed
+        together; we use rgi->wait_start_sub_id to control this.
       */
       rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
       rgi->wait_commit_sub_id= e->current_sub_id;



More information about the commits mailing list