[Commits] Rev 3442: MDEV-26: Global transaction commit. Intermediate commit. in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Mon Nov 5 16:01:50 EET 2012


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 3442
revision-id: knielsen at knielsen-hq.org-20121105140149-pkwpjslzst8a3ix1
parent: knielsen at knielsen-hq.org-20121023104629-ioukatzhqrhudtxp
committer: knielsen at knielsen-hq.org
branch nick: work-10.0-mdev26
timestamp: Mon 2012-11-05 15:01:49 +0100
message:
  MDEV-26: Global transaction commit. Intermediate commit.
  
  Now slave records GTID in mysql.rpl_slave_state when applying XID log event.
=== modified file 'sql/log.cc'
--- a/sql/log.cc	2012-10-23 10:46:29 +0000
+++ b/sql/log.cc	2012-11-05 14:01:49 +0000
@@ -107,6 +107,8 @@ static SHOW_VAR binlog_status_vars_detai
   {NullS, NullS, SHOW_LONG}
 };
 
+rpl_binlog_state rpl_global_gtid_binlog_state;
+
 
 /**
    purge logs, master and slave sides both, related error code
@@ -5297,7 +5299,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd
 
   /* Update the replication state (last GTID in each replication domain). */
   mysql_mutex_lock(&LOCK_rpl_gtid_state);
-  global_rpl_gtid_state.update(&gtid);
+  rpl_global_gtid_binlog_state.update(&gtid);
   mysql_mutex_unlock(&LOCK_rpl_gtid_state);
   return false;
 }

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2012-10-23 10:46:29 +0000
+++ b/sql/log_event.cc	2012-11-05 14:01:49 +0000
@@ -6044,28 +6044,247 @@ bool Binlog_checkpoint_log_event::write(
         Global transaction ID stuff
 **************************************************************************/
 
-/**
-   Current replication state (hash of last GTID executed, per replication
-   domain).
+rpl_slave_state::rpl_slave_state()
+  : inited(false), loaded(false)
+{
+  my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
+               sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+}
+
+
+rpl_slave_state::~rpl_slave_state()
+{
+}
+
+#ifdef MYSQL_SERVER
+void
+rpl_slave_state::init()
+{
+  DBUG_ASSERT(!inited);
+  mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW);
+  inited= true;
+}
+
+void
+rpl_slave_state::deinit()
+{
+  uint32 i;
+
+  if (!inited)
+    return;
+  for (i= 0; i < hash.records; ++i)
+  {
+    element *e= (element *)my_hash_element(&hash, i);
+    list_element *l= e->list;
+    list_element *next;
+    while (l)
+    {
+      next= l->next;
+      my_free(l);
+      l= next;
+    }
+    /* The element itself is freed by my_hash_free(). */
+  }
+  my_hash_free(&hash);
+  mysql_mutex_destroy(&LOCK_slave_state);
+}
+#endif
+
+
+int
+rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
+                        uint64 seq_no)
+{
+  element *elem= NULL;
+  list_element *list_elem= NULL;
+
+  if (!(elem= get_element(domain_id)))
+    return 1;
+
+  if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
+    return 1;
+  list_elem->server_id= server_id;
+  list_elem->sub_id= sub_id;
+  list_elem->seq_no= seq_no;
+
+  elem->add(list_elem);
+  return 0;
+}
+
+
+struct rpl_slave_state::element *
+rpl_slave_state::get_element(uint32 domain_id)
+{
+  struct element *elem;
+
+  elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
+  if (elem)
+    return elem;
+
+  if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
+    return NULL;
+  elem->list= NULL;
+  elem->last_sub_id= 0;
+  elem->domain_id= domain_id;
+  if (my_hash_insert(&hash, (uchar *)elem))
+  {
+    my_free(elem);
+    return NULL;
+  }
+  return elem;
+}
+
+
+#ifdef MYSQL_SERVER
+#ifdef HAVE_REPLICATION
+/*
+  Write a gtid to the replication slave state table.
+
+  Do it as part of the transaction, to get slave crash safety, or as a separate
+  transaction if !in_transaction (eg. MyISAM or DDL).
+
+    gtid    The global transaction id for this event group.
+    sub_id  Value allocated within the sub_id when the event group was
+            read (sub_id must be consistent with commit order in master binlog).
+
+  Note that caller must later ensure that the new gtid and sub_id is inserted
+  into the appropriate HASH element with rpl_slave_state.add(), so that it can
+  be deleted later. But this must only be done after COMMIT if in transaction.
 */
