[Commits] Rev 4498: MDEV-6676: Speculative parallel replication: Intermediate commit. in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Nov 13 16:09:30 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4498
revision-id: knielsen at knielsen-hq.org-20140919132537-rrjr7nvg0ltz37b5
parent: knielsen at knielsen-hq.org-20140917123739-gqztmf045l6ipp4s
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: work-10.0-mdev6676
timestamp: Fri 2014-09-19 15:25:37 +0200
message:
  MDEV-6676: Speculative parallel replication: Intermediate commit.
  
  Implement proper event scheduling that respects configuration and makes
  non-transactional and DDL be replicated safely (not speculatively in
  parallel).
  
  Now speculative mode is off by default.
=== modified file 'mysql-test/r/mysqld--help.result'
--- a/mysql-test/r/mysqld--help.result	2014-09-16 08:07:32 +0000
+++ b/mysql-test/r/mysqld--help.result	2014-09-19 13:25:37 +0000
@@ -1320,7 +1320,7 @@ slave-exec-mode STRICT
 slave-max-allowed-packet 1073741824
 slave-net-timeout 3600
 slave-parallel-max-queued 131072
-slave-parallel-mode domain,groupcommit,transactional
+slave-parallel-mode domain,groupcommit
 slave-parallel-threads 0
 slave-skip-errors (No default value)
 slave-sql-verify-checksum TRUE

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result	2014-09-11 10:37:33 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result	2014-09-19 13:25:37 +0000
@@ -2,8 +2,10 @@ include/rpl_init.inc [topology=1->2]
 ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
 CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET @old_mode=@@GLOBAL.slave_parallel_mode;
 include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=10;
+SET GLOBAL slave_parallel_mode="domain,transactional";
 CHANGE MASTER TO master_use_gtid=slave_pos;
 INSERT INTO t1 VALUES(1,1);
 BEGIN;
@@ -39,8 +41,89 @@ a	b
 1       2
 2       6
 3       3
+*** Test a bunch of non-transactional/DDL event groups. ***
+include/stop_slave.inc
+INSERT INTO t1 VALUES (4,4);
+INSERT INTO t1 VALUES (5,5);
+CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1);
+CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=MyISAM;
+ALTER TABLE t2 ADD b INT;
+INSERT INTO t2 VALUES (2,2);
+ALTER TABLE t2 DROP b;
+INSERT INTO t2 VALUES (3);
+ALTER TABLE t2 ADD c INT;
+INSERT INTO t2 VALUES (4,5);
+INSERT INTO t2 VALUES (5,5);
+INSERT INTO t3 VALUES (1);
+UPDATE t2 SET c=NULL WHERE a=4;
+ALTER TABLE t2 ADD UNIQUE (c);
+INSERT INTO t2 VALUES (6,6);
+UPDATE t2 SET c=c+100 WHERE a=2;
+INSERT INTO t3(a) VALUES (2);
+DELETE FROM t3 WHERE a=2;
+INSERT INTO t3(a) VALUES (2);
+DELETE FROM t3 WHERE a=2;
+ALTER TABLE t3 CHANGE a c INT NOT NULL;
+INSERT INTO t3(c) VALUES (2);
+DELETE FROM t3 WHERE c=2;
+INSERT INTO t3 SELECT a+200 FROM t2;
+DELETE FROM t3 WHERE c >= 200;
+INSERT INTO t3 SELECT a+200 FROM t2;
+include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       2
+2       6
+3       3
+4       4
+5       5
+SELECT * FROM t2 ORDER BY a;
+a       c
+1       NULL
+2       NULL
+3       NULL
+4       NULL
+5       5
+6       6
+SELECT * FROM t3 ORDER BY c;
+c
+1
+201
+202
+203
+204
+205
+206
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a       b
+1       2
+2       6
+3       3
+4       4
+5       5
+SELECT * FROM t2 ORDER BY a;
+a       c
+1       NULL
+2       NULL
+3       NULL
+4       NULL
+5       5
+6       6
+SELECT * FROM t3 ORDER BY c;
+c
+1
+201
+202
+203
+204
+205
+206
 include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+SET GLOBAL slave_parallel_mode=@old_mode;
 include/start_slave.inc
