[Commits] Rev 4489: MDEV-6680: Performance of domain_parallel replication is disappointing in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Nov 13 14:55:57 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4489
revision-id: knielsen at knielsen-hq.org-20141113092048-dl4gowocj339a7f6
parent: knielsen at knielsen-hq.org-20141113090946-opxrwnvgdxekz6eb
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: work-10.0
timestamp: Thu 2014-11-13 10:20:48 +0100
message:
  MDEV-6680: Performance of domain_parallel replication is disappointing
  
  The code that handles free lists of various objects passed to worker threads
  in parallel replication handles freeing in batches, to avoid taking and
  releasing LOCK_rpl_thread too often. However, it was possible for freeing to
  be delayed to the point where one thread could stall the SQL driver thread due
  to full queue, while other worker threads might be idle. This could
  significantly degrade possible parallelism and thus performance.
  
  Clean up the batch freeing code so that it is more robust and now able to
  regularly free batches of object, so that normally the queue will not run full
  unless the SQL driver thread is really far ahead of the worker threads.
=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-11-13 09:09:46 +0000
+++ b/sql/rpl_parallel.cc	2014-11-13 09:20:48 +0000
@@ -8,6 +8,15 @@
   Code for optional parallel execution of replicated events on the slave.
 */
 
+
+/*
+  Maximum number of queued events to accumulate in a local free list, before
+  moving them to the global free list. There is additional a limit of how much
+  to accumulate based on opt_slave_parallel_max_queued.
+*/
+#define QEV_BATCH_FREE 200
+
+
 struct rpl_parallel_thread_pool global_rpl_thread_pool;
 
 static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
@@ -510,14 +519,8 @@ handle_rpl_parallel_thread(void *arg)
   rpl_group_info *group_rgi= NULL;
   group_commit_orderer *gco, *tmp_gco;
   uint64 event_gtid_sub_id= 0;
-  rpl_parallel_thread::queued_event *qevs_to_free;
-  rpl_group_info *rgis_to_free;
-  group_commit_orderer *gcos_to_free;
   rpl_sql_thread_info sql_info(NULL);
-  size_t total_event_size;
   int err;
-  inuse_relaylog *last_ir;
-  uint64 accumulated_ir_count;
 
   struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
 
@@ -559,6 +562,8 @@ handle_rpl_parallel_thread(void *arg)
 
   while (!rpt->stop)
   {
+    rpl_parallel_thread::queued_event *qev, *next_qev;
+
     thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
                     &stage_waiting_for_work_from_sql_thread, &old_stage);
     /*
@@ -580,28 +585,21 @@ handle_rpl_parallel_thread(void *arg)
     thd->EXIT_COND(&old_stage);
 
   more_events:
-    qevs_to_free= NULL;
-    rgis_to_free= NULL;
-    gcos_to_free= NULL;
-    total_event_size= 0;
-    while (events)
+    for (qev= events; qev; qev= next_qev)
     {
-      struct rpl_parallel_thread::queued_event *next= events->next;
       Log_event_type event_type;
-      rpl_group_info *rgi= events->rgi;
+      rpl_group_info *rgi= qev->rgi;
       rpl_parallel_entry *entry= rgi->parallel_entry;
       bool end_of_group, group_ending;
 
-      total_event_size+= events->event_size;
-      if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
+      next_qev= qev->next;
+      if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
       {
-        handle_queued_pos_update(thd, events);
-        events->next= qevs_to_free;
-        qevs_to_free= events;
-        events= next;
+        handle_queued_pos_update(thd, qev);
+        rpt->loc_free_qev(qev);
         continue;
       }
-      else if (events->typ ==
+      else if (qev->typ ==
                rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
       {
         if (in_event_group)
@@ -613,24 +611,21 @@ handle_rpl_parallel_thread(void *arg)
           group_rgi->cleanup_context(thd, 1);
           in_event_group= false;
           finish_event_group(thd, group_rgi->gtid_sub_id,
-                             events->entry_for_queued, group_rgi);
+                             qev->entry_for_queued, group_rgi);
 
-          group_rgi->next= rgis_to_free;
-          rgis_to_free= group_rgi;
+          rpt->loc_free_rgi(group_rgi);
           thd->rgi_slave= group_rgi= NULL;
         }
 
-        events->next= qevs_to_free;
-        qevs_to_free= events;
-        events= next;
+        rpt->loc_free_qev(qev);
         continue;
       }
-      DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
+      DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
 
       thd->rgi_slave= group_rgi= rgi;
       gco= rgi->gco;
       /* Handle a new event group, which will be initiated by a GTID event. */