-rpl_state global_rpl_gtid_state;
+int
+rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+                             bool in_transaction)
+{
+  TABLE_LIST tlist;
+  int err= 0;
+  bool table_opened= false;
+  TABLE *table;
+  list_element *elist= 0, *next;
+  element *elem;
+
+  DBUG_ASSERT(in_transaction /* ToDo: new transaction for DDL etc. */);
+
+  mysql_reset_thd_for_next_command(thd, 0);
+
+  tlist.init_one_table(STRING_WITH_LEN("mysql"),
+                       rpl_gtid_slave_state_table_name.str,
+                       rpl_gtid_slave_state_table_name.length,
+                       NULL, TL_WRITE);
+  if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+    goto end;
+  table_opened= true;
+  table= tlist.table;
+
+  /*
+    ToDo: Check the table definition, error if not as expected.
+    We need the correct first 4 columns with correct type, and the primary key.
+  */
+  bitmap_set_bit(table->write_set, table->field[0]->field_index);
+  bitmap_set_bit(table->write_set, table->field[1]->field_index);
+  bitmap_set_bit(table->write_set, table->field[2]->field_index);
+  bitmap_set_bit(table->write_set, table->field[3]->field_index);
+
+  table->field[0]->store((ulonglong)gtid->domain_id, true);
+  table->field[1]->store(sub_id, true);
+  table->field[2]->store((ulonglong)gtid->server_id, true);
+  table->field[3]->store(gtid->seq_no, true);
+  if ((err= table->file->ha_write_row(table->record[0])))
+      goto end;
+
+  lock();
+  if ((elem= get_element(gtid->domain_id)) == NULL)
+  {
+    unlock();
+    err= 1;
+    goto end;
+  }
+  elist= elem->grab_list();
+  unlock();
+
+  if (!elist)
+    goto end;
+
+  /* Now delete any already committed rows. */
+  DBUG_ASSERT
+    ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
+     table->s->primary_key < MAX_KEY /* ToDo support all storage engines */);
+
+  bitmap_set_bit(table->read_set, table->field[0]->field_index);
+  bitmap_set_bit(table->read_set, table->field[1]->field_index);
+  while (elist)
+  {
+    next= elist->next;
+
+    table->field[1]->store(elist->sub_id, true);
+    /* domain_id is already set in table->record[0] from write_row() above. */
+    if ((err= table->file->ha_rnd_pos_by_record(table->record[0])) ||
+        (err= table->file->ha_delete_row(table->record[0])))
+      goto end;
+    my_free(elist);
+    elist= next;
+  }
+
+end:
+
+  if (table_opened)
+  {
+    if (err)
+    {
+      /*
+        ToDo: If error, we need to put any remaining elist back into the HASH so
+        we can do another delete attempt later.
+      */
+      ha_rollback_trans(thd, FALSE);
+      close_thread_tables(thd);
+      if (in_transaction)
+        ha_rollback_trans(thd, TRUE);
+    }
+    else
+    {
+      ha_commit_trans(thd, FALSE);
+      close_thread_tables(thd);
+      if (in_transaction)
+        ha_commit_trans(thd, TRUE);
+    }
+  }
+  return err;
+}
+
+
+uint64
+rpl_slave_state::next_subid(uint32 domain_id)
+{
+  uint32 sub_id= 0;
+  element *elem;
+
+  lock();
+  elem= get_element(domain_id);
+  if (elem)
+    sub_id= ++elem->last_sub_id;
+  unlock();
 
+  return sub_id;
+}
+#endif
 
