[Commits] Rev 4497: MDEV-6676: Speculative parallel replication: Intermediate commit. in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Nov 13 16:09:27 EET 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4497
revision-id: knielsen at knielsen-hq.org-20140917123739-gqztmf045l6ipp4s
parent: knielsen at knielsen-hq.org-20140916080907-p8tut0vvx0vg2kcl
committer: Kristian Nielsen <knielsen at knielsen-hq.org>
branch nick: work-10.0-mdev6676
timestamp: Wed 2014-09-17 14:37:39 +0200
message:
  MDEV-6676: Speculative parallel replication: Intermediate commit.
  
  Set flags in GTID event in master binlog to mark transactional event groups,
  transactions that had to wait on the master, and transactions marked by the
  user with @@replicate_allow_parallel.
  
  This patch needs cleanup later I think, as the setting and clearing of
  DID_WAIT in m_unsafe_rollback_flags is spread all over the code and easy to
  miss a case or two.
  
  I think there is some code in MySQL 5.6 that was incompletely merged. Probably
  fixing this merge could help, and it could also save a bit of space in THD by
  then moving modified_non_trans_table into a bit in m_unsafe_rollback_flags, as
  I think was intended.
=== modified file 'sql/handler.h'
--- a/sql/handler.h	2014-05-13 09:53:30 +0000
+++ b/sql/handler.h	2014-09-17 12:37:39 +0000
@@ -1427,7 +1427,11 @@ struct THD_TRANS
   */
   bool modified_non_trans_table;
 
-  void reset() { no_2pc= FALSE; modified_non_trans_table= FALSE; }
+  void reset() {
+    no_2pc= FALSE;
+    modified_non_trans_table= FALSE;
+    m_unsafe_rollback_flags= 0;
+  }
   bool is_empty() const { return ha_list == NULL; }
   THD_TRANS() {}                        /* Remove gcc warning */
 
@@ -1439,12 +1443,17 @@ struct THD_TRANS
   static unsigned int const MODIFIED_NON_TRANS_TABLE= 0x01;
   static unsigned int const CREATED_TEMP_TABLE= 0x02;
   static unsigned int const DROPPED_TEMP_TABLE= 0x04;
+  static unsigned int const DID_WAIT= 0x08;
 
   void mark_created_temp_table()
   {
     DBUG_PRINT("debug", ("mark_created_temp_table"));
     m_unsafe_rollback_flags|= CREATED_TEMP_TABLE;
   }
+  void mark_trans_did_wait() { m_unsafe_rollback_flags|= DID_WAIT; }
+  bool trans_did_wait() const {
+    return (m_unsafe_rollback_flags & DID_WAIT) != 0;
+  }
 
 };
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2014-09-10 07:46:29 +0000
+++ b/sql/log_event.cc	2014-09-17 12:37:39 +0000
@@ -6412,6 +6412,14 @@ Gtid_log_event::Gtid_log_event(THD *thd_
     flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0))
 {
   cache_type= Log_event::EVENT_NO_CACHE;
+  if (thd_arg->transaction.stmt.trans_did_wait() ||
+      thd_arg->transaction.all.trans_did_wait())
+    flags2|= FL_WAITED;
+  if (is_transactional &&
+      !(sql_command_flags[thd->lex->sql_command] & CF_DISALLOW_IN_RO_TRANS))
+    flags2|= FL_TRANSACTIONAL;
+  if (thd_arg->variables.option_bits & OPTION_RPL_ALLOW_PARALLEL)
+    flags2|= FL_ALLOW_PARALLEL;
 }
 
 
