[Commits] Rev 4490: MDEV-6775: Wrong binlog order in parallel replication: Intermediate commit in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Nov 13 14:55:58 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4490
revision-id: knielsen at knielsen-hq.org-20141113093120-gpbnnsapi9lbercf
parent: knielsen at knielsen-hq.org-20141113092048-dl4gowocj339a7f6
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: work-10.0
timestamp: Thu 2014-11-13 10:31:20 +0100
message:
  MDEV-6775: Wrong binlog order in parallel replication: Intermediate commit
  
  The code in binlog group commit around wait_for_commit that controls commit
  order, did the wakeup of subsequent commits early, as soon as a following
  transaction is put into the group commit queue, but before any such commit has
  actually taken place. This causes problems with too early wakeup of
  transactions that need to wait for prior to commit, but do not take part in
  the binlog group commit for one reason or the other.
  
  This patch solves the problem, by moving the wakeup to happen only after the
  binlog group commit is completed.
  
  This requires a new solution to ensure that transactions that arrive later
  than the leader are still able to participate in group commit. This patch
  introduces a flag wait_for_commit::commit_started. When this is set, a waiter
  can queue up itself in the group commit queue.
  
  This way, effectively the wait_for_prior_commit() is skipped only for
  transactions that participate in group commit, so that skipping the wait is
  safe. Other transactions still wait as needed for correctness.
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result	2014-08-15 09:31:13 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result	2014-11-13 09:31:20 +0000
@@ -923,6 +923,55 @@ SELECT * FROM t2 WHERE a >= 30 ORDER BY
 32
 33
 34
+*** MDEV-6775: Wrong binlog order in parallel replication ***
+DELETE FROM t4;
+INSERT INTO t4 VALUES (1,NULL), (3,NULL), (4,4), (5, NULL), (6, 6);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,inject_binlog_commit_before_get_LOCK_log";
+SET @old_format=@@GLOBAL.binlog_format;
+SET GLOBAL binlog_format=ROW;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET @old_format= @@binlog_format;
+SET binlog_format= statement;
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+UPDATE t4 SET b=NULL WHERE a=6;
+SET debug_sync='now WAIT_FOR master_queued1';
+SET @old_format= @@binlog_format;
+SET binlog_format= statement;
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+DELETE FROM t4 WHERE b <= 3;
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='now SIGNAL master_cont1';
+SET binlog_format= @old_format;
+SET binlog_format= @old_format;
+SET debug_sync='RESET';
+SELECT * FROM t4 ORDER BY a;
+a       b
+1       NULL
+3       NULL
+4       4
+5       NULL
+6       NULL
+include/start_slave.inc
+SET debug_sync= 'now WAIT_FOR waiting';
+SELECT * FROM t4 ORDER BY a;
+a       b
+1       NULL
+3       NULL
+4       4
+5       NULL
+6       NULL
+SET debug_sync= 'now SIGNAL cont';
+include/stop_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+SET GLOBAL binlog_format= @old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
 include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
 include/start_slave.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	2014-08-15 09:31:13 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2014-11-13 09:31:20 +0000
@@ -1466,6 +1466,75 @@ SET sql_slave_skip_counter= 1;
 SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
 
 
