[Commits] Rev 3953: MDEV-5363: Make parallel replication waits killable in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Fri Dec 6 14:28:24 EET 2013


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 3953
revision-id: knielsen at knielsen-hq.org-20131206122823-chprmuvs2u7xkf75
parent: knielsen at knielsen-hq.org-20131205133609-g46y9cu0uk0dd161
committer: knielsen at knielsen-hq.org
branch nick: tmp-10.0-base
timestamp: Fri 2013-12-06 13:28:23 +0100
message:
  MDEV-5363: Make parallel replication waits killable
  
  A couple of more parallel replication waits made killable.
=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2013-12-05 13:36:09 +0000
+++ b/sql/rpl_parallel.cc	2013-12-06 12:28:23 +0000
@@ -285,21 +285,42 @@ handle_rpl_parallel_thread(void *arg)
         wait_start_sub_id= rgi->wait_start_sub_id;
         if (wait_for_sub_id || wait_start_sub_id)
         {
+          bool did_enter_cond= false;
+          const char *old_msg= NULL;
+
           mysql_mutex_lock(&entry->LOCK_parallel_entry);
           if (wait_start_sub_id)
           {
-            while (wait_start_sub_id > entry->last_committed_sub_id)
+            old_msg= thd->enter_cond(&entry->COND_parallel_entry,
+                                     &entry->LOCK_parallel_entry,
+                                     "Waiting for prior transaction to commit "
+                                     "before starting next transaction");
+            did_enter_cond= true;
+            while (wait_start_sub_id > entry->last_committed_sub_id &&
+                   !thd->check_killed())
               mysql_cond_wait(&entry->COND_parallel_entry,
                               &entry->LOCK_parallel_entry);
+            if (wait_start_sub_id > entry->last_committed_sub_id)
+            {
+              /* The thread got a kill signal. */
+              thd->send_kill_message();
+              rgi->is_error= true;
+              slave_output_error_info(rgi->rli, thd);
+              rgi->cleanup_context(thd, true);
+              rgi->rli->abort_slave= true;
+            }
+            rgi->wait_start_sub_id= 0;            /* No need to check again. */
           }
-          rgi->wait_start_sub_id= 0;            /* No need to check again. */
           if (wait_for_sub_id > entry->last_committed_sub_id)
           {
             wait_for_commit *waitee=
               &rgi->wait_commit_group_info->commit_orderer;
             rgi->commit_orderer.register_wait_for_prior_commit(waitee);
           }
-          mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+          if (did_enter_cond)
+            thd->exit_cond(old_msg);
+          else
+            mysql_mutex_unlock(&entry->LOCK_parallel_entry);
         }
 
         if(thd->wait_for_commit_ptr)
@@ -753,6 +774,8 @@ rpl_parallel::do_event(rpl_group_info *s
   Relay_log_info *rli= serial_rgi->rli;
   enum Log_event_type typ;
   bool is_group_event;
+  bool did_enter_cond= false;
+  const char *old_msg= NULL;
 
   /* ToDo: what to do with this lock?!? */
   mysql_mutex_unlock(&rli->data_lock);
@@ -860,6 +883,13 @@ rpl_parallel::do_event(rpl_group_info *s
           }
           else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
             break;                        // The thread is ready to queue into
+          else if (rli->sql_driver_thd->check_killed())
+          {
+            mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+            my_error(ER_CONNECTION_KILLED, MYF(0));
+            delete rgi;
+            return true;
+          }
           else
           {
             /*
@@ -867,6 +897,13 @@ rpl_parallel::do_event(rpl_group_info *s
               use for queuing events, so wait for the thread to consume some
               of its queue.
             */
+            if (!did_enter_cond)
+            {
+              old_msg= rli->sql_driver_thd->enter_cond
+                (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
+                 "Waiting for room in worker thread event queue");
+              did_enter_cond= true;
+            }
             mysql_cond_wait(&cur_thread->COND_rpl_thread,
                             &cur_thread->LOCK_rpl_thread);
           }
@@ -1016,7 +1053,10 @@ rpl_parallel::do_event(rpl_group_info *s
   */
   rli->event_relay_log_pos= rli->future_event_relay_log_pos;
   cur_thread->enqueue(qev);
-  mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+  if (did_enter_cond)
+    rli->sql_driver_thd->exit_cond(old_msg);
+  else
+    mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
   mysql_cond_signal(&cur_thread->COND_rpl_thread);
 
   return false;



More information about the commits mailing list