[Commits] Rev 4606: MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Feb 19 11:41:22 EET 2015


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4606
revision-id: knielsen at knielsen-hq.org-20150219094006-zujymec4z82bktmy
parent: jplindst at mariadb.org-20150218120713-qjg3ejkkh61qvzv7
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: work-10.0
timestamp: Thu 2015-02-19 10:40:06 +0100
message:
  MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication
  
  The problem occurs in parallel replication in GTID mode, when we are using
  multiple replication domains. In this case, if the SQL thread stops, the
  slave GTID position may refer to a different point in the relay log for each
  domain.
  
  The bug was that when the SQL thread was stopped and restarted (but the IO
  thread was kept running), the SQL thread would resume applying the relay log
  from the point of the most advanced replication domain, silently skipping all
  earlier events within other domains. This caused replication corruption.
  
  This patch solves the problem by storing, when the SQL thread stops with
  multiple parallel replication domains active, the current GTID
  position. Additionally, the current position in the relay logs is moved back
  to a point known to be earlier than the current position of any replication
  domain. Then when the SQL thread restarts from the earlier position, GTIDs
  encountered are compared against the stored GTID position. Any GTID that was
  already applied before the stop is skipped to avoid duplicate apply.
  
  This patch should have no effect if multi-domain GTID parallel replication is
  not used. Similarly, if both SQL and IO thread are stopped and restarted, the
  patch has no effect, as in this case the existing relay logs are removed and
  re-fetched from the master at the current global @@gtid_slave_pos.
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result	2015-02-19 09:40:06 +0000
@@ -0,0 +1,147 @@
+include/master-slave.inc
+[connection master]
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=current_pos;
+include/start_slave.inc
+*** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
+CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+INSERT INTO t2 VALUES (1);
+SELECT * FROM t1;
+a
+1
+SELECT * FROM t2;
+a
+1
+SET sql_log_bin=0;
+BEGIN;
+INSERT INTO t2 VALUES (5);
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t2 VALUES (3);
+FLUSH LOGS;
+INSERT INTO t1 VALUES (4);
+SET gtid_domain_id=1;
+INSERT INTO t2 VALUES (5);
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (6);
+INSERT INTO t1 VALUES (7);
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (8);
+INSERT INTO t1 VALUES (9);
+FLUSH LOGS;
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (10);
+INSERT INTO t1 VALUES (11);
+SET gtid_domain_id=1;
+INSERT INTO t1 VALUES (12);
+INSERT INTO t2 VALUES (13);
+SET gtid_domain_id=0;
+INSERT INTO t2 VALUES (14);
+FLUSH LOGS;
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (15);
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (16);
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (17);
+SET @gtid0 = @@last_gtid;
+SET gtid_domain_id=2;
+INSERT INTO t1 VALUES (18);
+SET @gtid2 = @@last_gtid;
+SET gtid_domain_id=3;
+INSERT INTO t1 VALUES (19);
+SET @gtid3 = @@last_gtid;
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+4
+6
+7
+9
+11
+12
+17
+18
+19
+SELECT * FROM t2 ORDER BY a;
+a
+1
+3
+5
+8
+10
+13
+14
+15
+16
+include/save_master_gtid.inc
+SELECT MASTER_GTID_WAIT('WAIT_POS');
+MASTER_GTID_WAIT('WAIT_POS')
+0
+COMMIT;
+SET sql_log_bin=1;
+include/wait_for_slave_sql_error.inc [errno=1062]
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+4
+6
+7
+9
+11
+17
+18
+19
+SELECT * FROM t2 ORDER BY a;
+a
+1
+3
+5
+8
+10
+14
+15
+16
+SET sql_log_bin=0;
+DELETE FROM t2 WHERE a=5;
+SET sql_log_bin=1;
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+4
+6
+7
+9
+11
+12
+17
+18
+19
+SELECT * FROM t2 ORDER BY a;
+a
+1
+3
+5
+8
+10
+13
+14
+15
+16
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+SET DEBUG_SYNC= 'RESET';
+DROP TABLE t1,t2;
+SET DEBUG_SYNC= 'RESET';
+include/rpl_end.inc

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test	2015-02-19 09:40:06 +0000
@@ -0,0 +1,132 @@
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--source include/master-slave.inc
+
+--connection server_2
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=current_pos;
+--source include/start_slave.inc
+
+
+--echo *** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
+CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+INSERT INTO t2 VALUES (1);
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+SELECT * FROM t1;
+SELECT * FROM t2;
+
+# Block one domain, which we will later cause to give an error. And let some
+# other domains proceed so we can check that after restart, the slave is able
+# to correctly restart each domain in a separate position.
+
+--connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+SET sql_log_bin=0;
+BEGIN;
+INSERT INTO t2 VALUES (5);
+
+--connection server_1
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t2 VALUES (3);
+FLUSH LOGS;
+INSERT INTO t1 VALUES (4);
+
+SET gtid_domain_id=1;
+# This query will be blocked on the slave, and later give a duplicate key error.
+INSERT INTO t2 VALUES (5);
+
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (6);
+INSERT INTO t1 VALUES (7);
+
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (8);
+INSERT INTO t1 VALUES (9);
+FLUSH LOGS;
+
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (10);
+INSERT INTO t1 VALUES (11);
+
+# These cannot be replicated before the error, as a prior commit is blocked.
+SET gtid_domain_id=1;
+INSERT INTO t1 VALUES (12);
+INSERT INTO t2 VALUES (13);
+
+SET gtid_domain_id=0;
+INSERT INTO t2 VALUES (14);
+FLUSH LOGS;
+
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (15);
+
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (16);
+
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (17);
+SET @gtid0 = @@last_gtid;
+SET gtid_domain_id=2;
+INSERT INTO t1 VALUES (18);
+SET @gtid2 = @@last_gtid;
+SET gtid_domain_id=3;
+INSERT INTO t1 VALUES (19);
+SET @gtid3 = @@last_gtid;
+--let $wait_pos= `SELECT CONCAT(@gtid0, ",", @gtid2, ",", @gtid3)`
+
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+--source include/save_master_gtid.inc
+
+
+--connection server_2
+# First wait for domains 0, 2, and 3 to complete.
+--replace_result $wait_pos WAIT_POS
+eval SELECT MASTER_GTID_WAIT('$wait_pos');
+
+# Then release the row lock, and wait for the domain 1 to fail with
+# duplicate key error.
+--connection con_temp1
+COMMIT;
+SET sql_log_bin=1;
+
+--connection server_2
+--let $slave_sql_errno= 1062
+--source include/wait_for_slave_sql_error.inc
+
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+
+SET sql_log_bin=0;
+DELETE FROM t2 WHERE a=5;
+SET sql_log_bin=1;
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+
+
+# Clean up.
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+SET DEBUG_SYNC= 'RESET';
+
+--connection server_1
+DROP TABLE t1,t2;
+SET DEBUG_SYNC= 'RESET';
+
+--source include/rpl_end.inc

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2015-01-19 13:32:28 +0000
+++ b/sql/log.cc	2015-02-19 09:40:06 +0000
@@ -4134,8 +4134,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay
       included= 1;
       to_purge_if_included= my_strdup(ir->name, MYF(0));
     }
