[Commits] Rev 3009: MWL#234: @@do_not_replicate: stage 2: master-side filtering. in http://bazaar.launchpad.net/~maria-captains/maria/5.2

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Mon Aug 8 14:41:27 EEST 2011


At http://bazaar.launchpad.net/~maria-captains/maria/5.2

------------------------------------------------------------
revno: 3009
revision-id: knielsen at knielsen-hq.org-20110808114127-1clb5yawa0gswxp4
parent: knielsen at knielsen-hq.org-20110804141000-hahpy5hq8lfsy4yz
committer: knielsen at knielsen-hq.org
branch nick: work-5.2-mwl234
timestamp: Mon 2011-08-08 13:41:27 +0200
message:
  MWL#234: @@do_not_replicate: stage 2: master-side filtering.
  
   - If --replicate-ignore-do-not-replicate, slave requests master to filter
     events with the @@do_not_replicate flag set.
  
   - Master filters events as requested by slave
  
   - Slave notices "holes" in binlog positions left by master-side filtering,
     and adjusts its master_log_pos accordingly.
=== modified file 'mysql-test/suite/rpl/r/rpl_do_not_replicate.result'
--- a/mysql-test/suite/rpl/r/rpl_do_not_replicate.result	2011-08-04 14:10:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_do_not_replicate.result	2011-08-08 11:41:27 +0000
@@ -14,6 +14,7 @@ SET do_not_replicate=1;
 CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=myisam;
 INSERT INTO t1(a) VALUES (2);
 INSERT INTO t2(a) VALUES (2);
+FLUSH NO_WRITE_TO_BINLOG LOGS;
 SHOW TABLES;
 Tables_in_test
 t1
@@ -25,6 +26,7 @@ SELECT * FROM t2;
 a       b
 1       NULL
 DROP TABLE t3;
+FLUSH NO_WRITE_TO_BINLOG LOGS;
 STOP SLAVE;
 SET GLOBAL replicate_ignore_do_not_replicate=0;
 START SLAVE;

=== modified file 'mysql-test/suite/rpl/t/rpl_do_not_replicate.test'
--- a/mysql-test/suite/rpl/t/rpl_do_not_replicate.test	2011-08-04 14:10:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_do_not_replicate.test	2011-08-08 11:41:27 +0000
@@ -22,6 +22,11 @@ CREATE TABLE t3 (a INT PRIMARY KEY, b IN
 INSERT INTO t1(a) VALUES (2);
 INSERT INTO t2(a) VALUES (2);
 
+# Inject a rotate event in the binlog stream sent to slave (otherwise we will
+# fail sync_slave_with_master as the last event on the master is not present
+# on the slave).
+FLUSH NO_WRITE_TO_BINLOG LOGS;
+
 sync_slave_with_master;
 connection slave;
 SHOW TABLES;
@@ -31,6 +36,7 @@ SELECT * FROM t2;
 connection master;
 DROP TABLE t3;
 
+FLUSH NO_WRITE_TO_BINLOG LOGS;
 sync_slave_with_master;
 connection slave;
 STOP SLAVE;

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2011-08-04 14:10:00 +0000
+++ b/sql/slave.cc	2011-08-08 11:41:27 +0000
@@ -1176,6 +1176,38 @@ when it try to get the value of TIME_ZON
     }
   }
 