-rpl_state::rpl_state()
+
+rpl_binlog_state::rpl_binlog_state()
 {
   my_hash_init(&hash, &my_charset_bin, 32,
-                offsetof(rpl_gtid, domain_id), sizeof(uint32),
-                NULL, my_free, HASH_UNIQUE);
+               offsetof(rpl_gtid, domain_id), 2*sizeof(uint32), NULL, my_free,
+               HASH_UNIQUE);
+  mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
+                   MY_MUTEX_INIT_SLOW);
 }
 
 
-rpl_state::~rpl_state()
+rpl_binlog_state::~rpl_binlog_state()
 {
+  mysql_mutex_destroy(&LOCK_binlog_state);
   my_hash_free(&hash);
 }
 
 
-#ifdef MYSQL_SERVER
 /*
   Update replication state with a new GTID.
 
@@ -6075,7 +6294,7 @@ rpl_state::~rpl_state()
   Returns 0 for ok, 1 for error.
 */
 int
-rpl_state::update(const struct rpl_gtid *gtid)
+rpl_binlog_state::update(const struct rpl_gtid *gtid)
 {
   uchar *rec;
 
@@ -6195,20 +6414,20 @@ Gtid_log_event::pack_info(THD *thd, Prot
   protocol->store(buf, p-buf, &my_charset_bin);
 }
 
-static char gtid_begin_string[5] = {'B','E','G','I','N'};
+static char gtid_begin_string[] = "BEGIN";
 
 int
 Gtid_log_event::do_apply_event(Relay_log_info const *rli)
 {
-  const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
-
-  /* ToDo: record the new GTID. */
+  thd->variables.server_id= this->server_id;
+  thd->variables.gtid_domain_id= this->domain_id;
+  thd->variables.gtid_seq_no= this->seq_no;
 
   if (flags2 & FL_STANDALONE)
     return 0;
 
   /* Execute this like a BEGIN query event. */
-  thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string),
+  thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
                         &my_charset_bin, next_query_id());
   Parser_state parser_state;
   if (!parser_state.init(thd, thd->query(), thd->query_length()))
@@ -6339,7 +6558,7 @@ Gtid_list_log_event::Gtid_list_log_event
 
 #ifdef MYSQL_SERVER
 
