[Commits] Rev 4276: MDEV-5262: Missing retry after temp error in parallel replication in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Tue Jul 8 16:39:28 EEST 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4276
revision-id: knielsen at knielsen-hq.org-20140515135208-xzquimf7hgwh5mes
parent: knielsen at knielsen-hq.org-20140513114206-cjvle53zycke12hs
author: knielsen at knielsen-hq.org
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: tmp-10.0
timestamp: Thu 2014-05-15 15:52:08 +0200
message:
  MDEV-5262: Missing retry after temp error in parallel replication
  
  Handle retry of event groups that span multiple relay log files.
  
   - If retry reaches the end of one relay log file, move on to the next.
  
   - Handle refcounting of relay log files, and avoid purging relay log
     files until all event groups have completed that might have needed
     them for transaction retry.
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_retry.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_retry.result	2014-05-13 11:42:06 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_retry.result	2014-05-15 13:52:08 +0000
@@ -141,9 +141,56 @@ a	b
 7       1
 8       1
 9       1
+*** Test retry of event group that spans multiple relay log files. ***
+CREATE TABLE t2 (a int PRIMARY KEY, b BLOB) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1,"Hulubullu");
+include/stop_slave.inc
+SET @old_max= @@GLOBAL.max_relay_log_size;
+SET GLOBAL max_relay_log_size=4096;
+SET gtid_seq_no = 100;
+SET @old_server_id = @@server_id;
+SET server_id = 12;
+BEGIN;
+INSERT INTO t1 VALUES (10, 4);
+COMMIT;
+SET server_id = @old_server_id;
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a       b
+10      4
+SELECT a, LENGTH(b) FROM t2 ORDER BY a;
+a       LENGTH(b)
+1       9
+2       5006
+3       5012
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
+include/start_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+retries
+1
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a       b
+10      4
+SELECT a, LENGTH(b) FROM t2 ORDER BY a;
+a       LENGTH(b)
+1       9
+2       5006
+3       5012
+INSERT INTO t1 VALUES (11,11);
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a       b
+10      4
+11      11
+SELECT a, LENGTH(b) FROM t2 ORDER BY a;
+a       LENGTH(b)
+1       9
+2       5006
+3       5012
+4       5000
+SET GLOBAL max_relay_log_size=@old_max;
 include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
 include/start_slave.inc
-DROP TABLE t1;
+DROP TABLE t1, t2;
 DROP function foo;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_retry.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_retry.test	2014-05-13 11:42:06 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_retry.test	2014-05-15 13:52:08 +0000
@@ -149,6 +149,64 @@ STOP SLAVE IO_THREAD;
 --sync_with_master
 SELECT * FROM t1 ORDER BY a;
 
+--echo *** Test retry of event group that spans multiple relay log files. ***
+
+--connection server_1
+CREATE TABLE t2 (a int PRIMARY KEY, b BLOB) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1,"Hulubullu");
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+--source include/stop_slave.inc
+SET @old_max= @@GLOBAL.max_relay_log_size;
+SET GLOBAL max_relay_log_size=4096;
+
+--connection server_1
+--let $big= `SELECT REPEAT("*", 5000)`
+SET gtid_seq_no = 100;
+SET @old_server_id = @@server_id;
+SET server_id = 12;
+BEGIN;
+--disable_query_log
+eval INSERT INTO t2 VALUES (2, CONCAT("Hello ", "$big"));
+eval INSERT INTO t2 VALUES (3, CONCAT("Long data:  ", "$big"));
+--enable_query_log
+INSERT INTO t1 VALUES (10, 4);
+COMMIT;
+SET server_id = @old_server_id;
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+SELECT a, LENGTH(b) FROM t2 ORDER BY a;
+--save_master_pos
+
+--connection server_2
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
+let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
+--source include/start_slave.inc
+--sync_with_master
+SET GLOBAL debug_dbug=@old_dbug;
+let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
+--disable_query_log
+eval SELECT $new_retry - $old_retry AS retries;
+--enable_query_log
+
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+SELECT a, LENGTH(b) FROM t2 ORDER BY a;
+
+--connection server_1
+INSERT INTO t1 VALUES (11,11);
+--disable_query_log
+eval INSERT INTO t2 VALUES (4, "$big");
+--enable_query_log
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+SELECT a, LENGTH(b) FROM t2 ORDER BY a;
+SET GLOBAL max_relay_log_size=@old_max;
+
 
 --connection server_2
 --source include/stop_slave.inc