+--echo *** MDEV-6775: Wrong binlog order in parallel replication ***
+--connection server_1
+# A bit tricky bug to reproduce. On the master, we binlog in statement-mode
+# two transactions, an UPDATE followed by a DELETE. On the slave, we replicate
+# with binlog-mode set to ROW, which means the DELETE, which modifies no rows,
+# is not binlogged. Then we inject a wait in the group commit code on the
+# slave, shortly before the actual commit of the UPDATE. The bug was that the
+# DELETE could wake up from wait_for_prior_commit() before the commit of the
+# UPDATE. So the test could see the slave position updated to after DELETE,
+# while the UPDATE was still not visible.
+DELETE FROM t4;
+INSERT INTO t4 VALUES (1,NULL), (3,NULL), (4,4), (5, NULL), (6, 6);
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+--source include/stop_slave.inc
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,inject_binlog_commit_before_get_LOCK_log";
+SET @old_format=@@GLOBAL.binlog_format;
+SET GLOBAL binlog_format=ROW;
+# Re-spawn the worker threads to be sure they pick up the new binlog format
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+
+--connection con1
+SET @old_format= @@binlog_format;
+SET binlog_format= statement;
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+send UPDATE t4 SET b=NULL WHERE a=6;
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connection con2
+SET @old_format= @@binlog_format;
+SET binlog_format= statement;
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+send DELETE FROM t4 WHERE b <= 3;
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con1
+REAP;
+SET binlog_format= @old_format;
+--connection con2
+REAP;
+SET binlog_format= @old_format;
+SET debug_sync='RESET';
+--save_master_pos
+SELECT * FROM t4 ORDER BY a;
+
+--connection server_2
+--source include/start_slave.inc
+SET debug_sync= 'now WAIT_FOR waiting';
+--sync_with_master
+SELECT * FROM t4 ORDER BY a;
+SET debug_sync= 'now SIGNAL cont';
+
+# Re-spawn the worker threads to remove any DBUG injections or DEBUG_SYNC.
+--source include/stop_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+SET GLOBAL binlog_format= @old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+
 --connection server_2
 --source include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2014-11-13 08:49:07 +0000
+++ b/sql/log.cc	2014-11-13 09:31:20 +0000
@@ -6771,6 +6771,10 @@ MYSQL_BIN_LOG::write_transaction_to_binl
   to commit. If so, we add those to the queue as well, transitively for all
   waiters.
 
+  And if a transaction is marked to wait for a prior transaction, but that
+  prior transaction is already queued for group commit, then we can queue the
+  new transaction directly to participate in the group commit.
+
   @retval < 0   Error
   @retval > 0   If queued as the first entry in the queue (meaning this
                 is the leader)