-Gtid_list_log_event::Gtid_list_log_event(rpl_state *gtid_set)
+Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
   : count(gtid_set->count()), list(0)
 {
   DBUG_ASSERT(count != 0);
@@ -6793,12 +7012,73 @@ void Xid_log_event::print(FILE* file, PR
 int Xid_log_event::do_apply_event(Relay_log_info const *rli)
 {
   bool res;
+  int err;
+  rpl_gtid gtid;
+  uint64 sub_id;
+
+  /*
+    Record any GTID in the same transaction, so slave state is transactionally
+    consistent.
+  */
+  if ((sub_id= rli->gtid_sub_id))
+  {
+    /* Clear the GTID from the RLI so we don't accidentally reuse it. */
+    const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
+
+    gtid= rli->current_gtid;
+    err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true);
+    if (err)
+    {
+      trans_rollback(thd);
+      return err;
+    }
+  }
+
   /* For a slave Xid_log_event is COMMIT */
   general_log_print(thd, COM_QUERY,
                     "COMMIT /* implicit, from Xid_log_event */");
   res= trans_commit(thd); /* Automatically rolls back on error. */
   thd->mdl_context.release_transactional_locks();
 
+  if (sub_id)
+  {
+    /*
+      Add the gtid to the HASH in the replication slave state.
+
+      We must do this only here _after_ commit, so that for parallel
+      replication, there will not be an attempt to delete the corresponding
+      table row before it is even committed.
+
+      Even if commit fails, we still add the entry - in case the table
+      mysql.rpl_slave_state is non-transactional and the row is not removed
+      by rollback.
+    */
+    rpl_slave_state::element *elem=
+      rpl_global_gtid_slave_state.get_element(gtid.domain_id);
+    rpl_slave_state::list_element *lelem=
+      (rpl_slave_state::list_element *)my_malloc(sizeof(*lelem), MYF(MY_WME));
+    if (elem && lelem)
+    {
+      lelem->sub_id= sub_id;
+      lelem->server_id= gtid.server_id;
+      lelem->seq_no= gtid.seq_no;
+      elem->add(lelem);
+    }
+    else
+    {
+      if (lelem)
+        my_free(lelem);
+      sql_print_warning("Slave: Out of memory during slave state maintenance. "
+                        "Some no longer necessary rows in table "
+                        "mysql.rpl_slave_state may be left undeleted.");
+    }
+    /*
+      Such failure is not fatal. We will fail to delete the row for this GTID,
+      but it will do no harm and will be removed automatically on next server
+      restart.
+    */
+  }
+
   /*
     Increment the global status commit count variable
   */

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2012-10-23 10:46:29 +0000
+++ b/sql/log_event.h	2012-11-05 14:01:49 +0000
@@ -2953,18 +2953,92 @@ struct rpl_gtid
 };
 
 
-struct rpl_state
+/*
+  Replication slave state.
+
+  For every independent replication stream (identified by domain_id), this
+  remembers the last gtid applied on the slave within this domain.
+
+  Since events are always committed in-order within a single domain, this is
+  sufficient to maintain the state of the replication slave.
+*/
+struct rpl_slave_state
 {
+  /* Elements in the list of GTIDs kept for each domain_id. */
+  struct list_element
+  {
+    struct list_element *next;
+    uint64 sub_id;
+    uint64 seq_no;
+    uint32 server_id;
+  };
+
+  /* Elements in the HASH that hold the state for one domain_id. */
+  struct element
+  {
+    struct list_element *list;
+    uint64 last_sub_id;
+    uint32 domain_id;
+
+    list_element *grab_list() { list_element *l= list; list= NULL; return l; }
+    void add (list_element *l)
+    {
+      l->next= list;
+      list= l;
+      if (last_sub_id < l->sub_id)
+        last_sub_id= l->sub_id;
+    }
+  };
+
+  /* Mapping from domain_id to its element. */
   HASH hash;
+  /* Mutex protecting access to the state. */
+  mysql_mutex_t LOCK_slave_state;
+
+  bool inited;
+  bool loaded;
 
-  rpl_state();
-  ~rpl_state();
+  rpl_slave_state();
+  ~rpl_slave_state();
 
+  void init();
+  void deinit();
   ulong count() const { return hash.records; }
-  int update(const struct rpl_gtid *gtid);
+  int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
+  int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+                      bool in_transaction);
+  uint64 next_subid(uint32 domain_id);
+
+  void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
+  void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
+
+  element *get_element(uint32 domain_id);
 };
 