-      if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
+      if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
       {
         bool did_enter_cond= false;
         PSI_stage_info old_stage;
@@ -643,7 +638,7 @@ handle_rpl_parallel_thread(void *arg)
           similar), without any terminating COMMIT/ROLLBACK/XID.
         */
         group_standalone=
-          (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
+          (0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 &
                  Gtid_log_event::FL_STANDALONE));
 
         event_gtid_sub_id= rgi->gtid_sub_id;
@@ -704,8 +699,7 @@ handle_rpl_parallel_thread(void *arg)
           */
           DBUG_ASSERT(!tmp_gco->prev_gco);
           gco->prev_gco= NULL;
-          tmp_gco->next_gco= gcos_to_free;
-          gcos_to_free= tmp_gco;
+          rpt->loc_free_gco(tmp_gco);
         }
 
         if (entry->force_abort && wait_count > entry->stop_count)
@@ -766,7 +760,7 @@ handle_rpl_parallel_thread(void *arg)
         }
       }
 
-      group_ending= is_group_ending(events->ev, event_type);
+      group_ending= is_group_ending(qev->ev, event_type);
       if (group_ending && likely(!rgi->worker_error))
       {
         DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
@@ -782,20 +776,20 @@ handle_rpl_parallel_thread(void *arg)
       if (likely(!rgi->worker_error) && !skip_event_group)
       {
         ++rgi->retry_event_count;
-        err= rpt_handle_event(events, rpt);
-        delete_or_keep_event_post_apply(rgi, event_type, events->ev);
+        err= rpt_handle_event(qev, rpt);
+        delete_or_keep_event_post_apply(rgi, event_type, qev->ev);
         DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
                         err= dbug_simulate_tmp_error(rgi, thd););
         if (err)
         {
           convert_kill_to_deadlock_error(rgi);
           if (has_temporary_error(thd) && slave_trans_retries > 0)
-            err= retry_event_group(rgi, rpt, events);
+            err= retry_event_group(rgi, rpt, qev);
         }
       }
       else
       {
-        delete events->ev;
+        delete qev->ev;
         err= thd->wait_for_prior_commit();
       }
 
@@ -804,8 +798,7 @@ handle_rpl_parallel_thread(void *arg)
         ((group_standalone && !Log_event::is_part_of_group(event_type)) ||
          group_ending);
 
-      events->next= qevs_to_free;
-      qevs_to_free= events;
+      rpt->loc_free_qev(qev);
 
       if (unlikely(err))
       {
@@ -820,61 +813,20 @@ handle_rpl_parallel_thread(void *arg)
       {
         in_event_group= false;
         finish_event_group(thd, event_gtid_sub_id, entry, rgi);
-        rgi->next= rgis_to_free;
-        rgis_to_free= rgi;
+        rpt->loc_free_rgi(rgi);
         thd->rgi_slave= group_rgi= rgi= NULL;
         skip_event_group= false;
         DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
       }
-
-      events= next;
     }
 
     mysql_mutex_lock(&rpt->LOCK_rpl_thread);
-    /* Signal that our queue can now accept more events. */
-    rpt->dequeue2(total_event_size);
-    mysql_cond_signal(&rpt->COND_rpl_thread_queue);
-    /* We need to delay the free here, to when we have the lock. */
-    while (gcos_to_free)
-    {
-      group_commit_orderer *next= gcos_to_free->next_gco;
-      rpt->free_gco(gcos_to_free);
-      gcos_to_free= next;
-    }
-    while (rgis_to_free)
-    {
-      rpl_group_info *next= rgis_to_free->next;
-      rpt->free_rgi(rgis_to_free);
-      rgis_to_free= next;
-    }
-    last_ir= NULL;
-    accumulated_ir_count= 0;
-    while (qevs_to_free)
-    {
-      rpl_parallel_thread::queued_event *next= qevs_to_free->next;
-      inuse_relaylog *ir= qevs_to_free->ir;
-      /* Batch up refcount update to reduce use of synchronised operations. */
-      if (last_ir != ir)
-      {
-        if (last_ir)
-        {
-          my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
-          my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
-          my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
-          accumulated_ir_count= 0;
-        }
-        last_ir= ir;
-      }
-      ++accumulated_ir_count;
-      rpt->free_qev(qevs_to_free);
-      qevs_to_free= next;
-    }
-    if (last_ir)
-    {
-      my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
-      my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
-      my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
-    }
+    /*
+      Now that we have the lock, we can move everything from our local free
+      lists to the real free lists that are also accessible from the SQL
+      driver thread.
+    */
+    rpt->batch_free();
 
     if ((events= rpt->event_queue) != NULL)
     {
@@ -887,6 +839,7 @@ handle_rpl_parallel_thread(void *arg)
       mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
       goto more_events;
     }
