[Commits] ed04c40: MDEV-5289: master server starts slave parallel threads

Kristian Nielsen knielsen at knielsen-hq.org
Wed Mar 11 10:20:15 EET 2015


revision-id: ed04c40b01c122436eda6552c550d62ce8a3920b
parent(s): a7fd11b31d52b62ef7b61783bb83a5e62271307b
committer: Kristian Nielsen
branch nick: server
timestamp: 2015-03-11 09:18:16 +0100
message:

MDEV-5289: master server starts slave parallel threads

Delay spawning parallel replication worker threads until a slave SQL
thread is running, and de-spawn them when the last SQL thread stops.

This is especially useful to avoid needless threads on a master in a
setup where same my.cnf is used on masters and slaves.

---
 mysql-test/suite/rpl/r/rpl_parallel.result         |   14 +++++
 mysql-test/suite/rpl/t/rpl_parallel.test           |   11 ++++
 .../sys_vars/r/slave_parallel_threads_basic.result |   15 ++++-
 .../sys_vars/t/slave_parallel_threads_basic.cnf    |    5 ++
 .../sys_vars/t/slave_parallel_threads_basic.test   |    9 ++-
 sql/rpl_mi.cc                                      |   28 ++++++++-
 sql/rpl_mi.h                                       |    1 +
 sql/rpl_parallel.cc                                |   63 ++++++++------------
 sql/rpl_parallel.h                                 |    6 +-
 sql/slave.cc                                       |    9 ++-
 sql/sys_vars.cc                                    |    8 +--
 11 files changed, 116 insertions(+), 53 deletions(-)

diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 3c66a54..499ca23 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -4,8 +4,22 @@ SET GLOBAL slave_parallel_threads=10;
 ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first
 include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=10;
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
 CHANGE MASTER TO master_use_gtid=slave_pos;
 include/start_slave.inc
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*)))
+OK
+include/stop_slave.inc
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
+include/start_slave.inc
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*)))
+OK
 *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
 ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
 CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 7397ede..cafdffe 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -12,9 +12,20 @@ SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
 SET GLOBAL slave_parallel_threads=10;
 --source include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=10;
+
+# Check that we do not spawn any worker threads when no slave is running.
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+
 CHANGE MASTER TO master_use_gtid=slave_pos;
 --source include/start_slave.inc
 
+# Check that worker threads get spawned when slave starts.
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+# ... and that worker threads get removed when slave stops.
+--source include/stop_slave.inc
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+--source include/start_slave.inc
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
 
 --echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
 
diff --git a/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result
index 2956d04..56aa597 100644
--- a/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result
+++ b/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result
@@ -1,13 +1,22 @@
 SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
-SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default';
-must be zero because of default
-0
+SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf';
+must be 20 because of .cnf
+20
 SELECT @@SESSION.slave_parallel_threads  as 'no session var';
 ERROR HY000: Variable 'slave_parallel_threads' is a GLOBAL variable
 SET GLOBAL slave_parallel_threads= 0;
 SET GLOBAL slave_parallel_threads= DEFAULT;
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default';
+must be 0 because of default
+0
 SET GLOBAL slave_parallel_threads= 10;
 SELECT @@GLOBAL.slave_parallel_threads;
 @@GLOBAL.slave_parallel_threads
 10
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
 SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
diff --git a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf
new file mode 100644
index 0000000..02bdb44
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf
@@ -0,0 +1,5 @@
+# Use default setting for mysqld processes
+!include include/default_mysqld.cnf
+
+[mysqld.1]
+slave_parallel_threads=20
diff --git a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test
index 8e98748..b567b7f 100644
--- a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test
+++ b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test
@@ -2,13 +2,20 @@
 
 SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
 
-SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default';
+# Check that we don't spawn worker threads at server startup, when no
+# slave is configured (MDEV-5289).
+SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf';
 --error ER_INCORRECT_GLOBAL_LOCAL_VAR
 SELECT @@SESSION.slave_parallel_threads  as 'no session var';
 
 SET GLOBAL slave_parallel_threads= 0;
 SET GLOBAL slave_parallel_threads= DEFAULT;
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default';
 SET GLOBAL slave_parallel_threads= 10;
 SELECT @@GLOBAL.slave_parallel_threads;