-extern rpl_state global_rpl_gtid_state;
+
+/*
+  Binlog state.
+  This keeps the last GTID written to the binlog for every distinct
+  (domain_id, server_id) pair.
+  This will be logged at the start of the next binlog file as a
+  Gtid_list_log_event; this way, it is easy to find the binlog file
+  containing a gigen GTID, by simply scanning backwards from the newest
+  one until a lower seq_no is found in the Gtid_list_log_event at the
+  start of a binlog for the given domain_id and server_id.
+*/
+struct rpl_binlog_state
+{
+  /* Mapping from (domain_id,server_id) to its GTID. */
+  HASH hash;
+  /* Mutex protecting access to the state. */
+  mysql_mutex_t LOCK_binlog_state;
+
+  rpl_binlog_state();
+  ~rpl_binlog_state();
+
+  ulong count() const { return hash.records; }
+  int update(const struct rpl_gtid *gtid);
+};
 
 /**
   @class Gtid_log_event
@@ -3129,7 +3203,7 @@ class Gtid_list_log_event: public Log_ev
   static const uint element_size= 4+4+8;
 
 #ifdef MYSQL_SERVER
-  Gtid_list_log_event(rpl_state *gtid_set);
+  Gtid_list_log_event(rpl_binlog_state *gtid_set);
 #ifdef HAVE_REPLICATION
   void pack_info(THD *thd, Protocol *protocol);
 #endif

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2012-10-23 10:46:29 +0000
+++ b/sql/mysqld.cc	2012-11-05 14:01:49 +0000
@@ -746,6 +746,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key
   key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count,
   key_PARTITION_LOCK_auto_inc;
 PSI_mutex_key key_RELAYLOG_LOCK_index;
+PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
 
 PSI_mutex_key key_LOCK_stats,
   key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@@ -817,7 +818,9 @@ static PSI_mutex_info all_server_mutexes
   { &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
   { &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
   { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
-  { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}
+  { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
+  { &key_LOCK_slave_state, "key_LOCK_slave_state", 0},
+  { &key_LOCK_binlog_state, "key_LOCK_binlog_state", 0}
 };
 
 PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
@@ -1757,6 +1760,7 @@ static void mysqld_exit(int exit_code)
     but if a kill -15 signal was sent, the signal thread did
     spawn the kill_server_thread thread, which is running concurrently.
   */
+  rpl_deinit_gtid_slave_state();
   wait_for_signal_thread_to_end();
   mysql_audit_finalize();
   clean_up_mutexes();
@@ -3960,6 +3964,11 @@ static int init_thread_environment()
     sql_print_error("Can't create thread-keys");
     return 1;
   }
+
+#ifdef HAVE_REPLICATION
+  rpl_init_gtid_slave_state();
+#endif
+
   return 0;
 }
 

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2012-10-23 09:19:42 +0000
+++ b/sql/mysqld.h	2012-11-05 14:01:49 +0000
@@ -245,6 +245,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_ind
   key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
   key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
 extern PSI_mutex_key key_RELAYLOG_LOCK_index;
+extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
 
 extern PSI_mutex_key key_LOCK_stats,
   key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2012-10-23 10:46:29 +0000
+++ b/sql/rpl_rli.cc	2012-11-05 14:01:49 +0000
@@ -31,6 +31,16 @@
 
 static int count_relay_log_space(Relay_log_info* rli);
 
+/**
+   Current replication state (hash of last GTID executed, per replication
+   domain).
+*/
+rpl_slave_state rpl_global_gtid_slave_state;
+
+const LEX_STRING rpl_gtid_slave_state_table_name=
+  { STRING_WITH_LEN("rpl_slave_state") };
+
+
 // Defined in slave.cc
 int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
 int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -51,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_s
    abort_pos_wait(0), slave_run_id(0), sql_thd(0),
    inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
    until_log_pos(0), retried_trans(0), executed_entries(0),
-   tables_to_lock(0), tables_to_lock_count(0),
+   gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
    last_event_start_time(0), deferred_events(NULL),m_flags(0),
    row_stmt_start_timestamp(0), long_find_row_note_printed(false),
    m_annotate_event(0)

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2012-10-02 22:44:54 +0000
+++ b/sql/rpl_rli.h	2012-11-05 14:01:49 +0000
@@ -307,6 +307,14 @@ class Relay_log_info : public Slave_repo
   char slave_patternload_file[FN_REFLEN]; 
   size_t slave_patternload_file_size;  
 
+  /*
+    Current GTID being processed.
+    The sub_id gives the binlog order within one domain_id. A zero sub_id
+    means that there is no active GTID.
+  */
+  uint64 gtid_sub_id;
+  rpl_gtid current_gtid;
+
   Relay_log_info(bool is_slave_recovery);
   ~Relay_log_info();
 