-    my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock);
-    my_free(ir);
+    rli->free_inuse_relaylog(ir);
     ir= next;
   }
   rli->inuse_relaylog_list= ir;

=== modified file 'sql/rpl_gtid.cc'
--- a/sql/rpl_gtid.cc	2014-09-30 17:31:14 +0000
+++ b/sql/rpl_gtid.cc	2015-02-19 09:40:06 +0000
@@ -1089,6 +1089,27 @@ rpl_binlog_state::load(struct rpl_gtid *
 }
 
 
+static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
+{
+  rpl_binlog_state *self= (rpl_binlog_state *)data;
+  return self->update_nolock(gtid, false);
+}
+
+
+bool
+rpl_binlog_state::load(rpl_slave_state *slave_pos)
+{
+  bool res= false;
+
+  mysql_mutex_lock(&LOCK_binlog_state);
+  reset_nolock();
+  if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0))
+    res= true;
+  mysql_mutex_unlock(&LOCK_binlog_state);
+  return res;
+}
+
+
 rpl_binlog_state::~rpl_binlog_state()
 {
   free();

=== modified file 'sql/rpl_gtid.h'
--- a/sql/rpl_gtid.h	2014-03-11 23:14:49 +0000
+++ b/sql/rpl_gtid.h	2015-02-19 09:40:06 +0000
@@ -235,6 +235,7 @@ struct rpl_binlog_state
   void reset();
   void free();
   bool load(struct rpl_gtid *list, uint32 count);
+  bool load(rpl_slave_state *slave_pos);
   int update_nolock(const struct rpl_gtid *gtid, bool strict);
   int update(const struct rpl_gtid *gtid, bool strict);
   int update_with_next_gtid(uint32 domain_id, uint32 server_id,

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2015-01-07 13:45:39 +0000
+++ b/sql/rpl_parallel.cc	2015-02-19 09:40:06 +0000
@@ -1830,6 +1830,41 @@ rpl_parallel::wait_for_workers_idle(THD
 
 
 /*
+  Handle seeing a GTID during slave restart in GTID mode. If we stopped with
+  different replication domains having reached different positions in the relay
+  log, we need to skip event groups in domains that are further progressed.
+
+  Updates the state with the seen GTID, and returns true if this GTID should
+  be skipped, false otherwise.
+*/
+bool
+process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
+{
+  slave_connection_state::entry *gtid_entry;
+  slave_connection_state *state= &rli->restart_gtid_pos;
+
+  if (likely(state->count() == 0) ||
+      !(gtid_entry= state->find_entry(gtid->domain_id)))
+    return false;
+  if (gtid->server_id == gtid_entry->gtid.server_id)
+  {
+    uint64 seq_no= gtid_entry->gtid.seq_no;
+    if (gtid->seq_no >= seq_no)
+    {
+      /*
+        This domain has reached its start position. So remove it, so that
+        further events will be processed normally.
+      */
+      state->remove(&gtid_entry->gtid);
+    }
+    return gtid->seq_no <= seq_no;
+  }
+  else
+    return true;
+}
+
+
+/*
   This is used when we get an error during processing in do_event();
   We will not queue any event to the thread, but we still need to wake it up
   to be sure that it will be returned to the pool.
@@ -1890,13 +1925,15 @@ rpl_parallel::do_event(rpl_group_info *s
     return -1;
 
   /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
-  if (unlikely(!current) && typ != GTID_EVENT)
+  is_group_event= Log_event::is_group_event(typ);
+  if (unlikely(!current) && typ != GTID_EVENT &&
+      !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
     return -1;
 
   /* ToDo: what to do with this lock?!? */
   mysql_mutex_unlock(&rli->data_lock);
 
-  if (typ == FORMAT_DESCRIPTION_EVENT)
+  if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
   {
     Format_description_log_event *fdev=
       static_cast<Format_description_log_event *>(ev);
@@ -1922,6 +1959,19 @@ rpl_parallel::do_event(rpl_group_info *s
       }
     }
   }
+  else if (unlikely(typ == GTID_LIST_EVENT))
+  {
+    Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
+    rpl_gtid *list= glev->list;
+    uint32 count= glev->count;
+    rli->update_relay_log_state(list, count);
+    while (count)
+    {
+      process_gtid_for_restart_pos(rli, list);
+      ++list;
+      --count;
+    }
+  }
 
   /*
     Stop queueing additional event groups once the SQL thread is requested to
@@ -1931,7 +1981,6 @@ rpl_parallel::do_event(rpl_group_info *s
     been partially queued, but after that we will just ignore any further
     events the SQL driver thread may try to queue, and eventually it will stop.
   */
-  is_group_event= Log_event::is_group_event(typ);
   if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
     sql_thread_stopping= true;
   if (sql_thread_stopping)
@@ -1944,8 +1993,34 @@ rpl_parallel::do_event(rpl_group_info *s
     return 0;
   }
 
+  if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
+  {
+    if (typ == GTID_EVENT)
+      rli->gtid_skip_flag= GTID_SKIP_NOT;
+    else
+    {
+      if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
+      {
+        if (!Log_event::is_part_of_group(typ))
+          rli->gtid_skip_flag= GTID_SKIP_NOT;
+      }
+      else
+      {
+        DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
+        if (typ == XID_EVENT ||
+            (typ == QUERY_EVENT &&
+             (((Query_log_event *)ev)->is_commit() ||
+              ((Query_log_event *)ev)->is_rollback())))
+          rli->gtid_skip_flag= GTID_SKIP_NOT;
+      }
+      delete_or_keep_event_post_apply(serial_rgi, typ, ev);
+      return 0;
+    }
+  }
+
   if (typ == GTID_EVENT)
   {
+    rpl_gtid gtid;
     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
     uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
                        0 : gtid_ev->domain_id);
@@ -1956,6 +2031,23 @@ rpl_parallel::do_event(rpl_group_info *s
       return 1;
     }
     current= e;
+
+    gtid.domain_id= gtid_ev->domain_id;
+    gtid.server_id= gtid_ev->server_id;
+    gtid.seq_no= gtid_ev->seq_no;
+    rli->update_relay_log_state(&gtid, 1);
+    if (process_gtid_for_restart_pos(rli, &gtid))
+    {
+      /*
+        This domain has progressed further into the relay log before the last
+        SQL thread restart. So we need to skip this event group to not doubly
+        apply it.
+      */
+      rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
+                            GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+      delete_or_keep_event_post_apply(serial_rgi, typ, ev);
+      return 0;
+    }
   }
   else
     e= current;

=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h	2015-01-07 13:45:39 +0000
+++ b/sql/rpl_parallel.h	2015-02-19 09:40:06 +0000
@@ -317,5 +317,6 @@ extern struct rpl_parallel_thread_pool g
 extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
                                             uint32 new_count,
                                             bool skip_check= false);
+extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
 
 #endif  /* RPL_PARALLEL_H */

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2015-01-07 13:45:39 +0000
+++ b/sql/rpl_rli.cc	2015-02-19 09:40:06 +0000
@@ -62,7 +62,7 @@ Relay_log_info::Relay_log_info(bool is_s
    group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
    last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
    abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
-   inited(0), abort_slave(0), stop_for_until(0),
+   gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
    slave_running(0), until_condition(UNTIL_NONE),
    until_log_pos(0), retried_trans(0), executed_entries(0),
    m_flags(0)
@@ -100,18 +100,9 @@ Relay_log_info::Relay_log_info(bool is_s
 
 Relay_log_info::~Relay_log_info()
 {
-  inuse_relaylog *cur;
   DBUG_ENTER("Relay_log_info::~Relay_log_info");
 
-  cur= inuse_relaylog_list;
-  while (cur)
-  {
-    DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
-    inuse_relaylog *next= cur->next;
-    my_atomic_rwlock_destroy(&cur->inuse_relaylog_atomic_lock);
-    my_free(cur);
-    cur= next;
-  }
+  reset_inuse_relaylog();
   mysql_mutex_destroy(&run_lock);
   mysql_mutex_destroy(&data_lock);
   mysql_mutex_destroy(&log_space_lock);
@@ -1384,14 +1375,34 @@ int
 Relay_log_info::alloc_inuse_relaylog(const char *name)
 {
   inuse_relaylog *ir;
+  uint32 gtid_count;
+  rpl_gtid *gtid_list;
 
   if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
   {
     my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
     return 1;
   }
+  gtid_count= relay_log_state.count();
+  if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
+                                         MYF(MY_WME))))
+  {
+    my_free(ir);
+    my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
+    return 1;
+  }
+  if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
+  {
+    my_free(gtid_list);
+    my_free(ir);
+    DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
+    my_error(ER_OUT_OF_RESOURCES, MYF(0));
+    return 1;
+  }
   ir->rli= this;
   strmake_buf(ir->name, name);
+  ir->relay_log_state= gtid_list;
+  ir->relay_log_state_count= gtid_count;
 
   if (!inuse_relaylog_list)
     inuse_relaylog_list= ir;
@@ -1407,6 +1418,45 @@ Relay_log_info::alloc_inuse_relaylog(con
 }
 
 
+void
+Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
+{
+  my_free(ir->relay_log_state);
+  my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock);
+  my_free(ir);
+}
+
+
+void
+Relay_log_info::reset_inuse_relaylog()
+{
+  inuse_relaylog *cur= inuse_relaylog_list;
+  while (cur)
+  {
+    DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
+    inuse_relaylog *next= cur->next;
+    free_inuse_relaylog(cur);
+    cur= next;
+  }
+  inuse_relaylog_list= last_inuse_relaylog= NULL;
+}
+
+
+int
+Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
+{
+  int res= 0;
+  while (count)
+  {
+    if (relay_log_state.update_nolock(gtid_list, false))
+      res= 1;
+    ++gtid_list;
+    --count;
+  }
+  return res;
+}
+
+
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 int
 rpl_load_gtid_slave_state(THD *thd)

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2015-01-07 13:45:39 +0000
+++ b/sql/rpl_rli.h	2015-02-19 09:40:06 +0000
@@ -269,6 +269,8 @@ class Relay_log_info : public Slave_repo
   int events_till_abort;
 #endif  
 