-DROP TABLE t1;
+DROP TABLE t1, t2, t3;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test	2014-09-11 10:37:33 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test	2014-09-19 13:25:37 +0000
@@ -10,8 +10,10 @@ CREATE TABLE t1 (a int PRIMARY KEY, b IN
 --connection server_2
 --sync_with_master
 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET @old_mode=@@GLOBAL.slave_parallel_mode;
 --source include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=10;
+SET GLOBAL slave_parallel_mode="domain,transactional";
 CHANGE MASTER TO master_use_gtid=slave_pos;
 
 
@@ -54,12 +56,62 @@ SELECT * FROM t1 ORDER BY a;
 SELECT * FROM t1 ORDER BY a;
 #SHOW STATUS LIKE 'Slave_retried_transactions';
 
+
+--echo *** Test a bunch of non-transactional/DDL event groups. ***
+
+--connection server_2
+--source include/stop_slave.inc
+
+--connection server_1
+
+INSERT INTO t1 VALUES (4,4);
+INSERT INTO t1 VALUES (5,5);
+CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1);
+CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=MyISAM;
+ALTER TABLE t2 ADD b INT;
+INSERT INTO t2 VALUES (2,2);
+ALTER TABLE t2 DROP b;
+INSERT INTO t2 VALUES (3);
+ALTER TABLE t2 ADD c INT;
+INSERT INTO t2 VALUES (4,5);
+INSERT INTO t2 VALUES (5,5);
+INSERT INTO t3 VALUES (1);
+UPDATE t2 SET c=NULL WHERE a=4;
+ALTER TABLE t2 ADD UNIQUE (c);
+INSERT INTO t2 VALUES (6,6);
+UPDATE t2 SET c=c+100 WHERE a=2;
+INSERT INTO t3(a) VALUES (2);
+DELETE FROM t3 WHERE a=2;
+INSERT INTO t3(a) VALUES (2);
+DELETE FROM t3 WHERE a=2;
+ALTER TABLE t3 CHANGE a c INT NOT NULL;
+INSERT INTO t3(c) VALUES (2);
+DELETE FROM t3 WHERE c=2;
+INSERT INTO t3 SELECT a+200 FROM t2;
+DELETE FROM t3 WHERE c >= 200;
+INSERT INTO t3 SELECT a+200 FROM t2;
+--source include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+SELECT * FROM t3 ORDER BY c;
+
+--connection server_2
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+SELECT * FROM t3 ORDER BY c;
+#SHOW STATUS LIKE 'Slave_retried_transactions';
+
+
 --connection server_2
 --source include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+SET GLOBAL slave_parallel_mode=@old_mode;
 --source include/start_slave.inc
 
 --connection server_1
