[Commits] e7f5737dee1: MDEV-7409 On RBR, extend the PROCESSLIST info to include at least the name of the recently used table

sachin.setiya at mariadb.com sachin.setiya at mariadb.com
Wed Jul 17 16:23:17 EEST 2019


revision-id: e7f5737dee1216523fdca37e9683cc3e1cc0dccb (mariadb-10.4.5-24-ge7f5737dee1)
parent(s): e35676f5557d68c7b51ba47aa73dcdf72eafa436
author: Sachin
committer: Sachin
timestamp: 2019-07-17 18:53:09 +0530
message:

MDEV-7409 On RBR, extend the PROCESSLIST info to include at least the name of the recently used table

When RBR is used, add the db name to db Field and table name to Status
Field  of the "SHOW FULL PROCESSLIST" command for SQL thread.

---
 mysql-test/suite/rpl/r/rpl_rbr_monitor.result | 56 +++++++++++++++++++
 mysql-test/suite/rpl/t/rpl_rbr_monitor.test   | 77 +++++++++++++++++++++++++++
 sql/log_event.cc                              | 71 +++++++++++++++++-------
 3 files changed, 184 insertions(+), 20 deletions(-)

diff --git a/mysql-test/suite/rpl/r/rpl_rbr_monitor.result b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result
new file mode 100644
index 00000000000..ab8574f055f
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result
@@ -0,0 +1,56 @@
+include/master-slave.inc
+[connection master]
+connection master;
+create table t1(a int primary key) engine=innodb;
+connection slave;
+connection slave1;
+begin;
+insert into t1(a) values(1);
+connection master;
+select * from t1;
+a
+connection master;
+insert into t1(a) values(1);
+#monitoring write rows
+connection slave;
+SELECT db , state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Write_rows_log_event::write_row(%) on table %";
+db	state
+test	Write_rows_log_event::write_row(-1) on table `t1`
+#monitoring update rows
+connection slave1;
+rollback;
+begin;
+select a from t1 for update;
+a
+1
+connection master;
+update t1 set a = a + 1 ;
+connection slave;
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::find_row(%) on table %";
+db	state
+test	Update_rows_log_event::find_row(-1) on table `t1`
+#monitoring delete rows
+connection slave1;
+rollback;
+begin;
+select * from t1 for update;
+a
+2
+connection master;
+delete from t1;
+connection slave;
+select * from t1;
+a
+2
+SELECT db , state FROM  INFORMATION_SCHEMA.PROCESSLIST
+WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::find_row(%) on table %";
+db	state
+test	Delete_rows_log_event::find_row(-1) on table `t1`
+connection slave1;
+rollback;
+connection master;
+drop table t1;
+connection slave;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_rbr_monitor.test b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test
new file mode 100644
index 00000000000..cbc38c511fc
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test
@@ -0,0 +1,77 @@
+--source include/have_innodb.inc
+--source include/have_binlog_format_row.inc
+--source include/master-slave.inc
+--enable_connect_log
+
+--connection master
+create table t1(a int primary key) engine=innodb;
+
+--sync_slave_with_master
+--connection slave1
+begin;
+insert into t1(a) values(1);
+--connection master
+select * from t1;
+
+--connection master
+insert into t1(a) values(1);
+--save_master_pos
+
+--echo #monitoring write rows
+--connection slave
+
+
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+      WHERE DB = 'test' AND STATE LIKE  "Write_rows_log_event::write_row(%) on table %";
+--source include/wait_condition.inc
+SELECT db , state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+      WHERE DB = 'test' AND STATE LIKE  "Write_rows_log_event::write_row(%) on table %";
+
+
+--echo #monitoring update rows
+--connection slave1
+rollback;
+--sync_with_master
+begin;
+select a from t1 for update;
+
+--connection master
+update t1 set a = a + 1 ;
+--save_master_pos
+
+--connection slave
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::find_row(%) on table %";
+--source include/wait_condition.inc
+SELECT db, state  FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Update_rows_log_event::find_row(%) on table %";
+
+--echo #monitoring delete rows
+--connection slave1
+rollback;
+--sync_with_master
+begin;
+select * from t1 for update;
+
+--connection master
+delete from t1;
+--save_master_pos
+
+--connection slave
+select * from t1;
+let $wait_condition= SELECT COUNT(*) = 1 FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::find_row(%) on table %";
+--source include/wait_condition.inc
+SELECT db , state FROM  INFORMATION_SCHEMA.PROCESSLIST
+  WHERE DB = 'test' AND STATE LIKE  "Delete_rows_log_event::find_row(%) on table %";
+
+#CleanUp
+--connection slave1
+rollback;
+--sync_with_master
+
+--connection master
+drop table t1;
+--sync_slave_with_master
+
+--source include/rpl_end.inc
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f25ebd56792..3a7f37eae94 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -13693,19 +13693,27 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
 {
   DBUG_ASSERT(m_table != NULL);
   const char *tmp= thd->get_proc_info();
-  const char *message= "Write_rows_log_event::write_row()";
+  LEX_CSTRING *tmp_db= &thd->db;
+  char *message, msg[128];
+  const char *table_name= m_table->s->table_name.str;
+  char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name));
+  my_snprintf(msg, sizeof(msg),"Write_rows_log_event::write_row() on table %c%s%c",
+                   quote_char, table_name, quote_char);
+  thd->set_db(&m_table->s->db);
+  message= msg;
   int error;
 
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Write_rows_log_event::write_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Write_rows_log_event::write_row(%lld) on table %c%s%c",
+              (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char);
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
   thd_proc_info(thd, message);
   error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
   thd_proc_info(thd, tmp);
