[Commits] Rev 3952: MDEV-5363: Make parallel replication waits killable in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Dec 5 15:36:09 EET 2013


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 3952
revision-id: knielsen at knielsen-hq.org-20131205133609-g46y9cu0uk0dd161
parent: knielsen at knielsen-hq.org-20131129144609-00rjbrdr24zmcecm
committer: knielsen at knielsen-hq.org
branch nick: tmp-10.0-base
timestamp: Thu 2013-12-05 14:36:09 +0100
message:
  MDEV-5363: Make parallel replication waits killable
  
  Make wait_for_prior_commit killable, and handle the error if
  killed.
=== modified file 'sql/log.cc'
--- a/sql/log.cc	2013-11-22 23:50:54 +0000
+++ b/sql/log.cc	2013-12-05 13:36:09 +0000
@@ -6612,12 +6612,13 @@ MYSQL_BIN_LOG::write_transaction_to_binl
   to commit. If so, we add those to the queue as well, transitively for all
   waiters.
 
-  @retval  TRUE   If queued as the first entry in the queue (meaning this
-                  is the leader)
-  @retval FALSE   Otherwise                  
+  @retval < 0   Error
+  @retval > 0   If queued as the first entry in the queue (meaning this
+                is the leader)
+  @retval   0   Otherwise (queued as participant, leader handles the commit)
 */
 
-bool
+int
 MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
 {
   group_commit_entry *entry, *orig_queue;
@@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
     /* Do an extra check here, this time safely under lock. */
     if (wfc->waiting_for_commit)
     {
+      const char *old_msg;
       /*
         By setting wfc->opaque_pointer to our own entry, we mark that we are
         ready to commit, but waiting for another transaction to commit before
@@ -6651,16 +6653,36 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
         queued_by_other flag is set.
       */
       wfc->opaque_pointer= orig_entry;
+      old_msg=
+        orig_entry->thd->enter_cond(&wfc->COND_wait_commit,
+                                    &wfc->LOCK_wait_commit,
+                                    "Waiting for prior transaction to commit");
       DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
-      do
-      {
+      while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
         mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
-      } while (wfc->waiting_for_commit);
       wfc->opaque_pointer= NULL;
       DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
                  orig_entry->queued_by_other));
+      orig_entry->thd->exit_cond(old_msg);
+
+      if (wfc->waiting_for_commit)
+      {
+        /* Interrupted by kill. */
+        wfc->wakeup_error= orig_entry->thd->killed_errno();
+        if (wfc->wakeup_error)
+          wfc->wakeup_error= ER_QUERY_INTERRUPTED;
+        my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
+        DBUG_RETURN(-1);
+      }
+    }
+    else
+      mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+
+    if (wfc->wakeup_error)
+    {
+      my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+      DBUG_RETURN(-1);
     }
-    mysql_mutex_unlock(&wfc->LOCK_wait_commit);
   }
 
   /*
@@ -6669,7 +6691,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
     then there is nothing else to do.
   */
   if (orig_entry->queued_by_other)
-    DBUG_RETURN(false);
+    DBUG_RETURN(0);
 
   /* Now enqueue ourselves in the group commit queue. */
   DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
@@ -6841,13 +6863,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(gr
 bool
 MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
 {
-  bool is_leader= queue_for_group_commit(entry);
+  int is_leader= queue_for_group_commit(entry);
 
   /*
     The first in the queue handles group commit for all; the others just wait
     to be signalled when group commit is done.
   */
-  if (is_leader)
+  if (is_leader < 0)
+    return true;                                /* Error */
+  else if (is_leader)
     trx_group_commit_leader(entry);
   else if (!entry->queued_by_other)
     entry->thd->wait_for_wakeup_ready();

=== modified file 'sql/log.h'
--- a/sql/log.h	2013-11-21 13:42:25 +0000
+++ b/sql/log.h	2013-12-05 13:36:09 +0000
@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
   void do_checkpoint_request(ulong binlog_id);
   void purge();
   int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
-  bool queue_for_group_commit(group_commit_entry *entry);
+  int queue_for_group_commit(group_commit_entry *entry);
   bool write_transaction_to_binlog_events(group_commit_entry *entry);
   void trx_group_commit_leader(group_commit_entry *leader);
   bool is_xidlist_idle_nolock();

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2013-11-08 10:41:13 +0000
+++ b/sql/rpl_parallel.cc	2013-12-05 13:36:09 +0000
@@ -142,7 +142,7 @@ finish_event_group(THD *thd, int err, ui
   if (err)
     wfc->unregister_wait_for_prior_commit();
   else
-    wfc->wait_for_prior_commit();
+    err= wfc->wait_for_prior_commit(thd);
   thd->wait_for_commit_ptr= NULL;
 
   /*

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2013-11-22 23:50:54 +0000
+++ b/sql/sql_class.cc	2013-12-05 13:36:09 +0000
@@ -5783,13 +5783,28 @@ wait_for_commit::register_wait_for_prior
   returns immediately.
 */
 int
-wait_for_commit::wait_for_prior_commit2()
+wait_for_commit::wait_for_prior_commit2(THD *thd)
 {
+  const char *old_msg;
+
   mysql_mutex_lock(&LOCK_wait_commit);
-  while (waiting_for_commit)
+  old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
+                           "Waiting for prior transaction to commit");
+  while (waiting_for_commit && !thd->check_killed())
     mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
-  mysql_mutex_unlock(&LOCK_wait_commit);
+  thd->exit_cond(old_msg);
   waitee= NULL;
+  if (!waiting_for_commit)
+  {
+    if (wakeup_error)
+      my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+    return wakeup_error;
+  }
+  /* Wait was interrupted by kill, so give the error. */
+  wakeup_error= thd->killed_errno();
+  if (!wakeup_error)
+    wakeup_error= ER_QUERY_INTERRUPTED;
+  my_message(wakeup_error, ER(wakeup_error), MYF(0));
   return wakeup_error;
 }
 

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2013-11-22 23:50:54 +0000
+++ b/sql/sql_class.h	2013-12-05 13:36:09 +0000
@@ -1627,14 +1627,14 @@ struct wait_for_commit
   bool wakeup_subsequent_commits_running;
 
   void register_wait_for_prior_commit(wait_for_commit *waitee);
-  int wait_for_prior_commit()
+  int wait_for_prior_commit(THD *thd)
   {
     /*
       Quick inline check, to avoid function call and locking in the common case
       where no wakeup is registered, or a registered wait was already signalled.
     */
     if (waiting_for_commit)
-      return wait_for_prior_commit2();
+      return wait_for_prior_commit2(thd);
     else
       return wakeup_error;
   }
@@ -1663,7 +1663,7 @@ struct wait_for_commit
 
   void wakeup(int wakeup_error);
 
-  int wait_for_prior_commit2();
+  int wait_for_prior_commit2(THD *thd);
   void wakeup_subsequent_commits2(int wakeup_error);
   void unregister_wait_for_prior_commit2();
 
@@ -3334,12 +3334,7 @@ class THD :public Statement,
   int wait_for_prior_commit()
   {
     if (wait_for_commit_ptr)
-    {
-      int err= wait_for_commit_ptr->wait_for_prior_commit();
-      if (err)
-        my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
-      return err;
-    }
+      return wait_for_commit_ptr->wait_for_prior_commit(this);
     return 0;
   }
   void wakeup_subsequent_commits(int wakeup_error)



More information about the commits mailing list