@@ -6620,14 +6628,28 @@ Gtid_log_event::print(FILE *file, PRINT_
   {
     print_header(&cache, print_event_info, FALSE);
     longlong10_to_str(seq_no, buf, 10);
+    my_b_printf(&cache, "\tGTID %u-%u-%s", domain_id, server_id, buf);
     if (flags2 & FL_GROUP_COMMIT_ID)
     {
       longlong10_to_str(commit_id, buf2, 10);
-      my_b_printf(&cache, "\tGTID %u-%u-%s cid=%s\n",
+      my_b_printf(&cache, " cid=%s\n",
                   domain_id, server_id, buf, buf2);
     }
-    else
-      my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf);
+    if (flags2 & FL_TRANSACTIONAL)
+      my_b_write_string(&cache, " trans");
+    if (flags2 & FL_WAITED)
+      my_b_write_string(&cache, " waited");
+    my_b_printf(&cache, "\n");
+
+    if (!print_event_info->allow_parallel_printed ||
+        print_event_info->allow_parallel != !!(flags2 & FL_ALLOW_PARALLEL))
+    {
+      my_b_printf(&cache,
+                  "/*!100101 SET @@session.replicate_allow_parallel=%u*/%s\n",
+                  !!(flags2 & FL_ALLOW_PARALLEL), print_event_info->delimiter);
+      print_event_info->allow_parallel= !!(flags2 & FL_ALLOW_PARALLEL);
+      print_event_info->allow_parallel_printed= true;
+    }
 
     if (!print_event_info->domain_id_printed ||
         print_event_info->domain_id != domain_id)
@@ -9617,6 +9639,7 @@ int Rows_log_event::do_apply_event(rpl_g
       by mysql_reset_thd_for_next_command.
     */
     thd->transaction.stmt.modified_non_trans_table= FALSE;
+    thd->transaction.stmt.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
     /*
       This is a row injection, so we flag the "statement" as
       such. Note that this code is called both when the slave does row
@@ -10063,7 +10086,10 @@ static int rows_event_stmt_cleanup(rpl_g
       rows_log_event::do_apply_event()
     */
     if (!thd->in_multi_stmt_transaction_mode())
+    {
       thd->transaction.all.modified_non_trans_table= 0;
+      thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+    }
 
     rgi->cleanup_context(thd, 0);
   }
@@ -12637,7 +12663,7 @@ st_print_event_info::st_print_event_info
    charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
    thread_id(0), thread_id_printed(false), server_id(0),
    server_id_printed(false), domain_id(0), domain_id_printed(false),
-   skip_replication(0),
+   allow_parallel(true), allow_parallel_printed(false), skip_replication(0),
    base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
 {
   /*

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2014-06-25 11:08:30 +0000
+++ b/sql/log_event.h	2014-09-17 12:37:39 +0000
@@ -804,6 +804,8 @@ typedef struct st_print_event_info
   bool server_id_printed;
   uint32 domain_id;
   bool domain_id_printed;
+  bool allow_parallel;
+  bool allow_parallel_printed;
 
   /*
     Track when @@skip_replication changes so we need to output a SET
@@ -3127,6 +3129,12 @@ class Binlog_checkpoint_log_event: publi
     <td>1 byte bitfield</td>
     <td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td>
     <td>Bit 1 set indicates group commit, and that commit id exists</td>
+    <td>Bit 2 set indicates a transactional event group (can be safely rolled
+        back).</td>
+    <td>Bit 3 set indicates that user allowed optimistic parallel apply (the
+        @@SESSION.replicate_allow_parallel value was true at commit).</td>
+    <td>Bit 4 set indicates that this transaction encountered a row (or other)
+        lock wait during execution.</td>
   </tr>
 
   <tr>
@@ -3159,6 +3167,21 @@ class Gtid_log_event: public Log_event
     master. Groups with same commit_id are part of the same group commit.
   */
   static const uchar FL_GROUP_COMMIT_ID= 2;
+  /*
+    FL_TRANSACTIONAL is set for an event group that can be safely rolled back
+    (no DDL or MyISAM, eg.).
+  */
+  static const uchar FL_TRANSACTIONAL= 4;
+  /*
+    FL_ALLOW_PARALLEL reflects the value of @@SESSION.replicate_allow_parallel
+    at the time of commit.
+  */
+  static const uchar FL_ALLOW_PARALLEL= 8;
+  /*
+    FL_WAITED is set if a row lock wait (or other wait) is detected during the
+    execution of the transaction.
+  */
+  static const uchar FL_WAITED= 16;
 
 #ifdef MYSQL_SERVER
   Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2014-11-13 10:01:31 +0000
+++ b/sql/sql_class.cc	2014-09-17 12:37:39 +0000
@@ -1373,6 +1373,9 @@ void THD::init(void)
 
   transaction.all.modified_non_trans_table=
     transaction.stmt.modified_non_trans_table= FALSE;
+  transaction.all.m_unsafe_rollback_flags=
+    transaction.stmt.m_unsafe_rollback_flags= 0;
+
   open_options=ha_open_options;
   update_lock_default= (variables.low_priority_updates ?
                         TL_WRITE_LOW_PRIORITY :
@@ -4233,6 +4236,8 @@ thd_need_wait_for(const MYSQL_THD thd)
 {
   rpl_group_info *rgi;
 
+  if (mysql_bin_log.is_open())
+    return true;
   if (!thd)
     return false;
   rgi= thd->rgi_slave;
@@ -4267,12 +4272,15 @@ thd_need_wait_for(const MYSQL_THD thd)
   not harmful, but could lead to unnecessary kill and retry, so best avoided).
 */
 extern "C" void
-thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
+thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd)
 {
   rpl_group_info *rgi;
   rpl_group_info *other_rgi;
 
-  if (!thd || !other_thd)
+  if (!thd)
+    return;
+  thd->transaction.stmt.mark_trans_did_wait();
+  if (!other_thd)
     return;
   rgi= thd->rgi_slave;
   other_rgi= other_thd->rgi_slave;

=== modified file 'sql/sql_delete.cc'
--- a/sql/sql_delete.cc	2014-09-30 17:31:14 +0000
+++ b/sql/sql_delete.cc	2014-09-17 12:37:39 +0000
@@ -1064,6 +1064,8 @@ void multi_delete::abort_result_set()
 
   if (thd->transaction.stmt.modified_non_trans_table)
     thd->transaction.all.modified_non_trans_table= TRUE;
+  thd->transaction.all.m_unsafe_rollback_flags|=
+    (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
 
   /*
     If rows from the first table only has been deleted and it is
@@ -1255,6 +1257,8 @@ bool multi_delete::send_eof()
 
   if (thd->transaction.stmt.modified_non_trans_table)
     thd->transaction.all.modified_non_trans_table= TRUE;
+  thd->transaction.all.m_unsafe_rollback_flags|=
+    (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
 
   /*
     We must invalidate the query cache before binlog writing and

=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc	2014-09-30 17:31:14 +0000
+++ b/sql/sql_insert.cc	2014-09-17 12:37:39 +0000
@@ -1009,6 +1009,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *t
 
     if (thd->transaction.stmt.modified_non_trans_table)
       thd->transaction.all.modified_non_trans_table= TRUE;
+    thd->transaction.all.m_unsafe_rollback_flags|=
+      (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
 
     if (error <= 0 ||
         thd->transaction.stmt.modified_non_trans_table ||
@@ -3666,6 +3668,8 @@ bool select_insert::send_eof()
 
   if (thd->transaction.stmt.modified_non_trans_table)
     thd->transaction.all.modified_non_trans_table= TRUE;
+  thd->transaction.all.m_unsafe_rollback_flags|=
+    (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
 
   DBUG_ASSERT(trans_table || !changed || 
               thd->transaction.stmt.modified_non_trans_table);

=== modified file 'sql/sql_load.cc'
--- a/sql/sql_load.cc	2014-09-30 17:31:14 +0000
+++ b/sql/sql_load.cc	2014-09-17 12:37:39 +0000
@@ -607,6 +607,8 @@ int mysql_load(THD *thd,sql_exchange *ex
 
   if (thd->transaction.stmt.modified_non_trans_table)
     thd->transaction.all.modified_non_trans_table= TRUE;
+  thd->transaction.all.m_unsafe_rollback_flags|=
+    (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
 #ifndef EMBEDDED_LIBRARY
   if (mysql_bin_log.is_open())
   {

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2014-11-03 16:47:37 +0000
+++ b/sql/sql_parse.cc	2014-09-17 12:37:39 +0000
@@ -6125,6 +6125,7 @@ void THD::reset_for_next_command()
   {
     thd->variables.option_bits&= ~OPTION_KEEP_LOG;
     thd->transaction.all.modified_non_trans_table= FALSE;
+    thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
   }
   DBUG_ASSERT(thd->security_ctx== &thd->main_security_ctx);
   thd->thread_specific_used= FALSE;

=== modified file 'sql/sql_update.cc'
--- a/sql/sql_update.cc	2014-09-30 17:31:14 +0000
+++ b/sql/sql_update.cc	2014-09-17 12:37:39 +0000
@@ -958,6 +958,8 @@ int mysql_update(THD *thd,
   
   if (thd->transaction.stmt.modified_non_trans_table)
       thd->transaction.all.modified_non_trans_table= TRUE;
+  thd->transaction.all.m_unsafe_rollback_flags|=
+    (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
 
   /*
     error < 0 means really no error at all: we processed all rows until the
@@ -2223,6 +2225,8 @@ void multi_update::abort_result_set()
     }
     thd->transaction.all.modified_non_trans_table= TRUE;
   }
+  thd->transaction.all.m_unsafe_rollback_flags|=
+    (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
   DBUG_ASSERT(trans_safe || !updated || thd->transaction.stmt.modified_non_trans_table);
 }
 
@@ -2474,6 +2478,8 @@ bool multi_update::send_eof()
 
   if (thd->transaction.stmt.modified_non_trans_table)
     thd->transaction.all.modified_non_trans_table= TRUE;
+  thd->transaction.all.m_unsafe_rollback_flags|=
+    (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
 
   if (local_error == 0 || thd->transaction.stmt.modified_non_trans_table)
   {

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2014-09-16 08:09:07 +0000
+++ b/sql/sys_vars.cc	2014-09-17 12:37:39 +0000
@@ -3411,6 +3411,7 @@ static bool fix_autocommit(sys_var *self
                  ~(OPTION_BEGIN | OPTION_KEEP_LOG | OPTION_NOT_AUTOCOMMIT |
                    OPTION_GTID_BEGIN);
     thd->transaction.all.modified_non_trans_table= false;
+    thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
     thd->server_status|= SERVER_STATUS_AUTOCOMMIT;
     return false;
   }
@@ -3420,6 +3421,7 @@ static bool fix_autocommit(sys_var *self
   {
     // disabling autocommit
     thd->transaction.all.modified_non_trans_table= false;
+    thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
     thd->server_status&= ~SERVER_STATUS_AUTOCOMMIT;
     thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
     return false;

=== modified file 'sql/transaction.cc'
--- a/sql/transaction.cc	2014-09-30 17:31:14 +0000
+++ b/sql/transaction.cc	2014-09-17 12:37:39 +0000
@@ -149,6 +149,7 @@ bool trans_begin(THD *thd, uint flags)
     when we come here.  We should at some point change this to an assert.
   */
   thd->transaction.all.modified_non_trans_table= FALSE;
+  thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
 
   if (res)
     DBUG_RETURN(TRUE);
@@ -227,6 +228,7 @@ bool trans_commit(THD *thd)
     (void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
   thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
   thd->transaction.all.modified_non_trans_table= FALSE;
+  thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
   thd->lex->start_transaction_opt= 0;
 
   DBUG_RETURN(MY_TEST(res));
@@ -270,6 +272,7 @@ bool trans_commit_implicit(THD *thd)
 
   thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
   thd->transaction.all.modified_non_trans_table= FALSE;
+  thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
 
   /*
     Upon implicit commit, reset the current transaction
@@ -310,6 +313,7 @@ bool trans_rollback(THD *thd)
   /* Reset the binlog transaction marker */
   thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
   thd->transaction.all.modified_non_trans_table= FALSE;
+  thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
   thd->lex->start_transaction_opt= 0;
 
   DBUG_RETURN(MY_TEST(res));
@@ -354,6 +358,7 @@ bool trans_rollback_implicit(THD *thd)
   */
   thd->variables.option_bits&= ~(OPTION_KEEP_LOG);
   thd->transaction.all.modified_non_trans_table= false;
+  thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
 
   /* Rollback should clear transaction_rollback_request flag. */
   DBUG_ASSERT(! thd->transaction_rollback_request);
@@ -854,6 +859,7 @@ bool trans_xa_commit(THD *thd)
 
   thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
   thd->transaction.all.modified_non_trans_table= FALSE;
+  thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
   thd->server_status&=
     ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
   DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -903,6 +909,7 @@ bool trans_xa_rollback(THD *thd)
 
   thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
   thd->transaction.all.modified_non_trans_table= FALSE;
+  thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
   thd->server_status&=
     ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
   DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));

=== modified file 'storage/innobase/lock/lock0lock.cc'
--- a/storage/innobase/lock/lock0lock.cc	2014-07-10 12:24:53 +0000
+++ b/storage/innobase/lock/lock0lock.cc	2014-09-17 12:37:39 +0000
@@ -374,7 +374,7 @@ struct lock_stack_t {
         ulint           heap_no;                /*!< heap number if rec lock */
 };
 
-extern "C" void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
+extern "C" void thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd);
 extern "C" int thd_need_wait_for(const MYSQL_THD thd);
 extern "C"
 int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);

=== modified file 'storage/xtradb/lock/lock0lock.cc'
--- a/storage/xtradb/lock/lock0lock.cc	2014-07-10 12:24:53 +0000
+++ b/storage/xtradb/lock/lock0lock.cc	2014-09-17 12:37:39 +0000
@@ -374,7 +374,7 @@ struct lock_stack_t {
         ulint           heap_no;                /*!< heap number if rec lock */
 };
 
-extern "C" void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
+extern "C" void thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd);
 extern "C" int thd_need_wait_for(const MYSQL_THD thd);
 extern "C"
 int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);



More information about the commits mailing list