+  enum_gtid_skip_type gtid_skip_flag;
+
   /*
     inited changes its value within LOCK_active_mi-guarded critical
     sections  at times of start_slave_threads() (0->1) and end_slave() (1->0).
@@ -344,6 +346,21 @@ class Relay_log_info : public Slave_repo
   size_t slave_patternload_file_size;  
 
   rpl_parallel parallel;
+  /*
+    The relay_log_state keeps track of the current binlog state of the execution
+    of the relay log. This is used to know where to resume current GTID position
+    if the slave thread is stopped and restarted.
+    It is only accessed from the SQL thread, so it does not need any locking.
+  */
+  rpl_binlog_state relay_log_state;
+  /*
+    The restart_gtid_state is used when the SQL thread restarts on a relay log
+    in GTID mode. In multi-domain parallel replication, each domain may have a
+    separat position, so some events in more progressed domains may need to be
+    skipped. This keeps track of the domains that have not yet reached their
+    starting event.
+  */
+  slave_connection_state restart_gtid_pos;
 
   Relay_log_info(bool is_slave_recovery);
   ~Relay_log_info();
@@ -408,6 +425,9 @@ class Relay_log_info : public Slave_repo
                  time_t event_creation_time, THD *thd,
                  rpl_group_info *rgi);
   int alloc_inuse_relaylog(const char *name);
