[Commits] Rev 3448: MDEV-26. Intermediate commit. in http://bazaar.launchpad.net/~maria-captains/maria/10.0

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Mon Feb 11 17:44:39 EET 2013


At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 3448
revision-id: knielsen at knielsen-hq.org-20130211154438-84vxb1oeso0vvwkr
parent: knielsen at knielsen-hq.org-20130125181930-2852iode30bftxwv
committer: knielsen at knielsen-hq.org
branch nick: work-10.0-mdev26
timestamp: Mon 2013-02-11 16:44:38 +0100
message:
  MDEV-26. Intermediate commit.
  
  Implement binlog_gtid_pos() function. This will be used so that
  the slave can obtain the gtid position automatically from first
  connect with old-style position - then MASTER_GTID_POS=AUTO will
  work the next time. Can also be used by mysqldump --master-data
  to give the current gtid position directly.
=== modified file 'sql/item_create.cc'
--- a/sql/item_create.cc	2012-10-02 22:44:54 +0000
+++ b/sql/item_create.cc	2013-02-11 15:44:38 +0000
@@ -447,6 +447,19 @@ class Create_func_bin : public Create_fu
 };
 
 
+class Create_func_binlog_gtid_pos : public Create_func_arg2
+{
+public:
+  virtual Item *create_2_arg(THD *thd, Item *arg1, Item *arg2);
+
+  static Create_func_binlog_gtid_pos s_singleton;
+
+protected:
+  Create_func_binlog_gtid_pos() {}
+  virtual ~Create_func_binlog_gtid_pos() {}
+};
+
+
 class Create_func_bit_count : public Create_func_arg1
 {
 public:
@@ -3052,6 +3065,16 @@ Create_func_bin::create_1_arg(THD *thd,
 }
 
 
+Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton;
+
+Item*
+Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2)
+{
+  thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+  return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2);
+}
+
+
 Create_func_bit_count Create_func_bit_count::s_singleton;
 
 Item*
@@ -5243,6 +5266,7 @@ static Native_func_registry func_array[]
   { { C_STRING_WITH_LEN("ATAN2") }, BUILDER(Create_func_atan)},
   { { C_STRING_WITH_LEN("BENCHMARK") }, BUILDER(Create_func_benchmark)},
   { { C_STRING_WITH_LEN("BIN") }, BUILDER(Create_func_bin)},
+  { { C_STRING_WITH_LEN("BINLOG_GTID_POS") }, BUILDER(Create_func_binlog_gtid_pos)},
   { { C_STRING_WITH_LEN("BIT_COUNT") }, BUILDER(Create_func_bit_count)},
   { { C_STRING_WITH_LEN("BIT_LENGTH") }, BUILDER(Create_func_bit_length)},
   { { C_STRING_WITH_LEN("BUFFER") }, GEOM_BUILDER(Create_func_buffer)},

=== modified file 'sql/item_strfunc.cc'
--- a/sql/item_strfunc.cc	2012-09-08 22:22:06 +0000
+++ b/sql/item_strfunc.cc	2013-02-11 15:44:38 +0000
@@ -57,6 +57,7 @@
 C_MODE_START
 #include "../mysys/my_static.h"                 // For soundex_map
 C_MODE_END
