[Commits] Rev 4074: MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Fri Mar 21 11:11:28 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4074
revision-id: knielsen at knielsen-hq.org-20140321091128-8oy3w0qblxjvsq5d
parent: jplindst at mariadb.org-20140321063904-vpvrd0l8488ln0mm
committer: knielsen at knielsen-hq.org
branch nick: tmp-10.0
timestamp: Fri 2014-03-21 10:11:28 +0100
message:
  MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction
  
  When a transaction fails in parallel replication, it should signal the error
  to any following transactions doing wait_for_prior_commit() on it. But the
  code for this was incorrect, and would not correctly remember a prior error
  when sending the signal. This caused corruption when slave stopped due to an
  error.
  
  Fix by remembering the error code when we first get an error, and passing the
  saved error code to wakeup_subsequent_commits().
  
  Thanks to nanyi607rao who reported this bug on
  maria-developers at lists.launchpad.net and analysed the root cause.
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result	2014-03-07 11:02:09 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result	2014-03-21 09:11:28 +0000
@@ -694,6 +694,7 @@ STOP SLAVE IO_THREAD;
 SET GLOBAL debug_dbug=@old_dbug;
 SET GLOBAL slave_parallel_max_queued= @old_max_queued;
 INSERT INTO t3 VALUES (82,0);
+SET binlog_format=@old_format;
 SET debug_sync='RESET';
 include/start_slave.inc
 SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
@@ -726,6 +727,40 @@ SELECT * FROM t3 WHERE a >= 100 ORDER BY
 a       b
 106     #
 107     #
+*** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction ***
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
+INSERT INTO t3 VALUES (110, 1);
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+a       b
+110     1
+SET sql_log_bin=0;
+INSERT INTO t3 VALUES (111, 666);
+SET sql_log_bin=1;
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+INSERT INTO t3 VALUES (111, 2);
+SET debug_sync='now WAIT_FOR master_queued1';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+INSERT INTO t3 VALUES (112, 3);
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='now SIGNAL master_cont1';
+SET debug_sync='RESET';
+include/wait_for_slave_sql_error.inc [errno=1062]
+include/wait_for_slave_sql_to_stop.inc
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+a       b
+110     1
+111     666
+SET sql_log_bin=0;
+DELETE FROM t3 WHERE a=111 AND b=666;
+SET sql_log_bin=1;
+START SLAVE SQL_THREAD;
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+a       b
+110     1
+111     2
+112     3
 include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
 include/start_slave.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	2014-03-10 20:02:22 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2014-03-21 09:11:28 +0000
@@ -1053,6 +1053,7 @@ SET GLOBAL slave_parallel_max_queued= @o
 
 --connection server_1
 INSERT INTO t3 VALUES (82,0);
+SET binlog_format=@old_format;
 --save_master_pos
 
 --connection server_2
@@ -1113,6 +1114,64 @@ INSERT INTO t3 VALUES (107, rand());
 SELECT * FROM t3 WHERE a >= 100 ORDER BY a;
 
 
