[Commits] befd5104a9c: MDEV-7409 On RBR, extend the PROCESSLIST info to include at least the name of the recently used table

sachin.setiya at mariadb.com sachin.setiya at mariadb.com
Tue May 28 11:12:48 EEST 2019


revision-id: befd5104a9cd8a8ffba29a417d5c58f369585fb3 (mariadb-10.4.4-55-gbefd5104a9c)
parent(s): 4c995eb16843e56cd07c20e1f97ae495f0614010
author: Sachin
committer: Sachin
timestamp: 2019-05-28 13:42:09 +0530
message:

MDEV-7409 On RBR, extend the PROCESSLIST info to include at least the name of the recently used table

When RBR is used, add the db name to db Field and table name to Status
Field  of the "SHOW FULL PROCESSLIST" command for SQL thread.

---
 mysql-test/suite/rpl/r/rpl_rbr_monitor.result | 57 +++++++++++++++++
 mysql-test/suite/rpl/t/rpl_rbr_monitor.test   | 86 +++++++++++++++++++++++++
 sql/debug_sync.cc                             |  3 +-
 sql/log_event.cc                              | 90 +++++++++++++++++++++------
 4 files changed, 215 insertions(+), 21 deletions(-)

diff --git a/mysql-test/suite/rpl/r/rpl_rbr_monitor.result b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result
new file mode 100644
index 00000000000..67e98b80de0
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result
@@ -0,0 +1,57 @@
+include/master-slave.inc
+[connection master]
+connection master;
+create table t1(a int primary key);
+connection slave;
+SET GLOBAL debug_dbug="+d,should_wait_for_mdev7409";
+select * from t1;
+a
+connection master;
+insert into t1(a) values(1);
+#monitoring write rows
+connection slave;
+SELECT db , state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Write_rows_log_event::write_row(%) on table t1";
+db	state
+test	Write_rows_log_event::write_row(-1) on table t1
+set debug_sync="now signal cont";
+#monitoring update rows
+connection master;
+update t1 set a = a + 4194304 ;
+connection slave;
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::find_row(%) on table t1";
+db	state
+test	Update_rows_log_event::find_row(-1) on table t1
+set debug_sync="now signal cont1";
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::unpack_current_row(%) on table t1";
+db	state
+test	Update_rows_log_event::unpack_current_row(-1) on table t1
+set debug_sync="now signal cont2";
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::ha_update_row(%) on table t1";
+db	state
+test	Update_rows_log_event::ha_update_row(-1) on table t1
+set debug_sync="now signal cont3";
+set debug_sync="RESET";
+#monitoring delete rows
+connection master;
+delete from t1 where a>1;
+connection slave;
+SELECT db , state FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::find_row(%) on table t1";
+db	state
+test	Delete_rows_log_event::find_row(-1) on table t1
+set debug_sync="now signal cont1";
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::ha_delete_row(%) on table t1";
+db	state
+test	Delete_rows_log_event::ha_delete_row(-1) on table t1
+set debug_sync="now signal cont2";
+set debug_sync="RESET";
+SET GLOBAL debug_dbug="";
+connection master;
+drop table t1;
+connection slave;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_rbr_monitor.test b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test
new file mode 100644
index 00000000000..e9eca2c0047
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test
@@ -0,0 +1,86 @@
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+--source include/master-slave.inc
+--enable_connect_log
+
+--connection master
+create table t1(a int primary key);
+--save_master_pos
+
+--connection slave
+--sync_with_master
+SET GLOBAL debug_dbug="+d,should_wait_for_mdev7409";
+select * from t1;
+
+--connection master
+insert into t1(a) values(1);
+--save_master_pos
+
+--echo #monitoring write rows
+--connection slave
+
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+      WHERE DB = 'test' AND STATE LIKE  "Write_rows_log_event::write_row(%) on table t1";
+--source include/wait_condition.inc
+SELECT db , state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+      WHERE DB = 'test' AND STATE LIKE  "Write_rows_log_event::write_row(%) on table t1";
+set debug_sync="now signal cont";
+--sync_with_master
+
+--echo #monitoring update rows
+--connection master
+update t1 set a = a + 4194304 ;
+
+--connection slave
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::find_row(%) on table t1";
+--source include/wait_condition.inc
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::find_row(%) on table t1";
+set debug_sync="now signal cont1";
+
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::unpack_current_row(%) on table t1";
+--source include/wait_condition.inc
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::unpack_current_row(%) on table t1";
+set debug_sync="now signal cont2";
+
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::ha_update_row(%) on table t1";
+--source include/wait_condition.inc
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::ha_update_row(%) on table t1";
+set debug_sync="now signal cont3";
+set debug_sync="RESET";
+--sync_with_master
+
+--echo #monitoring delete rows
+--connection master
+delete from t1 where a>1;
+
+--connection slave
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::find_row(%) on table t1";
+--source include/wait_condition.inc
+SELECT db , state FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::find_row(%) on table t1";
+set debug_sync="now signal cont1";
+
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::ha_delete_row(%) on table t1";
+--source include/wait_condition.inc
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::ha_delete_row(%) on table t1";
+set debug_sync="now signal cont2";
+set debug_sync="RESET";
+--sync_with_master
+SET GLOBAL debug_dbug="";
+
+#CleanUp
+--connection master
+drop table t1;
+--sync_slave_with_master
+
+--source include/rpl_end.inc
diff --git a/sql/debug_sync.cc b/sql/debug_sync.cc
index 357d8f4ce60..f7bc817ab13 100644
--- a/sql/debug_sync.cc
+++ b/sql/debug_sync.cc
@@ -1360,7 +1360,8 @@ static void debug_sync_execute(THD *thd, st_debug_sync_action *action)
       Do this before emitting the signal, so other threads can see it
       if they awake before we enter_cond() below.
     */