+#include <sql_repl.h>
 
 /**
    @todo Remove this. It is not safe to use a shared String object.
@@ -2717,6 +2718,40 @@ String *Item_func_repeat::val_str(String
 }
 
 
+void Item_func_binlog_gtid_pos::fix_length_and_dec()
+{
+  collation.set(system_charset_info);
+  max_length= MAX_BLOB_WIDTH;
+  maybe_null= 1;
+}
+
+
+String *Item_func_binlog_gtid_pos::val_str(String *str)
+{
+  DBUG_ASSERT(fixed == 1);
+  String name_str, *name;
+  longlong pos;
+
+  if (args[0]->null_value || args[1]->null_value)
+    goto err;
+
+  name= args[0]->val_str(&name_str);
+  pos= args[1]->val_int();
+
+  if (pos < 0 || pos > UINT_MAX32)
+    goto err;
+
+  if (gtid_state_from_binlog_pos(name->c_ptr_safe(), (uint32)pos, str))
+    goto err;
+  null_value= 0;
+  return str;
+
+err:
+  null_value= 1;
+  return NULL;
+}
+
+
 void Item_func_rpad::fix_length_and_dec()
 {
   // Handle character set for args[0] and args[2].

=== modified file 'sql/item_strfunc.h'
--- a/sql/item_strfunc.h	2012-06-19 12:06:45 +0000
+++ b/sql/item_strfunc.h	2013-02-11 15:44:38 +0000
@@ -615,6 +615,17 @@ class Item_func_repeat :public Item_str_
 };
 
 
+class Item_func_binlog_gtid_pos :public Item_str_func
+{
+  String tmp_value;
+public:
+  Item_func_binlog_gtid_pos(Item *arg1,Item *arg2) :Item_str_func(arg1,arg2) {}
+  String *val_str(String *);
+  void fix_length_and_dec();
+  const char *func_name() const { return "binlog_gtid_pos"; }
+};
+
+
 class Item_func_rpad :public Item_str_func
 {
   String tmp_value, rpad_str;

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2013-01-25 14:21:49 +0000
+++ b/sql/log_event.cc	2013-02-11 15:44:38 +0000
@@ -6690,6 +6690,19 @@ slave_connection_state::load(char *slave
 }
 
 
+int
+slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
+{
+  uint32 i;
+
+  my_hash_reset(&hash);
+  for (i= 0; i < count; ++i)
+    if (update(&gtid_list[i]))
+      return 1;
+  return 0;
+}
+
+
 rpl_gtid *
 slave_connection_state::find(uint32 domain_id)
 {
@@ -6697,6 +6710,30 @@ slave_connection_state::find(uint32 doma
 }
 
 
+int
+slave_connection_state::update(const rpl_gtid *in_gtid)
+{
+  rpl_gtid *new_gtid;
+  uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+  if (rec)
+  {
+    memcpy(rec, in_gtid, sizeof(*in_gtid));
+    return 0;
+  }
+
+  if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME))))
+    return 1;
+  memcpy(new_gtid, in_gtid, sizeof(*new_gtid));
+  if (my_hash_insert(&hash, (uchar *)new_gtid))
+  {
+    my_free(new_gtid);
+    return 1;
+  }
+
+  return 0;
+}
+
+
 void
 slave_connection_state::remove(const rpl_gtid *in_gtid)
 {
@@ -6714,6 +6751,30 @@ slave_connection_state::remove(const rpl
 }
 
 
+int
+slave_connection_state::to_string(String *out_str)
+{
+  uint32 i;
+
+  out_str->length(0);
+  for (i= 0; i < hash.records; ++i)
+  {
+    const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
+    if (i && out_str->append(","))
+      return 1;
+    if (gtid->domain_id &&
+        (out_str->append_ulonglong(gtid->domain_id) ||
+         out_str->append("-")))
+      return 1;
+    if (out_str->append_ulonglong(gtid->server_id) ||
+        out_str->append("-") ||
+        out_str->append_ulonglong(gtid->seq_no))
+      return 1;
+  }
+  return 0;
+}
+
+
 #endif  /* MYSQL_SERVER */
 
 