-DROP TABLE t1;
+DROP TABLE t1, t2, t3;
 
 --source include/rpl_end.inc

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2014-09-17 12:37:39 +0000
+++ b/sql/log_event.cc	2014-09-19 13:25:37 +0000
@@ -187,7 +187,7 @@ is_parallel_retry_error(rpl_group_info *
 {
   if (!rgi->is_parallel_exec)
     return false;
-  if (rgi->speculative)
+  if (rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC)
     return true;
   if (rgi->killed_for_retry &&
       (err == ER_QUERY_INTERRUPTED || err == ER_CONNECTION_KILLED))

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2014-09-16 08:07:32 +0000
+++ b/sql/mysqld.cc	2014-09-19 13:25:37 +0000
@@ -562,7 +562,7 @@ ulong stored_program_cache_size= 0;
 ulong opt_slave_parallel_threads= 0;
 ulong opt_slave_domain_parallel_threads= 0;
 ulonglong opt_slave_parallel_mode=
-  SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_GROUPCOMMIT | SLAVE_PARALLEL_TRX;
+  SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_GROUPCOMMIT;
 ulong opt_binlog_commit_wait_count= 0;
 ulong opt_binlog_commit_wait_usec= 0;
 ulong opt_slave_parallel_max_queued= 131072;

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-09-11 10:37:33 +0000
+++ b/sql/rpl_parallel.cc	2014-09-19 13:25:37 +0000
@@ -266,7 +266,8 @@ convert_kill_to_deadlock_error(rpl_group
   if (!thd->get_stmt_da()->is_error())
     return;
   err_code= thd->get_stmt_da()->sql_errno();
-  if ((rgi->speculative && err_code != ER_PRIOR_COMMIT_FAILED) ||
+  if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
+       err_code != ER_PRIOR_COMMIT_FAILED) ||
       ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
        rgi->killed_for_retry))
   {
@@ -367,7 +368,7 @@ retry_event_group(rpl_group_info *rgi, r
     register_wait_for_prior_event_group_commit(rgi, entry);
     mysql_mutex_unlock(&entry->LOCK_parallel_entry);
 
-    if (!rgi->speculative)
+    if (rgi->speculation != rpl_group_info::SPECULATE_OPTIMISTIC)
       break;
 
     /*
@@ -379,7 +380,10 @@ retry_event_group(rpl_group_info *rgi, r
       transaction that conflicts with a prior long-running one.
     */
     if (!(err= thd->wait_for_prior_commit()))
+    {
+      rgi->speculation = rpl_group_info::SPECULATE_WAIT;
       break;
+    }
 
     convert_kill_to_deadlock_error(rgi);
     if (!has_temporary_error(thd))
@@ -395,7 +399,6 @@ retry_event_group(rpl_group_info *rgi, r
     if(thd->wait_for_commit_ptr)
       thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
   }
-  rgi->speculative= false;
 
   strmake_buf(log_name, ir->name);
   if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
@@ -693,11 +696,11 @@ handle_rpl_parallel_thread(void *arg)
           in parallel with.
         */
         mysql_mutex_lock(&entry->LOCK_parallel_entry);
-        if (!gco->installed)
+        if (!(gco->flags & group_commit_orderer::INSTALLED))
         {
           if (gco->prev_gco)
             gco->prev_gco->next_gco= gco;
-          gco->installed= true;
+          gco->flags|= group_commit_orderer::INSTALLED;
         }
         wait_count= gco->wait_count;
         if (wait_count > entry->count_committing_event_groups)
@@ -797,6 +800,18 @@ handle_rpl_parallel_thread(void *arg)
             /* We have to apply the event. */
           }
         }
+        /*
+          If we are optimistically running transactions in parallel, but this
+          particular event group should not run in parallel with what came
+          before, then wait now for the prior transaction to complete its
+          commit.
+        */
+        if (rgi->speculation == rpl_group_info::SPECULATE_WAIT &&
+            (err= thd->wait_for_prior_commit()))
+        {
+          slave_output_error_info(rgi, thd);
+          signal_error_to_sql_driver_thread(thd, rgi, 1);
+        }
       }
 
       group_ending= is_group_ending(qev->ev, event_type);
@@ -1350,7 +1365,7 @@ rpl_parallel_thread::get_gco(uint64 wait
   gco->wait_count= wait_count;
   gco->prev_gco= prev;
   gco->next_gco= NULL;
-  gco->installed= false;
+  gco->flags= 0;
   return gco;
 }
 