+  /*
+    Request the master to filter away events with the @@do_not_replicate flag
+    set, if we are running with --replicate-ignore-do_not_replicate=1.
+  */
+  if (opt_replicate_ignore_do_not_replicate)
+  {
+    if (!mysql_real_query(mysql, STRING_WITH_LEN("SET do_not_replicate=1")))
+    {
+      err_code= mysql_errno(mysql);
+      if (is_network_error(err_code))
+      {
+        mi->report(ERROR_LEVEL, err_code,
+                   "Setting master-side filtering of @@do_not_replicate failed "
+                   "with error: %s", mysql_error(mysql));
+        goto network_err;
+      }
+      else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE)
+      {
+        /*
+          The master is older than the slave and does not support the
+          @@do_not_replicate feature.
+          This is not a problem, as such master will not generate events with
+          the @@do_not_replicate flag set in the first place. We will still
+          do slave-side filtering of such events though, to handle the (rare)
+          case of downgrading a master and receiving old events generated from
+          before the downgrade with the @@do_not_replicate flag set.
+        */
+        DBUG_PRINT("info", ("Old master does not support master-side filtering "
+                            "of @@do_not_replicate events."));
+      }
+    }
+  }
 err:
   if (errmsg)
   {
@@ -3584,6 +3616,7 @@ static int queue_event(Master_info* mi,c
 {
   int error= 0;
   ulong inc_pos;
+  ulong event_pos;
   Relay_log_info *rli= &mi->rli;
   pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
   DBUG_ENTER("queue_event");
@@ -3669,6 +3702,23 @@ static int queue_event(Master_info* mi,c
   }
 
   /*
+    If we filter events master-side (eg. @@do_not_replicate), we will see holes
+    in the event positions from the master. If we see such a hole, adjust
+    mi->master_log_pos accordingly so we maintain the correct position (for
+    reconnect, MASTER_POS_WAIT(), etc.)
+  */
+  if (inc_pos > 0 &&
+      event_len >= LOG_POS_OFFSET+4 &&
+      (event_pos= uint4korr(buf+LOG_POS_OFFSET)) > mi->master_log_pos + inc_pos)
+  {
+    inc_pos= event_pos - mi->master_log_pos;
+    DBUG_PRINT("info", ("Adjust master_log_pos %lu->%lu to account for "
+                        "master-side filtering",
+                        (unsigned long)(mi->master_log_pos + inc_pos),
+                        event_pos));
+  }
+
+  /*
      If this event is originating from this server, don't queue it.
      We don't check this for 3.23 events because it's simpler like this; 3.23
      will be filtered anyway by the SQL slave thread which also tests the

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2011-05-09 13:06:16 +0000
+++ b/sql/sql_repl.cc	2011-08-08 11:41:27 +0000
@@ -338,6 +338,41 @@ Increase max_allowed_packet on master";
 
 
 /*
+  Helper function for mysql_binlog_send() to write an event down the slave
+  connection.
+
+  Returns NULL on success, error message string on error.
+*/
+static const char *
+send_event_to_slave(THD *thd, NET *net, String* const packet)
+{
+  thd_proc_info(thd, "Sending binlog event to slave");
+
+  /*
+    Skip events with the @@do_not_replicate flag set, if slave requested
+    skipping of such events.
+  */
+  if (thd->options & OPTION_DO_NOT_REPLICATE)
+  {
+    uint16 flags= uint2korr(&((*packet)[FLAGS_OFFSET+1]));
+    if (flags & LOG_EVENT_DO_NOT_REPLICATE_F)
+      return NULL;
+  }
+
+  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+    return "Failed on my_net_write()";
+
+  DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
+  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+  {
+    if (send_file(thd))
+      return "failed in send_file()";
+  }
+
+  return NULL;    /* Success */
+}
+
+/*
   TODO: Clean up loop to only have one call to send_file()
 */
 
@@ -349,9 +384,9 @@ void mysql_binlog_send(THD* thd, char* l
   char search_file_name[FN_REFLEN], *name;
   IO_CACHE log;
   File file = -1;
-  String* packet = &thd->packet;
+  String* const packet = &thd->packet;
   int error;
-  const char *errmsg = "Unknown error";
+  const char *errmsg = "Unknown error", *tmp_msg;
   NET* net = &thd->net;
   pthread_mutex_t *log_lock;
   bool binlog_can_be_corrupted= FALSE;
@@ -588,9 +623,9 @@ impossible position";
       else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
         binlog_can_be_corrupted= FALSE;
 
-      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+      if ((tmp_msg= send_event_to_slave(thd, net, packet)))
       {
-        errmsg = "Failed on my_net_write()";
+        errmsg = tmp_msg;
         my_errno= ER_UNKNOWN_ERROR;
         goto err;
       }
@@ -603,17 +638,6 @@ impossible position";
                         }
                       });
 
-      DBUG_PRINT("info", ("log event code %d",
-                          (*packet)[LOG_EVENT_OFFSET+1] ));
-      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
-      {
-        if (send_file(thd))
-        {
-          errmsg = "failed in send_file()";
-          my_errno= ER_UNKNOWN_ERROR;
-          goto err;
-        }
-      }
       packet->set("\0", 1, &my_charset_bin);
     }
 
@@ -713,23 +737,12 @@ impossible position";
 
         if (read_packet)
         {
-          thd_proc_info(thd, "Sending binlog event to slave");
-          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
-          {
-            errmsg = "Failed on my_net_write()";
-            my_errno= ER_UNKNOWN_ERROR;
-            goto err;
-          }
-
-          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
-          {
-            if (send_file(thd))
-            {
-              errmsg = "failed in send_file()";
-              my_errno= ER_UNKNOWN_ERROR;
-              goto err;
-            }
-          }
+          if ((tmp_msg= send_event_to_slave(thd, net, packet)))
+          {
+            errmsg = tmp_msg;
+            my_errno= ER_UNKNOWN_ERROR;
+            goto err;
+          }
           packet->set("\0", 1, &my_charset_bin);
           /*
             No need to net_flush because we will get to flush later when



More information about the commits mailing list