@@ -7144,6 +7205,46 @@ Gtid_list_log_event::print(FILE *file, P
 #endif  /* MYSQL_SERVER */
 
 
+/*
+  Used to record gtid_list event while sending binlog to slave, without having to
+  fully contruct the event object.
+*/
+bool
+Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
+                          rpl_gtid **out_gtid_list, uint32 *out_list_len)
+{
+  const char *p;
+  uint32 count_field, count;
+  rpl_gtid *gtid_list;
+
+  if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
+    return true;
+  p= event_start + LOG_EVENT_HEADER_LEN;
+  count_field= uint4korr(p);
+  p+= 4;
+  count= count_field & ((1<<28)-1);
+  if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
+      16 * count)
+    return true;
+  if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count, MYF(MY_WME))))
+    return true;
+  *out_gtid_list= gtid_list;
+  *out_list_len= count;
+  while (count--)
+  {
+    gtid_list->domain_id= uint4korr(p);
+    p+= 4;
+    gtid_list->server_id= uint4korr(p);
+    p+= 4;
+    gtid_list->seq_no= uint8korr(p);
+    p+= 8;
+    ++gtid_list;
+  }
+
+  return false;
+}
+
+
 /**************************************************************************
         Intvar_log_event methods
 **************************************************************************/

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2013-01-25 14:21:49 +0000
+++ b/sql/log_event.h	2013-02-11 15:44:38 +0000
@@ -3073,9 +3073,12 @@ struct slave_connection_state
   ~slave_connection_state();
 
   int load(char *slave_request, size_t len);
+  int load(const rpl_gtid *gtid_list, uint32 count);
   rpl_gtid *find(uint32 domain_id);
+  int update(const rpl_gtid *in_gtid);
   void remove(const rpl_gtid *gtid);
   ulong count() const { return hash.records; }
+  int to_string(String *out_str);
 };
 
 
@@ -3261,6 +3264,8 @@ class Gtid_list_log_event: public Log_ev
 #ifdef MYSQL_SERVER
   bool write(IO_CACHE *file);
 #endif
+  static bool peek(const char *event_start, uint32 event_len,
+                   rpl_gtid **out_gtid_list, uint32 *out_list_len);
 };
 
 

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2013-01-22 14:18:36 +0000
+++ b/sql/sql_repl.cc	2013-02-11 15:44:38 +0000
@@ -656,8 +656,12 @@ get_gtid_list_event(IO_CACHE *cache, Gti
   if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
                                       opt_master_verify_checksum)) ||
       ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+  {
+    if (ev)
+      delete ev;
     return "Could not read format description log event while looking for "
       "GTID position in binlog";
+  }
 
   fdle= static_cast<Format_description_log_event *>(ev);
 
@@ -795,6 +799,177 @@ gtid_find_binlog_file(slave_connection_s
 }
 
 
