[Commits] Rev 4163: MDEV-6156: Parallel replication incorrectly caches charset between worker threads in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Fri Apr 25 13:58:57 EEST 2014


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4163
revision-id: knielsen at knielsen-hq.org-20140425105831-ax3r2smz9j8b8oc1
parent: bar at mnogosearch.org-20140424125901-nz7nt429ltgr4cms
committer: knielsen at knielsen-hq.org
branch nick: tmp-10.0
timestamp: Fri 2014-04-25 12:58:31 +0200
message:
  MDEV-6156: Parallel replication incorrectly caches charset between worker threads
  
  The previous patch for this bug was unfortunately completely wrong.
  
  The purpose of cached_charset is to remember which character set we
  have installed currently in the THD, so that in the common case where
  charset does not change between queries, we do not need to update it
  in the THD. Thus, it is important that the cached_charset field is
  tightly coupled to the THD for which it handles caching.
  
  Thus the right place to put cached_charset seems to be in the THD.
  This patch introduces a field THD:system_thread_info where such info
  in general can be placed without further inflating the THD with unused
  data for other threads (THD is already far too big as it is). It then
  moves the cached_charset into this slot for the SQL driver thread and
  for the parallel replication worker threads.
  
  The THD::rpl_filter field is also moved inside system_thread_info, to
  keep the size of THD unchanged. Moving further fields in to reduce the
  size of THD is a separate task, filed as MDEV-6164.
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2014-04-23 14:06:06 +0000
+++ b/sql/log_event.cc	2014-04-25 10:58:31 +0000
@@ -4153,7 +4153,8 @@ int Query_log_event::do_apply_event(rpl_
                    (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE));
       if (charset_inited)
       {
-        if (rgi->cached_charset_compare(charset))
+        rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info;
+        if (sql_info->cached_charset_compare(charset))
         {
           /* Verify that we support the charsets found in the event. */
           if (!(thd->variables.character_set_client=

=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h	2013-11-20 11:05:39 +0000
+++ b/sql/rpl_mi.h	2014-04-25 10:58:31 +0000
@@ -216,6 +216,16 @@ class Master_info_index
   bool stop_all_slaves(THD *thd);
 };
 
+
+/*
+  The class rpl_io_thread_info is the THD::system_thread_info for the IO thread.
+*/
+class rpl_io_thread_info
+{
+public:
+};
+
+
 bool check_master_connection_name(LEX_STRING *name);
 void create_logfile_name_with_suffix(char *res_file_name, size_t length,
                              const char *info_file, 

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc	2014-04-09 12:42:46 +0000
+++ b/sql/rpl_parallel.cc	2014-04-25 10:58:31 +0000
@@ -33,7 +33,7 @@ rpt_handle_event(rpl_parallel_thread::qu
   THD *thd= rgi->thd;
 
   thd->rgi_slave= rgi;
-  thd->rpl_filter = rli->mi->rpl_filter;
+  thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
 
   /* ToDo: Access to thd, and what about rli, split out a parallel part? */
   mysql_mutex_lock(&rli->data_lock);
@@ -212,6 +212,7 @@ handle_rpl_parallel_thread(void *arg)
   rpl_parallel_thread::queued_event *qevs_to_free;
   rpl_group_info *rgis_to_free;
   group_commit_orderer *gcos_to_free;
+  rpl_sql_thread_info sql_info(NULL);
   size_t total_event_size;
   int err;
 
@@ -242,6 +243,7 @@ handle_rpl_parallel_thread(void *arg)
   thd_proc_info(thd, "Waiting for work from main SQL threads");
   thd->set_time();
   thd->variables.lock_wait_timeout= LONG_TIMEOUT;
+  thd->system_thread_info.rpl_sql_info= &sql_info;
   /*
     For now, we need to run the replication parallel worker threads in
     READ COMMITTED. This is needed because gap locks are not symmetric.

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2014-04-23 14:06:06 +0000
+++ b/sql/rpl_rli.cc	2014-04-25 10:58:31 +0000
@@ -1479,7 +1479,6 @@ rpl_group_info::rpl_group_info(Relay_log
     deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
 {
   reinit(rli);
-  cached_charset_invalidate();
   bzero(&current_gtid, sizeof(current_gtid));
   mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
                    MY_MUTEX_INIT_FAST);
@@ -1562,29 +1561,6 @@ delete_or_keep_event_post_apply(rpl_grou
 }
 
 
-void rpl_group_info::cached_charset_invalidate()
-{
-  DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
-
-  /* Full of zeroes means uninitialized. */
-  bzero(cached_charset, sizeof(cached_charset));
-  DBUG_VOID_RETURN;
-}
-
-
-bool rpl_group_info::cached_charset_compare(char *charset) const
-{
-  DBUG_ENTER("rpl_group_info::cached_charset_compare");
-
-  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
-  {
-    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
-    DBUG_RETURN(1);
-  }
-  DBUG_RETURN(0);
-}
-
-
 void rpl_group_info::cleanup_context(THD *thd, bool error)
 {
   DBUG_ENTER("Relay_log_info::cleanup_context");
@@ -1769,4 +1745,33 @@ rpl_group_info::mark_start_commit()
 }
 
 
+rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
+  : rpl_filter(filter)
+{
+  cached_charset_invalidate();
+}
+
+
+void rpl_sql_thread_info::cached_charset_invalidate()
+{
+  DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
+
+  /* Full of zeroes means uninitialized. */
+  bzero(cached_charset, sizeof(cached_charset));
+  DBUG_VOID_RETURN;
+}
+
+
+bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
+{
+  DBUG_ENTER("rpl_group_info::cached_charset_compare");
+
+  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
+  {
+    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
+    DBUG_RETURN(1);
+  }
+  DBUG_RETURN(0);
+}
+
 #endif

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2014-04-23 14:06:06 +0000
+++ b/sql/rpl_rli.h	2014-04-25 10:58:31 +0000
@@ -26,6 +26,7 @@
 
 struct RPL_TABLE_LIST;
 class Master_info;
+class Rpl_filter;
 
 /****************************************************************************
 
@@ -536,8 +537,6 @@ struct rpl_group_info
   mysql_mutex_t sleep_lock;
   mysql_cond_t sleep_cond;
 
-  char cached_charset[6];
-
   /*
     trans_retries varies between 0 to slave_transaction_retries and counts how
     many times the slave has retried the present transaction; gets reset to 0
@@ -671,15 +670,6 @@ struct rpl_group_info
     return false;
   }
 
-  /*
-    Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
-    the thread save 3 get_charset() per Query_log_event if the charset is not
-    changing from event to event (common situation).
-    When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
-  */
-  void cached_charset_invalidate();
-  bool cached_charset_compare(char *charset) const;
-
   void clear_tables_to_lock();
   void cleanup_context(THD *, bool);
   void slave_close_thread_tables(THD *);
@@ -727,6 +717,30 @@ struct rpl_group_info
 };
 
 
+/*
+  The class rpl_sql_thread_info is the THD::system_thread_info for an SQL
+  thread; this is either the driver SQL thread or a worker thread for parallel
+  replication.
+*/
+class rpl_sql_thread_info
+{
+public:
+  char cached_charset[6];
+  Rpl_filter* rpl_filter;
+
+  rpl_sql_thread_info(Rpl_filter *filter);
+
+  /*
+    Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
+    the thread save 3 get_charset() per Query_log_event if the charset is not
+    changing from event to event (common situation).
+    When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
+  */
+  void cached_charset_invalidate();
+  bool cached_charset_compare(char *charset) const;
+};
+
+
 // Defined in rpl_rli.cc
 int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
 

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2014-04-23 14:06:06 +0000
+++ b/sql/slave.cc	2014-04-25 10:58:31 +0000
@@ -2891,7 +2891,7 @@ void set_slave_thread_default_charset(TH
     global_system_variables.collation_server;
   thd->update_charset();
 
-  rgi->cached_charset_invalidate();
+  thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
   DBUG_VOID_RETURN;
 }
 
@@ -3768,6 +3768,7 @@ pthread_handler_t handle_slave_io(void *
   uint retry_count;
   bool suppress_warnings;
   int ret;
+  rpl_io_thread_info io_info;
 #ifndef DBUG_OFF
   uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 #endif
@@ -3801,6 +3802,7 @@ pthread_handler_t handle_slave_io(void *
     sql_print_error("Failed during slave I/O thread initialization");
     goto err_during_init;
   }
+  thd->system_thread_info.rpl_io_info= &io_info;
   mysql_mutex_lock(&LOCK_thread_count);
   threads.append(thd);
   mysql_mutex_unlock(&LOCK_thread_count);
@@ -4367,6 +4369,7 @@ pthread_handler_t handle_slave_sql(void
   Relay_log_info* rli = &mi->rli;
   const char *errmsg;
   rpl_group_info *serial_rgi;
+  rpl_sql_thread_info sql_info(mi->rpl_filter);
 
   // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
   my_thread_init();
@@ -4378,7 +4381,7 @@ pthread_handler_t handle_slave_sql(void
   serial_rgi= new rpl_group_info(rli);
   thd = new THD; // note that contructor of THD uses DBUG_ !
   thd->thread_stack = (char*)&thd; // remember where our stack is
-  thd->rpl_filter = mi->rpl_filter;
+  thd->system_thread_info.rpl_sql_info= &sql_info;
 
   DBUG_ASSERT(rli->inited);
   DBUG_ASSERT(rli->mi == mi);
@@ -4676,7 +4679,7 @@ log '%s' at position %s, relay log '%s'
   mysql_cond_broadcast(&rli->data_cond);
   rli->ignore_log_space_limit= 0; /* don't need any lock */
   /* we die so won't remember charset - re-update them on next thread start */
-  serial_rgi->cached_charset_invalidate();
+  thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
 
   /*
     TODO: see if we can do this conditionally in next_event() instead

=== modified file 'sql/sql_acl.cc'
--- a/sql/sql_acl.cc	2014-03-29 10:33:25 +0000
+++ b/sql/sql_acl.cc	2014-04-25 10:58:31 +0000
@@ -38,6 +38,7 @@
 #include "records.h"              // READ_RECORD, read_record_info,
                                   // init_read_record, end_read_record
 #include "rpl_filter.h"           // rpl_filter
+#include "rpl_rli.h"
 #include <m_ctype.h>
 #include <stdarg.h>
 #include "sp_head.h"
@@ -2558,7 +2559,7 @@ bool change_password(THD *thd, const cha
 {
   TABLE_LIST tables;
   TABLE *table;
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   /* Buffer should be extended when password length is extended. */
   char buff[512];
   ulong query_length;
@@ -2580,7 +2581,8 @@ bool change_password(THD *thd, const cha
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -5393,7 +5395,7 @@ int mysql_table_grant(THD *thd, TABLE_LI
   TABLE_LIST tables[3];
   bool create_new_users=0;
   char *db_name, *table_name;
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("mysql_table_grant");
 
   if (!initialized)
@@ -5483,7 +5485,8 @@ int mysql_table_grant(THD *thd, TABLE_LI
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -5670,7 +5673,7 @@ bool mysql_routine_grant(THD *thd, TABLE
   TABLE_LIST tables[2];
   bool create_new_users=0, result=0;
   char *db_name, *table_name;
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("mysql_routine_grant");
 
   if (!initialized)
@@ -5705,7 +5708,8 @@ bool mysql_routine_grant(THD *thd, TABLE
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -6141,7 +6145,7 @@ bool mysql_grant(THD *thd, const char *d
   char tmp_db[SAFE_NAME_LEN+1];
   bool create_new_users=0;
   TABLE_LIST tables[2];
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("mysql_grant");
 
   if (!initialized)
@@ -6190,7 +6194,8 @@ bool mysql_grant(THD *thd, const char *d
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -8223,7 +8228,7 @@ void get_mqh(const char *user, const cha
 #define GRANT_TABLES 7
 static int open_grant_tables(THD *thd, TABLE_LIST *tables)
 {
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("open_grant_tables");
 
   if (!initialized)
@@ -8267,7 +8272,8 @@ static int open_grant_tables(THD *thd, T
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2014-04-23 06:57:25 +0000
+++ b/sql/sql_class.h	2014-04-25 10:58:31 +0000
@@ -72,6 +72,8 @@ class Parser_state;
 class Rows_log_event;
 class Sroutine_hash_entry;
 class user_var_entry;
+class rpl_io_thread_info;
+class rpl_sql_thread_info;
 
 enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME };
 enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE };
@@ -1810,8 +1812,10 @@ class THD :public Statement,
   /* Slave applier execution context */
   rpl_group_info* rgi_slave;
 
-  /* Used to SLAVE SQL thread */
-  Rpl_filter* rpl_filter;
+  union {
+    rpl_io_thread_info *rpl_io_info;
+    rpl_sql_thread_info *rpl_sql_info;
+  } system_thread_info;
 
   void reset_for_next_command();
   /*

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2014-03-29 10:33:25 +0000
+++ b/sql/sql_parse.cc	2014-04-25 10:58:31 +0000
@@ -171,8 +171,9 @@ const char *xa_state_names[]={
 */
 inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables)
 {
-  return thd->rpl_filter->is_on() && tables && !thd->spcont &&
-         !thd->rpl_filter->tables_ok(thd->db, tables);
+  Rpl_filter *rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+  return rpl_filter->is_on() && tables && !thd->spcont &&
+         !rpl_filter->tables_ok(thd->db, tables);
 }
 #endif
 
@@ -2233,7 +2234,7 @@ mysql_execute_command(THD *thd)
   /* have table map for update for multi-update statement (BUG#37051) */
   bool have_table_map_for_update= FALSE;
   /* */
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
 #endif
   DBUG_ENTER("mysql_execute_command");
 
@@ -3885,12 +3886,15 @@ mysql_execute_command(THD *thd)
       above was not called. So we have to check rules again here.
     */
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread && 
-        (!rpl_filter->db_ok(lex->name.str) ||
-         !rpl_filter->db_ok_with_wild_table(lex->name.str)))
+    if (thd->slave_thread)
     {
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(lex->name.str) ||
+          !rpl_filter->db_ok_with_wild_table(lex->name.str))
+      {
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0))
@@ -3913,12 +3917,15 @@ mysql_execute_command(THD *thd)
       above was not called. So we have to check rules again here.
     */
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread && 
-        (!rpl_filter->db_ok(lex->name.str) ||
-         !rpl_filter->db_ok_with_wild_table(lex->name.str)))
+    if (thd->slave_thread)
     {
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(lex->name.str) ||
+          !rpl_filter->db_ok_with_wild_table(lex->name.str))
+      {
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0))
@@ -3930,13 +3937,16 @@ mysql_execute_command(THD *thd)
   {
     LEX_STRING *db= & lex->name;
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread && 
-       (!rpl_filter->db_ok(db->str) ||
-        !rpl_filter->db_ok_with_wild_table(db->str)))
+    if (thd->slave_thread)
     {
-      res= 1;
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(db->str) ||
+          !rpl_filter->db_ok_with_wild_table(db->str))
+      {
+        res= 1;
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_db_name(db))
@@ -3973,12 +3983,15 @@ mysql_execute_command(THD *thd)
       above was not called. So we have to check rules again here.
     */
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread &&
-        (!rpl_filter->db_ok(db->str) ||
-         !rpl_filter->db_ok_with_wild_table(db->str)))
+    if (thd->slave_thread)
     {
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(db->str) ||
+          !rpl_filter->db_ok_with_wild_table(db->str))
+      {
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))



More information about the commits mailing list