[Commits] Rev 4274: MDEV-5262: Missing retry after temp error in parallel replication in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Tue Jul 8 16:39:22 EEST 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4274
revision-id: knielsen at knielsen-hq.org-20140508122018-cdwr0phcaphkkj27
parent: knielsen at knielsen-hq.org-20140707091705-nf0xh4cgrlggyiti
author: knielsen at knielsen-hq.org
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: tmp-10.0
timestamp: Thu 2014-05-08 14:20:18 +0200
message:
  MDEV-5262: Missing retry after temp error in parallel replication
  
  Start implementing that an event group can be re-tried in parallel replication
  if it fails with a temporary error (like deadlock).
  
  Patch is very incomplete, just some very basic retry works.
  
  Stuff still missing (not complete list):
  
   - Handle moving to the next relay log file, if event group to be retried
     spans multiple relay log files.
  
   - Handle refcounting of relay log files, to ensure that we do not purge a
     relay log file and then later attempt to re-execute events out of it.
  
   - Handle description_event_for_exec - we need to save this somehow for the
     possible retry - and use the correct one in case it differs between relay
     logs.
  
   - Do another retry attempt in case the first retry also fails.
  
   - Limit the max number of retries.
  
   - Lots of testing will be needed for the various edge cases.
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_retry.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_retry.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_retry.result	2014-05-08 12:20:18 +0000
@@ -0,0 +1,62 @@
+include/rpl_init.inc [topology=1->2]
+*** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1,1);
+SET sql_log_bin=0;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=5;
+include/start_slave.inc
+SET sql_log_bin=0;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+include/stop_slave.inc
+SET @old_format= @@SESSION.binlog_format;
+SET binlog_format='statement';
+SET gtid_seq_no = 100;
+BEGIN;
+INSERT INTO t1 VALUES (2,1);
+UPDATE t1 SET b=b+1 WHERE a=1;
+INSERT INTO t1 VALUES (3,1);
+COMMIT;
+SET binlog_format=@old_format;
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       2
+2       1
+3       1
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
+include/start_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+retries
+1
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       2
+2       1
+3       1
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+DROP TABLE t1;
+DROP function foo;
+include/rpl_end.inc

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_retry.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_retry.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_retry.test	2014-05-08 12:20:18 +0000
@@ -0,0 +1,91 @@
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--let $rpl_topology=1->2
+--source include/rpl_init.inc
+
+--echo *** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1,1);
+--save_master_pos
+
+# Use a stored function to inject a debug_sync into the appropriate THD.
+# The function does nothing on the master, and on the slave it injects the
+# desired debug_sync action(s).
+SET sql_log_bin=0;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+  RETURNS INT DETERMINISTIC
+  BEGIN
+    RETURN x;
+  END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_2
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=5;
+--source include/start_slave.inc
+--sync_with_master
+SET sql_log_bin=0;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+  RETURNS INT DETERMINISTIC
+  BEGIN
+    IF d1 != '' THEN
+      SET debug_sync = d1;
+    END IF;
+    IF d2 != '' THEN
+      SET debug_sync = d2;
+    END IF;
+    RETURN x;
+  END
+||
+--delimiter ;
+--source include/stop_slave.inc
+
+--connection server_1
+SET @old_format= @@SESSION.binlog_format;
+SET binlog_format='statement';
+SET gtid_seq_no = 100;
+BEGIN;
+INSERT INTO t1 VALUES (2,1);
+UPDATE t1 SET b=b+1 WHERE a=1;
+#INSERT INTO t1 VALUES (3,foo(1,
+#    "ha_write_row_end SIGNAL q1_ready WAIT_FOR q1_cont",
+#    ""));
+INSERT INTO t1 VALUES (3,1);
+COMMIT;
+SET binlog_format=@old_format;
+SELECT * FROM t1 ORDER BY a;
+--save_master_pos
+
+--connection server_2
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
+let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
+--source include/start_slave.inc
+--sync_with_master
+SET GLOBAL debug_dbug=@old_dbug;
+let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
+--disable_query_log
+eval SELECT $new_retry - $old_retry AS retries;
+--enable_query_log
+
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+
+--connection server_1
+DROP TABLE t1;
+DROP function foo;
+
+--source include/rpl_end.inc

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-06-27 11:34:29 +0000
+++ b/sql/rpl_parallel.cc	2014-05-08 12:20:18 +0000
@@ -7,15 +7,6 @@
 
 /*
   Code for optional parallel execution of replicated events on the slave.
-
-  ToDo list:
-
-   - Retry of failed transactions is not yet implemented for the parallel case.
-
-   - All the waits (eg. in struct wait_for_commit and in
-     rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
-     everything needs to be correctly rolled back and stopped in all threads,
-     to ensure a consistent slave replication state.
 */
 
 struct rpl_parallel_thread_pool global_rpl_thread_pool;