@@ -156,7 +214,7 @@ SET GLOBAL slave_parallel_threads=@old_p
 --source include/start_slave.inc
 
 --connection server_1
-DROP TABLE t1;
+DROP TABLE t1, t2;
 DROP function foo;
 
 --source include/rpl_end.inc

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2014-06-25 11:08:30 +0000
+++ b/sql/log.cc	2014-05-15 13:52:08 +0000
@@ -4097,6 +4097,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay
 {
   int error;
   char *to_purge_if_included= NULL;
+  inuse_relaylog *ir;
   DBUG_ENTER("purge_first_log");
 
   DBUG_ASSERT(is_open());
@@ -4104,7 +4105,30 @@ int MYSQL_BIN_LOG::purge_first_log(Relay
   DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
 
   mysql_mutex_lock(&LOCK_index);
-  to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0));
+
+  ir= rli->inuse_relaylog_list;
+  while (ir)
+  {
+    inuse_relaylog *next= ir->next;
+    if (!ir->completed || ir->dequeued_count < ir->queued_count)
+    {
+      included= false;
+      break;
+    }
+    if (!included && 0 == strcmp(ir->name, rli->group_relay_log_name))
+      break;
+    if (!next)
+    {
+      rli->last_inuse_relaylog= NULL;
+      included= 1;
+      to_purge_if_included= my_strdup(ir->name, MYF(0));
+    }
+    my_free(ir);
+    ir= next;
+  }
+  rli->inuse_relaylog_list= ir;
+  if (ir)
+    to_purge_if_included= my_strdup(ir->name, MYF(0));
 
   /*
     Read the next log file name from the index file and pass it back to

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-05-13 11:42:06 +0000
+++ b/sql/rpl_parallel.cc	2014-05-15 13:52:08 +0000
@@ -204,20 +204,14 @@ dbug_simulate_tmp_error(rpl_group_info *
 }
 #endif
 
-static int
-retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
-{
-  /* ToDo */
-  return 0;
-}
-
 
 static int
 retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
                   rpl_parallel_thread::queued_event *orig_qev)
 {
   IO_CACHE rlog;
-  File fd;
+  LOG_INFO linfo;
+  File fd= (File)-1;
   const char *errmsg= NULL;
   inuse_relaylog *ir= rgi->relay_log;
   uint64 event_count;
@@ -241,7 +235,10 @@ retry_event_group(rpl_group_info *rgi, r
 
   strcpy(log_name, ir->name);
   if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
-    return 1;
+  {
+    err= 1;
+    goto err;
+  }
   cur_offset= rgi->retry_start_offset;
   my_b_seek(&rlog, cur_offset);
 
@@ -249,43 +246,85 @@ retry_event_group(rpl_group_info *rgi, r
   {
     Log_event_type event_type;
     Log_event *ev;
+    rpl_parallel_thread::queued_event *qev;
 
-    old_offset= cur_offset;
-    ev= Log_event::read_log_event(&rlog, 0,
-                                  rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
-                                  opt_slave_sql_verify_checksum);
-    cur_offset= my_b_tell(&rlog);
-
-    if (!ev)
-    {
-      err= 1;
-      goto err;
-    }
-    ev->thd= thd;
-    event_type= ev->get_type_code();
-    if (Log_event::is_group_event(event_type))
+    /* The loop is here so we can try again the next relay log file on EOF. */
+    for (;;)
     {
-      rpl_parallel_thread::queued_event *qev;
+      old_offset= cur_offset;
+      ev= Log_event::read_log_event(&rlog, 0,
+                                    rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+                                    opt_slave_sql_verify_checksum);
+      cur_offset= my_b_tell(&rlog);
 
-      mysql_mutex_lock(&rpt->LOCK_rpl_thread);
-      qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
-                              cur_offset - old_offset);
-      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
-      if (!qev)
+      if (ev)
+        break;
+      if (rlog.error < 0)
       {
-        delete ev;
-        my_error(ER_OUT_OF_RESOURCES, MYF(0));
+        errmsg= "slave SQL thread aborted because of I/O error";
         err= 1;
         goto err;
       }
-      err= rpt_handle_event(qev, rpt);
-      ++event_count;
-      mysql_mutex_lock(&rpt->LOCK_rpl_thread);
-      rpt->free_qev(qev);
-      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+      if (rlog.error > 0)
+      {
+        sql_print_error("Slave SQL thread: I/O error reading "
+                        "event(errno: %d  cur_log->error: %d)",
+                        my_errno, rlog.error);
+        errmsg= "Aborting slave SQL thread because of partial event read";
+        err= 1;
+        goto err;
+      }
+      /* EOF. Move to the next relay log. */
+      end_io_cache(&rlog);
+      mysql_file_close(fd, MYF(MY_WME));
+      fd= (File)-1;
+
+      /* Find the next relay log file. */
+      if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
+         (err= rli->relay_log.find_next_log(&linfo, 1)))
+      {
+        char buff[22];
+        sql_print_error("next log error: %d  offset: %s  log: %s",
+                        err,
+                        llstr(linfo.index_file_offset, buff),
+                        log_name);
+        goto err;
+      }
+      strmake_buf(log_name ,linfo.log_file_name);
+
+      if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+      {
+        err= 1;
+        goto err;
+      }
+      /* Loop to try again on the new log file. */
     }
-    else
-      err= retry_handle_relay_log_rotate(ev, &rlog);
+
+    event_type= ev->get_type_code();
+    if (!Log_event::is_group_event(event_type))
+    {
+      delete ev;
+      continue;
+    }
+    ev->thd= thd;
+
+    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+    qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+                            cur_offset - old_offset);
+    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+    if (!qev)
+    {
+      delete ev;
+      my_error(ER_OUT_OF_RESOURCES, MYF(0));
+      err= 1;
+      goto err;
+    }
+    err= rpt_handle_event(qev, rpt);
+    ++event_count;
+    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+    rpt->free_qev(qev);
+    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+
     delete_or_keep_event_post_apply(rgi, event_type, ev);
     DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
                     if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
@@ -300,6 +339,7 @@ retry_event_group(rpl_group_info *rgi, r
         {
           end_io_cache(&rlog);
           mysql_file_close(fd, MYF(MY_WME));
+          fd= (File)-1;
           goto do_retry;
         }
         sql_print_error("Slave worker thread retried transaction %lu time(s) "
@@ -309,15 +349,17 @@ retry_event_group(rpl_group_info *rgi, r
       }
       goto err;
     }
-
-    // ToDo: handle too many retries.
-
   } while (event_count < events_to_execute);
 
 err:
 
-  end_io_cache(&rlog);
-  mysql_file_close(fd, MYF(MY_WME));
+  if (fd >= 0)
+  {
+    end_io_cache(&rlog);
+    mysql_file_close(fd, MYF(MY_WME));
+  }
+  if (errmsg)
+    sql_print_error("Error reading relay log event: %s", errmsg);
   return err;
 }
 