@@ -1999,7 +2014,12 @@ rpl_parallel::do_event(rpl_group_info *s
   if (typ == GTID_EVENT)
   {
     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
-    bool do_parallel;
+    bool new_gco;
+    ulonglong mode= opt_slave_parallel_mode;
+    uchar gtid_flags= gtid_ev->flags2;
+    group_commit_orderer *gco;
+    uint8 force_switch_flag;
+    enum rpl_group_info::enum_speculation speculation;
 
     if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
     {
@@ -2026,38 +2046,75 @@ rpl_parallel::do_event(rpl_group_info *s
     rgi->wait_commit_sub_id= e->current_sub_id;
     rgi->wait_commit_group_info= e->current_group_info;
 
-    if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
-        e->last_commit_id == gtid_ev->commit_id)
-    {
-      /* A new batch of transactions that group-committed together on the master. */
-      do_parallel= true;
-    }
-    else
-    {
-      /*
-        ToDo: Test for flags, avoid speculative parallelisation of DDL or
-        MyISAM, or stuff that waited on the master, or user-marked risky
-        transactions, or anything if speculation is disabled.
-
-        ToDo: Also, we need to start a new batch not just at DDL/MyISAM, but
-        also at the end of the commit_id group of that DDL/MyISAM, so we do
-        not try to eg. run a bunch of inserts in parallel with a long-running
-        ALTER TABLE that they depend on.
-      */
+    speculation= rpl_group_info::SPECULATE_NO;
+    new_gco= true;
+    force_switch_flag= 0;
+    gco= e->current_gco;
+    if (likely(gco))
+    {
+      uint8 flags= gco->flags;
+
+      if (!(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
+          e->last_commit_id != gtid_ev->commit_id)
+        flags|= group_commit_orderer::MULTI_BATCH;
+      /* Make sure we do not attempt to run DDL in parallel speculatively. */
+      if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL)/* ToDo: use a new flag "FL_DDL" here. */)
+        flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH);
 
-      /*
-        We do not know for sure that this is safe to run in parallel with what
-        came before. But we can try anyway; there should be a good chance that
-        it will work. If it does _not_ work, we will catch the problem as a
-        deadlock or other error, and we can rollback and retry the
-        transaction.
-      */
-      rgi->speculative= true;
-      do_parallel= true;
+      if (!(flags & group_commit_orderer::MULTI_BATCH))
+      {
+        /*
+          Still the same batch of event groups that group-committed together
+          on the master, so we can run in parallel.
+        */
+        new_gco= false;
+      }
+      else if ((mode & SLAVE_PARALLEL_TRX) &&
+               !(flags & group_commit_orderer::FORCE_SWITCH))
+      {
+        /*
+          In transactional parallel mode, we optimistically attempt to run
+          non-DDL in parallel. In case of conflicts, we catch the conflict as
+          a deadlock or other error, roll back and retry serially.
+
+          The assumption is that only a few event groups will be
+          non-transactional or otherwise unsuitable for parallel apply. Those
+          transactions are still scheduled in parallel, but we set a flag that
+          will make the worker thread wait for everything before to complete
+          before starting.
+        */
+        new_gco= false;
+        if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
+            !(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
+            ((gtid_flags & Gtid_log_event::FL_WAITED) &&
+             !(mode & SLAVE_PARALLEL_WAITING)))
+        {
+          /*
+            This transaction should not be speculatively run in parallel with
+            what came before, either because it cannot safely be rolled back in
+            case of a conflict, or because it was marked as likely to conflict
+            and require expensive rollback and retry.
+
+            Here we mark it as such, and then the worker thread will do a
+            wait_for_prior_commit() before starting it. We do not introduce a
+            new group_commit_orderer, since we still want following transactions
+            to run in parallel with transactions prior to this one.
+          */
+          speculation= rpl_group_info::SPECULATE_WAIT;
+        }
+        else
+          speculation= rpl_group_info::SPECULATE_OPTIMISTIC;
+      }
+      gco->flags= flags;
     }
-    if (do_parallel && e->current_gco)
-      rgi->gco= e->current_gco;
+    rgi->speculation= speculation;
+
+    if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
+      e->last_commit_id= gtid_ev->commit_id;
     else
+      e->last_commit_id= 0;
+
+    if (new_gco)
     {
       /*
         Do not run this event group in parallel with what came before; instead
@@ -2068,9 +2125,8 @@ rpl_parallel::do_event(rpl_group_info *s
         groups that run in parallel, and allocate a new gco.
       */
       uint64 count= e->count_queued_event_groups;
-      group_commit_orderer *gco;
 
-      if (!(gco= cur_thread->get_gco(count, e->current_gco)))
+      if (!(gco= cur_thread->get_gco(count, gco)))
       {
         cur_thread->free_rgi(rgi);
         cur_thread->free_qev(qev);
@@ -2079,12 +2135,11 @@ rpl_parallel::do_event(rpl_group_info *s
         delete ev;
         return 1;
       }
-      e->current_gco= rgi->gco= gco;
+      gco->flags|= force_switch_flag;
+      e->current_gco= gco;
     }
-    if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
-      e->last_commit_id= gtid_ev->commit_id;
-    else
-      e->last_commit_id= 0;
+    rgi->gco= gco;
+
     qev->rgi= e->current_group_info= rgi;
     e->current_sub_id= rgi->gtid_sub_id;
     ++e->count_queued_event_groups;

=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h	2014-09-16 08:07:32 +0000
+++ b/sql/rpl_parallel.h	2014-09-19 13:25:37 +0000
@@ -56,7 +56,29 @@ struct group_commit_orderer {
   uint64 wait_count;
   group_commit_orderer *prev_gco;
   group_commit_orderer *next_gco;
-  bool installed;
+  /*
+    This flag is set when this GCO has been installed into the next_gco pointer
+    of the previous GCO.
+  */
+  static const uint8 INSTALLED = 0x01;
+  /*
+    This flag is set for a GCO in which we have event groups with multiple
+    different commit_id values from the master. This happens when we
+    optimistically try to execute in parallel transactions not known to be
+    conflict-free.
+
+    When this flag is set, in case of DDL we need to start a new GCO regardless
+    of current commit_id, as DDL is not safe to speculatively apply in parallel
+    with prior event groups.
+  */
+  static const uint8 MULTI_BATCH = 0x02;
+  /*
+    This flag is set for a GCO that contains DDL. If set, it forces a switch to
+    a new GCO upon seeing a new commit_id, as DDL is not safe to speculatively
+    replicate in parallel with subsequent transactions.
+  */
+  static const uint8 FORCE_SWITCH = 0x04;
+  uint8 flags;
 };
 
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-09-10 07:46:29 +0000
+++ b/sql/rpl_rli.cc	2014-09-19 13:25:37 +0000
@@ -1601,7 +1601,7 @@ rpl_group_info::reinit(Relay_log_info *r
   long_find_row_note_printed= false;
   did_mark_start_commit= false;
   gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
-  speculative= false;
+  speculation= SPECULATE_NO;
   commit_orderer.reinit();
 }
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-09-10 07:46:29 +0000
+++ b/sql/rpl_rli.h	2014-09-19 13:25:37 +0000
@@ -652,17 +652,37 @@ struct rpl_group_info
   inuse_relaylog *relay_log;
   uint64 retry_start_offset;
   uint64 retry_event_count;
-  bool killed_for_retry;
   /*
-    If `speculative' is true, then we are optimistically running this
-    transaction in parallel, even though it might not be safe (there may be a
-    conflict with a prior event group).
+    If `speculation' is != SPECULATE_NO, then we are optimistically running
+    this transaction in parallel, even though it might not be safe (there may
+    be a conflict with a prior event group).
 
     In this case, a conflict can cause other errors than deadlocks (like
     duplicate key for example). So in case of _any_ error, we need to roll
     back and retry the event group.
   */
-  bool speculative;
+  enum enum_speculation {
+    /*
+      This transaction was group-committed together on the master with the
+      other transactions with which it is replicated in parallel.
+    */
+    SPECULATE_NO,
+    /*
+      We will optimistically try to run this transaction in parallel with
+      other transactions, even though it is not known to be conflict free.
+      If we get a conflict, we will detect it as a deadlock, roll back and
+      retry.
+    */
+    SPECULATE_OPTIMISTIC,
+    /*
+      This transaction got a conflict during speculative parallel apply, or
+      it was marked on the master as likely to cause a conflict or unsafe to
+      speculate. So it will wait for the prior transaction to commit before
+      starting to replicate.
+    */
+    SPECULATE_WAIT
+  } speculation;
+  bool killed_for_retry;
 
   rpl_group_info(Relay_log_info *rli_);
   ~rpl_group_info();

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2014-09-17 12:37:39 +0000
+++ b/sql/sys_vars.cc	2014-09-19 13:25:37 +0000
@@ -1851,8 +1851,7 @@ static Sys_var_set Sys_slave_parallel_mo
        GLOBAL_VAR(opt_slave_parallel_mode), CMD_LINE(REQUIRED_ARG),
        slave_parallel_mode_names,
        DEFAULT(SLAVE_PARALLEL_DOMAIN |
-               SLAVE_PARALLEL_GROUPCOMMIT |
-               SLAVE_PARALLEL_TRX));
+               SLAVE_PARALLEL_GROUPCOMMIT));
 
 
 static Sys_var_bit Sys_replicate_allow_parallel(



More information about the commits mailing list