@@ -584,4 +592,8 @@ class Relay_log_info : public Slave_repo
 int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
 
 
+extern const LEX_STRING rpl_gtid_slave_state_table_name;
+extern struct rpl_slave_state rpl_global_gtid_slave_state;
+
+
 #endif /* RPL_RLI_H */

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2012-10-23 10:46:29 +0000
+++ b/sql/slave.cc	2012-11-05 14:01:49 +0000
@@ -3725,6 +3725,15 @@ log '%s' at position %s, relay log '%s'
     goto err;
   }
 
+  /* Load the set of seen GTIDs, if we did not already. */
+  if (rpl_load_gtid_slave_state(thd))
+  {
+    rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), 
+                "Unable to load replication GTID slave state from mysql.%s: %s",
+                rpl_gtid_slave_state_table_name.str, thd->stmt_da->message());
+    goto err;
+  }
+
   /* execute init_slave variable */
   if (opt_init_slave.length)
   {
@@ -5192,6 +5201,27 @@ static Log_event* next_event(Relay_log_i
         inc_event_relay_log_pos()
       */
       rli->future_event_relay_log_pos= my_b_tell(cur_log);
+      /*
+        For GTID, allocate a new sub_id for the given domain_id.
+        The sub_id must be allocated in increasing order of binlog order.
+      */
+      if (ev->get_type_code() == GTID_EVENT)
+      {
+        Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
+        uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
+        if (!sub_id)
+        {
+          errmsg = "slave SQL thread aborted because of out-of-memory error";
+          if (hot_log)
+            mysql_mutex_unlock(log_lock);
+          goto err;
+        }
+        rli->gtid_sub_id= sub_id;
+        rli->current_gtid.server_id= gev->server_id;
+        rli->current_gtid.domain_id= gev->domain_id;
+        rli->current_gtid.seq_no= gev->seq_no;
+      }
+
       if (hot_log)
         mysql_mutex_unlock(log_lock);
       DBUG_RETURN(ev);

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2012-10-23 10:46:29 +0000
+++ b/sql/sql_repl.cc	2012-11-05 14:01:49 +0000
@@ -16,10 +16,12 @@
 
 #include "sql_priv.h"
 #include "unireg.h"
+#include "sql_base.h"
 #include "sql_parse.h"                          // check_access
 #ifdef HAVE_REPLICATION
 
 #include "rpl_mi.h"
+#include "rpl_rli.h"
 #include "sql_repl.h"
 #include "sql_acl.h"                            // SUPER_ACL
 #include "log_event.h"
@@ -748,7 +750,7 @@ void mysql_binlog_send(THD* thd, char* l
   mariadb_slave_capability= get_mariadb_slave_capability(thd);
   if (global_system_variables.log_warnings > 1)
     sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
-                        thd->variables.server_id, log_ident, (ulong)pos);
+                          (int)thd->variables.server_id, log_ident, (ulong)pos);
   if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
   {
     errmsg= "Failed to run hook 'transmit_start'";
@@ -2406,4 +2408,118 @@ int log_loaded_block(IO_CACHE* file)
   DBUG_RETURN(0);
 }
 
+
+/**
+   Initialise the slave replication state from the mysql.rpl_slave_state table.
+
+   This is called each time an SQL thread starts, but the data is only actually
+   loaded on the first call.
+
+   The slave state is the last GTID applied on the slave within each
+   replication domain.
+
+   To avoid row lock contention, there are multiple rows for each domain_id.
+   The one containing the current slave state is the one with the maximal
+   sub_id value, within each domain_id.
+
+    CREATE TABLE mysql.rpl_slave_state (
+      domain_id INT UNSIGNED NOT NULL,
+      sub_id BIGINT UNSIGNED NOT NULL,
+      server_id INT UNSIGNED NOT NULL,
+      seq_no BIGINT UNSIGNED NOT NULL,
+      PRIMARY KEY (domain_id, sub_id))
+*/
+
+void
+rpl_init_gtid_slave_state()
+{
+  rpl_global_gtid_slave_state.init();
+}
+
+
+void
+rpl_deinit_gtid_slave_state()
+{
+  rpl_global_gtid_slave_state.deinit();
+}
+
+
+int
+rpl_load_gtid_slave_state(THD *thd)
+{
+  TABLE_LIST tlist;
+  TABLE *table;
+  bool table_opened= false;
+  bool table_scanned= false;
+  DBUG_ENTER("rpl_load_gtid_slave_state");
+
+  int err= 0;
+  rpl_global_gtid_slave_state.lock();
+  if (rpl_global_gtid_slave_state.loaded)
+    goto end;
+
+  mysql_reset_thd_for_next_command(thd, 0);
+
+  tlist.init_one_table(STRING_WITH_LEN("mysql"),
+                       rpl_gtid_slave_state_table_name.str,
+                       rpl_gtid_slave_state_table_name.length,
+                       NULL, TL_READ);
+  if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+    goto end;
+  table_opened= true;
+  table= tlist.table;
+
+  /*
+    ToDo: Check the table definition, error if not as expected.
+    We need the correct first 4 columns with correct type, and the primary key.
+  */
+
+  bitmap_set_bit(table->read_set, table->field[0]->field_index);
+  bitmap_set_bit(table->read_set, table->field[1]->field_index);
+  bitmap_set_bit(table->read_set, table->field[2]->field_index);
+  bitmap_set_bit(table->read_set, table->field[3]->field_index);
+  if ((err= table->file->ha_rnd_init_with_error(1)))
+    goto end;
+  table_scanned= true;
+  for (;;)
+  {
+    uint32 domain_id, server_id;
+    uint64 sub_id, seq_no;
+    if ((err= table->file->ha_rnd_next(table->record[0])))
+    {
+      if (err == HA_ERR_RECORD_DELETED)
+        continue;
+      else if (err == HA_ERR_END_OF_FILE)
+        break;
+      else
+        goto end;
+    }
+    domain_id= (ulonglong)table->field[0]->val_int();
+    sub_id= (ulonglong)table->field[1]->val_int();
+    server_id= (ulonglong)table->field[2]->val_int();
+    seq_no= (ulonglong)table->field[3]->val_int();
+    DBUG_PRINT("info", ("Read slave state row: %u:%u-%lu sub_id=%lu\n",
+                        (unsigned)domain_id, (unsigned)server_id,
+                        (ulong)seq_no, (ulong)sub_id));
+    if ((err= rpl_global_gtid_slave_state.update(domain_id, server_id,
+                                                 sub_id, seq_no)))
+      goto end;
+  }
+  err= 0;                                       /* Clear HA_ERR_END_OF_FILE */
+
+  rpl_global_gtid_slave_state.loaded= true;
+
+end:
+  if (table_scanned)
+  {
+    table->file->ha_index_or_rnd_end();
+    ha_commit_trans(thd, FALSE);
+    ha_commit_trans(thd, TRUE);
+  }
+  if (table_opened)
+    close_thread_tables(thd);
+  rpl_global_gtid_slave_state.unlock();
+  DBUG_RETURN(err);
+}
+
 #endif /* HAVE_REPLICATION */

=== modified file 'sql/sql_repl.h'
--- a/sql/sql_repl.h	2011-06-30 15:46:53 +0000
+++ b/sql/sql_repl.h	2012-11-05 14:01:49 +0000
@@ -65,6 +65,11 @@ int log_loaded_block(IO_CACHE* file);
 int init_replication_sys_vars();
 void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
 
+extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
+void rpl_init_gtid_slave_state();
+void rpl_deinit_gtid_slave_state();
+int rpl_load_gtid_slave_state(THD *thd);
+
 #endif /* HAVE_REPLICATION */
 
 #endif /* SQL_REPL_INCLUDED */



More information about the commits mailing list