@@ -6780,8 +6784,8 @@ MYSQL_BIN_LOG::write_transaction_to_binl
 int
 MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
 {
-  group_commit_entry *entry, *orig_queue;
-  wait_for_commit *cur, *last;
+  group_commit_entry *entry, *orig_queue, *last;
+  wait_for_commit *cur;
   wait_for_commit *wfc;
   DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
 
@@ -6798,8 +6802,17 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
   if (wfc && wfc->waitee)
   {
     mysql_mutex_lock(&wfc->LOCK_wait_commit);
-    /* Do an extra check here, this time safely under lock. */
-    if (wfc->waitee)
+    /*
+      Do an extra check here, this time safely under lock.
+
+      If waitee->commit_started is set, it means that the transaction we need
+      to wait for has already queued up for group commit. In this case it is
+      safe for us to queue up immediately as well, increasing the opprtunities
+      for group commit. Because waitee has taken the LOCK_prepare_ordered
+      before setting the flag, so there is no risk that we can queue ahead of
+      it.
+    */
+    if (wfc->waitee && !wfc->waitee->commit_started)
     {
       PSI_stage_info old_stage;
       wait_for_commit *loc_waitee;
@@ -6812,6 +6825,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
         This other transaction may then take over the commit process for us to
         get us included in its own group commit. If this happens, the
         queued_by_other flag is set.
+
+        Setting this flag may or may not be seen by the other thread, but we
+        are safe in any case: The other thread will set queued_by_other under
+        its LOCK_wait_commit, and we will not check queued_by_other only after
+        we have been woken up.
       */
       wfc->opaque_pointer= orig_entry;
       DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
@@ -6884,41 +6902,41 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
   /*
     Iteratively process everything added to the queue, looking for waiters,
     and their waiters, and so on. If a waiter is ready to commit, we
-    immediately add it to the queue; if not we just wake it up.
+    immediately add it to the queue, and mark it as queued_by_other.
 
     This would be natural to do with recursion, but we want to avoid
     potentially unbounded recursion blowing the C stack, so we use the list
     approach instead.
 
-    We keep a list of all the waiters that need to be processed in `list',
-    linked through the next_subsequent_commit pointer. Initially this list
-    contains only the entry passed into this function.
+    We keep a list of the group_commit_entry of all the waiters that need to
+    be processed. Initially this list contains only the entry passed into this
+    function.
 
     We process entries in the list one by one. The element currently being
-    processed is pointed to by `cur`, and the element at the end of the list
+    processed is pointed to by `entry`, and the element at the end of the list
     is pointed to by `last` (we do not use NULL to terminate the list).
 
-    As we process an element, it is first added to the group_commit_queue.
-    Then any waiters for that element are added at the end of the list, to
-    be processed in subsequent iterations. This continues until the list
-    is exhausted, with all elements ever added eventually processed.
+    As we process an entry, any waiters for that entry are added at the end of
+    the list, to be processed in subsequent iterations. The the entry is added
+    to the group_commit_queue.  This continues until the list is exhausted,
+    with all entries ever added eventually processed.
 
     The end result is a breath-first traversal of the tree of waiters,
-    re-using the next_subsequent_commit pointers in place of extra stack
-    space in a recursive traversal.
+    re-using the `next' pointers of the group_commit_entry objects in place of
+    extra stack space in a recursive traversal.
 
-    The temporary list created in next_subsequent_commit is not
-    used by the caller or any other function.
+    The temporary list linked through these `next' pointers is not used by the
+    caller or any other function; it only exists while doing the iterative
+    tree traversal. After, all the processed entries are linked into the
+    group_commit_queue.
   */
 
   cur= wfc;
-  last= wfc;
+  last= orig_entry;
   entry= orig_entry;
   for (;;)
   {
-    /* Add the entry to the group commit queue. */
-    entry->next= group_commit_queue;
-    group_commit_queue= entry;
+    group_commit_entry *next_entry;
 
     if (entry->cache_mngr->using_xa)
     {
@@ -6927,135 +6945,95 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
       DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
     }
 
-    if (!cur)
-      break;             // Can happen if initial entry has no wait_for_commit
-
-    /*
-      Check if this transaction has other transaction waiting for it to commit.
-
-      If so, process the waiting transactions, and their waiters and so on,
-      transitively.
-    */
-    if (cur->subsequent_commits_list)
+    if (cur)
     {
-      wait_for_commit *waiter;
-      wait_for_commit *wakeup_list= NULL;
-      wait_for_commit **wakeup_next_ptr= &wakeup_list;
+      /*
+        Now that we have taken LOCK_prepare_ordered and will queue up in the
+        group commit queue, it is safe for following transactions to queue
+        themselves. We will grab here any transaction that is now ready to
+        queue up, but after that, more transactions may become ready while the
+        leader is waiting to start the group commit. So set the flag
+        `commit_started', so that later transactions can still participate in
+        the group commit..
+      */
+      cur->commit_started= true;
 
-      mysql_mutex_lock(&cur->LOCK_wait_commit);
       /*
-        Grab the list, now safely under lock, and process it if still
-        non-empty.
+        Check if this transaction has other transaction waiting for it to
+        commit.
+
+        If so, process the waiting transactions, and their waiters and so on,
+        transitively.
       */
-      waiter= cur->subsequent_commits_list;
-      cur->subsequent_commits_list= NULL;
-      while (waiter)
+      if (cur->subsequent_commits_list)
       {
-        wait_for_commit *next= waiter->next_subsequent_commit;
-        group_commit_entry *entry2=
-          (group_commit_entry *)waiter->opaque_pointer;
-        if (entry2)
-        {
-          /*
-            This is another transaction ready to be written to the binary
-            log. We can put it into the queue directly, without needing a
-            separate context switch to the other thread. We just set a flag
-            so that the other thread will know when it wakes up that it was
-            already processed.
-
-            So put it at the end of the list to be processed in a subsequent
-            iteration of the outer loop.
-          */
-          entry2->queued_by_other= true;
-          last->next_subsequent_commit= waiter;
-          last= waiter;
-          /*
-            As a small optimisation, we do not actually need to set
-            waiter->next_subsequent_commit to NULL, as we can use the
-            pointer `last' to check for end-of-list.
-          */
-        }
-        else
-        {
-          /*
-            Wake up the waiting transaction.
+        wait_for_commit *waiter, **waiter_ptr;
 
-            For this, we need to set the "wakeup running" flag and release
-            the waitee lock to avoid a deadlock, see comments on
-            THD::wakeup_subsequent_commits2() for details.
-
-            So we need to put these on a list and delay the wakeup until we
-            have released the lock.
-          */
-          *wakeup_next_ptr= waiter;
-          wakeup_next_ptr= &waiter->next_subsequent_commit;
-        }
-        waiter= next;
-      }
-      if (wakeup_list)
-      {
-        /* Now release our lock and do the wakeups that were delayed above. */
-        cur->wakeup_subsequent_commits_running= true;
-        mysql_mutex_unlock(&cur->LOCK_wait_commit);
-        for (;;)
+        mysql_mutex_lock(&cur->LOCK_wait_commit);
+        /*
+          Grab the list, now safely under lock, and process it if still
+          non-empty.
+        */
+        waiter= cur->subsequent_commits_list;
+        waiter_ptr= &cur->subsequent_commits_list;
+        while (waiter)
         {
-          wait_for_commit *next;
-
-          /*
-            ToDo: We wakeup the waiter here, so that it can have the chance to
-            reach its own commit state and queue up for this same group commit,
-            if it is still pending.
-
-            One problem with this is that if the waiter does not reach its own
-            commit state before this group commit starts, and then the group
-            commit fails (binlog write failure), we do not get to propagate
-            the error to the waiter.
-
-            A solution for this could be to delay the wakeup until commit is
-            successful. But then we need to set a flag in the waitee that it is
-            already queued for group commit, so that the waiter can check this
-            flag and queue itself if it _does_ reach the commit state in time.
-
-            (But error handling in case of binlog write failure is currently
-            broken in other ways, as well).
-          */
-          if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr)
+          wait_for_commit *next_waiter= waiter->next_subsequent_commit;
+          group_commit_entry *entry2=
+            (group_commit_entry *)waiter->opaque_pointer;
+          if (entry2)
           {
-            /* The last one in the list. */
-            wakeup_list->wakeup(0);
-            break;
+            /*
+              This is another transaction ready to be written to the binary
+              log. We can put it into the queue directly, without needing a
+              separate context switch to the other thread. We just set a flag
+              so that the other thread will know when it wakes up that it was
+              already processed.
+
+              So remove it from the list of our waiters, and instead put it at
+              the end of the list to be processed in a subsequent iteration of
+              the outer loop.
+            */
+            *waiter_ptr= next_waiter;
+            entry2->queued_by_other= true;
+            last->next= entry2;
+            last= entry2;
+            /*
+              As a small optimisation, we do not actually need to set
+              entry2->next to NULL, as we can use the pointer `last' to check
+              for end-of-list.
+            */
           }
-          /*
-            Important: don't access wakeup_list->next after the wakeup() call,
-            it may be invalidated by the other thread.
-          */
-          next= wakeup_list->next_subsequent_commit;
-          wakeup_list->wakeup(0);
-          wakeup_list= next;
+          else
+          {
+            /*
+              This transaction is not ready to participate in the group commit
+              yet, so leave it in the waiter list. It might join the group
+              commit later, if it completes soon enough to do so (it will see
+              our wfc->commit_started flag set), or it might commit later in a
+              later group commit.
+            */
+            waiter_ptr= &waiter->next_subsequent_commit;
+          }
+          waiter= next_waiter;
         }
-        /*
-          We need a full memory barrier between walking the list and clearing
-          the flag wakeup_subsequent_commits_running. This barrier is needed
-          to ensure that no other thread will start to modify the list
-          pointers before we are done traversing the list.
-
-          But wait_for_commit::wakeup(), which was called above, does a full
-          memory barrier already (it locks a mutex).
-        */
-        cur->wakeup_subsequent_commits_running= false;
-      }
-      else
         mysql_mutex_unlock(&cur->LOCK_wait_commit);
+      }
     }
-    if (cur == last)
+
+    /* Add the entry to the group commit queue. */
+    next_entry= entry->next;
+    entry->next= group_commit_queue;
+    group_commit_queue= entry;
+    if (entry == last)
       break;
     /*
       Move to the next entry in the flattened list of waiting transactions
       that still need to be processed transitively.
     */
-    cur= cur->next_subsequent_commit;
-    entry= (group_commit_entry *)cur->opaque_pointer;
+    entry= next_entry;
     DBUG_ASSERT(entry != NULL);
+    cur= entry->thd->wait_for_commit_ptr;
   }
 
   if (opt_binlog_commit_wait_count > 0)
@@ -7111,6 +7089,7 @@ MYSQL_BIN_LOG::write_transaction_to_binl
       DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
     }
     mysql_mutex_unlock(&LOCK_commit_ordered);