+  void free_inuse_relaylog(inuse_relaylog *ir);
+  void reset_inuse_relaylog();
+  int update_relay_log_state(rpl_gtid *gtid_list, uint32 count);
 
   /**
      Is the replication inside a group?
@@ -497,6 +517,12 @@ class Relay_log_info : public Slave_repo
 struct inuse_relaylog {
   inuse_relaylog *next;
   Relay_log_info *rli;
+  /*
+    relay_log_state holds the binlog state corresponding to the start of this
+    relay log file. It is an array with relay_log_state_count elements.
+  */
+  rpl_gtid *relay_log_state;
+  uint32 relay_log_state_count;
   /* Number of events in this relay log queued for worker threads. */
   int64 queued_count;
   /* Number of events completed by worker threads. */

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2014-12-02 11:11:07 +0000
+++ b/sql/slave.cc	2015-02-19 09:40:06 +0000
@@ -943,6 +943,8 @@ int start_slave_threads(bool need_slave_
                                              Master_info::USE_GTID_CURRENT_POS);
     mi->events_queued_since_last_gtid= 0;
     mi->gtid_reconnect_event_skip_count= 0;
+
+    mi->rli.restart_gtid_pos.reset();
   }
 
   if (!error && (thread_mask & SLAVE_IO))
@@ -4504,6 +4506,16 @@ pthread_handler_t handle_slave_sql(void
 
   serial_rgi->gtid_sub_id= 0;
   serial_rgi->gtid_pending= false;
+  if (mi->using_gtid != Master_info::USE_GTID_NO)
+  {
+    /*
+      We initialize the relay log state from the know starting position.
+      It will then be updated as required by GTID and GTID_LIST events found
+      while applying events read from relay logs.
+    */
+    rli->relay_log_state.load(&rpl_global_gtid_slave_state);
+  }
+  rli->gtid_skip_flag = GTID_SKIP_NOT;
   if (init_relay_log_pos(rli,
                          rli->group_relay_log_name,
                          rli->group_relay_log_pos,
@@ -4514,6 +4526,7 @@ pthread_handler_t handle_slave_sql(void
                 "Error initializing relay log position: %s", errmsg);
     goto err;
   }
+  rli->reset_inuse_relaylog();
   if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
     goto err;
 
@@ -4718,7 +4731,49 @@ log '%s' at position %s, relay log '%s'
   thd->reset_query();
   thd->reset_db(NULL, 0);
   if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
+  {
+    ulong domain_count;
+
     flush_relay_log_info(rli);
+    if (opt_slave_parallel_threads > 0)
+    {
+      /*
+        In parallel replication GTID mode, we may stop with different domains
+        at different positions in the relay log.
+
+        To handle this when we restart the SQL thread, mark the current
+        per-domain position in the Relay_log_info.
+      */
+      mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+      domain_count= rpl_global_gtid_slave_state.count();
+      mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+      if (domain_count > 1)
+      {
+        inuse_relaylog *ir;
+
+        /*
+          Load the starting GTID position, so that we can skip already applied
+          GTIDs when we restart the SQL thread. And set the start position in
+          the relay log back to a known safe place to start (prior to any not
+          yet applied transaction in any domain).
+        */
+        rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0);
+        if ((ir= rli->inuse_relaylog_list))
+        {
+          rpl_gtid *gtid= ir->relay_log_state;
+          uint32 count= ir->relay_log_state_count;
+          while (count > 0)
+          {
+            process_gtid_for_restart_pos(rli, gtid);
+            ++gtid;
+            --count;
+          }
+          strmake_buf(rli->group_relay_log_name, ir->name);
+          rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE;
+        }
+      }
+    }
+  }
   THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
   thd->add_status_to_global();
   mysql_mutex_lock(&rli->run_lock);
@@ -4731,6 +4786,7 @@ log '%s' at position %s, relay log '%s'
   /* Forget the relay log's format */
   delete rli->relay_log.description_event_for_exec;
   rli->relay_log.description_event_for_exec= 0;
+  rli->reset_inuse_relaylog();
   /* Wake up master_pos_wait() */
   mysql_mutex_unlock(&rli->data_lock);
   DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));



More information about the commits mailing list