[Commits] Rev 3971: MDEV-5764: START SLAVE UNTIL does not work with parallel replication in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Mon Mar 3 13:13:56 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 3971
revision-id: knielsen at knielsen-hq.org-20140303111355-jdaivy9wr9xlzhnm
parent: knielsen at knielsen-hq.org-20140226140209-1v7udjp529paplen
committer: knielsen at knielsen-hq.org
branch nick: tmp-10.0-base
timestamp: Mon 2014-03-03 12:13:55 +0100
message:
  MDEV-5764: START SLAVE UNTIL does not work with parallel replication
  
  With parallel replication, there can be any number of events queued on
  in-memory lists in the worker threads.
  
  For normal STOP SLAVE, we want to skip executing any remaining events on those
  lists and stop as quickly as possible.
  
  However, for START SLAVE UNTIL, when the UNTIL position is reached in the SQL
  driver thread, we must _not_ stop until all already queued events for the
  workers have been executed - otherwise we would stop too early, before the
  actual UNTIL position had been completely reached.
  
  The code did not handle UNTIL correctly, stopping too early due to not
  executing the queued events to completion. Fix this, and also implement that
  an explicit STOP SLAVE in the middle (when the SQL driver thread has reached
  the UNTIL position but the workers have not) _will_ cause an immediate stop.
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2014-02-11 13:06:03 +0000
+++ b/sql/log_event.cc	2014-03-03 11:13:55 +0000
@@ -6649,7 +6649,7 @@ Gtid_list_log_event::write(IO_CACHE *fil
 int
 Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
 {
-  Relay_log_info const *rli= rgi->rli;
+  Relay_log_info *rli= const_cast<Relay_log_info*>(rgi->rli);
   int ret;
   if (gl_flags & FLAG_IGN_GTIDS)
   {
@@ -6669,10 +6669,11 @@ Gtid_list_log_event::do_apply_event(rpl_
   {
     char str_buf[128];
     String str(str_buf, sizeof(str_buf), system_charset_info);
-    const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str);
+    rli->until_gtid_pos.to_string(&str);
     sql_print_information("Slave SQL thread stops because it reached its"
                           " UNTIL master_gtid_pos %s", str.c_ptr_safe());
-    const_cast<Relay_log_info*>(rli)->abort_slave= true;
+    rli->abort_slave= true;
+    rli->stop_for_until= true;
   }
   return ret;
 }

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-02-26 14:02:09 +0000
+++ b/sql/rpl_parallel.cc	2014-03-03 11:13:55 +0000
@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *t
   rgi->is_error= true;
   rgi->cleanup_context(thd, true);
   rgi->rli->abort_slave= true;
+  rgi->rli->stop_for_until= false;
   mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
   mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
   rgi->rli->relay_log.signal_update();
@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
 
 
 void
-rpl_parallel::wait_for_done(THD *thd)
+rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
 {
   struct rpl_parallel_entry *e;
   rpl_parallel_thread *rpt;
@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
       started executing yet. So we set e->stop_count here and use it to
       decide in the worker threads whether to continue executing an event
       group or whether to skip it, when force_abort is set.
+
+      If we stop due to reaching the START SLAVE UNTIL condition, then we
+      need to continue executing any queued events up to that point.
     */
     e->force_abort= true;
-    e->stop_count= e->count_committing_event_groups;
+    e->stop_count= rli->stop_for_until ?
+      e->count_queued_event_groups : e->count_committing_event_groups;
     mysql_mutex_unlock(&e->LOCK_parallel_entry);
     for (j= 0; j < e->rpl_thread_max; ++j)
     {
@@ -1189,6 +1194,30 @@ rpl_parallel::wait_for_done(THD *thd)
   }
 }
 
+
+/*
+  This function handles the case where the SQL driver thread reached the
+  START SLAVE UNTIL position; we stop queueing more events but continue
+  processing remaining, already queued events; then use executes manual
+  STOP SLAVE; then this function signals to worker threads that they
+  should stop the processing of any remaining queued events.
+*/
+void
+rpl_parallel::stop_during_until()
+{
+  struct rpl_parallel_entry *e;
+  uint32 i;
+
+  for (i= 0; i < domain_hash.records; ++i)
+  {
+    e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+    mysql_mutex_lock(&e->LOCK_parallel_entry);
+    if (e->force_abort)
+      e->stop_count= e->count_committing_event_groups;
+    mysql_mutex_unlock(&e->LOCK_parallel_entry);
+  }
+}
+
 
 bool
 rpl_parallel::workers_idle()

=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h	2014-02-26 14:02:09 +0000
+++ b/sql/rpl_parallel.h	2014-03-03 11:13:55 +0000
@@ -222,7 +222,8 @@ struct rpl_parallel {
   ~rpl_parallel();
   void reset();
   rpl_parallel_entry *find(uint32 domain_id);
-  void wait_for_done(THD *thd);
+  void wait_for_done(THD *thd, Relay_log_info *rli);
+  void stop_during_until();
   bool workers_idle();
   bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
                 ulonglong event_size);

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-02-26 14:02:09 +0000
+++ b/sql/rpl_rli.cc	2014-03-03 11:13:55 +0000
@@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_s
    group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
    last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
    abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
-   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
+   inited(0), abort_slave(0), stop_for_until(0),
+   slave_running(0), until_condition(UNTIL_NONE),
    until_log_pos(0), retried_trans(0), executed_entries(0),
    m_flags(0)
 {

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-02-26 14:02:09 +0000
+++ b/sql/rpl_rli.h	2014-03-03 11:13:55 +0000
@@ -262,6 +262,7 @@ class Relay_log_info : public Slave_repo
   */
   volatile bool inited;
   volatile bool abort_slave;
+  volatile bool stop_for_until;
   volatile uint slave_running;
 
   /* 

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2014-02-26 14:02:09 +0000
+++ b/sql/slave.cc	2014-03-03 11:13:55 +0000
@@ -615,7 +615,14 @@ int terminate_slave_threads(Master_info*
   if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
   {
     DBUG_PRINT("info",("Terminating SQL thread"));
-    mi->rli.abort_slave=1;
+    if (opt_slave_parallel_threads > 0 &&
+        mi->rli.abort_slave && mi->rli.stop_for_until)
+    {
+      mi->rli.stop_for_until= false;
+      mi->rli.parallel.stop_during_until();
+    }
+    else
+      mi->rli.abort_slave=1;
     if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
                                       &mi->rli.stop_cond,
                                       &mi->rli.slave_running,
@@ -3414,6 +3421,7 @@ static int exec_relay_log_event(THD* thd
         message about error in query execution to be printed.
       */
       rli->abort_slave= 1;
+      rli->stop_for_until= true;
       mysql_mutex_unlock(&rli->data_lock);
       delete ev;
       DBUG_RETURN(1);
@@ -4356,6 +4364,7 @@ pthread_handler_t handle_slave_sql(void
     Seconds_Behind_Master grows. No big deal.
   */
   rli->abort_slave = 0;
+  rli->stop_for_until= false;
   mysql_mutex_unlock(&rli->run_lock);
   mysql_cond_broadcast(&rli->start_cond);
 
@@ -4526,7 +4535,7 @@ log '%s' at position %s, relay log '%s'
   }
 
   if (opt_slave_parallel_threads > 0)
-    rli->parallel.wait_for_done(thd);
+    rli->parallel.wait_for_done(thd, rli);
 
   /* Thread stopped. Print the current replication position to the log */
   {
@@ -4552,7 +4561,7 @@ log '%s' at position %s, relay log '%s'
     get the correct position printed.)
   */
   if (opt_slave_parallel_threads > 0)
-    rli->parallel.wait_for_done(thd);
+    rli->parallel.wait_for_done(thd, rli);
 
   /*
     Some events set some playgrounds, which won't be cleared because thread



More information about the commits mailing list