@@ -340,6 +382,8 @@ handle_rpl_parallel_thread(void *arg)
   rpl_sql_thread_info sql_info(NULL);
   size_t total_event_size;
   int err;
+  inuse_relaylog *last_ir;
+  uint64 accumulated_ir_count;
 
   struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
 
@@ -683,12 +727,34 @@ handle_rpl_parallel_thread(void *arg)
       rpt->free_rgi(rgis_to_free);
       rgis_to_free= next;
     }
+    last_ir= NULL;
+    accumulated_ir_count= 0;
     while (qevs_to_free)
     {
       rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+      inuse_relaylog *ir= qevs_to_free->ir;
+      /* Batch up refcount update to reduce use of synchronised operations. */
+      if (last_ir != ir)
+      {
+        if (last_ir)
+        {
+          my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+          my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+          my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+          accumulated_ir_count= 0;
+        }
+        last_ir= ir;
+      }
+      ++accumulated_ir_count;
       rpt->free_qev(qevs_to_free);
       qevs_to_free= next;
     }
+    if (last_ir)
+    {
+      my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+      my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+      my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+    }
 
     if ((events= rpt->event_queue) != NULL)
     {
@@ -1711,6 +1777,8 @@ rpl_parallel::do_event(rpl_group_info *s
     Queue the event for processing.
   */
   rli->event_relay_log_pos= rli->future_event_relay_log_pos;
+  qev->ir= rli->last_inuse_relaylog;
+  ++qev->ir->queued_count;
   cur_thread->enqueue(qev);
   unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
                       &did_enter_cond, &old_stage);

=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h	2014-05-08 12:20:18 +0000
+++ b/sql/rpl_parallel.h	2014-05-15 13:52:08 +0000
@@ -9,6 +9,7 @@ struct rpl_parallel_entry;
 struct rpl_parallel_thread_pool;
 
 class Relay_log_info;
+struct inuse_relaylog;
 
 
 /*
@@ -73,6 +74,7 @@ struct rpl_parallel_thread {
     queued_event *next;
     Log_event *ev;
     rpl_group_info *rgi;
+    inuse_relaylog *ir;
     ulonglong future_event_relay_log_pos;
     char event_relay_log_name[FN_REFLEN];
     char future_event_master_log_name[FN_REFLEN];

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-05-08 12:20:18 +0000
+++ b/sql/rpl_rli.cc	2014-05-15 13:52:08 +0000
@@ -92,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_s
   mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
   mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
   mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
+  my_atomic_rwlock_init(&inuse_relaylog_atomic_lock);
   relay_log.init_pthread_objects();
   DBUG_VOID_RETURN;
 }
@@ -117,6 +118,7 @@ Relay_log_info::~Relay_log_info()
   mysql_cond_destroy(&start_cond);
   mysql_cond_destroy(&stop_cond);
   mysql_cond_destroy(&log_space_cond);
+  my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock);
   relay_log.cleanup();
   DBUG_VOID_RETURN;
 }
@@ -1365,7 +1367,10 @@ Relay_log_info::alloc_inuse_relaylog(con
   if (!inuse_relaylog_list)
     inuse_relaylog_list= ir;
   else
+  {
+    last_inuse_relaylog->completed= true;
     last_inuse_relaylog->next= ir;
+  }
   last_inuse_relaylog= ir;
 
   return 0;

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-05-08 12:20:18 +0000
+++ b/sql/rpl_rli.h	2014-05-15 13:52:08 +0000
@@ -170,6 +170,7 @@ class Relay_log_info : public Slave_repo
   */
   inuse_relaylog *inuse_relaylog_list;
   inuse_relaylog *last_inuse_relaylog;
+  my_atomic_rwlock_t inuse_relaylog_atomic_lock;
 
   /*
     Needed to deal properly with cur_log getting closed and re-opened with
@@ -481,12 +482,26 @@ class Relay_log_info : public Slave_repo
   Each rpl_group_info has a pointer to one of those, corresponding to the
   first GTID event.
 
-  A reference count keeps track of how long a relay log is potentially in use.
+  A pair of reference count keeps track of how long a relay log is potentially
+  in use. When the `completed' flag is set, all events have been read out of
+  the relay log, but the log might still be needed for retry in worker
+  threads.  As worker threads complete an event group, they increment
+  atomically the `dequeued_count' with number of events queued. Thus, when
+  completed is set and dequeued_count equals queued_count, the relay log file
+  is finally done with and can be purged.
+
+  By separating the queued and dequeued count, only the dequeued_count needs
+  multi-thread synchronisation; the completed flag and queued_count fields
+  are only accessed by the SQL driver thread and need no synchronisation.
 */
 struct inuse_relaylog {
   inuse_relaylog *next;
-  uint64 queued_count;
-  uint64 dequeued_count;
+  /* Number of events in this relay log queued for worker threads. */
+  int64 queued_count;
+  /* Number of events completed by worker threads. */
+  volatile int64 dequeued_count;
+  /* Set when all events have been read from a relaylog. */
+  bool completed;
   char name[FN_REFLEN];
 };
 

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2014-05-08 12:20:18 +0000
+++ b/sql/slave.cc	2014-05-15 13:52:08 +0000
@@ -6397,6 +6397,7 @@ static Log_event* next_event(rpl_group_i
       DBUG_ASSERT(rli->cur_log_fd >= 0);
       mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
       rli->cur_log_fd = -1;
+      rli->last_inuse_relaylog->completed= true;
 
       if (relay_log_purge)
       {



More information about the commits mailing list