-    if (action->wait_for.length())
+    if (action->wait_for.length() &&
+            DBUG_EVALUATE_IF("should_wait_for_mdev7409", 0, 1))
     {
       st_debug_sync_control *ds_control= thd->debug_sync_control;
       strxnmov(ds_control->ds_proc_info, sizeof(ds_control->ds_proc_info)-1,
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 70f0e6c2623..8b1d7dd4b69 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -53,6 +53,7 @@
 #include "rpl_constants.h"
 #include "sql_digest.h"
 #include "zlib.h"
+#include "debug_sync.h"
 
 #define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
 
@@ -13670,19 +13671,29 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
 {
   DBUG_ASSERT(m_table != NULL);
   const char *tmp= thd->get_proc_info();
-  const char *message= "Write_rows_log_event::write_row()";
+  LEX_CSTRING *tmp_db= &thd->db;
+  char *message, msg[128];
+  my_snprintf(msg, sizeof(msg),"Write_rows_log_event::write_row() on table %s",
+                   m_table->s->table_name.str);
+  thd->db= m_table->s->db;
+  message= msg;
   int error;
 
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Write_rows_log_event::write_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Write_rows_log_event::write_row(%lld) on table %s",
+              (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str);
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
   thd_proc_info(thd, message);
+  DBUG_EXECUTE_IF("should_wait_for_mdev7409",{
+                    debug_sync_set_action
+                    (thd, STRING_WITH_LEN("now WAIT_FOR cont"));
+                  };);
   error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
   thd_proc_info(thd, tmp);
+  thd->db= *tmp_db;
 
   if (unlikely(error) && unlikely(!thd->is_error()))
   {
@@ -14345,32 +14356,47 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
 {
   int error;
   const char *tmp= thd->get_proc_info();
-  const char *message= "Delete_rows_log_event::find_row()";
+  LEX_CSTRING *tmp_db= &thd->db;
+  char *message, msg[128];
+  my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::find_row() on table %s",
+                   m_table->s->table_name.str);
+  thd->db= m_table->s->db;
+  message= msg;
   const bool invoke_triggers=
     slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
   DBUG_ASSERT(m_table != NULL);
 
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Delete_rows_log_event::find_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Delete_rows_log_event::find_row(%lld) on table %s",
+              (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
   thd_proc_info(thd, message);
+  DBUG_EXECUTE_IF("should_wait_for_mdev7409",{
+                    debug_sync_set_action
+                    (thd, STRING_WITH_LEN("now WAIT_FOR cont1"));
+                  };);
   if (likely(!(error= find_row(rgi))))
   { 
     /*
       Delete the record found, located in record[0]
     */
-    message= "Delete_rows_log_event::ha_delete_row()";
+    my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::ha_delete_row() on table %s",
+                   m_table->s->table_name.str);
+    message= msg;
 #ifdef WSREP_PROC_INFO
     snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-             "Delete_rows_log_event::ha_delete_row(%lld)",
-             (long long) wsrep_thd_trx_seqno(thd));
+             "Delete_rows_log_event::ha_delete_row(%lld) on table %s",
+              (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
     message= thd->wsrep_info;
 #endif
     thd_proc_info(thd, message);
+    DBUG_EXECUTE_IF("should_wait_for_mdev7409",{
+                    debug_sync_set_action
+                    (thd, STRING_WITH_LEN("now WAIT_FOR cont2"));
+                  };);
 
     if (invoke_triggers &&
         unlikely(process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE)))
@@ -14399,6 +14425,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
     m_table->file->ha_index_or_rnd_end();
   }
   thd_proc_info(thd, tmp);
+  thd->db= *tmp_db;
   return error;
 }
 
@@ -14567,17 +14594,27 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
   const bool invoke_triggers=
     slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
   const char *tmp= thd->get_proc_info();
-  const char *message= "Update_rows_log_event::find_row()";
+  LEX_CSTRING *tmp_db= &thd->db;
+  char *message, msg[128];
   DBUG_ASSERT(m_table != NULL);
+  my_snprintf(msg, sizeof(msg),"Update_rows_log_event::find_row() on table %s",
+                   m_table->s->table_name.str);
+  thd->db= m_table->s->db;
+  message= msg;
 
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Update_rows_log_event::find_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Update_rows_log_event::find_row(%lld) on table %s",
+              (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
   thd_proc_info(thd, message);
+  DBUG_EXECUTE_IF("should_wait_for_mdev7409",{
+                    debug_sync_set_action
+                    (thd, STRING_WITH_LEN("now WAIT_FOR cont1"));
+                  };);
+
   int error= find_row(rgi); 
   if (unlikely(error))
   {
@@ -14588,6 +14625,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
     if ((m_curr_row= m_curr_row_end))
       unpack_current_row(rgi, &m_cols_ai);
     thd_proc_info(thd, tmp);
+    thd->db= *tmp_db;
     return error;
   }
 
@@ -14605,16 +14643,22 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
   store_record(m_table,record[1]);
 
   m_curr_row= m_curr_row_end;
-  message= "Update_rows_log_event::unpack_current_row()";
+  my_snprintf(msg, sizeof(msg),"Update_rows_log_event::unpack_current_row() on table %s",
+                   m_table->s->table_name.str);
+  message= msg;
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Update_rows_log_event::unpack_current_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Update_rows_log_event::unpack_current_row(%lld) on table %s",
+              (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
   /* this also updates m_curr_row_end */
   thd_proc_info(thd, message);
+  DBUG_EXECUTE_IF("should_wait_for_mdev7409",{
+                    debug_sync_set_action
+                    (thd, STRING_WITH_LEN("now WAIT_FOR cont2"));
+                  };);
   if (unlikely((error= unpack_current_row(rgi, &m_cols_ai))))
     goto err;
 
@@ -14632,15 +14676,21 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
   DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
 #endif
 
-  message= "Update_rows_log_event::ha_update_row()";
+  my_snprintf(msg, sizeof(msg),"Update_rows_log_event::ha_update_row() on table %s",
+             m_table->s->table_name.str);
+  message= msg;
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Update_rows_log_event::ha_update_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Update_rows_log_event::ha_update_row(%lld) on table %s",
+              (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ;
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
   thd_proc_info(thd, message);
+  DBUG_EXECUTE_IF("should_wait_for_mdev7409",{
+                    debug_sync_set_action
+                    (thd, STRING_WITH_LEN("now WAIT_FOR cont3"));
+                  };);
   if (invoke_triggers &&
       unlikely(process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE)))
   {
@@ -14670,9 +14720,9 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
       unlikely(process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE)))
     error= HA_ERR_GENERIC; // in case if error is not set yet
 
-  thd_proc_info(thd, tmp);
-
 err:
+  thd_proc_info(thd, tmp);
+  thd->db= *tmp_db;
   m_table->file->ha_index_or_rnd_end();
   return error;
 }


More information about the commits mailing list