[Commits] 15e5c91: MDEV-12179: Per-engine mysql.gtid_slave_pos table

Kristian Nielsen knielsen at knielsen-hq.org
Thu Mar 9 16:47:23 EET 2017


revision-id: 15e5c91276ee8f6a0ab3e0543d9292786aba88cb (mariadb-10.1.21-4-g15e5c91)
parent(s): c034ee89e298abdb77d1b8e3a39012b6f5a6683d
author: Kristian Nielsen
committer: Kristian Nielsen
timestamp: 2017-03-09 13:27:27 +0100
message:

MDEV-12179: Per-engine mysql.gtid_slave_pos table

Intermediate commit.

For each GTID recorded in mysq.gtid_slave_pos, keep track of which
engine the update was made in.

This will be later used to know which rows can be deleted in the table
of a given engine.

---
 sql/log_event.cc | 18 +++++++++++-------
 sql/rpl_gtid.cc  | 26 +++++++++++++++++---------
 sql/rpl_gtid.h   | 18 +++++++++++++-----
 sql/rpl_rli.cc   | 18 +++++++++++-------
 4 files changed, 52 insertions(+), 28 deletions(-)

diff --git a/sql/log_event.cc b/sql/log_event.cc
index 21b5de2..f61bee5 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -4227,6 +4227,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
   int expected_error,actual_error= 0;
   Schema_specification_st db_options;
   uint64 sub_id= 0;
+  void *hton= NULL;
   rpl_gtid gtid;
   Relay_log_info const *rli= rgi->rli;
   Rpl_filter *rpl_filter= rli->mi->rpl_filter;
@@ -4397,7 +4398,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
 
           gtid= rgi->current_gtid;
           if (rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id,
-                                                       true, false))
+                                                       true, false, &hton))
           {
             int errcode= thd->get_stmt_da()->sql_errno();
             if (!is_parallel_retry_error(rgi, errcode))
@@ -4616,7 +4617,7 @@ START SLAVE; . Query: '%s'", expected_error, thd->query());
 
 end:
   if (sub_id && !thd->is_slave_error)
-    rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
+    rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
 
   /*
     Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -7084,15 +7085,17 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
   int ret;
   if (gl_flags & FLAG_IGN_GTIDS)
   {
+    void *hton= NULL;
     uint32 i;
+
     for (i= 0; i < count; ++i)
     {
       if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i],
-                                                        sub_id_list[i],
-                                                        false, false)))
+                                                         sub_id_list[i],
+                                                         false, false, &hton)))
         return ret;
       rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i],
-                                                    NULL);
+                                                     hton, NULL);
     }
   }
   ret= Log_event::do_apply_event(rgi);
@@ -7573,6 +7576,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
   rpl_gtid gtid;
   uint64 sub_id= 0;
   Relay_log_info const *rli= rgi->rli;
+  void *hton= NULL;
 
   /*
     XID_EVENT works like a COMMIT statement. And it also updates the
@@ -7593,7 +7597,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
 
     gtid= rgi->current_gtid;
     err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, true,
-                                                  false);
+                                                  false, &hton);
     if (err)
     {
       int ec= thd->get_stmt_da()->sql_errno();
@@ -7626,7 +7630,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
   thd->mdl_context.release_transactional_locks();
 
   if (!res && sub_id)
-    rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
+    rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
 
   /*
     Increment the global status commit count variable
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 87816c9..d3057fd 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -33,7 +33,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
 
 
 void
-rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
                                    rpl_group_info *rgi)
 {
   int err;
@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
     it is even committed.
   */
   mysql_mutex_lock(&LOCK_slave_state);
-  err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
+  err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
   mysql_mutex_unlock(&LOCK_slave_state);
   if (err)
   {
@@ -74,12 +74,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
   if (rgi->gtid_pending)
   {
     uint64 sub_id= rgi->gtid_sub_id;
+    void *hton= NULL;
+
     rgi->gtid_pending= false;
     if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
     {
-      if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
+      if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
         DBUG_RETURN(1);
-      update_state_hash(sub_id, &rgi->current_gtid, rgi);
+      update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
     }
     rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
   }
@@ -287,11 +289,12 @@ rpl_slave_state::truncate_hash()
 
 int
 rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
-                        uint64 seq_no, rpl_group_info *rgi)
+                        uint64 seq_no, void *hton, rpl_group_info *rgi)
 {
   element *elem= NULL;
   list_element *list_elem= NULL;
 
+  DBUG_ASSERT(hton);
   if (!(elem= get_element(domain_id)))
     return 1;
 
@@ -336,6 +339,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
   list_elem->server_id= server_id;
   list_elem->sub_id= sub_id;
   list_elem->seq_no= seq_no;
+  list_elem->hton= hton;
 
   elem->add(list_elem);
   if (last_sub_id < sub_id)
@@ -482,7 +486,8 @@ gtid_check_rpl_slave_state_table(TABLE *table)
 */
 int
 rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
-                             bool in_transaction, bool in_statement)
+                             bool in_transaction, bool in_statement,
+                             void **out_hton)
 {
   TABLE_LIST tlist;
   int err= 0;
@@ -495,6 +500,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
   wait_for_commit* suspended_wfc;
   DBUG_ENTER("record_gtid");
 
+  *out_hton= NULL;
   if (unlikely(!loaded))
   {
     /*
@@ -582,6 +588,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
     table->file->print_error(err, MYF(0));
     goto end;
   }
+  *out_hton= table->s->db_type();
 
   if(opt_bin_log &&
      (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
@@ -1078,11 +1085,12 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
   {
     rpl_gtid gtid;
     uint64 sub_id;
+    void *hton= NULL;
 
     if (gtid_parser_helper(&state_from_master, end, &gtid) ||
         !(sub_id= next_sub_id(gtid.domain_id)) ||
-        record_gtid(thd, &gtid, sub_id, false, in_statement) ||
-        update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
+        record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
+        update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
       return 1;
     if (state_from_master == end)
       break;
@@ -1144,7 +1152,7 @@ rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table
 
 
 struct rpl_slave_state::gtid_pos_table *
-rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton)
+rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton)
 {
   struct gtid_pos_table *p;
   char *allocated_str;
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 2e28d66..4974415 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -112,6 +112,7 @@ struct rpl_slave_state
     uint64 sub_id;
     uint64 seq_no;
     uint32 server_id;
+    void *hton;
   };
 
   /* Elements in the HASH that hold the state for one domain_id. */
@@ -158,7 +159,13 @@ struct rpl_slave_state
   /* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */
   struct gtid_pos_table {
     struct gtid_pos_table *next;
-    handlerton *table_hton;
+    /*
+      Use a void * here, rather than handlerton *, to make explicit that we
+      are not using the value to access any functionality in the engine. It
+      is just used as an opaque value to identify which engine we are using
+      for each GTID row.
+    */
+    void *table_hton;
     LEX_STRING table_name;
   };
 
@@ -179,10 +186,10 @@ struct rpl_slave_state
   void truncate_hash();
   ulong count() const { return hash.records; }
   int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
-             uint64 seq_no, rpl_group_info *rgi);
+             uint64 seq_no, void *hton, rpl_group_info *rgi);
   int truncate_state_table(THD *thd);
   int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