+    entry->thd->wakeup_subsequent_commits(entry->error);
 
     if (next)
     {
@@ -7144,7 +7123,7 @@ MYSQL_BIN_LOG::write_transaction_to_binl
   }
 
   if (likely(!entry->error))
-    return 0;
+    return entry->thd->wait_for_prior_commit();
 
   switch (entry->error)
   {
@@ -7202,10 +7181,15 @@ MYSQL_BIN_LOG::trx_group_commit_leader(g
   LINT_INIT(binlog_id);
 
   {
+    DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log",
+      DBUG_ASSERT(!debug_sync_set_action(leader->thd, STRING_WITH_LEN
+        ("commit_before_get_LOCK_log SIGNAL waiting WAIT_FOR cont TIMEOUT 1")));
+    );
     /*
       Lock the LOCK_log(), and once we get it, collect any additional writes
       that queued up while we were waiting.
     */
+    DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_log");
     mysql_mutex_lock(&LOCK_log);
     DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
 
@@ -7412,6 +7396,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(g
     if (current->cache_mngr->using_xa && !current->error &&
         DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1))
       run_commit_ordered(current->thd, current->all);
+    current->thd->wakeup_subsequent_commits(current->error);
 
     /*
       Careful not to access current->next after waking up the other thread! As

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2014-11-13 08:19:12 +0000
+++ b/sql/sql_class.cc	2014-11-13 09:31:20 +0000
@@ -6342,6 +6342,7 @@ wait_for_commit::reinit()
   opaque_pointer= NULL;
   wakeup_error= 0;
   wakeup_subsequent_commits_running= false;
+  commit_started= false;
 #ifdef SAFE_MUTEX
   /*
     When using SAFE_MUTEX, the ordering between taking the LOCK_wait_commit

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2014-09-10 11:22:44 +0000
+++ b/sql/sql_class.h	2014-11-13 09:31:20 +0000
@@ -1713,6 +1713,16 @@ struct wait_for_commit
     on that function for details.
   */
   bool wakeup_subsequent_commits_running;
+  /*
+    This flag can be set when a commit starts, but has not completed yet.
+    It is used by binlog group commit to allow a waiting transaction T2 to
+    join the group commit of an earlier transaction T1. When T1 has queued
+    itself for group commit, it will set the commit_started flag. Then when
+    T2 becomes ready to commit and needs to wait for T1 to commit first, T2
+    can queue itself before waiting, and thereby participate in the same
+    group commit as T1.
+  */
+  bool commit_started;
 
   void register_wait_for_prior_commit(wait_for_commit *waitee);
   int wait_for_prior_commit(THD *thd)



More information about the commits mailing list