+/*
+  Given an old-style binlog position with file name and file offset, find the
+  corresponding gtid position. If the offset is not at an event boundary, give
+  an error.
+
+  Return NULL on ok, error message string on error.
+
+  ToDo: Improve the performance of this by using binlog index files.
+*/
+static const char *
+gtid_state_from_pos(const char *name, uint32 offset,
+                    slave_connection_state *gtid_state)
+{
+  IO_CACHE cache;
+  File file;
+  const char *errormsg= NULL;
+  bool found_gtid_list_event= false;
+  bool found_format_description_event= false;
+  bool valid_pos= false;
+  uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+  int err;
+  String packet;
+
+  if (gtid_state->load((const rpl_gtid *)NULL, 0))
+  {
+    errormsg= "Internal error (out of memory?) initializing slave state "
+      "while scanning binlog to find start position";
+    return errormsg;
+  }
+
+  if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
+    return errormsg;
+
+  /*
+    First we need to find the initial GTID_LIST_EVENT. We need this even
+    if the offset is at the very start of the binlog file.
+
+    But if we do not find any GTID_LIST_EVENT, then this is an old binlog
+    with no GTID information, so we return empty GTID state.
+  */
+  for (;;)
+  {
+    Log_event_type typ;
+    uint32 cur_pos;
+
+    cur_pos= (uint32)my_b_tell(&cache);
+    if (cur_pos == offset)
+      valid_pos= true;
+    if (found_format_description_event && found_gtid_list_event &&
+        cur_pos >= offset)
+      break;
+
+    packet.length(0);
+    err= Log_event::read_log_event(&cache, &packet, NULL,
+                                   current_checksum_alg);
+    if (err)
+    {
+      errormsg= "Could not read binlog while searching for slave start "
+        "position on master";
+      goto end;
+    }
+    /*
+      The cast to uchar is needed to avoid a signed char being converted to a
+      negative number.
+    */
+    typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
+    if (typ == FORMAT_DESCRIPTION_EVENT)
+    {
+      if (found_format_description_event)
+      {
+        errormsg= "Duplicate format description log event found while "
+          "searching for old-style position in binlog";
+        goto end;
+      }
+
+      current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
+      found_format_description_event= true;
+    }
+    else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
+    {
+      errormsg= "Did not find format description log event while searching "
+        "for old-style position in binlog";
+      goto end;
+    }
+    else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+             typ == BINLOG_CHECKPOINT_EVENT)
+      continue;                                 /* Continue looking */
+    else if (typ == GTID_LIST_EVENT)
+    {
+      rpl_gtid *gtid_list;
+      bool status;
+      uint32 list_len;
+
+      if (found_gtid_list_event)
+      {
+        errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
+          "to find slave start position";
+        goto end;
+      }
+      status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
+                                        &gtid_list, &list_len);
+      if (status)
+      {
+        errormsg= "Error reading Gtid_list_log_event while searching "
+          "for old-style position in binlog";
+        goto end;
+      }
+      err= gtid_state->load(gtid_list, list_len);
+      my_free(gtid_list);
+      if (err)
+      {
+        errormsg= "Internal error (out of memory?) initialising slave state "
+          "while scanning binlog to find start position";
+        goto end;
+      }
+      found_gtid_list_event= true;
+    }
+    else if (!found_gtid_list_event)
+    {
+      /* We did not find any Gtid_list_log_event, must be old binlog. */
+      goto end;
+    }
+    else if (typ == GTID_EVENT)
+    {
+      rpl_gtid gtid;
+      uchar flags2;
+      if (Gtid_log_event::peek(packet.ptr(), packet.length(), &gtid.domain_id,
+                               &gtid.server_id, &gtid.seq_no, &flags2))
+      {
+        errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
+          "initial slave position";
+        goto end;
+      }
+      if (gtid_state->update(&gtid))
+      {
+        errormsg= "Internal error (out of memory?) updating slave state while "
+          "scanning binlog to find start position";
+        goto end;
+      }
+    }
+  }
+
+  if (!valid_pos)
+  {
+    errormsg= "Slave requested incorrect position in master binlog. "
+      "Requested position %u in file '%s', but this position does not "
+      "correspond to the location of any binlog event.";
+  }
+
+end:
+  end_io_cache(&cache);
+  mysql_file_close(file, MYF(MY_WME));
+
+  return errormsg;
+}
+
+
+int
+gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str)
+{
+  slave_connection_state gtid_state;
+
+  if (pos < 4)
+    pos= 4;
+  if (gtid_state_from_pos(name, pos, &gtid_state) ||
+      gtid_state.to_string(out_str))
+    return 1;
+  return 0;
+}
+
+
 enum enum_gtid_skip_type {
   GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
 };
@@ -945,8 +1120,8 @@ send_event_to_slave(THD *thd, NET *net,
         binlog positions.
       */
       if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
-        return "Failed to replace binlog checkpoint event with dummy: "
-               "too small event.";
+        return "Failed to replace binlog checkpoint or gtid list event with "
+               "dummy: too small event.";
     }
   }
 
@@ -1010,7 +1185,7 @@ void mysql_binlog_send(THD* thd, char* l
   char str_buf[256];
   String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
   bool using_gtid_state;
-  slave_connection_state gtid_state;
+  slave_connection_state gtid_state, return_gtid_state;
   enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
 
   uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;

=== modified file 'sql/sql_repl.h'
--- a/sql/sql_repl.h	2012-11-05 14:01:49 +0000
+++ b/sql/sql_repl.h	2013-02-11 15:44:38 +0000
@@ -69,6 +69,7 @@ extern PSI_mutex_key key_LOCK_slave_stat
 void rpl_init_gtid_slave_state();
 void rpl_deinit_gtid_slave_state();
 int rpl_load_gtid_slave_state(THD *thd);
+int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
 
 #endif /* HAVE_REPLICATION */
 



More information about the commits mailing list