+    rpt->inuse_relaylog_refcount_update();
 
     if (in_event_group && group_rgi->parallel_entry->force_abort)
     {
@@ -1122,6 +1075,51 @@ rpl_parallel_change_thread_count(rpl_par
 }
 
 
+void
+rpl_parallel_thread::batch_free()
+{
+  mysql_mutex_assert_owner(&LOCK_rpl_thread);
+  if (loc_qev_list)
+  {
+    *loc_qev_last_ptr_ptr= qev_free_list;
+    qev_free_list= loc_qev_list;
+    loc_qev_list= NULL;
+    dequeue2(loc_qev_size);
+    /* Signal that our queue can now accept more events. */
+    mysql_cond_signal(&COND_rpl_thread_queue);
+    loc_qev_size= 0;
+    qev_free_pending= 0;
+  }
+  if (loc_rgi_list)
+  {
+    *loc_rgi_last_ptr_ptr= rgi_free_list;
+    rgi_free_list= loc_rgi_list;
+    loc_rgi_list= NULL;
+  }
+  if (loc_gco_list)
+  {
+    *loc_gco_last_ptr_ptr= gco_free_list;
+    gco_free_list= loc_gco_list;
+    loc_gco_list= NULL;
+  }
+}
+
+
+void
+rpl_parallel_thread::inuse_relaylog_refcount_update()
+{
+  inuse_relaylog *ir= accumulated_ir_last;
+  if (ir)
+  {
+    my_atomic_rwlock_wrlock(&ir->rli->inuse_relaylog_atomic_lock);
+    my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
+    my_atomic_rwlock_wrunlock(&ir->rli->inuse_relaylog_atomic_lock);
+    accumulated_ir_count= 0;
+    accumulated_ir_last= NULL;
+  }
+}
+
+
 rpl_parallel_thread::queued_event *
 rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
 {
@@ -1176,6 +1174,43 @@ rpl_parallel_thread::retry_get_qev(Log_e
 
 
 void
+rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev)
+{
+  inuse_relaylog *ir= qev->ir;
+  inuse_relaylog *last_ir= accumulated_ir_last;
+  if (ir != last_ir)
+  {
+    if (last_ir)
+      inuse_relaylog_refcount_update();
+    accumulated_ir_last= ir;
+  }
+  ++accumulated_ir_count;
+  if (!loc_qev_list)
+    loc_qev_last_ptr_ptr= &qev->next;
+  else
+    qev->next= loc_qev_list;
+  loc_qev_list= qev;
+  loc_qev_size+= qev->event_size;
+  /*
+    We want to release to the global free list only occasionally, to avoid
+    having to take the LOCK_rpl_thread muted too many times.
+
+    However, we do need to release regularly. If we let the unreleased part
+    grow too large, then the SQL driver thread may go to sleep waiting for
+    the queue to drop below opt_slave_parallel_max_queued, and this in turn
+    can stall all other worker threads for more stuff to do.
+  */
+  if (++qev_free_pending >= QEV_BATCH_FREE ||
+      loc_qev_size >= opt_slave_parallel_max_queued/3)
+  {
+    mysql_mutex_lock(&LOCK_rpl_thread);
+    batch_free();
+    mysql_mutex_unlock(&LOCK_rpl_thread);
+  }
+}
+
+
+void
 rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
 {
   mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -1224,6 +1259,19 @@ rpl_parallel_thread::get_rgi(Relay_log_i
 
 
 void
+rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi)
+{
+  DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
+  rgi->free_annotate_event();
+  if (!loc_rgi_list)
+    loc_rgi_last_ptr_ptr= &rgi->next;
+  else
+    rgi->next= loc_rgi_list;
+  loc_rgi_list= rgi;
+}
+
+
+void
 rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
 {
   mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -1257,12 +1305,14 @@ rpl_parallel_thread::get_gco(uint64 wait
 
 
 void
-rpl_parallel_thread::free_gco(group_commit_orderer *gco)
+rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
 {
-  mysql_mutex_assert_owner(&LOCK_rpl_thread);
   DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
-  gco->next_gco= gco_free_list;
-  gco_free_list= gco;
+  if (!loc_gco_list)
+    loc_gco_last_ptr_ptr= &gco->next_gco;
+  else
+    gco->next_gco= loc_gco_list;
+  loc_gco_list= gco;
 }
 
 

=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h	2014-08-20 08:59:39 +0000
+++ b/sql/rpl_parallel.h	2014-11-13 09:20:48 +0000
@@ -96,9 +96,28 @@ struct rpl_parallel_thread {
     size_t event_size;
   } *event_queue, *last_in_queue;
   uint64 queued_size;
+  /* These free lists are protected by LOCK_rpl_thread. */
   queued_event *qev_free_list;
   rpl_group_info *rgi_free_list;
   group_commit_orderer *gco_free_list;
+  /*
+    These free lists are local to the thread, so need not be protected by any
+    lock. They are moved to the global free lists in batches in the function
+    batch_free(), to reduce LOCK_rpl_thread contention.
+
+    The lists are not NULL-terminated (as we do not need to traverse them).
+    Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the
+    `next' pointer of the last element, which is used to link into the front
+    of the global freelists.
+  */
+  queued_event *loc_qev_list, **loc_qev_last_ptr_ptr;
+  size_t loc_qev_size;
+  uint64 qev_free_pending;
+  rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr;
+  group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr;
+  /* These keep track of batch update of inuse_relaylog refcounts. */
+  inuse_relaylog *accumulated_ir_last;
+  uint64 accumulated_ir_count;
 
   void enqueue(queued_event *qev)
   {
@@ -127,12 +146,41 @@ struct rpl_parallel_thread {
   queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
                               const char *relay_log_name,
                               ulonglong event_pos, ulonglong event_size);
+  /*
+    Put a qev on the local free list, to be later released to the global free
+    list by batch_free().
+  */
+  void loc_free_qev(queued_event *qev);
+  /*
+    Release an rgi immediately to the global free list. Requires holding the
+    LOCK_rpl_thread mutex.
+  */
   void free_qev(queued_event *qev);
   rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
                           rpl_parallel_entry *e, ulonglong event_size);
+  /*
+    Put an gco on the local free list, to be later released to the global free
+    list by batch_free().
+  */
+  void loc_free_rgi(rpl_group_info *rgi);
+  /*
+    Release an rgi immediately to the global free list. Requires holding the
+    LOCK_rpl_thread mutex.
+  */
   void free_rgi(rpl_group_info *rgi);
   group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
-  void free_gco(group_commit_orderer *gco);
+  /*
+    Put a gco on the local free list, to be later released to the global free
+    list by batch_free().
+  */
+  void loc_free_gco(group_commit_orderer *gco);
+  /*
+    Move all local free lists to the global ones. Requires holding
+    LOCK_rpl_thread.
+  */
+  void batch_free();
+  /* Update inuse_relaylog refcounts with what we have accumulated so far. */
+  void inuse_relaylog_refcount_update();
 };
 
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-11-13 09:09:46 +0000
+++ b/sql/rpl_rli.cc	2014-11-13 09:20:48 +0000
@@ -1390,6 +1390,7 @@ Relay_log_info::alloc_inuse_relaylog(con
     my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
     return 1;
   }
+  ir->rli= this;
   strmake_buf(ir->name, name);
 
   if (!inuse_relaylog_list)

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-09-10 11:22:44 +0000
+++ b/sql/rpl_rli.h	2014-11-13 09:20:48 +0000
@@ -496,6 +496,7 @@ class Relay_log_info : public Slave_repo
 */
 struct inuse_relaylog {
   inuse_relaylog *next;
+  Relay_log_info *rli;
   /* Number of events in this relay log queued for worker threads. */
   int64 queued_count;
   /* Number of events completed by worker threads. */



More information about the commits mailing list