@@ -197,6 +188,105 @@ unlock_or_exit_cond(THD *thd, mysql_mute
 }
 
 
+static int
+retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
+{
+  /* ToDo */
+  return 0;
+}
+
+
+static int
+retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
+                  rpl_parallel_thread::queued_event *orig_qev)
+{
+  IO_CACHE rlog;
+  File fd;
+  const char *errmsg= NULL;
+  inuse_relaylog *ir= rgi->relay_log;
+  uint64 event_count= 0;
+  uint64 events_to_execute= rgi->retry_event_count;
+  Relay_log_info *rli= rgi->rli;
+  int err= 0;
+  ulonglong cur_offset, old_offset;
+  char log_name[FN_REFLEN];
+  THD *thd= rgi->thd;
+
+do_retry:
+  rgi->cleanup_context(thd, 1);
+
+  mysql_mutex_lock(&rli->data_lock);
+  ++rli->retried_trans;
+  statistic_increment(slave_retried_transactions, LOCK_status);
+  mysql_mutex_unlock(&rli->data_lock);
+
+  strcpy(log_name, ir->name);
+  if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+    return 1;
+  cur_offset= rgi->retry_start_offset;
+  my_b_seek(&rlog, cur_offset);
+
+  do
+  {
+    Log_event_type event_type;
+    Log_event *ev;
+
+    old_offset= cur_offset;
+    ev= Log_event::read_log_event(&rlog, 0,
+                                  rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+                                  opt_slave_sql_verify_checksum);
+    cur_offset= my_b_tell(&rlog);
+
+    if (!ev)
+    {
+      err= 1;
+      goto err;
+    }
+    ev->thd= thd;
+    event_type= ev->get_type_code();
+    if (Log_event::is_group_event(event_type))
+    {
+      rpl_parallel_thread::queued_event *qev;
+
+      mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+      qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+                              cur_offset - old_offset);
+      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+      if (!qev)
+      {
+        delete ev;
+        my_error(ER_OUT_OF_RESOURCES, MYF(0));
+        err= 1;
+        goto err;
+      }
+      err= rpt_handle_event(qev, rpt);
+      ++event_count;
+      mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+      rpt->free_qev(qev);
+      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+    }
+    else
+      err= retry_handle_relay_log_rotate(ev, &rlog);
+    delete_or_keep_event_post_apply(rgi, event_type, ev);
+
+    if (err)
+    {
+      /* ToDo: Need to here also handle second retry. */
+      goto err;
+    }
+
+    // ToDo: handle too many retries.
+
+  } while (event_count < events_to_execute);
+
+err:
+
+  end_io_cache(&rlog);
+  mysql_file_close(fd, MYF(MY_WME));
+  return err;
+}
+
+
 pthread_handler_t
 handle_rpl_parallel_thread(void *arg)
 {
@@ -499,7 +589,23 @@ handle_rpl_parallel_thread(void *arg)
         everything is stopped and cleaned up correctly.
       */
       if (likely(!rgi->worker_error) && !skip_event_group)
+      {
+        ++rgi->retry_event_count;
         err= rpt_handle_event(events, rpt);
+        DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_1_100",
+          if (rgi->current_gtid.domain_id == 0 &&
+              rgi->current_gtid.server_id == 1 &&
+              rgi->current_gtid.seq_no == 100 &&
+              rgi->retry_event_count == 4)
+          {
+            thd->clear_error();
+            thd->get_stmt_da()->reset_diagnostics_area();
+            my_error(ER_LOCK_DEADLOCK, MYF(0));
+            err= 1;
+          };);
+        if (err && has_temporary_error(thd))
+          err= retry_event_group(rgi, rpt, events);
+      }
       else
         err= thd->wait_for_prior_commit();
 