+  thd->set_db(tmp_db);
 
   if (unlikely(error) && unlikely(!thd->is_error()))
   {
@@ -14368,15 +14376,22 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
 {
   int error;
   const char *tmp= thd->get_proc_info();
-  const char *message= "Delete_rows_log_event::find_row()";
+  LEX_CSTRING *tmp_db= &thd->db;
+  char *message, msg[128];
+  const char *table_name= m_table->s->table_name.str;
+  char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name));
+  my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::find_row() on table %c%s%c",
+                   quote_char, table_name, quote_char);
+  thd->set_db(&m_table->s->db);
+  message= msg;
   const bool invoke_triggers=
     slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
   DBUG_ASSERT(m_table != NULL);
 
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Delete_rows_log_event::find_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Delete_rows_log_event::find_row(%lld) on table %c%s%c",
+              (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char);
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
@@ -14386,11 +14401,13 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
     /*
       Delete the record found, located in record[0]
     */
-    message= "Delete_rows_log_event::ha_delete_row()";
+    my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::ha_delete_row() on table %c%s%c",
+                   quote_char, table_name, quote_char);
+    message= msg;
 #ifdef WSREP_PROC_INFO
     snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-             "Delete_rows_log_event::ha_delete_row(%lld)",
-             (long long) wsrep_thd_trx_seqno(thd));
+             "Delete_rows_log_event::ha_delete_row(%lld) on table %c%s%c",
+              (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char);
     message= thd->wsrep_info;
 #endif
     thd_proc_info(thd, message);
@@ -14422,6 +14439,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
     m_table->file->ha_index_or_rnd_end();
   }
   thd_proc_info(thd, tmp);
+  thd->set_db(tmp_db);
   return error;
 }
 
@@ -14590,17 +14608,25 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
   const bool invoke_triggers=
     slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
   const char *tmp= thd->get_proc_info();
-  const char *message= "Update_rows_log_event::find_row()";
+  LEX_CSTRING *tmp_db= &thd->db;
+  char *message, msg[128];
+  const char *table_name= m_table->s->table_name.str;
+  char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name));
   DBUG_ASSERT(m_table != NULL);
+  my_snprintf(msg, sizeof(msg),"Update_rows_log_event::find_row() on table %c%s%c",
+                   quote_char, table_name, quote_char);
+  thd->set_db(&m_table->s->db);
+  message= msg;
 
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Update_rows_log_event::find_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Update_rows_log_event::find_row(%lld) on table %c%s%c",
+              (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char);
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
   thd_proc_info(thd, message);
+
   int error= find_row(rgi); 
   if (unlikely(error))
   {
@@ -14611,6 +14637,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
     if ((m_curr_row= m_curr_row_end))
       unpack_current_row(rgi, &m_cols_ai);
     thd_proc_info(thd, tmp);
+    thd->db= *tmp_db;
     return error;
   }
 
@@ -14628,11 +14655,13 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
   store_record(m_table,record[1]);
 
   m_curr_row= m_curr_row_end;
-  message= "Update_rows_log_event::unpack_current_row()";
+  my_snprintf(msg, sizeof(msg),"Update_rows_log_event::unpack_current_row() on table %c%s%c",
+                   quote_char, table_name, quote_char);
+  message= msg;
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Update_rows_log_event::unpack_current_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Update_rows_log_event::unpack_current_row(%lld) on table %c%s%c",
+              (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char);
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
@@ -14655,11 +14684,13 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
   DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
 #endif
 
-  message= "Update_rows_log_event::ha_update_row()";
+  my_snprintf(msg, sizeof(msg),"Update_rows_log_event::ha_update_row() on table %c%s%c",
+              quote_char, table_name, quote_char);
+  message= msg;
 #ifdef WSREP_PROC_INFO
   my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
-              "Update_rows_log_event::ha_update_row(%lld)",
-              (long long) wsrep_thd_trx_seqno(thd));
+              "Update_rows_log_event::ha_update_row(%lld) on table %c%s%c",
+              (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char);
   message= thd->wsrep_info;
 #endif /* WSREP_PROC_INFO */
 
@@ -14693,9 +14724,9 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
       unlikely(process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE)))
     error= HA_ERR_GENERIC; // in case if error is not set yet
 
-  thd_proc_info(thd, tmp);
-
 err:
+  thd_proc_info(thd, tmp);
+  thd->set_db(tmp_db);
   m_table->file->ha_index_or_rnd_end();
   return error;
 }


More information about the commits mailing list