-                  bool in_transaction, bool in_statement);
+                  bool in_transaction, bool in_statement, void **out_hton);
   uint64 next_sub_id(uint32 domain_id);
   int iterate(int (*cb)(rpl_gtid *, void *), void *data,
               rpl_gtid *extra_gtids, uint32 num_extra,
@@ -196,12 +203,13 @@ struct rpl_slave_state
   element *get_element(uint32 domain_id);
   int put_back_list(uint32 domain_id, list_element *list);
 
-  void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi);
+  void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
+                         rpl_group_info *rgi);
   int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
   int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
   void release_domain_owner(rpl_group_info *rgi);
   void set_gtid_pos_tables_list(struct gtid_pos_table *new_list);
-  struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton);
+  struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton);
   void free_gtid_pos_tables(struct gtid_pos_table *list);
 };
 
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 4d00535..91380cf 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1437,11 +1437,11 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
 
 
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; };
+struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
 
 static int
 scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
-                              LEX_STRING *tablename, handlerton **out_hton)
+                              LEX_STRING *tablename, void **out_hton)
 {
   TABLE_LIST tlist;
   TABLE *table;
@@ -1498,6 +1498,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
     tmp_entry.gtid.domain_id= domain_id;
     tmp_entry.gtid.server_id= server_id;
     tmp_entry.gtid.seq_no= seq_no;
+    tmp_entry.hton= table->s->db_type();
     if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
     {
       my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -1513,6 +1514,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
       DBUG_ASSERT(entry->gtid.domain_id == domain_id);
       entry->gtid.server_id= server_id;
       entry->gtid.seq_no= seq_no;
+      entry->hton= table->s->db_type();
     }
     else
     {
@@ -1527,6 +1529,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
       entry->gtid.domain_id= domain_id;
       entry->gtid.server_id= server_id;
       entry->gtid.seq_no= seq_no;
+      entry->hton= table->s->db_type();
       if ((err= my_hash_insert(hash, (uchar *)entry)))
       {
         my_free(entry);
@@ -1621,7 +1624,7 @@ load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg)
   int err;
   load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
   struct rpl_slave_state::gtid_pos_table *p;
-  handlerton *hton;
+  void *hton;
 
   if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
                                           table_name, &hton)))
@@ -1676,10 +1679,11 @@ rpl_load_gtid_slave_state(THD *thd)
   {
     get_dynamic(&array, (uchar *)&tmp_entry, i);
     if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
-                                                 tmp_entry.gtid.server_id,
-                                                 tmp_entry.sub_id,
-                                                 tmp_entry.gtid.seq_no,
-                                                 NULL)))
+                                                  tmp_entry.gtid.server_id,
+                                                  tmp_entry.sub_id,
+                                                  tmp_entry.gtid.seq_no,
+                                                  tmp_entry.hton,
+                                                  NULL)))
     {
       mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
       my_error(ER_OUT_OF_RESOURCES, MYF(0));


More information about the commits mailing list