@@ -802,8 +908,7 @@ rpl_parallel_change_thread_count(rpl_par
 
 
 rpl_parallel_thread::queued_event *
-rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
-                             Relay_log_info *rli)
+rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
 {
   queued_event *qev;
   mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -817,6 +922,17 @@ rpl_parallel_thread::get_qev(Log_event *
   qev->ev= ev;
   qev->event_size= event_size;
   qev->next= NULL;
+  return qev;
+}
+
+
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+                             Relay_log_info *rli)
+{
+  queued_event *qev= get_qev_common(ev, event_size);
+  if (!qev)
+    return NULL;
   strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
   qev->event_relay_log_pos= rli->event_relay_log_pos;
   qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
@@ -825,6 +941,24 @@ rpl_parallel_thread::get_qev(Log_event *
 }
 
 
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
+                                   const char *relay_log_name,
+                                   ulonglong event_pos, ulonglong event_size)
+{
+  queued_event *qev= get_qev_common(ev, event_size);
+  if (!qev)
+    return NULL;
+  qev->rgi= orig_qev->rgi;
+  strcpy(qev->event_relay_log_name, relay_log_name);
+  qev->event_relay_log_pos= event_pos;
+  qev->future_event_relay_log_pos= event_pos+event_size;
+  strcpy(qev->future_event_master_log_name,
+         orig_qev->future_event_master_log_name);
+  return qev;
+}
+
+
 void
 rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
 {
@@ -836,7 +970,7 @@ rpl_parallel_thread::free_qev(rpl_parall
 
 rpl_group_info*
 rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
-                             rpl_parallel_entry *e)
+                             rpl_parallel_entry *e, ulonglong event_size)
 {
   rpl_group_info *rgi;
   mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -864,6 +998,9 @@ rpl_parallel_thread::get_rgi(Relay_log_i
     return NULL;
   }
   rgi->parallel_entry= e;
+  rgi->relay_log= rli->last_inuse_relaylog;
+  rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
+  rgi->retry_event_count= 0;
 
   return rgi;
 }
