[Commits] Rev 3444: MDEV-26: Global transaction id: Intermediate commit. in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Thu Nov 15 14:11:36 EET 2012


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 3444
revision-id: knielsen at knielsen-hq.org-20121115121135-62jgt6tru3bh4ltc
parent: knielsen at knielsen-hq.org-20121107131810-bqvn097cjsukjldy
committer: knielsen at knielsen-hq.org
branch nick: work-10.0-mdev26
timestamp: Thu 2012-11-15 13:11:35 +0100
message:
  MDEV-26: Global transaction id: Intermediate commit.
  
  Now slave can connect to master, sending start position as slave state
  rather than old-style binlog name/position.
  
  This enables to switch to a new master by changing just connection
  information, replication slave GTID state ensures that slave starts
  at the correct point in the new master.
=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql	2012-01-13 14:50:02 +0000
+++ b/scripts/mysql_system_tables.sql	2012-11-15 12:11:35 +0000
@@ -20,6 +20,12 @@
 --
 
 set sql_mode='';
+
+-- We want this to be created with the default storage engine.
+-- This way, if InnoDB is used we get crash safety, and if MyISAM is used
+-- we avoid mixed-engine transactions.
+CREATE TABLE IF NOT EXISTS rpl_slave_state (domain_id INT UNSIGNED NOT NULL, sub_id BIGINT UNSIGNED NOT NULL, server_id INT UNSIGNED NOT NULL, seq_no BIGINT UNSIGNED NOT NULL, PRIMARY KEY (domain_id, sub_id)) comment='Replication slave GTID state';
+
 set storage_engine=myisam;
 
 CREATE TABLE IF NOT EXISTS db (   Host char(60) binary DEFAULT '' NOT NULL, Db char(64) binary DEFAULT '' NOT NULL, User char(16) binary DEFAULT '' NOT NULL, Select_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Insert_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Update_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Delete_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Drop_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Grant_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, References_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Index_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_tmp_table_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Lock_tables_priv enum('N','Y')
  COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Show_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Execute_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Event_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Trigger_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, PRIMARY KEY Host (Host,Db,User), KEY User (User) ) engine=MyISAM CHARACTER SET utf8 COLLATE utf8_bin comment='Database privileges';

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2012-11-07 13:18:10 +0000
+++ b/sql/log.cc	2012-11-15 12:11:35 +0000
@@ -3233,9 +3233,12 @@ bool MYSQL_BIN_LOG::open(const char *log
           there had been an entry (domain_id, server_id, 0).
         */
 
-        Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
-        if (gl_ev.write(&log_file))
-          goto err;
+        if (rpl_global_gtid_binlog_state.count())
+        {
+          Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
+          if (gl_ev.write(&log_file))
+            goto err;
+        }
 
         /* Output a binlog checkpoint event at the start of the binlog file. */
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2012-11-07 13:18:10 +0000
+++ b/sql/log_event.cc	2012-11-15 12:11:35 +0000
@@ -6268,6 +6268,67 @@ rpl_slave_state::next_subid(uint32 domai
 #endif
 
 
+/*
+  Prepare the current slave state as a string, suitable for sending to the
+  master to request to receive binlog events starting from that GTID state.
+
+  The state consists of the most recently applied GTID for each domain_id,
+  ie. the one with the highest sub_id within each domain_id.
+*/
+
+int
+rpl_slave_state::tostring(String *dest)
+{
+  bool first= true;
+  uint32 i;
+    int err= 1;
+
+  lock();
+
+  for (i= 0; i < hash.records; ++i)
+  {
+    uint64 best_sub_id;
+    rpl_gtid best_gtid;
+    element *e= (element *)my_hash_element(&hash, i);
+    list_element *l= e->list;
+
+    DBUG_ASSERT(l /* We should never have empty list in element. */);
+    if (!l)
+      goto err;
+
+    best_gtid.domain_id= e->domain_id;
+    best_gtid.server_id= l->server_id;
+    best_gtid.seq_no= l->seq_no;
+    best_sub_id= l->sub_id;
+    while ((l= l->next))
+    {
+      if (l->sub_id > best_sub_id)
+      {
+        best_sub_id= l->sub_id;
+        best_gtid.server_id= l->server_id;
+        best_gtid.seq_no= l->seq_no;
+      }
+    }
+
+    if (first)
+      first= false;
+    else
+      dest->append("-",1);
+    dest->append_ulonglong(best_gtid.domain_id);
+    dest->append("-",1);
+    dest->append_ulonglong(best_gtid.server_id);
+    dest->append("-",1);
+    dest->append_ulonglong(best_gtid.seq_no);
+ }
+
+  err= 0;
+
+err:
+  unlock();
+  return err;
+}
+
+
 rpl_binlog_state::rpl_binlog_state()
 {
   my_hash_init(&hash, &my_charset_bin, 32,
@@ -6399,6 +6460,119 @@ rpl_binlog_state::read_from_iocache(IO_C
   }
   return 0;
 }
+
+
+slave_connection_state::slave_connection_state()
+{
+  my_hash_init(&hash, &my_charset_bin, 32,
+               offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free,
+               HASH_UNIQUE);
+}
+
+
+slave_connection_state::~slave_connection_state()
+{
+  my_hash_free(&hash);
+}
+
+
+/*
+  Create a hash from the slave GTID state that is sent to master when slave
+  connects to start replication.
+
+  The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
+
+     0-2-112,1-4-1022
+
+  The state gives for each domain_id the GTID to start replication from for
+  the corresponding replication stream. So domain_id must be unique.
+
+  Returns 0 if ok, non-zero if error due to malformed input.
+
+  Note that input string is built by slave server, so it will not be incorrect
+  unless bug/corruption/malicious server. So we just need basic sanity check,
+  not fancy user-friendly error message.
+*/
+
+int
+slave_connection_state::load(char *slave_request, size_t len)
+{
+  char *p, *q, *end;
+  uint64 v;
+  uint32 domain_id, server_id;
+  uint64 seq_no;
+  uchar *rec;
+  rpl_gtid *gtid;
+  int err= 0;
+
+  my_hash_reset(&hash);
+  p= slave_request;
+  end= slave_request + len;
+  for (;;)
+  {
+    q= end;
+    v= (uint64)my_strtoll10(p, &q, &err);
+    if (err != 0 || v > (uint32)0xffffffff || *q != '-')
+      return 1;
+    domain_id= (uint32)v;
+    p= q+1;
+    q= end;
+    v= (uint64)my_strtoll10(p, &q, &err);
+    if (err != 0 || v > (uint32)0xffffffff || *q != '-')
+      return 1;
+    server_id= (uint32)v;
+    p= q+1;
+    q= end;
+    seq_no= (uint64)my_strtoll10(p, &q, &err);
+    if (err != 0)
+      return 1;
+
+    if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
+      return 1;
+    gtid= (rpl_gtid *)rec;
+    gtid->domain_id= domain_id;
+    gtid->server_id= server_id;
+    gtid->seq_no= seq_no;
+    if (my_hash_insert(&hash, rec))
+    {
+      my_free(rec);
+      return 1;
+    }
+    if (q == end)
+      break;                                         /* Finished. */
+    if (*q != ',')
+      return 1;
+    p= q+1;
+  }
+
+  return 0;
+}
+
+
+rpl_gtid *
+slave_connection_state::find(uint32 domain_id)
+{
+  return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
+}
+
+
+void
+slave_connection_state::remove(const rpl_gtid *in_gtid)
+{
+  bool err;
+  uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+#ifndef DBUG_OFF
+  rpl_gtid *slave_gtid= (rpl_gtid *)rec;
+  DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
+  DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
+  DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
+#endif
+
+  err= my_hash_delete(&hash, rec);
+  DBUG_ASSERT(!err);
+}
+
+
 #endif  /* MYSQL_SERVER */
 
 
@@ -6432,6 +6606,28 @@ Gtid_log_event::Gtid_log_event(THD *thd_
 {
 }
 
+
+/*
+  Used to record GTID while sending binlog to slave, without having to
+  fully contruct every Gtid_log_event() needlessly.
+*/
+bool
+Gtid_log_event::peek(const char *event_start, size_t event_len,
+                     uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
+                     uchar *flags2)
+{
+  const char *p;
+  if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+    return true;
+  *server_id= uint4korr(event_start + SERVER_ID_OFFSET);
+  p= event_start + LOG_EVENT_HEADER_LEN;
+  *seq_no= uint8korr(p);
+  p+= 8;
+  *domain_id= uint4korr(p);
+  return false;
+}
+
+
 bool
 Gtid_log_event::write(IO_CACHE *file)
 {

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2012-11-07 13:18:10 +0000
+++ b/sql/log_event.h	2012-11-15 12:11:35 +0000
@@ -3008,6 +3008,7 @@ struct rpl_slave_state
   int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
                       bool in_transaction);
   uint64 next_subid(uint32 domain_id);
+  int tostring(String *dest);
 
   void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
   void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
@@ -3044,6 +3045,26 @@ struct rpl_binlog_state
   int read_from_iocache(IO_CACHE *src);
 };
 
+
+/*
+  Represent the GTID state that a slave connection to a master requests
+  the master to start sending binlog events from.
+*/
+struct slave_connection_state
+{
+  /* Mapping from domain_id to the GTID requested for that domain. */
+  HASH hash;
+
+  slave_connection_state();
+  ~slave_connection_state();
+
+  int load(char *slave_request, size_t len);
+  rpl_gtid *find(uint32 domain_id);
+  void remove(const rpl_gtid *gtid);
+  ulong count() const { return hash.records; }
+};
+
+
 /**
   @class Gtid_log_event
 
@@ -3134,6 +3155,9 @@ class Gtid_log_event: public Log_event
   bool write(IO_CACHE *file);
   static int make_compatible_event(String *packet, bool *need_dummy_event,
                                     ulong ev_offset, uint8 checksum_alg);
+  static bool peek(const char *event_start, size_t event_len,
+                   uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
+                   uchar *flags2);
 #endif
 };
 

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2012-11-05 14:01:49 +0000
+++ b/sql/slave.cc	2012-11-15 12:11:35 +0000
@@ -1782,6 +1782,42 @@ when it try to get the value of TIME_ZON
   }
 after_set_capability:
 
+  /* Request dump start from slave replication GTID state. */
+
+  /* ToDo: This needs to be configurable somehow in a useful way ... */
+  if (rpl_global_gtid_slave_state.count())
+  {
+    int rc;
+    char str_buf[256];
+    String connect_state(str_buf, sizeof(str_buf), system_charset_info);
+    connect_state.length(0);
+
+    connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
+                         system_charset_info);
+    rpl_global_gtid_slave_state.tostring(&connect_state);
+    connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
+    rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
+    if (rc)
+    {
+      err_code= mysql_errno(mysql);
+      if (is_network_error(err_code))
+      {
+        mi->report(ERROR_LEVEL, err_code,
+                   "Setting @slave_connect_state failed with error: %s",
+                   mysql_error(mysql));
+        goto network_err;
+      }
+      else
+      {
+        /* Fatal error */
+        errmsg= "The slave I/O thread stops because a fatal error is "
+          "encountered when it tries to set @slave_connect_state.";
+        sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+        goto err;
+      }
+    }
+  }
+
 err:
   if (errmsg)
   {

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2012-11-05 14:01:49 +0000
+++ b/sql/sql_repl.cc	2012-11-15 12:11:35 +0000
@@ -507,6 +507,26 @@ get_mariadb_slave_capability(THD *thd)
 
 
 /*
+  Get the value of the @slave_connect_state user variable into the supplied
+  String (this is the GTID connect state requested by the connecting slave).
+
+  Returns false if error (ie. slave did not set the variable and does not
+  want to use GTID to set start position), true if success.
+*/
+static bool
+get_slave_connect_state(THD *thd, String *out_str)
+{
+  bool null_value;
+
+  const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") };
+  user_var_entry *entry=
+    (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+                                  name.length);
+  return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
+}
+
+
+/*
   Function prepares and sends repliation heartbeat event.
 
   @param net                net object of THD
@@ -569,6 +589,216 @@ static int send_heartbeat_event(NET* net
 }
 
 
+struct binlog_file_entry
+{
+  binlog_file_entry *next;
+  char *name;
+};
+
+static binlog_file_entry *
+get_binlog_list(MEM_ROOT *memroot)
+{
+  IO_CACHE *index_file;
+  char fname[FN_REFLEN];
+  size_t length;
+  binlog_file_entry *current_list= NULL, *e;
+  DBUG_ENTER("get_binlog_list");
+
+  if (!mysql_bin_log.is_open())
+  {
+    my_error(ER_NO_BINARY_LOGGING, MYF(0));
+    DBUG_RETURN(NULL);
+  }
+
+  mysql_bin_log.lock_index();
+  index_file=mysql_bin_log.get_index_file();
+  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
+
+  /* The file ends with EOF or empty line */
+  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
+  {
+    --length;                                   /* Remove the newline */
+    if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
+        !(e->name= strmake_root(memroot, fname, length)))
+    {
+      mysql_bin_log.unlock_index();
+      my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e));
+      DBUG_RETURN(NULL);
+    }
+    e->next= current_list;
+    current_list= e;
+  }
+  mysql_bin_log.unlock_index();
+
+  DBUG_RETURN(current_list);
+}
+
+/*
+  Find the Gtid_list_log_event at the start of a binlog.
+
+  NULL for ok, non-NULL error message for error.
+
+  If ok, then the event is returned in *out_gtid_list. This can be NULL if we
+  get back to binlogs written by old server version without GTID support. If
+  so, it means we have reached the point to start from, as no GTID events can
+  exist in earlier binlogs.
+*/
+static const char *
+get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
+{
+  Format_description_log_event init_fdle(BINLOG_VERSION);
+  Format_description_log_event *fdle;
+  Log_event *ev;
+  const char *errormsg = NULL;
+
+  *out_gtid_list= NULL;
+
+  if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
+                                      opt_master_verify_checksum)) ||
+      ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+    return "Could not read format description log event while looking for "
+      "GTID position in binlog";
+
+  fdle= static_cast<Format_description_log_event *>(ev);
+
+  for (;;)
+  {
+    Log_event_type typ;
+
+    ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
+    if (!ev)
+    {
+      errormsg= "Could not read GTID list event while looking for GTID "
+        "position in binlog";
+      break;
+    }
+    typ= ev->get_type_code();
+    if (typ == GTID_LIST_EVENT)
+      break;                                    /* Done, found it */
+    if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+        typ == FORMAT_DESCRIPTION_EVENT)
+      continue;                                 /* Continue looking */
+
+    /* We did not find any Gtid_list_log_event, must be old binlog. */
+    ev= NULL;
+    break;
+  }
+
+  delete fdle;
+  *out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
+  return errormsg;
+}
+
+
+/*
+  Check if every GTID requested by the slave is contained in this (or a later)
+  binlog file. Return true if so, false if not.
+*/
+static bool
+contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
+{
+  uint32 i;
+
+  for (i= 0; i < glev->count; ++i)
+  {
+    const rpl_gtid *gtid= st->find(glev->list[i].domain_id);
+    if (gtid != NULL &&
+        gtid->server_id == glev->list[i].server_id &&
+        gtid->seq_no <= glev->list[i].seq_no)
+    {
+      /*
+        The slave needs to receive gtid, but it is contained in an earlier
+        binlog file. So we need to serch back further.
+      */
+      return false;
+    }
+  }
+  return true;
+}
+
+
+/*
+  Find the name of the binlog file to start reading for a slave that connects
+  using GTID state.
+
+  Returns the file name in out_name, which must be of size at least FN_REFLEN.
+
+  Returns NULL on ok, error message on error.
+*/
+static const char *
+gtid_find_binlog_file(slave_connection_state *state, char *out_name)
+{
+  MEM_ROOT memroot;
+  binlog_file_entry *list;
+  Gtid_list_log_event *glev;
+  const char *errormsg= NULL;
+  IO_CACHE cache;
+  File file = (File)-1;
+  char buf[FN_REFLEN];
+
+  bzero((char*) &cache, sizeof(cache));
+  init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0);
+  if (!(list= get_binlog_list(&memroot)))
+  {
+    errormsg= "Out of memory while looking for GTID position in binlog";
+    goto end;
+  }
+
+  while (list)
+  {
+    if (!list->next)
+    {
+      /*
+        It should be safe to read the currently used binlog, as we will only
+        read the header part that is already written.
+
+        But if that does not work on windows, then we will need to cache the
+        event somewhere in memory I suppose - that could work too.
+      */
+    }
+    /*
+      Read the Gtid_list_log_event at the start of the binlog file to
+      get the binlog state.
+    */
+    if (normalize_binlog_name(buf, list->name, false))
+    {
+      errormsg= "Failed to determine binlog file name while looking for "
+        "GTID position in binlog";
+      goto end;
+    }
+    if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1 ||
+        (errormsg= get_gtid_list_event(&cache, &glev)))
+      goto end;
+
+    if (!glev || contains_all_slave_gtid(state, glev))
+    {
+      strmake(out_name, buf, FN_REFLEN);
+      goto end;
+    }
+    list= list->next;
+  }
+
+  /* We reached the end without finding anything. */
+  errormsg= "Could not find GTID state requested by slave in any binlog "
+    "files. Probably the slave state is too old and required binlog files "
+    "have been purged.";
+
+end:
+  if (file != (File)-1)
+  {
+    end_io_cache(&cache);
+    mysql_file_close(file, MYF(MY_WME));
+  }
+
+  free_root(&memroot, MYF(0));
+  return errormsg;
+}
+
+
+enum enum_gtid_skip_type {
+  GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
+};
+
 /*
   Helper function for mysql_binlog_send() to write an event down the slave
   connection.
@@ -579,10 +809,65 @@ static const char *
 send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
                     Log_event_type event_type, char *log_file_name,
                     IO_CACHE *log, int mariadb_slave_capability,
-                    ulong ev_offset, uint8 current_checksum_alg)
+                    ulong ev_offset, uint8 current_checksum_alg,
+                    bool using_gtid_state, slave_connection_state *gtid_state,
+                    enum_gtid_skip_type *gtid_skip_group)
 {
   my_off_t pos;
 
+  /* Skip GTID event groups until we reach slave position within a domain_id. */
+  if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0)
+  {
+    uint32 server_id, domain_id;
+    uint64 seq_no;
+    uchar flags2;
+    rpl_gtid *gtid;
+    size_t len= packet->length();
+
+    if (ev_offset > len ||
+        Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+                             &domain_id, &server_id, &seq_no, &flags2))
+      return "Failed to read Gtid_log_event: corrupt binlog";
+    gtid= gtid_state->find(domain_id);
+    if (gtid != NULL)
+    {
+      /* Skip this event group if we have not yet reached slave start pos. */
+      if (server_id != gtid->server_id || seq_no <= gtid->seq_no)
+        *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+                            GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+      /*
+        Delete this entry if we have reached slave start position (so we will
+        not skip subsequent events and won't have to look them up and check).
+      */
+      if (server_id == gtid->server_id && seq_no >= gtid->seq_no)
+        gtid_state->remove(gtid);
+    }
+  }
+
+  /*
+    Skip event group if we have not yet reached the correct slave GTID position.
+
+    Note that slave that understands GTID can also tolerate holes, so there is
+    no need to supply dummy event.
+  */
+  switch (*gtid_skip_group)
+  {
+  case GTID_SKIP_STANDALONE:
+    if (event_type != INTVAR_EVENT &&
+        event_type != RAND_EVENT &&
+        event_type != USER_VAR_EVENT &&
+        event_type != TABLE_MAP_EVENT &&
+        event_type != ANNOTATE_ROWS_EVENT)
+      *gtid_skip_group= GTID_SKIP_NOT;
+    return NULL;
+  case GTID_SKIP_TRANSACTION:
+    if (event_type == XID_EVENT /* ToDo || is_COMMIT_query_event() */)
+      *gtid_skip_group= GTID_SKIP_NOT;
+    return NULL;
+  case GTID_SKIP_NOT:
+    break;
+  }
+
   /* Do not send annotate_rows events unless slave requested it. */
   if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
   {
@@ -722,6 +1007,11 @@ void mysql_binlog_send(THD* thd, char* l
   mysql_mutex_t *log_lock;
   mysql_cond_t *log_cond;
   int mariadb_slave_capability;
+  char str_buf[256];
+  String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
+  bool using_gtid_state;
+  slave_connection_state gtid_state;
+  enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
 
   uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
   int old_max_allowed_packet= thd->variables.max_allowed_packet;
@@ -748,6 +1038,10 @@ void mysql_binlog_send(THD* thd, char* l
     set_timespec_nsec(*heartbeat_ts, 0);
   }
   mariadb_slave_capability= get_mariadb_slave_capability(thd);
+
+  connect_gtid_state.length(0);
+  using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+
   if (global_system_variables.log_warnings > 1)
     sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
                           (int)thd->variables.server_id, log_ident, (ulong)pos);
@@ -781,10 +1075,30 @@ void mysql_binlog_send(THD* thd, char* l
   }
 
   name=search_file_name;
-  if (log_ident[0])
-    mysql_bin_log.make_log_name(search_file_name, log_ident);
+  if (using_gtid_state)
+  {
+    if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
+                        connect_gtid_state.length()))
+    {
+      errmsg= "Out of memory or malformed slave request when obtaining start "
+        "position from GTID state";
+      my_errno= ER_UNKNOWN_ERROR;
+      goto err;
+    }
+    if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name)))
+    {
+      my_errno= ER_UNKNOWN_ERROR;
+      goto err;
+    }
+    pos= 4;
+  }
   else
-    name=0;                                     // Find first log
+  {
+    if (log_ident[0])
+      mysql_bin_log.make_log_name(search_file_name, log_ident);
+    else
+      name=0;                                   // Find first log
+  }
 
   linfo.index_file_offset = 0;
 
@@ -1036,7 +1350,8 @@ impossible position";
       if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
                                         log_file_name, &log,
                                         mariadb_slave_capability, ev_offset,
-                                        current_checksum_alg)))
+                                        current_checksum_alg, using_gtid_state,
+                                        &gtid_state, &gtid_skip_group)))
       {
         errmsg= tmp_msg;
         my_errno= ER_UNKNOWN_ERROR;
@@ -1197,7 +1512,9 @@ impossible position";
             (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
                                           log_file_name, &log,
                                           mariadb_slave_capability, ev_offset,
-                                          current_checksum_alg)))
+                                          current_checksum_alg,
+                                          using_gtid_state, &gtid_state,
+                                          &gtid_skip_group)))
         {
           errmsg= tmp_msg;
           my_errno= ER_UNKNOWN_ERROR;



More information about the commits mailing list