+--echo *** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction ***
+
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+--connection server_1
+INSERT INTO t3 VALUES (110, 1);
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+# Inject a duplicate key error.
+SET sql_log_bin=0;
+INSERT INTO t3 VALUES (111, 666);
+SET sql_log_bin=1;
+
+--connection server_1
+
+# Create a group commit with two inserts, the first one conflicts with a row on the slave
+--connect (con1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+send INSERT INTO t3 VALUES (111, 2);
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+send INSERT INTO t3 VALUES (112, 3);
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con1
+REAP;
+--connection con2
+REAP;
+SET debug_sync='RESET';
+--save_master_pos
+
+--connection server_2
+--let $slave_sql_errno= 1062
+--source include/wait_for_slave_sql_error.inc
+--source include/wait_for_slave_sql_to_stop.inc
+# We should not see the row (112,3) here, it should be rolled back due to
+# error signal from the prior transaction.
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+SET sql_log_bin=0;
+DELETE FROM t3 WHERE a=111 AND b=666;
+SET sql_log_bin=1;
+START SLAVE SQL_THREAD;
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+
+
 --connection server_2
 --source include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-03-11 23:14:49 +0000
+++ b/sql/rpl_parallel.cc	2014-03-21 09:11:28 +0000
@@ -20,6 +20,8 @@
 
 struct rpl_parallel_thread_pool global_rpl_thread_pool;
 
+static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
+                                              int err);
 
 static int
 rpt_handle_event(rpl_parallel_thread::queued_event *qev,
@@ -94,10 +96,11 @@ handle_queued_pos_update(THD *thd, rpl_p
 
 
 static void
-finish_event_group(THD *thd, int err, uint64 sub_id,
-                   rpl_parallel_entry *entry, rpl_group_info *rgi)
+finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
+                   rpl_group_info *rgi)
 {
   wait_for_commit *wfc= &rgi->commit_orderer;
+  int err;
 
   /*
     Remove any left-over registration to wait for a prior commit to
@@ -120,10 +123,10 @@ finish_event_group(THD *thd, int err, ui
     waiting for us will in any case receive the error back from their
     wait_for_prior_commit() call.
   */
-  if (err)
+  if (rgi->worker_error)
     wfc->unregister_wait_for_prior_commit();
-  else
-    err= wfc->wait_for_prior_commit(thd);
+  else if ((err= wfc->wait_for_prior_commit(thd)))
+    signal_error_to_sql_driver_thread(thd, rgi, err);
   thd->wait_for_commit_ptr= NULL;
 
   /*
@@ -150,7 +153,7 @@ finish_event_group(THD *thd, int err, ui
     not yet started should just skip their group, preparing for stop of the
     SQL driver thread.
   */
-  if (unlikely(rgi->is_error) &&
+  if (unlikely(rgi->worker_error) &&
       entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
     entry->stop_on_error_sub_id= sub_id;
   /*
@@ -163,14 +166,14 @@ finish_event_group(THD *thd, int err, ui
 
   thd->clear_error();
   thd->get_stmt_da()->reset_diagnostics_area();
-  wfc->wakeup_subsequent_commits(err);
+  wfc->wakeup_subsequent_commits(rgi->worker_error);
 }
 
 
 static void
-signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
+signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
 {
-  rgi->is_error= true;
+  rgi->worker_error= err;
   rgi->cleanup_context(thd, true);
   rgi->rli->abort_slave= true;
   rgi->rli->stop_for_until= false;
@@ -294,7 +297,6 @@ handle_rpl_parallel_thread(void *arg)
         continue;
       }
 
-      err= 0;
       group_rgi= rgi;
       gco= rgi->gco;
       /* Handle a new event group, which will be initiated by a GTID event. */
@@ -346,12 +348,12 @@ handle_rpl_parallel_thread(void *arg)
           did_enter_cond= true;
           do
           {
-            if (thd->check_killed() && !rgi->is_error)
+            if (thd->check_killed() && !rgi->worker_error)
             {
               DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
               thd->send_kill_message();
               slave_output_error_info(rgi->rli, thd);
-              signal_error_to_sql_driver_thread(thd, rgi);
+              signal_error_to_sql_driver_thread(thd, rgi, 1);
               /*
                 Even though we were killed, we need to continue waiting for the
                 prior event groups to signal that we can continue. Otherwise we
@@ -417,7 +419,7 @@ handle_rpl_parallel_thread(void *arg)
           */
           rgi->cleanup_context(thd, true);
           thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
-          thd->wait_for_commit_ptr->wakeup_subsequent_commits(err);
+          thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error);
         }
         thd->wait_for_commit_ptr= &rgi->commit_orderer;
 
@@ -430,7 +432,7 @@ handle_rpl_parallel_thread(void *arg)
           {
             /* Error. */
             slave_output_error_info(rgi->rli, thd);
-            signal_error_to_sql_driver_thread(thd, rgi);
+            signal_error_to_sql_driver_thread(thd, rgi, 1);
           }
           else if (!res)
           {
@@ -460,7 +462,7 @@ handle_rpl_parallel_thread(void *arg)
         processing between the event groups as a simple way to ensure that
         everything is stopped and cleaned up correctly.
       */
-      if (!rgi->is_error && !skip_event_group)
+      if (!rgi->worker_error && !skip_event_group)
         err= rpt_handle_event(events, rpt);
       else
         err= thd->wait_for_prior_commit();
@@ -474,15 +476,15 @@ handle_rpl_parallel_thread(void *arg)
       events->next= qevs_to_free;
       qevs_to_free= events;
 
-      if (err)
+      if (unlikely(err) && !rgi->worker_error)
       {
         slave_output_error_info(rgi->rli, thd);
-        signal_error_to_sql_driver_thread(thd, rgi);
+        signal_error_to_sql_driver_thread(thd, rgi, err);
       }
       if (end_of_group)
       {
         in_event_group= false;
-        finish_event_group(thd, err, event_gtid_sub_id, entry, rgi);
+        finish_event_group(thd, event_gtid_sub_id, entry, rgi);
         rgi->next= rgis_to_free;
         rgis_to_free= rgi;
         group_rgi= rgi= NULL;
@@ -541,9 +543,9 @@ handle_rpl_parallel_thread(void *arg)
       */
       mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
       thd->wait_for_prior_commit();
-      finish_event_group(thd, 1, group_rgi->gtid_sub_id,
+      signal_error_to_sql_driver_thread(thd, group_rgi, 1);
+      finish_event_group(thd, group_rgi->gtid_sub_id,
                          group_rgi->parallel_entry, group_rgi);
-      signal_error_to_sql_driver_thread(thd, group_rgi);
       in_event_group= false;
       mysql_mutex_lock(&rpt->LOCK_rpl_thread);
       rpt->free_rgi(group_rgi);

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-03-11 23:14:49 +0000
+++ b/sql/rpl_rli.cc	2014-03-21 09:11:28 +0000
@@ -1489,7 +1489,7 @@ rpl_group_info::reinit(Relay_log_info *r
   tables_to_lock_count= 0;
   trans_retries= 0;
   last_event_start_time= 0;
-  is_error= false;
+  worker_error= 0;
   row_stmt_start_timestamp= 0;
   long_find_row_note_printed= false;
   did_mark_start_commit= false;

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-03-11 23:14:49 +0000
+++ b/sql/rpl_rli.h	2014-03-21 09:11:28 +0000
@@ -569,7 +569,7 @@ struct rpl_group_info
   */
   char future_event_master_log_name[FN_REFLEN];
   bool is_parallel_exec;
-  bool is_error;
+  int worker_error;
   /*
     Set true when we signalled that we reach the commit phase. Used to avoid
     counting one event group twice.



More information about the commits mailing list