@@ -1439,7 +1576,7 @@ rpl_parallel::do_event(rpl_group_info *s
   {
     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
 
-    if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
+    if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
     {
       cur_thread->free_qev(qev);
       abandon_worker_thread(rli->sql_driver_thd, cur_thread,

=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h	2014-06-25 13:17:03 +0000
+++ b/sql/rpl_parallel.h	2014-05-08 12:20:18 +0000
@@ -106,11 +106,15 @@ struct rpl_parallel_thread {
     queued_size-= dequeue_size;
   }
 
+  queued_event *get_qev_common(Log_event *ev, ulonglong event_size);
   queued_event *get_qev(Log_event *ev, ulonglong event_size,
                         Relay_log_info *rli);
+  queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
+                              const char *relay_log_name,
+                              ulonglong event_pos, ulonglong event_size);
   void free_qev(queued_event *qev);
   rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
-                          rpl_parallel_entry *e);
+                          rpl_parallel_entry *e, ulonglong event_size);
   void free_rgi(rpl_group_info *rgi);
   group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
   void free_gco(group_commit_orderer *gco);

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-06-25 13:17:03 +0000
+++ b/sql/rpl_rli.cc	2014-05-08 12:20:18 +0000
@@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_s
    info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
    sync_counter(0), is_relay_log_recovery(is_slave_recovery),
    save_temporary_tables(0), mi(0),
+   inuse_relaylog_list(0), last_inuse_relaylog(0),
    cur_log_old_open_count(0), group_relay_log_pos(0), 
    event_relay_log_pos(0),
 #if HAVE_valgrind
@@ -98,8 +99,17 @@ Relay_log_info::Relay_log_info(bool is_s
 
 Relay_log_info::~Relay_log_info()
 {
+  inuse_relaylog *cur;
   DBUG_ENTER("Relay_log_info::~Relay_log_info");
 
+  cur= inuse_relaylog_list;
+  while (cur)
+  {
+    DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
+    inuse_relaylog *next= cur->next;
+    my_free(cur);
+    cur= next;
+  }
   mysql_mutex_destroy(&run_lock);
   mysql_mutex_destroy(&data_lock);
   mysql_mutex_destroy(&log_space_lock);
@@ -1339,6 +1349,29 @@ void Relay_log_info::stmt_done(my_off_t
   DBUG_VOID_RETURN;
 }
 
+
+int
+Relay_log_info::alloc_inuse_relaylog(const char *name)
+{
+  inuse_relaylog *ir;
+
+  if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
+  {
+    my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
+    return 1;
+  }
+  strcpy(ir->name, name);
+
+  if (!inuse_relaylog_list)
+    inuse_relaylog_list= ir;
+  else
+    last_inuse_relaylog->next= ir;
+  last_inuse_relaylog= ir;
+
+  return 0;
+}
+
+
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 int
 rpl_load_gtid_slave_state(THD *thd)
@@ -1623,7 +1656,7 @@ delete_or_keep_event_post_apply(rpl_grou
 
 void rpl_group_info::cleanup_context(THD *thd, bool error)
 {
-  DBUG_ENTER("Relay_log_info::cleanup_context");
+  DBUG_ENTER("rpl_group_info::cleanup_context");
   DBUG_PRINT("enter", ("error: %d", (int) error));
   
   DBUG_ASSERT(this->thd == thd);
@@ -1689,7 +1722,7 @@ void rpl_group_info::cleanup_context(THD
 
 void rpl_group_info::clear_tables_to_lock()
 {
-  DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
+  DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
 #ifndef DBUG_OFF
   /**
     When replicating in RBR and MyISAM Merge tables are involved
@@ -1736,7 +1769,7 @@ void rpl_group_info::clear_tables_to_loc
 
 void rpl_group_info::slave_close_thread_tables(THD *thd)
 {
-  DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
+  DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
   thd->get_stmt_da()->set_overwrite_status(true);
   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
   thd->get_stmt_da()->set_overwrite_status(false);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-06-25 13:17:03 +0000
+++ b/sql/rpl_rli.h	2014-05-08 12:20:18 +0000
@@ -61,6 +61,7 @@ enum {
 *****************************************************************************/
 
 struct rpl_group_info;
+struct inuse_relaylog;
 
 class Relay_log_info : public Slave_reporting_capability
 {
@@ -164,6 +165,13 @@ class Relay_log_info : public Slave_repo
   Master_info *mi;
 
   /*
+    List of active relay log files.
+    (This can be more than one in case of parallel replication).
+  */
+  inuse_relaylog *inuse_relaylog_list;
+  inuse_relaylog *last_inuse_relaylog;
+
+  /*
     Needed to deal properly with cur_log getting closed and re-opened with
     a different log under our feet
   */
@@ -398,6 +406,7 @@ class Relay_log_info : public Slave_repo
   void stmt_done(my_off_t event_log_pos,
                  time_t event_creation_time, THD *thd,
                  rpl_group_info *rgi);
+  int alloc_inuse_relaylog(const char *name);
 
   /**
      Is the replication inside a group?
@@ -464,6 +473,25 @@ class Relay_log_info : public Slave_repo
 
 
 /*
+  In parallel replication, if we need to re-try a transaction due to a
+  deadlock or other temporary error, we may need to go back and re-read events
+  out of an earlier relay log.
+
+  This structure keeps track of the relaylogs that are potentially in use.
+  Each rpl_group_info has a pointer to one of those, corresponding to the
+  first GTID event.
+
+  A reference count keeps track of how long a relay log is potentially in use.
+*/
+struct inuse_relaylog {
+  inuse_relaylog *next;
+  uint64 queued_count;
+  uint64 dequeued_count;
+  char name[FN_REFLEN];
+};
+
+
+/*
   This is data for various state needed to be kept for the processing of
   one event group (transaction) during replication.
 
@@ -596,6 +624,14 @@ struct rpl_group_info
   /* Needs room for "Gtid D-S-N\x00". */
   char gtid_info_buf[5+10+1+10+1+20+1];
 
+  /*
+    Information to be able to re-try an event group in case of a deadlock or
+    other temporary error.
+  */
+  inuse_relaylog *relay_log;
+  uint64 retry_start_offset;
+  uint64 retry_event_count;
+
   rpl_group_info(Relay_log_info *rli_);
   ~rpl_group_info();
   void reinit(Relay_log_info *rli);

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2014-06-25 13:17:03 +0000
+++ b/sql/slave.cc	2014-05-08 12:20:18 +0000
@@ -3094,7 +3094,8 @@ static ulong read_event(MYSQL* mysql, Ma
   that the error is temporary by pushing a warning with the error code
   ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
 */
-static int has_temporary_error(THD *thd)
+int
+has_temporary_error(THD *thd)
 {
   DBUG_ENTER("has_temporary_error");
 
@@ -4478,6 +4479,9 @@ pthread_handler_t handle_slave_sql(void
                 "Error initializing relay log position: %s", errmsg);
     goto err;
   }
+  if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
+    goto err;
+
   strcpy(rli->future_event_master_log_name, rli->group_master_log_name);
   THD_CHECK_SENTRY(thd);
 #ifndef DBUG_OFF
@@ -6521,6 +6525,12 @@ static Log_event* next_event(rpl_group_i
             mysql_mutex_unlock(log_lock);
           goto err;
         }
+        if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
+        {
+          if (!hot_log)
+            mysql_mutex_unlock(log_lock);
+          goto err;
+        }
         if (!hot_log)
           mysql_mutex_unlock(log_lock);
         continue;
@@ -6536,6 +6546,8 @@ static Log_event* next_event(rpl_group_i
       if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
                                        &errmsg)) <0)
         goto err;
+      if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
+        goto err;
     }
     else
     {

=== modified file 'sql/slave.h'
--- a/sql/slave.h	2014-06-25 13:17:03 +0000
+++ b/sql/slave.h	2014-05-08 12:20:18 +0000
@@ -229,6 +229,7 @@ int purge_relay_logs(Relay_log_info* rli
 void set_slave_thread_options(THD* thd);
 void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi);
 int rotate_relay_log(Master_info* mi);
+int has_temporary_error(THD *thd);
 int apply_event_and_update_pos(Log_event* ev, THD* thd,
                                struct rpl_group_info *rgi,
                                rpl_parallel_thread *rpt);



More information about the commits mailing list