+# Check that we don't spawn worker threads when no slave is started.
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
 
 SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 081265c..50cd44d 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -1248,7 +1248,7 @@ bool Master_info_index::remove_master_info(LEX_STRING *name)
 
 bool Master_info_index::give_error_if_slave_running()
 {
-  DBUG_ENTER("warn_if_slave_running");
+  DBUG_ENTER("give_error_if_slave_running");
   mysql_mutex_assert_owner(&LOCK_active_mi);
   if (!this) // master_info_index is set to NULL on server shutdown
     return TRUE;
@@ -1269,6 +1269,32 @@ bool Master_info_index::give_error_if_slave_running()
 
 
 /**
+   Master_info_index::any_slave_sql_running()
+
+   The LOCK_active_mi must be held while calling this function.
+
+   @return
+   TRUE  	If some slave SQL thread is running.
+   FALSE	No slave SQL thread is running
+*/
+
+bool Master_info_index::any_slave_sql_running()
+{
+  DBUG_ENTER("any_slave_sql_running");
+  if (!this) // master_info_index is set to NULL on server shutdown
+    return TRUE;
+
+  for (uint i= 0; i< master_info_hash.records; ++i)
+  {
+    Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i);
+    if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
+      DBUG_RETURN(TRUE);
+  }
+  DBUG_RETURN(FALSE);
+}
+
+
+/**
    Master_info_index::start_all_slaves()
 
    Start all slaves that was not running.
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index ebb1b54..2b0b40f 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -218,6 +218,7 @@ class Master_info_index
   Master_info *get_master_info(LEX_STRING *connection_name,
                                Sql_condition::enum_warning_level warning);
   bool give_error_if_slave_running();
+  bool any_slave_sql_running();
   bool start_all_slaves(THD *thd);
   bool stop_all_slaves(THD *thd);
 };
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ec17728..2f47ad8 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -944,9 +944,9 @@ static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
 }
 
 
-int
+static int
 rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
-                                 uint32 new_count, bool skip_check)
+                                 uint32 new_count)
 {
   uint32 i;
   rpl_parallel_thread **new_list= NULL;
@@ -991,24 +991,6 @@ static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
     new_free_list= new_list[i];
   }
 
-  if (!skip_check)
-  {
-    mysql_mutex_lock(&LOCK_active_mi);
-    if (master_info_index->give_error_if_slave_running())
-    {
-      mysql_mutex_unlock(&LOCK_active_mi);
-      goto err;
-    }
-    if (pool->changing)
-    {
-      mysql_mutex_unlock(&LOCK_active_mi);
-      my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0));
-      goto err;
-    }
-    pool->changing= true;
-    mysql_mutex_unlock(&LOCK_active_mi);
-  }
-
   /*
     Grab each old thread in turn, and signal it to stop.
 
@@ -1068,13 +1050,6 @@ static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
     mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
   }
 
-  if (!skip_check)
-  {
-    mysql_mutex_lock(&LOCK_active_mi);
-    pool->changing= false;
-    mysql_mutex_unlock(&LOCK_active_mi);
-  }
-
   mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
   mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
   mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
@@ -1101,16 +1076,26 @@ static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
     }
     my_free(new_list);
   }
-  if (!skip_check)
-  {
-    mysql_mutex_lock(&LOCK_active_mi);
-    pool->changing= false;
-    mysql_mutex_unlock(&LOCK_active_mi);
-  }
   return 1;
 }
 
 
+int
+rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
+{
+  if (!pool->count)
+    return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads);
+  return 0;
+}
+
+
+int
+rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
+{
+  return rpl_parallel_change_thread_count(pool, 0);
+}
+
+
 void
 rpl_parallel_thread::batch_free()
 {
@@ -1354,7 +1339,7 @@ static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
 
 
 rpl_parallel_thread_pool::rpl_parallel_thread_pool()
-  : count(0), threads(0), free_list(0), changing(false), inited(false)
+  : count(0), threads(0), free_list(0), inited(false)
 {
 }
 
@@ -1369,10 +1354,14 @@ static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
   mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
                    MY_MUTEX_INIT_SLOW);
   mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL);
-  changing= false;
   inited= true;
 
-  return rpl_parallel_change_thread_count(this, size, true);
+  /*
+    The pool is initially empty. Threads will be spawned when a slave SQL
+    thread is started.
+  */
+
+  return 0;
 }
 
 
@@ -1381,7 +1370,7 @@ static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
 {
   if (!inited)
     return;
-  rpl_parallel_change_thread_count(this, 0, true);
+  rpl_parallel_change_thread_count(this, 0);
   mysql_mutex_destroy(&LOCK_rpl_thread_pool);
   mysql_cond_destroy(&COND_rpl_thread_pool);
   inited= false;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index d60b220..00421d6 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -204,7 +204,6 @@ struct rpl_parallel_thread_pool {
   struct rpl_parallel_thread *free_list;
   mysql_mutex_t LOCK_rpl_thread_pool;
   mysql_cond_t COND_rpl_thread_pool;
-  bool changing;
   bool inited;
 
   rpl_parallel_thread_pool();
@@ -314,9 +313,8 @@ struct rpl_parallel {
 extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
 
 
-extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
-                                            uint32 new_count,
-                                            bool skip_check= false);
+extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
+extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
 extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
 
 #endif  /* RPL_PARALLEL_H */
diff --git a/sql/slave.cc b/sql/slave.cc
index 1e1edcb..703338c 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -652,6 +652,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
       DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
 
     mysql_mutex_unlock(log_lock);
+
+    if (opt_slave_parallel_threads > 0 &&
+        !master_info_index->any_slave_sql_running())
+      rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
   }
   if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
   {
@@ -958,7 +962,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
                               mi);
   if (!error && (thread_mask & SLAVE_SQL))
   {
-    error= start_slave_thread(
+    if (opt_slave_parallel_threads > 0)
+      error= rpl_parallel_activate_pool(&global_rpl_thread_pool);
+    if (!error)
+      error= start_slave_thread(
 #ifdef HAVE_PSI_INTERFACE
                               key_thread_slave_sql,
 #endif
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index f1942d0..31bd712 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1752,16 +1752,12 @@ static Sys_var_last_gtid Sys_last_gtid(
 static bool
 fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
 {
-  bool running;
-  bool err= false;
+  bool err;
 
   mysql_mutex_unlock(&LOCK_global_system_variables);
   mysql_mutex_lock(&LOCK_active_mi);
-  running= master_info_index->give_error_if_slave_running();
+  err= master_info_index->give_error_if_slave_running();
   mysql_mutex_unlock(&LOCK_active_mi);
-  if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool,
-                                                  opt_slave_parallel_threads))
-    err= true;
   mysql_mutex_lock(&LOCK_global_system_variables);
 
   return err;


More information about the commits mailing list