[Commits] bf24da9: MDEV-7145: Delayed replication, intermediate commit.

Kristian Nielsen knielsen at knielsen-hq.org
Thu Sep 22 09:35:39 EEST 2016


revision-id: bf24da97876f0dbe7dab05f0ca5690f30b84d562 (mariadb-10.1.17-2-gbf24da9)
parent(s): fb0db9d0c8b3b7b11473193c87bffefc5ef9115d
author: Kristian Nielsen
committer: Kristian Nielsen
timestamp: 2016-09-22 08:26:45 +0200
message:

MDEV-7145: Delayed replication, intermediate commit.

Initial merge of delayed replication from MySQL git.

The code from the initial push into MySQL is merged, and the
associated test case passes. A number of tasks are still pending:

1. Check full test suite run for any regressions or .result file updates.

2. Extend the feature to also work for parallel replication.

3. There are some todo-comments about future refactoring left from
MySQL, these should be located and merged on top.

4. There are some later related MySQL commits, these should be checked
and merged. These include:
    e134b9362ba0b750d6ac1b444780019622d14aa5
    b38f0f7857c073edfcc0a64675b7f7ede04be00f
    fd2b210383358fe7697f201e19ac9779879ba72a
    afc397376ec50e96b2918ee64e48baf4dda0d37d

5. The testcase from MySQL relies heavily on sleep and timing for
testing, and seems likely to sporadically fail on heavily loaded test
servers in buildbot or distro build farms.

---
 .../rpl_tests/delayed_slave_wait_on_query.inc      |  39 +++
 mysql-test/include/check-testcase.test             |   3 +
 mysql-test/include/show_delayed_slave_state.inc    |  28 ++
 mysql-test/include/sync_with_master.inc            |  26 ++
 mysql-test/suite/rpl/r/rpl_delayed_slave.result    | 172 ++++++++++
 mysql-test/suite/rpl/t/rpl_delayed_slave.test      | 348 +++++++++++++++++++++
 sql/lex.h                                          |   1 +
 sql/log.cc                                         |   8 +-
 sql/log_event.cc                                   |   7 +
 sql/rpl_mi.cc                                      |   2 +-
 sql/rpl_rli.cc                                     | 178 ++++++-----
 sql/rpl_rli.h                                      | 115 ++++++-
 sql/slave.cc                                       | 245 +++++++++++----
 sql/slave.h                                        |  18 +-
 sql/sql_binlog.cc                                  | 148 +++++----
 sql/sql_lex.cc                                     |   1 -
 sql/sql_lex.h                                      |   5 +-
 sql/sql_repl.cc                                    |   4 +
 sql/sql_yacc.yy                                    |  13 +
 19 files changed, 1150 insertions(+), 211 deletions(-)

diff --git a/mysql-test/extra/rpl_tests/delayed_slave_wait_on_query.inc b/mysql-test/extra/rpl_tests/delayed_slave_wait_on_query.inc
new file mode 100644
index 0000000..5d04d14e
--- /dev/null
+++ b/mysql-test/extra/rpl_tests/delayed_slave_wait_on_query.inc
@@ -0,0 +1,39 @@
+# ==== Purpose ====
+#
+# Auxiliary file used by rpl_delayed_slave.test.  This assumes that an
+# 'INSERT INTO t1...' query has been executed on the master.  It does
+# this:
+#
+# - After half the delay, check the status. It should be delaying and
+#   the query should not have executed.
+#
+# - After one and a half delay, check the status. It should not be
+#   delaying and the query should be executed.
+#
+# ==== Usage ====
+#
+# --source extra/rpl_tests/delayed_slave_wait_on_query.inc
+
+connection master;
+--echo [on slave]
+--let $slave_timeout= $time1
+
+--source include/sync_slave_io_with_master.inc
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect query not executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # sync with master (with timeout 1*T)
+--source include/sync_with_master.inc
+
+--echo # Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--source include/check_slave_is_running.inc
diff --git a/mysql-test/include/check-testcase.test b/mysql-test/include/check-testcase.test
index 083f44c..abac386 100644
--- a/mysql-test/include/check-testcase.test
+++ b/mysql-test/include/check-testcase.test
@@ -67,6 +67,9 @@ if ($tmp)
   --echo Replicate_Do_Domain_Ids	
   --echo Replicate_Ignore_Domain_Ids	
   --echo Parallel_Mode	conservative
+  --echo SQL_Delay	0
+  --echo SQL_Remaining_Delay	NULL
+  --echo Slave_SQL_Running_State	
 }
 if (!$tmp) {
   # Note: after WL#5177, fields 13-18 shall not be filtered-out.
diff --git a/mysql-test/include/show_delayed_slave_state.inc b/mysql-test/include/show_delayed_slave_state.inc
new file mode 100644
index 0000000..8eb7232
--- /dev/null
+++ b/mysql-test/include/show_delayed_slave_state.inc
@@ -0,0 +1,28 @@
+# ==== Purpose ====
+#
+# Display the delay state of the SQL thread.
+#
+# ==== Usage ====
+#
+# --let $verbose_delayed_slave_state= [0|1]
+# --source extra/rpl_tests/show_delayed_slave_state.inc
+#
+# By default, the output is normalized so that it does not depend on
+# exact timing or exact binlog positions. If
+# $verbose_delayed_slave_state is set, then it outputs exact times and
+# binlog positions. This can be useful for debugging.
+
+--let $_delayed_slave_status= query_get_value(SHOW SLAVE STATUS, Slave_SQL_Running_State, 1)
+
+--let $_delayed_slave_remaining_delay= query_get_value(SHOW SLAVE STATUS, SQL_Remaining_Delay, 1)
+--let $_delayed_slave_qualitative_delay= `SELECT CASE WHEN "$_delayed_slave_remaining_delay" = "NULL" THEN "NULL" WHEN "$_delayed_slave_remaining_delay" = "0" THEN "0" ELSE "greater than zero" END`
+
+--let $_delayed_slave_io_pos= query_get_value(SHOW SLAVE STATUS, Read_Master_Log_Pos, 1)
+--let $_delayed_slave_sql_pos= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1)
+--let $_delayed_slave_qualitative_log_pos= `SELECT IF($_delayed_slave_io_pos > $_delayed_slave_sql_pos, "behind", "in sync with")`
+
+--echo Slave_SQL_Running_State='$_delayed_slave_status'; SQL_Remaining_Delay is $_delayed_slave_qualitative_delay; SQL thread is $_delayed_slave_qualitative_log_pos IO thread
+
+if ($verbose_delayed_slave_state) {
+  --echo SQL_Remaining_Delay='$_delayed_slave_remaining_delay'; Read_master_log_pos='$_delayed_slave_io_pos'; Exec_Master_Log_Pos='$_delayed_slave_sql_pos'
+}
diff --git a/mysql-test/include/sync_with_master.inc b/mysql-test/include/sync_with_master.inc
new file mode 100644
index 0000000..dcb995a
--- /dev/null
+++ b/mysql-test/include/sync_with_master.inc
@@ -0,0 +1,26 @@
+# ==== Purpose ====
+#
+# This file does the same as the built-in command sync_with_master,
+# but can be configured to use a custom timeout.  This has the benefit
+# that it accepts the same $slave_timeout and $master_connection
+# parameters as wait_for_slave_param.inc
+#
+#
+# ==== Usage ====
+#
+# --connection master
+# --source include/save_master_pos.inc
+# --connection slave
+# --source include/sync_with_master.inc
+#
+# Parameters to this macro are $slave_timeout and
+# $master_connection. See wait_for_slave_param.inc for
+# descriptions.
+
+--let $slave_param= Relay_Master_Log_File
+--let $slave_param_value= $_master_file
+--source include/wait_for_slave_param.inc
+
+--let $slave_param= Exec_Master_Log_Pos
+--let $slave_param_value= $_master_pos
+--source include/wait_for_slave_param.inc
diff --git a/mysql-test/suite/rpl/r/rpl_delayed_slave.result b/mysql-test/suite/rpl/r/rpl_delayed_slave.result
new file mode 100644
index 0000000..75b263b
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_delayed_slave.result
@@ -0,0 +1,172 @@
+include/master-slave.inc
+[connection master]
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+[on master]
+CREATE TABLE t1 (a VARCHAR(100), b INT AUTO_INCREMENT PRIMARY KEY);
+==== Normal setup ====
+[on slave]
+include/stop_slave.inc
+# CHANGE MASTER TO MASTER_DELAY = 2*T
+# Checking that delay is what we set it to
+# Expect status to be ''
+SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST ORDER BY ID DESC LIMIT 1;
+STATE
+
+include/start_slave.inc
+[on master]
+INSERT INTO t1(a) VALUES ('normal setup');
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect query not executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
+# sleep 1*T
+# sync with master (with timeout 1*T)
+include/wait_for_slave_param.inc [Relay_Master_Log_File]
+include/wait_for_slave_param.inc [Exec_Master_Log_Pos]
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+normal setup	1
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+include/check_slave_is_running.inc
+==== Slave lags "naturally" after master ====
+[on master]
+# CREATE FUNCTION delay_on_slave(time_units INT) RETURNS INT BEGIN IF @@GLOBAL.server_id = 2 THEN RETURN SLEEP(time_units * T); ELSE RETURN 0; END IF; END
+INSERT INTO t1(a) SELECT delay_on_slave(3);
+Warnings:
+Note	1592	Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
+Note	1592	Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
+INSERT INTO t1(a) VALUES ('slave is already lagging: this statement should execute immediately');
+INSERT INTO t1(a) SELECT delay_on_slave(2);
+Warnings:
+Note	1592	Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
+Note	1592	Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect no query executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+normal setup	1
+Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
+# wait for first query to execute
+# sleep 1*T
+# Expect second query executed and status is executing third query (i.e., 'User sleep')
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+slave is already lagging: this statement should execute immediately	3
+Slave_SQL_Running_State='User sleep'; SQL_Remaining_Delay is NULL; SQL thread is behind IO thread
+# sleep 2*T
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+0	4
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+==== Seconds_Behind_Master ====
+# Bring slave to sync.
+include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 0;
+include/start_slave.inc
+INSERT INTO t1(a) VALUES ('Syncing slave');
+include/stop_slave.inc
+# CHANGE MASTER TO MASTER_DELAY = 2*T
+include/start_slave.inc
+INSERT INTO t1(a) VALUES (delay_on_slave(1));
+Warnings:
+Note	1592	Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
+Note	1592	Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
+# sleep 1*T
+# sleep 1*T
+==== STOP SLAVE and START SLAVE ====
+include/stop_slave.inc
+# CHANGE MASTER TO MASTER_DELAY = 3*T
+include/start_slave.inc
+# Checking that delay is what we set it to
+[on master]
+INSERT INTO t1(a) VALUES ('stop slave and start slave');
+[on slave]
+# sleep 1*T
+SET @before_stop_slave= UNIX_TIMESTAMP();
+include/stop_slave.inc
+# STOP SLAVE finished in time.
+# Expect query not executed and status is ''
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+0	6
+Slave_SQL_Running_State=''; SQL_Remaining_Delay is NULL; SQL thread is behind IO thread
+include/start_slave.inc
+# START SLAVE finished in time.
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect query not executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+0	6
+Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
+# sleep 1*T
+# sync with master (with timeout 1*T)
+include/wait_for_slave_param.inc [Relay_Master_Log_File]
+include/wait_for_slave_param.inc [Exec_Master_Log_Pos]
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+stop slave and start slave	7
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+include/check_slave_is_running.inc
+==== Change back to no delay ====
+[on slave]
+include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 0;
+# Expect delay is 0.
+SQL_Delay='0'
+include/start_slave.inc
+[on master]
+INSERT INTO t1(a) VALUES ('change back to no delay');
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a	b
+change back to no delay	8
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+==== Reset delay with RESET SLAVE ====
+include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 71;
+include/start_slave.inc
+# Expect delay is 71
+SQL_Delay='71'
+include/stop_slave.inc
+RESET SLAVE;
+[on master]
+RESET MASTER;
+[on slave]
+include/start_slave.inc
+# Expect delay is 0
+SQL_Delay='0'
+==== Set a bad value for the delay ====
+include/stop_slave.inc
+# Expect error for setting negative delay
+CHANGE MASTER TO MASTER_DELAY = -1;
+ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '-1' at line 1
+# Expect that it's ok to set delay of 2^31-1
+CHANGE MASTER TO MASTER_DELAY = 2147483647;
+# Expect error for setting delay between 2^31 and 2^32-1
+CHANGE MASTER TO MASTER_DELAY = 2147483648;
+ERROR HY000: The requested value 2147483648 for the master delay exceeds the maximum 2147483647
+# Expect error for setting delay to nonsense
+CHANGE MASTER TO MASTER_DELAY = blah;
+ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'blah' at line 1
+CHANGE MASTER TO MASTER_DELAY = 0;
+include/start_slave.inc
+==== Clean up ====
+[on master]
+DROP TABLE t1;
+DROP FUNCTION delay_on_slave;
+[on slave]
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_delayed_slave.test b/mysql-test/suite/rpl/t/rpl_delayed_slave.test
new file mode 100644
index 0000000..2fa5c81
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_delayed_slave.test
@@ -0,0 +1,348 @@
+# ==== Purpose ====
+#
+# Test the time-delayed replication feature, i.e.,
+# CHANGE MASTER TO MASTER_DELAY=X:
+#
+#  - Verify that slave has executed the events after but not before the
+#    delay timeout.
+#
+#  - Verify that delay is correct works when slave is already lagging
+#    due to slow queries.
+#
+#  - Verify that Seconds_Behind_Master is greater than or equal to the
+#    delay if the slave still has unprocessed events in the relay log
+#    and more time than the delay has elapsed since the last event was
+#    executed on the master.
+#
+#  - Verify that STOP SLAVE works instantly even during a delay, and
+#    that it does not cause the waited-for event to be executed too
+#    early on slave.
+#
+#  - Verify that changing back to no delay works.
+#
+#  - Verify that RESET SLAVE sets the delay to 0.
+#
+#  - Verify that setting a bad value for the delay gives an error.
+#
+# ==== Implementation ====
+#
+# We run the slave with 10 seconds lag.
+#
+# In general, to test that a query has not been executed by the slave
+# before this time, we wait until the slave IO thread has received the
+# event, and then 5 seconds more, and check that the table has not
+# been updated.  To test that a query has been executed after this
+# time, we wait 10 seconds more.
+#
+# To simulate that the slave lags due to slow queries, we invoke a
+# stored function that executes SLEEP if @@gloval.server_id==2. This
+# requires that we run with binlog_format=STATEMENT.
+#
+# ==== Related Bugs and Worklogs ====
+#
+# WL#344: Time-delayed replication
+# BUG#28760: Simulating a replication lag
+# [duplicate] BUG#22072: configurable delayed replication
+# [duplicate] BUG#21639: Add Replication Delay parameter
+#
+# ==== Issues with this Test Case ====
+#
+# The test is inherently timing-sensitive (i.e., contains races) and
+# is likely to fail sporadically on a loaded host.
+#
+# The test takes a long time; it sleeps for around 20*10 seconds.
+
+--source include/master-slave.inc
+# Needed so that sleeps get executed in the slave SQL thread.
+--source include/have_binlog_format_statement.inc
+
+
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+--connection slave
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+--connection master
+
+
+# We assume that any simple operation takes zero time, with an error
+# margin of $time1 seconds. Hence, if we run with a delay of $time2
+# seconds, we expect that:
+#  - If we execute a query on master and wait $time1 seconds, then the
+#    query has been copied to slave but not yet executed.
+#  - If we execute a query on master and wait $time3 seconds, then the
+#    query has been executed.
+--let $time1= 10
+if (`SELECT '$max_query_execution_time' > 0`) {
+  --let $time1= $max_query_execution_time
+}
+--let $time2= `SELECT 2 * $time1`
+--let $time3= `SELECT 3 * $time1`
+
+
+--echo [on master]
+CREATE TABLE t1 (a VARCHAR(100), b INT AUTO_INCREMENT PRIMARY KEY);
+
+
+--echo ==== Normal setup ====
+
+--echo [on slave]
+--sync_slave_with_master
+
+--source include/stop_slave.inc
+
+--echo # CHANGE MASTER TO MASTER_DELAY = 2*T
+--disable_query_log
+eval CHANGE MASTER TO MASTER_DELAY = $time2;
+--enable_query_log
+
+--echo # Checking that delay is what we set it to
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+if (`SELECT $delay != $time2`) {
+  --echo Delay is wrong! Expected $time2, got $delay
+  --source include/show_rpl_debug_info.inc
+  --die wrong delay
+}
+
+--echo # Expect status to be ''
+SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST ORDER BY ID DESC LIMIT 1;
+
+--source include/start_slave.inc
+
+--echo [on master]
+--connection master
+INSERT INTO t1(a) VALUES ('normal setup');
+
+--source extra/rpl_tests/delayed_slave_wait_on_query.inc
+
+
+--echo ==== Slave lags "naturally" after master ====
+
+--echo [on master]
+--connection master
+
+--disable_query_log
+--echo # CREATE FUNCTION delay_on_slave(time_units INT) RETURNS INT BEGIN IF @@GLOBAL.server_id = 2 THEN RETURN SLEEP(time_units * T); ELSE RETURN 0; END IF; END
+--eval CREATE FUNCTION delay_on_slave(time_units INT) RETURNS INT BEGIN IF @@GLOBAL.server_id = 2 THEN RETURN SLEEP(time_units * $time1); ELSE RETURN 0; END IF; END
+--enable_query_log
+
+INSERT INTO t1(a) SELECT delay_on_slave(3);
+
+--save_master_pos
+INSERT INTO t1(a) VALUES ('slave is already lagging: this statement should execute immediately');
+INSERT INTO t1(a) SELECT delay_on_slave(2);
+
+--echo [on slave]
+--source include/sync_slave_io_with_master.inc
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect no query executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--echo # wait for first query to execute
+--sync_with_master
+
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect second query executed and status is executing third query (i.e., 'User sleep')
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--echo # sleep 2*T
+--sleep $time2
+
+--echo # Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+
+--echo ==== Seconds_Behind_Master ====
+
+--echo # Bring slave to sync.
+--source include/stop_slave.inc
+eval CHANGE MASTER TO MASTER_DELAY = 0;
+--source include/start_slave.inc
+
+--connection master
+INSERT INTO t1(a) VALUES ('Syncing slave');
+--sync_slave_with_master
+
+--source include/stop_slave.inc
+--echo # CHANGE MASTER TO MASTER_DELAY = 2*T
+--disable_query_log
+eval CHANGE MASTER TO MASTER_DELAY = $time2;
+--enable_query_log
+--source include/start_slave.inc
+
+--connection master
+INSERT INTO t1(a) VALUES (delay_on_slave(1));
+--save_master_pos
+--connection slave
+
+--echo # sleep 1*T
+--sleep $time1
+
+if ($bug_53167_is_fixed) {
+
+--let $seconds_behind_master= query_get_value(SHOW SLAVE STATUS, Seconds_Behind_Master, 1)
+if (`SELECT $seconds_behind_master <= 0 OR $seconds_behind_master >= $time2`) {
+  --echo Seconds_Behind_Master was $seconds_behind_master. Expected that 0 < Seconds_Behind_Master < SQL_Delay = $time2
+  --source include/show_rpl_debug_info.inc
+  --die Seconds_Behind_Master was wrong
+}
+
+}
+
+--echo # sleep 1*T
+--sleep $time1
+
+--let $seconds_behind_master= query_get_value(SHOW SLAVE STATUS, Seconds_Behind_Master, 1)
+if (`SELECT $seconds_behind_master < $time2`) {
+  --echo Seconds_Behind_Master was $seconds_behind_master. Expected it to be >= SQL_Delay = $time2
+  --source include/show_rpl_debug_info.inc
+  --die Seconds_Behind_Master was < SQL_Delay
+}
+
+--sync_with_master
+
+
+--echo ==== STOP SLAVE and START SLAVE ====
+
+# Set up a longer delay.
+--source include/stop_slave.inc
+
+--echo # CHANGE MASTER TO MASTER_DELAY = 3*T
+--disable_query_log
+eval CHANGE MASTER TO MASTER_DELAY = $time3;
+--enable_query_log
+
+--source include/start_slave.inc
+
+--echo # Checking that delay is what we set it to
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+if (`SELECT $delay != $time3`) {
+  --echo Delay is wrong! Expected $time2, got $delay
+  --source include/show_rpl_debug_info.inc
+  --die wrong delay
+}
+
+--echo [on master]
+--connection master
+INSERT INTO t1(a) VALUES ('stop slave and start slave');
+
+--echo [on slave]
+--connection slave
+--echo # sleep 1*T
+--sleep $time1
+SET @before_stop_slave= UNIX_TIMESTAMP();
+--source include/stop_slave.inc
+if (`SELECT UNIX_TIMESTAMP() - @before_stop_slave >= $time1`)
+{
+  --source include/show_rpl_debug_info.inc
+  --die STOP SLAVE did not finish in time
+}
+--echo # STOP SLAVE finished in time.
+
+--echo # Expect query not executed and status is ''
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--source include/start_slave.inc
+if (`SELECT UNIX_TIMESTAMP() - @before_stop_slave >= $time1`)
+{
+  --source include/show_rpl_debug_info.inc
+  --die START SLAVE did not finish in time
+}
+--echo # START SLAVE finished in time.
+
+--source extra/rpl_tests/delayed_slave_wait_on_query.inc
+
+
+--echo ==== Change back to no delay ====
+
+--echo [on slave]
+--connection slave
+--source include/stop_slave.inc
+eval CHANGE MASTER TO MASTER_DELAY = 0;
+
+--echo # Expect delay is 0.
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+--echo SQL_Delay='$delay'
+
+--source include/start_slave.inc
+
+--echo [on master]
+--connection master
+INSERT INTO t1(a) VALUES ('change back to no delay');
+
+--echo [on slave]
+--source include/sync_slave_io_with_master.inc
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+
+--echo ==== Reset delay with RESET SLAVE ====
+
+--source include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 71;
+--source include/start_slave.inc
+
+--echo # Expect delay is 71
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+--echo SQL_Delay='$delay'
+
+--source include/stop_slave.inc
+RESET SLAVE;
+--echo [on master]
+--connection master
+RESET MASTER;
+--echo [on slave]
+--connection slave
+--source include/start_slave.inc
+
+--echo # Expect delay is 0
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+--echo SQL_Delay='$delay'
+
+
+--echo ==== Set a bad value for the delay ====
+
+--source include/stop_slave.inc
+
+--echo # Expect error for setting negative delay
+--error ER_PARSE_ERROR
+CHANGE MASTER TO MASTER_DELAY = -1;
+
+--echo # Expect that it's ok to set delay of 2^31-1
+CHANGE MASTER TO MASTER_DELAY = 2147483647;
+--echo # Expect error for setting delay between 2^31 and 2^32-1
+--error ER_MASTER_DELAY_VALUE_OUT_OF_RANGE
+CHANGE MASTER TO MASTER_DELAY = 2147483648;
+
+--echo # Expect error for setting delay to nonsense
+--error ER_PARSE_ERROR
+CHANGE MASTER TO MASTER_DELAY = blah;
+
+# todo: CHANGE MASTER TO MASTER_DELAY = 999999999999999999999999999
+# should give error
+
+CHANGE MASTER TO MASTER_DELAY = 0;
+--source include/start_slave.inc
+
+
+--echo ==== Clean up ====
+
+--echo [on master]
+--connection master
+DROP TABLE t1;
+DROP FUNCTION delay_on_slave;
+
+--echo [on slave]
+--sync_slave_with_master
+
+--source include/rpl_end.inc
diff --git a/sql/lex.h b/sql/lex.h
index 85bd20a..430e966 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -339,6 +339,7 @@ static SYMBOL symbols[] = {
   { "LOW_PRIORITY",	SYM(LOW_PRIORITY)},
   { "MASTER",           SYM(MASTER_SYM)},
   { "MASTER_CONNECT_RETRY",           SYM(MASTER_CONNECT_RETRY_SYM)},
+  { "MASTER_DELAY",     SYM(MASTER_DELAY_SYM)},
   { "MASTER_GTID_POS",  SYM(MASTER_GTID_POS_SYM)},
   { "MASTER_HOST",           SYM(MASTER_HOST_SYM)},
   { "MASTER_LOG_FILE",           SYM(MASTER_LOG_FILE_SYM)},
diff --git a/sql/log.cc b/sql/log.cc
index be24bcd..e3d42a45 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -4292,6 +4292,10 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event()
   relay log.
 
   IMPLEMENTATION
+
+  - You must hold rli->data_lock before calling this function, since
+    it writes group_relay_log_pos and similar fields of
+    Relay_log_info.
   - Protects index file with LOCK_index
   - Delete relevant relay log files
   - Copy all file names after these ones to the front of the index file
@@ -4305,7 +4309,7 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event()
                       read by the SQL slave thread are deleted).
 
   @note
-    - This is only called from the slave-execute thread when it has read
+    - This is only called from the slave SQL thread when it has read
     all commands from a relay log and want to switch to a new relay log.
     - When this happens, we can be in an active transaction as
     a transaction can span over two relay logs
@@ -4336,6 +4340,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
   DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT);
   DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
 
+  mysql_mutex_assert_owner(&rli->data_lock);
+
   mysql_mutex_lock(&LOCK_index);
 
   ir= rli->inuse_relaylog_list;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index afa58af..bd8ae98 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -966,6 +966,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
   Relay_log_info *rli= rgi->rli;
   DBUG_ENTER("Log_event::do_update_pos");
 
+  DBUG_ASSERT(!rli->belongs_to_client());
   /*
     rli is null when (as far as I (Guilhem) know) the caller is
     Load_log_event::do_apply_event *and* that one is called from
@@ -6400,6 +6401,9 @@ bool Rotate_log_event::write()
   in a A -> B -> A setup.
   The NOTES below is a wrong comment which will disappear when 4.1 is merged.
 
+  This must only be called from the Slave SQL thread, since it calls
+  flush_relay_log_info().
+
   @retval
     0	ok
 */
@@ -8222,6 +8226,9 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
   were we must do this cleaning is in
   Start_log_event_v3::do_apply_event(), not here. Because if we come
   here, the master was sane.
+
+  This must only be called from the Slave SQL thread, since it calls
+  flush_relay_log_info().
 */
 
 int Stop_log_event::do_update_pos(rpl_group_info *rgi)
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 6048d26..867ebc7 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -18,7 +18,7 @@
 #include "sql_priv.h"
 #include <my_dir.h>
 #include "rpl_mi.h"
-#include "slave.h"                              // SLAVE_MAX_HEARTBEAT_PERIOD
+#include "slave.h"
 #include "strfunc.h"
 #include "sql_repl.h"
 
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 31dd24a..a7d8f84 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -28,6 +28,7 @@
 #include "rpl_utility.h"
 #include "transaction.h"
 #include "sql_parse.h"                          // end_trans, ROLLBACK
+#include "slave.h"
 #include <mysql/plugin.h>
 #include <mysql/service_thd_wait.h>
 
@@ -41,30 +42,24 @@ rpl_slave_state *rpl_global_gtid_slave_state;
 /* Object used for MASTER_GTID_WAIT(). */
 gtid_waiting rpl_global_gtid_waiting;
 
-
-// Defined in slave.cc
-int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
-int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
-			  const char *default_val);
+const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event";
 
 Relay_log_info::Relay_log_info(bool is_slave_recovery)
   :Slave_reporting_capability("SQL"),
-   no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
+   replicate_same_server_id(::replicate_same_server_id),
    info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
    sync_counter(0), is_relay_log_recovery(is_slave_recovery),
    save_temporary_tables(0), mi(0),
    inuse_relaylog_list(0), last_inuse_relaylog(0),
    cur_log_old_open_count(0), group_relay_log_pos(0), 
    event_relay_log_pos(0),
-#if HAVE_valgrind
-   is_fake(FALSE),
-#endif
    group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
    last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
    abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
    gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
    slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
    until_log_pos(0), retried_trans(0), executed_entries(0),
+   sql_delay(0), sql_delay_end(0),
    m_flags(0)
 {
   DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -115,39 +110,55 @@ Relay_log_info::~Relay_log_info()
 }
 
 
+/**
+  Wrapper around Relay_log_info::init(const char *).
+
+  @todo Remove this and replace all calls to it by calls to
+  Relay_log_info::init(const char *). /SVEN
+*/
 int init_relay_log_info(Relay_log_info* rli,
 			const char* info_fname)
 {
+  return rli->init(info_fname);
+}
+
+
+/**
+  Read the relay_log.info file.
+
+  @param info_fname The name of the file to read from.
+  @retval 0 success
+  @retval 1 failure
+*/
+int Relay_log_info::init(const char* info_fname)
+{
   char fname[FN_REFLEN+128];
-  int info_fd;
   const char* msg = 0;
   int error = 0;
   DBUG_ENTER("init_relay_log_info");
-  DBUG_ASSERT(!rli->no_storage);         // Don't init if there is no storage
 
-  if (rli->inited)                       // Set if this function called
+  if (inited)                       // Set if this function called
     DBUG_RETURN(0);
   fn_format(fname, info_fname, mysql_data_home, "", 4+32);
-  mysql_mutex_lock(&rli->data_lock);
-  info_fd = rli->info_fd;
-  rli->cur_log_fd = -1;
-  rli->slave_skip_counter=0;
-  rli->abort_pos_wait=0;
-  rli->log_space_limit= relay_log_space_limit;
-  rli->log_space_total= 0;
+  mysql_mutex_lock(&data_lock);
+  cur_log_fd = -1;
+  slave_skip_counter=0;
+  abort_pos_wait=0;
+  log_space_limit= relay_log_space_limit;
+  log_space_total= 0;
 
   char pattern[FN_REFLEN];
   (void) my_realpath(pattern, slave_load_tmpdir, 0);
   if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
             MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
   {
-    mysql_mutex_unlock(&rli->data_lock);
+    mysql_mutex_unlock(&data_lock);
     sql_print_error("Unable to use slave's temporary directory %s",
                     slave_load_tmpdir);
     DBUG_RETURN(1);
   }
-  unpack_filename(rli->slave_patternload_file, pattern);
-  rli->slave_patternload_file_size= strlen(rli->slave_patternload_file);
+  unpack_filename(slave_patternload_file, pattern);
+  slave_patternload_file_size= strlen(slave_patternload_file);
 
   /*
     The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
@@ -161,7 +172,7 @@ int init_relay_log_info(Relay_log_info* rli,
     if (opt_relay_logname && 
         opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
     {
-      mysql_mutex_unlock(&rli->data_lock);
+      mysql_mutex_unlock(&data_lock);
       sql_print_error("Path '%s' is a directory name, please specify \
 a file name for --relay-log option", opt_relay_logname);
       DBUG_RETURN(1);
@@ -173,7 +184,7 @@ a file name for --relay-log option", opt_relay_logname);
         opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] 
         == FN_LIBCHAR)
     {
-      mysql_mutex_unlock(&rli->data_lock);
+      mysql_mutex_unlock(&data_lock);
       sql_print_error("Path '%s' is a directory name, please specify \
 a file name for --relay-log-index option", opt_relaylog_index_name);
       DBUG_RETURN(1);
@@ -182,7 +193,7 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
     char buf[FN_REFLEN];
     const char *ln;
     static bool name_warning_sent= 0;
-    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
+    ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
                                      1, buf);
     /* We send the warning only at startup, not after every RESET SLAVE */
     if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
@@ -205,7 +216,6 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
     }
 
     /* For multimaster, add connection name to relay log filenames */
-    Master_info* mi= rli->mi;
     char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
     char *buf_relaylog_index_name= opt_relaylog_index_name;
 
@@ -227,11 +237,11 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
       note, that if open() fails, we'll still have index file open
       but a destructor will take care of that
     */
-    if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
-        rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
-                            mi->rli.max_relay_log_size, 1, TRUE))
+    if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
+        relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
+                       max_relay_log_size, 1, TRUE))
     {
-      mysql_mutex_unlock(&rli->data_lock);
+      mysql_mutex_unlock(&data_lock);
       sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno);
       DBUG_RETURN(1);
     }
@@ -254,7 +264,7 @@ file '%s', errno %d)", fname, my_errno);
       msg= current_thd->get_stmt_da()->message();
       goto err;
     }
-    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
+    if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
                       MYF(MY_WME)))
     {
       sql_print_error("Failed to create a cache on relay log info file '%s'",
@@ -264,20 +274,19 @@ file '%s', errno %d)", fname, my_errno);
     }
 
     /* Init relay log with first entry in the relay index file */
-    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
+    if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
                            &msg, 0))
     {
       sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
       goto err;
     }
-    rli->group_master_log_name[0]= 0;
-    rli->group_master_log_pos= 0;
-    rli->info_fd= info_fd;
+    group_master_log_name[0]= 0;
+    group_master_log_pos= 0;
   }
   else // file exists
   {
     if (info_fd >= 0)
-      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
+      reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
     else
     {
       int error=0;
@@ -289,7 +298,7 @@ Failed to open the existing relay log info file '%s' (errno %d)",
                         fname, my_errno);
         error= 1;
       }
-      else if (init_io_cache(&rli->info_file, info_fd,
+      else if (init_io_cache(&info_file, info_fd,
                              IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
       {
         sql_print_error("Failed to create a cache on relay log info file '%s'",
@@ -300,24 +309,15 @@ Failed to open the existing relay log info file '%s' (errno %d)",
       {
         if (info_fd >= 0)
           mysql_file_close(info_fd, MYF(0));
-        rli->info_fd= -1;
-        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
-        mysql_mutex_unlock(&rli->data_lock);
+        info_fd= -1;
+        relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+        mysql_mutex_unlock(&data_lock);
         DBUG_RETURN(1);
       }
     }
 
-    rli->info_fd = info_fd;
     int relay_log_pos, master_log_pos, lines;
     char *first_non_digit;
-    /*
-      In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is
-      not yet merged into MariaDB (as of 10.0.13). However, we detect the
-      presense of the new option in relay-log.info, as a placeholder for
-      possible later merge of the feature, and to maintain file format
-      compatibility with MySQL 5.6+.
-    */
-    int dummy_sql_delay;
 
     /*
       Starting from MySQL 5.6.x, relay-log.info has a new format.
@@ -342,25 +342,25 @@ Failed to open the existing relay log info file '%s' (errno %d)",
       it is line count and not binlog name (new format) it will be
       overwritten by the second row later.
     */
-    if (init_strvar_from_file(rli->group_relay_log_name,
-                              sizeof(rli->group_relay_log_name),
-                              &rli->info_file, ""))
+    if (init_strvar_from_file(group_relay_log_name,
+                              sizeof(group_relay_log_name),
+                              &info_file, ""))
     {
       msg="Error reading slave log configuration";
       goto err;
     }
 
-    lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10);
+    lines= strtoul(group_relay_log_name, &first_non_digit, 10);
 
-    if (rli->group_relay_log_name[0] != '\0' &&
+    if (group_relay_log_name[0] != '\0' &&
         *first_non_digit == '\0' &&
         lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
     {
       DBUG_PRINT("info", ("relay_log_info file is in new format."));
       /* Seems to be new format => read relay log name from next line */
-      if (init_strvar_from_file(rli->group_relay_log_name,
-                                sizeof(rli->group_relay_log_name),
-                                &rli->info_file, ""))
+      if (init_strvar_from_file(group_relay_log_name,
+                                sizeof(group_relay_log_name),
+                                &info_file, ""))
       {
         msg="Error reading slave log configuration";
         goto err;
@@ -370,70 +370,70 @@ Failed to open the existing relay log info file '%s' (errno %d)",
       DBUG_PRINT("info", ("relay_log_info file is in old format."));
 
     if (init_intvar_from_file(&relay_log_pos,
-                              &rli->info_file, BIN_LOG_HEADER_SIZE) ||
-        init_strvar_from_file(rli->group_master_log_name,
-                              sizeof(rli->group_master_log_name),
-                              &rli->info_file, "") ||
-        init_intvar_from_file(&master_log_pos, &rli->info_file, 0) ||
+                              &info_file, BIN_LOG_HEADER_SIZE) ||
+        init_strvar_from_file(group_master_log_name,
+                              sizeof(group_master_log_name),
+                              &info_file, "") ||
+        init_intvar_from_file(&master_log_pos, &info_file, 0) ||
         (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
-         init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0)))
+         init_intvar_from_file(&sql_delay, &info_file, 0)))
     {
       msg="Error reading slave log configuration";
       goto err;
     }
 
-    strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name);
-    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
-    rli->group_master_log_pos= master_log_pos;
+    strmake_buf(event_relay_log_name,group_relay_log_name);
+    group_relay_log_pos= event_relay_log_pos= relay_log_pos;
+    group_master_log_pos= master_log_pos;
 
-    if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg))
+    if (is_relay_log_recovery && init_recovery(mi, &msg))
       goto err;
 
-    rli->relay_log_state.load(rpl_global_gtid_slave_state);
-    if (init_relay_log_pos(rli,
-                           rli->group_relay_log_name,
-                           rli->group_relay_log_pos,
+    relay_log_state.load(rpl_global_gtid_slave_state);
+    if (init_relay_log_pos(this,
+                           group_relay_log_name,
+                           group_relay_log_pos,
                            0 /* no data lock*/,
                            &msg, 0))
     {
       sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)",
-                      rli->group_relay_log_name, rli->group_relay_log_pos);
+                      group_relay_log_name, group_relay_log_pos);
       goto err;
     }
   }
 
-  DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu",
-                      my_b_tell(rli->cur_log), rli->event_relay_log_pos));
-  DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
-  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
+  DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu",
+                      my_b_tell(cur_log), event_relay_log_pos));
+  DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+  DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos);
 
   /*
     Now change the cache from READ to WRITE - must do this
     before flush_relay_log_info
   */
-  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
-  if ((error= flush_relay_log_info(rli)))
+  reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
+  if ((error= flush_relay_log_info(this)))
   {
     msg= "Failed to flush relay log info file";
     goto err;
   }
-  if (count_relay_log_space(rli))
+  if (count_relay_log_space(this))
   {
     msg="Error counting relay log space";
     goto err;
   }
-  rli->inited= 1;
-  mysql_mutex_unlock(&rli->data_lock);
+  inited= 1;
+  mysql_mutex_unlock(&data_lock);
   DBUG_RETURN(error);
 
 err:
   sql_print_error("%s", msg);
-  end_io_cache(&rli->info_file);
+  end_io_cache(&info_file);
   if (info_fd >= 0)
     mysql_file_close(info_fd, MYF(0));
-  rli->info_fd= -1;
-  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
-  mysql_mutex_unlock(&rli->data_lock);
+  info_fd= -1;
+  relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+  mysql_mutex_unlock(&data_lock);
   DBUG_RETURN(1);
 }
 
@@ -750,6 +750,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
   if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
     *errmsg= "Invalid Format_description log event; could be out of memory";
 
+  DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0));
+
   DBUG_RETURN ((*errmsg) ? 1 : 0);
 }
 
@@ -967,8 +969,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
 {
   DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
 
-  if (!skip_lock)
+  if (skip_lock)
+    mysql_mutex_assert_owner(&data_lock);
+  else
     mysql_mutex_lock(&data_lock);
+
   rgi->inc_event_relay_log_pos();
   DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
                       (long) log_pos, (long) group_master_log_pos));
@@ -1294,6 +1299,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
 {
   DBUG_ENTER("Relay_log_info::stmt_done");
 
+  DBUG_ASSERT(!belongs_to_client());
   DBUG_ASSERT(rgi->rli == this);
   /*
     If in a transaction, and if the slave supports transactions, just
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index e896c18..621abba 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -29,11 +29,6 @@ class Master_info;
 class Rpl_filter;
 
 
-enum {
-  LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5
-};
-
-
 /****************************************************************************
 
   Replication SQL Thread
@@ -78,11 +73,17 @@ class Relay_log_info : public Slave_reporting_capability
   };
 
   /*
-    If flag set, then rli does not store its state in any info file.
-    This is the case only when we execute BINLOG SQL commands inside
-    a client, non-replication thread.
+    The SQL thread owns one Relay_log_info, and each client that has
+    executed a BINLOG statement owns one Relay_log_info. This function
+    returns zero for the Relay_log_info object that belongs to the SQL
+    thread and nonzero for Relay_log_info objects that belong to
+    clients.
   */
-  bool no_storage;
+  inline bool belongs_to_client()
+  {
+    DBUG_ASSERT(sql_driver_thd);
+    return !sql_driver_thd->slave_thread;
+  }
 
   /*
     If true, events with the same server id should be replicated. This
@@ -194,6 +195,11 @@ class Relay_log_info : public Slave_reporting_capability
     relay log and finishing (commiting) on another relay log. Case which can
     happen when, for example, the relay log gets rotated because of
     max_binlog_size.
+
+    Note: group_relay_log_name, group_relay_log_pos must only be
+    written from the thread owning the Relay_log_info (SQL thread if
+    !belongs_to_client(); client thread executing BINLOG statement if
+    belongs_to_client()).
   */
   char group_relay_log_name[FN_REFLEN];
   ulonglong group_relay_log_pos;
@@ -205,16 +211,17 @@ class Relay_log_info : public Slave_reporting_capability
   */
   char future_event_master_log_name[FN_REFLEN];
 
-#ifdef HAVE_valgrind
-  bool is_fake; /* Mark that this is a fake relay log info structure */
-#endif
-
   /* 
      Original log name and position of the group we're currently executing
      (whose coordinates are group_relay_log_name/pos in the relay log)
      in the master's binlog. These concern the *group*, because in the master's
      binlog the log_pos that comes with each event is the position of the
      beginning of the group.
+
+    Note: group_master_log_name, group_master_log_pos must only be
+    written from the thread owning the Relay_log_info (SQL thread if
+    !belongs_to_client(); client thread executing BINLOG statement if
+    belongs_to_client()).
   */
   char group_master_log_name[FN_REFLEN];
   volatile my_off_t group_master_log_pos;
@@ -244,6 +251,15 @@ class Relay_log_info : public Slave_reporting_capability
   bool sql_thread_caught_up;
 
   void clear_until_condition();
+  /**
+    Reset the delay.
+    This is used by RESET SLAVE to clear the delay.
+  */
+  void clear_sql_delay()
+  {
+    sql_delay= 0;
+  }
+
 
   /*
     Needed for problems when slave stops and we want to restart it
@@ -474,8 +490,72 @@ class Relay_log_info : public Slave_reporting_capability
     m_flags&= ~flag;
   }
 
+  /**
+    Text used in THD::proc_info when the slave SQL thread is delaying.
+  */
+  static const char *const state_delaying_string;
+
+  bool flush();
+
+  /**
+    Reads the relay_log.info file.
+  */
+  int init(const char* info_filename);
+
+  /**
+    Indicate that a delay starts.
+
+    This does not actually sleep; it only sets the state of this
+    Relay_log_info object to delaying so that the correct state can be
+    reported by SHOW SLAVE STATUS and SHOW PROCESSLIST.
+
+    Requires rli->data_lock.
+
+    @param delay_end The time when the delay shall end.
+  */
+  void start_sql_delay(time_t delay_end)
+  {
+    mysql_mutex_assert_owner(&data_lock);
+    sql_delay_end= delay_end;
+    thd_proc_info(sql_driver_thd, state_delaying_string);
+  }
+
+  int32 get_sql_delay() { return sql_delay; }
+  void set_sql_delay(time_t _sql_delay) { sql_delay= _sql_delay; }
+  time_t get_sql_delay_end() { return sql_delay_end; }
+
 private:
 
+
+  /**
+    Delay slave SQL thread by this amount, compared to master (in
+    seconds). This is set with CHANGE MASTER TO MASTER_DELAY=X.
+
+    Guarded by data_lock.  Initialized by the client thread executing
+    START SLAVE.  Written by client threads executing CHANGE MASTER TO
+    MASTER_DELAY=X.  Read by SQL thread and by client threads
+    executing SHOW SLAVE STATUS.  Note: must not be written while the
+    slave SQL thread is running, since the SQL thread reads it without
+    a lock when executing flush_relay_log_info().
+  */
+  int sql_delay;
+
+  /**
+    During a delay, specifies the point in time when the delay ends.
+
+    This is used for the SQL_Remaining_Delay column in SHOW SLAVE STATUS.
+
+    Guarded by data_lock. Written by the sql thread.  Read by client
+    threads executing SHOW SLAVE STATUS.
+  */
+  time_t sql_delay_end;
+
+  /*
+    Before the MASTER_DELAY parameter was added (WL#344),
+    relay_log.info had 4 lines. Now it has 5 lines.
+  */
+  static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5;
+
   /*
     Holds the state of the data in the relay log.
     We need this to ensure that we are not in the middle of a
@@ -869,7 +949,14 @@ class rpl_sql_thread_info
 };
 
 
-// Defined in rpl_rli.cc
+/**
+  Reads the relay_log.info file.
+
+  @todo This is a wrapper around Relay_log_info::init(). It's only
+  kept for historical reasons. It would be good if we removed this
+  function and replaced all calls to it by calls to
+  Relay_log_info::init(). /SVEN
+*/
 int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
 
 
diff --git a/sql/slave.cc b/sql/slave.cc
index 3cf29d6..475feca 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1535,8 +1535,10 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
       (master_res= mysql_store_result(mysql)) &&
       (master_row= mysql_fetch_row(master_res)))
   {
+    mysql_mutex_lock(&mi->data_lock);
     mi->clock_diff_with_master=
       (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
+    mysql_mutex_unlock(&mi->data_lock);
   }
   else if (check_io_slave_killed(mi, NULL))
     goto slave_killed_err;
@@ -1548,7 +1550,9 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
   }
   else 
   {
+    mysql_mutex_lock(&mi->data_lock);
     mi->clock_diff_with_master= 0; /* The "most sensible" value */
+    mysql_mutex_unlock(&mi->data_lock);
     sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
                       "do not trust column Seconds_Behind_Master of SHOW "
                       "SLAVE STATUS. Error: %s (%d)",
@@ -2698,6 +2702,15 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list,
                         Item_empty_string(thd, "Parallel_Mode",
                                           sizeof("conservative")-1),
                         mem_root);
+  field_list->push_back(new (mem_root)
+                        Item_return_int(thd, "SQL_Delay", 10,
+                                        MYSQL_TYPE_LONG));
+  field_list->push_back(new (mem_root)
+                        Item_return_int(thd, "SQL_Remaining_Delay", 8,
+                                        MYSQL_TYPE_LONG));
+  field_list->push_back(new (mem_root)
+                        Item_empty_string(thd, "Slave_SQL_Running_State",
+                                          20));
   if (full)
   {
     field_list->push_back(new (mem_root)
@@ -2887,6 +2900,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
     prot_store_ids(thd, &mi->ignore_server_ids);
     // Master_Server_id
     protocol->store((uint32) mi->master_id);
+    // SQL_Delay
     // Master_Ssl_Crl
     protocol->store(mi->ssl_ca, &my_charset_bin);
     // Master_Ssl_Crlpath
@@ -2909,6 +2923,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
       protocol->store(mode_name, strlen(mode_name), &my_charset_bin);
     }
 
+    protocol->store((uint32) mi->rli.get_sql_delay());
+    // SQL_Remaining_Delay
+    // THD::proc_info is not protected by any lock, so we read it once
+    // to ensure that we use the same value throughout this function.
+    const char *slave_sql_running_state=
+      mi->rli.sql_driver_thd ? mi->rli.sql_driver_thd->proc_info : "";
+    if (slave_sql_running_state == Relay_log_info::state_delaying_string)
+    {
+      time_t t= my_time(0), sql_delay_end= mi->rli.get_sql_delay_end();
+      protocol->store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0));
+    }
+    else
+      protocol->store_null();
+    // Slave_SQL_Running_State
+    protocol->store(slave_sql_running_state, &my_charset_bin);
+
     if (full)
     {
       protocol->store((uint32)    mi->rli.retried_trans);
@@ -3278,6 +3308,69 @@ has_temporary_error(THD *thd)
 }
 
 
+/**
+  If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. Also unlocks rli->data_lock.
+
+  Design note: this is the place to unlock rli->data_lock here since
+  it should be held when reading delay info from rli, but it should
+  not be held while sleeping.
+
+  @param ev Event that is about to be executed.
+
+  @param thd The sql thread's THD object.
+
+  @param rli The sql thread's Relay_log_info structure.
+*/
+static void sql_delay_event(Log_event *ev, THD *thd, rpl_group_info *rgi)
+{
+  Relay_log_info* rli= rgi->rli;
+  long sql_delay= rli->get_sql_delay();
+
+  DBUG_ENTER("sql_delay_event");
+  mysql_mutex_assert_owner(&rli->data_lock);
+  DBUG_ASSERT(!rli->belongs_to_client());
+
+  int type= ev->get_type_code();
+  if (sql_delay && type != ROTATE_EVENT &&
+      type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3)
+  {
+    // The time when we should execute the event.
+    time_t sql_delay_end=
+      ev->when + rli->mi->clock_diff_with_master + sql_delay;
+    // The current time.
+    time_t now= my_time(0);
+    // The time we will have to sleep before executing the event.
+    unsigned long nap_time= 0;
+    if (sql_delay_end > now)
+      nap_time= sql_delay_end - now;
+
+    DBUG_PRINT("info", ("sql_delay= %lu "
+                        "ev->when= %lu "
+                        "rli->mi->clock_diff_with_master= %lu "
+                        "now= %ld "
+                        "sql_delay_end= %lu "
+                        "nap_time= %ld",
+                        sql_delay, (long)ev->when,
+                        rli->mi->clock_diff_with_master,
+                        (long)now, sql_delay_end, (long)nap_time));
+
+    if (sql_delay_end > now)
+    {
+      DBUG_PRINT("info", ("delaying replication event %lu secs",
+                          nap_time));
+      rli->start_sql_delay(sql_delay_end);
+      mysql_mutex_unlock(&rli->data_lock);
+      slave_sleep(thd, nap_time, sql_slave_killed, rgi);
+      DBUG_VOID_RETURN;
+    }
+  }
+
+  mysql_mutex_unlock(&rli->data_lock);
+
+  DBUG_VOID_RETURN;
+}
+
+
 /*
   First half of apply_event_and_update_pos(), see below.
   Split out so that it can run with rli->data_lock held in non-parallel
@@ -3395,16 +3488,16 @@ apply_event_and_update_pos_part2(Log_event* ev, THD* thd, rpl_group_info *rgi,
   if (exec_res == 0)
   {
     int error= ev->update_pos(rgi);
-#ifdef HAVE_valgrind
-    if (!rli->is_fake)
-#endif
+ #ifndef DBUG_OFF
+    DBUG_PRINT("info", ("update_pos error = %d", error));
+    if (!rli->belongs_to_client())
     {
-      DBUG_PRINT("info", ("update_pos error = %d", error));
       DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos,
                           rli->group_relay_log_name));
       DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos,
                           rli->event_relay_log_name));
     }
+#endif
     /*
       The update should not fail, so print an error message and
       return an error code.
@@ -3439,21 +3532,39 @@ apply_event_and_update_pos_part2(Log_event* ev, THD* thd, rpl_group_info *rgi,
 /**
   Applies the given event and advances the relay log position.
 
-  In essence, this function does:
+  This is needed by the sql thread to execute events from the binlog,
+  and by clients executing BINLOG statements.  Conceptually, this
+  function does:
 
   @code
     ev->apply_event(rli);
     ev->update_pos(rli);
   @endcode
 
-  But it also does some maintainance, such as skipping events if
-  needed and reporting errors.
+  It also does the following maintainance:
 
-  If the @c skip flag is set, then it is tested whether the event
-  should be skipped, by looking at the slave_skip_counter and the
-  server id.  The skip flag should be set when calling this from a
-  replication thread but not set when executing an explicit BINLOG
-  statement.
+   - Initializes the thread's server_id and time; and the event's
+     thread.
+
+   - If !rli->belongs_to_client() (i.e., if it belongs to the slave
+     sql thread instead of being used for executing BINLOG
+     statements), it does the following things: (1) skips events if it
+     is needed according to the server id or slave_skip_counter; (2)
+     unlocks rli->data_lock; (3) sleeps if required by 'CHANGE MASTER
+     TO MASTER_DELAY=X'; (4) maintains the running state of the sql
+     thread (rli->thread_state).
+
+   - Reports errors as needed.
+
+  @param ev The event to apply.
+
+  @param thd The client thread that executes the event (i.e., the
+  slave sql thread if called from a replication slave, or the client
+  thread if called to execute a BINLOG statement).
+
+  @param rli The relay log info (i.e., the slave's rli if called from
+  a replication slave, or the client's thd->rli_fake if called to
+  execute a BINLOG statement).
 
   @retval 0 OK.
 
@@ -3476,7 +3587,15 @@ apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi)
     DBUG_ASSERT(rli->slave_skip_counter > 0);
     rli->slave_skip_counter--;
   }
-  mysql_mutex_unlock(&rli->data_lock);
+
+  if (reason == Log_event::EVENT_SKIP_NOT)
+  {
+    // Sleeps if needed, and unlocks rli->data_lock.
+    sql_delay_event(ev, thd, rgi);
+  }
+  else
+    mysql_mutex_unlock(&rli->data_lock);
+
   return apply_event_and_update_pos_part2(ev, thd, rgi, reason);
 }
 
@@ -3498,6 +3617,10 @@ apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
     driver thread, so 23 should never see EVENT_SKIP_COUNT here.
   */
   DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT);
+  /*
+    Calling sql_delay_event() was handled in the SQL driver thread when
+    doing parallel replication.
+  */
   return apply_event_and_update_pos_part2(ev, thd, rgi, reason);
 }
 
@@ -3577,7 +3700,8 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
 
 
 /**
-  Top-level function for executing the next event from the relay log.
+  Top-level function for executing the next event in the relay log.
+  This is called from the SQL thread.
 
   This function reads the event from the relay log, executes it, and
   advances the relay log position.  It also handles errors, etc.
@@ -4124,8 +4248,10 @@ pthread_handler_t handle_slave_io(void *arg)
                     };);
 #endif
 
-  // TODO: the assignment below should be under mutex (5.0)
+  mysql_mutex_lock(&mi->run_lock);
   mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
+  mysql_mutex_unlock(&mi->run_lock);
+
   thd->slave_net = &mysql->net;
   THD_STAGE_INFO(thd, stage_checking_master_version);
   ret= get_master_version_and_clock(mysql, mi);
@@ -6477,67 +6603,80 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
 }
 #endif
 
-/*
-  Store the file and position where the execute-slave thread are in the
+/**
+  Store the file and position where the slave's SQL thread are in the
   relay log.
 
-  SYNOPSIS
-    flush_relay_log_info()
-    rli                 Relay log information
+  Notes:
 
-  NOTES
-    - As this is only called by the slave thread or on STOP SLAVE, with the
-      log_lock grabbed and the slave thread stopped, we don't need to have 
-      a lock here.
-    - If there is an active transaction, then we don't update the position
-      in the relay log.  This is to ensure that we re-execute statements
-      if we die in the middle of an transaction that was rolled back.
-    - As a transaction never spans binary logs, we don't have to handle the
-      case where we do a relay-log-rotation in the middle of the transaction.
-      If this would not be the case, we would have to ensure that we
-      don't delete the relay log file where the transaction started when
-      we switch to a new relay log file.
-
-  TODO
-    - Change the log file information to a binary format to avoid calling
-      longlong2str.
+  - This function should be called either from the slave SQL thread,
+    or when the slave thread is not running.  (It reads the
+    group_{relay|master}_log_{pos|name} and delay fields in the rli
+    object.  These may only be modified by the slave SQL thread or by
+    a client thread when the slave SQL thread is not running.)
 
-  RETURN VALUES
-    0   ok
-    1   write error
-*/
+  - If there is an active transaction, then we do not update the
+    position in the relay log.  This is to ensure that we re-execute
+    statements if we die in the middle of an transaction that was
+    rolled back.
+
+  - As a transaction never spans binary logs, we don't have to handle
+    the case where we do a relay-log-rotation in the middle of the
+    transaction.  If transactions could span several binlogs, we would
+    have to ensure that we do not delete the relay log file where the
+    transaction started before switching to a new relay log file.
+
+  - Error can happen if writing to file fails or if flushing the file
+    fails.
+
+  @param rli The object representing the Relay_log_info.
 
+  @todo Change the log file information to a binary format to avoid
+  calling longlong2str.
+
+  @todo Move the member function into rpl_rli.cc and get rid of the
+  global function. /SVEN
+
+  @return 0 on success, 1 on error.
+*/
 bool flush_relay_log_info(Relay_log_info* rli)
 {
-  bool error=0;
-  DBUG_ENTER("flush_relay_log_info");
+  return rli->flush();
+}
 
-  if (unlikely(rli->no_storage))
-    DBUG_RETURN(0);
+bool Relay_log_info::flush()
+{
+  bool error=0;
 
-  IO_CACHE *file = &rli->info_file;
-  char buff[FN_REFLEN*2+22*2+4], *pos;
+  DBUG_ENTER("Relay_log_info::flush()");
 
+  IO_CACHE *file = &info_file;
+  // 2*file name, 2*long long, 2*unsigned long, 6*'\n'
+  char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
   my_b_seek(file, 0L);
-  pos=strmov(buff, rli->group_relay_log_name);
+  pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
+  *pos++='\n';
+  pos=strmov(pos, group_relay_log_name);
   *pos++='\n';
-  pos= longlong10_to_str(rli->group_relay_log_pos, pos, 10);
+  pos=longlong10_to_str(group_relay_log_pos, pos, 10);
   *pos++='\n';
-  pos=strmov(pos, rli->group_master_log_name);
+  pos=strmov(pos, group_master_log_name);
   *pos++='\n';
-  pos=longlong10_to_str(rli->group_master_log_pos, pos, 10);
+  pos=longlong10_to_str(group_master_log_pos, pos, 10);
   *pos='\n';
+  pos= longlong10_to_str(sql_delay, pos, 10);
+  *pos= '\n';
   if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
     error=1;
   if (flush_io_cache(file))
     error=1;
   if (sync_relayloginfo_period &&
       !error &&
-      ++(rli->sync_counter) >= sync_relayloginfo_period)
+      ++sync_counter >= sync_relayloginfo_period)
   {
-    if (my_sync(rli->info_fd, MYF(MY_WME)))
+    if (my_sync(info_fd, MYF(MY_WME)))
       error=1;
-    rli->sync_counter= 0;
+    sync_counter= 0;
   }
   /* 
     Flushing the relay log is done by the slave I/O thread 
diff --git a/sql/slave.h b/sql/slave.h
index 269d95c..1bf117c 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -18,6 +18,14 @@
 #define SLAVE_H
 
 /**
+  MASTER_DELAY can be at most (1 << 31) - 1.
+*/
+#define MASTER_DELAY_MAX (0x7FFFFFFF)
+#if INT_MAX < 0x7FFFFFFF
+#error "don't support platforms where INT_MAX < 0x7FFFFFFF"
+#endif
+
+/**
   @defgroup Replication Replication
   @{
 
@@ -102,12 +110,14 @@ int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
 
   In Master_info: run_lock, data_lock
   run_lock protects all information about the run state: slave_running, thd
-  and the existence of the I/O thread to stop/start it, you need this mutex).
+  and the existence of the I/O thread (to stop/start it, you need this mutex).
   data_lock protects some moving members of the struct: counters (log name,
   position) and relay log (MYSQL_BIN_LOG object).
 
   In Relay_log_info: run_lock, data_lock
   see Master_info
+  However, note that run_lock does not protect
+  Relay_log_info.run_state; that is protected by data_lock.
   
   Order of acquisition: if you want to have LOCK_active_mi and a run_lock, you
   must acquire LOCK_active_mi first.
@@ -247,6 +257,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
 int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
                                             struct rpl_group_info *rgi);
 
+int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val);
+int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
+                          const char *default_val);
+int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
+
 pthread_handler_t handle_slave_io(void *arg);
 void slave_output_error_info(rpl_group_info *rgi, THD *thd);
 pthread_handler_t handle_slave_sql(void *arg);
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index f0465cd..a7beb42 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -17,18 +17,96 @@
 #include <my_global.h>
 #include "sql_priv.h"
 #include "sql_binlog.h"
-#include "sql_parse.h"                          // check_global_access
-#include "sql_acl.h"                            // *_ACL
+#include "sql_parse.h"
+#include "sql_acl.h"
 #include "rpl_rli.h"
 #include "base64.h"
-#include "slave.h"                              // apply_event_and_update_pos
-#include "log_event.h"                          // Format_description_log_event,
-                                                // EVENT_LEN_OFFSET,
-                                                // EVENT_TYPE_OFFSET,
-                                                // FORMAT_DESCRIPTION_LOG_EVENT,
-                                                // START_EVENT_V3,
-                                                // Log_event_type,
-                                                // Log_event
+#include "slave.h"
+#include "log_event.h"
+
+
+/**
+  Check if the event type is allowed in a BINLOG statement.
+
+  @retval 0 if the event type is ok.
+  @retval 1 if the event type is not ok.
+*/
+static int check_event_type(int type, Relay_log_info *rli)
+{
+  Format_description_log_event *fd_event=
+    rli->relay_log.description_event_for_exec;
+
+  /*
+    Convert event type id of certain old versions (see comment in
+    Format_description_log_event::Format_description_log_event(char*,...)).
+  */
+  if (fd_event && fd_event->event_type_permutation)
+  {
+    IF_DBUG({
+        int new_type= fd_event->event_type_permutation[type];
+        DBUG_PRINT("info",
+                   ("converting event type %d to %d (%s)",
+                    type, new_type,
+                    Log_event::get_type_str((Log_event_type)new_type)));
+      },
+      (void)0);
+    type= fd_event->event_type_permutation[type];
+  }
+
+  switch (type)
+  {
+  case START_EVENT_V3:
+  case FORMAT_DESCRIPTION_EVENT:
+    /*
+      We need a preliminary FD event in order to parse the FD event,
+      if we don't already have one.
+    */
+    if (!fd_event)
+      if (!(rli->relay_log.description_event_for_exec=
+            new Format_description_log_event(4)))
+      {
+        my_error(ER_OUTOFMEMORY, MYF(0), 1);
+        return 1;
+      }
+
+    /* It is always allowed to execute FD events. */
+    return 0;
+    
+  case TABLE_MAP_EVENT:
+  case WRITE_ROWS_EVENT:
+  case UPDATE_ROWS_EVENT:
+  case DELETE_ROWS_EVENT:
+  case PRE_GA_WRITE_ROWS_EVENT:
+  case PRE_GA_UPDATE_ROWS_EVENT:
+  case PRE_GA_DELETE_ROWS_EVENT:
+    /*
+      Row events are only allowed if a Format_description_event has
+      already been seen.
+    */
+    if (fd_event)
+      return 0;
+    else
+    {
+      my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
+               MYF(0), Log_event::get_type_str((Log_event_type)type));
+      return 1;
+    }
+    break;
+
+  default:
+    /*
+      It is not meaningful to execute other events than row-events and
+      FD events. It would even be dangerous to execute Stop_log_event
+      and Rotate_log_event since they call flush_relay_log_info, which
+      is not allowed to call by other threads than the slave SQL
+      thread when the slave SQL thread is running.
+    */
+    my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
+             MYF(0), Log_event::get_type_str((Log_event_type)type));
+    return 1;
+  }
+}
+
 /**
   Execute a BINLOG statement.
 
@@ -73,31 +151,13 @@ void mysql_client_binlog_statement(THD* thd)
     Allocation
   */
 
-  /*
-    If we do not have a Format_description_event, we create a dummy
-    one here.  In this case, the first event we read must be a
-    Format_description_event.
-  */
-  my_bool have_fd_event= TRUE;
   int err;
   Relay_log_info *rli;
   rpl_group_info *rgi;
 
   rli= thd->rli_fake;
-  if (!rli)
-  {
-    rli= thd->rli_fake= new Relay_log_info(FALSE);
-#ifdef HAVE_valgrind
-    rli->is_fake= TRUE;
-#endif
-    have_fd_event= FALSE;
-  }
-  if (rli && !rli->relay_log.description_event_for_exec)
-  {
-    rli->relay_log.description_event_for_exec=
-      new Format_description_log_event(4);
-    have_fd_event= FALSE;
-  }
+  if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE)))
+    rli->sql_driver_thd= thd;
   if (!(rgi= thd->rgi_fake))
     rgi= thd->rgi_fake= new rpl_group_info(rli);
   rgi->thd= thd;
@@ -109,16 +169,13 @@ void mysql_client_binlog_statement(THD* thd)
   /*
     Out of memory check
   */
-  if (!(rli &&
-        rli->relay_log.description_event_for_exec &&
-        buf))
+  if (!(rli && buf))
   {
     my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);  /* needed 1 bytes */
     goto end;
   }
 
-  rli->sql_driver_thd= thd;
-  rli->no_storage= TRUE;
+  DBUG_ASSERT(rli->belongs_to_client());
 
   for (char const *strptr= thd->lex->comment.str ;
        strptr < thd->lex->comment.str + thd->lex->comment.length ; )
@@ -185,23 +242,8 @@ void mysql_client_binlog_statement(THD* thd)
       DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
                           event_len, bytes_decoded));
 
-      /*
-        If we have not seen any Format_description_event, then we must
-        see one; it is the only statement that can be read in base64
-        without a prior Format_description_event.
-      */
-      if (!have_fd_event)
-      {
-        int type = (uchar)bufptr[EVENT_TYPE_OFFSET];
-        if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3)
-          have_fd_event= TRUE;
-        else
-        {
-          my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
-                   MYF(0), Log_event::get_type_str((Log_event_type)type));
-          goto end;
-        }
-      }
+      if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
+        goto end;
 
       ev= Log_event::read_log_event(bufptr, event_len, &error,
                                     rli->relay_log.description_event_for_exec,
@@ -212,7 +254,7 @@ void mysql_client_binlog_statement(THD* thd)
       {
         /*
           This could actually be an out-of-memory, but it is more likely
-          causes by a bad statement
+          caused by a bad statement
         */
         my_error(ER_SYNTAX_ERROR, MYF(0));
         goto end;
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index b62838f..15cf9a3 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -4831,4 +4831,3 @@ void binlog_unsafe_map_init()
      BINLOG_DIRECT_OFF & TRX_CACHE_NOT_EMPTY);
 }
 #endif
-
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 6ec112c..626beaa 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -240,11 +240,12 @@ struct LEX_MASTER_INFO
   ulong server_id;
   uint port, connect_retry;
   float heartbeat_period;
+  int sql_delay;
   /*
     Enum is used for making it possible to detect if the user
     changed variable or if it should be left at old value
    */
-  enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
+  enum {LEX_MI_UNCHANGED= 0, LEX_MI_DISABLE, LEX_MI_ENABLE}
     ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt,
     repl_do_domain_ids_opt, repl_ignore_domain_ids_opt;
   enum {
@@ -260,6 +261,7 @@ struct LEX_MASTER_INFO
                           sizeof(ulong), 0, 16, MYF(0));
     my_init_dynamic_array(&repl_ignore_domain_ids,
                           sizeof(ulong), 0, 16, MYF(0));
+    sql_delay= -1;
   }
   void reset(bool is_change_master)
   {
@@ -280,6 +282,7 @@ struct LEX_MASTER_INFO
       repl_ignore_domain_ids_opt= LEX_MI_UNCHANGED;
     gtid_pos_str= null_lex_str;
     use_gtid_opt= LEX_GTID_UNCHANGED;
+    sql_delay= -1;
   }
 };
 
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 572d083..8bea280 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -3319,6 +3319,7 @@ int reset_slave(THD *thd, Master_info* mi)
   mi->clear_error();
   mi->rli.clear_error();
   mi->rli.clear_until_condition();
+  mi->rli.clear_sql_delay();
   mi->rli.slave_skip_counter= 0;
 
   // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
@@ -3630,6 +3631,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
   if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
     mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
+  if (lex_mi->sql_delay != -1)
+    mi->rli.set_sql_delay(lex_mi->sql_delay);
+
   if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
     mi->ssl_verify_server_cert=
       (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 9131b99..3b9a06d 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -1341,6 +1341,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
 %token  LOOP_SYM
 %token  LOW_PRIORITY
 %token  MASTER_CONNECT_RETRY_SYM
+%token  MASTER_DELAY_SYM
 %token  MASTER_GTID_POS_SYM
 %token  MASTER_HOST_SYM
 %token  MASTER_LOG_FILE_SYM
@@ -2255,6 +2256,16 @@ master_def:
           {
             Lex->mi.connect_retry = $3;
           }
+        | MASTER_DELAY_SYM '=' ulong_num
+          {
+            if ($3 > MASTER_DELAY_MAX)
+            {
+              my_error(ER_MASTER_DELAY_VALUE_OUT_OF_RANGE, MYF(0),
+                       $3, MASTER_DELAY_MAX);
+            }
+            else
+              Lex->mi.sql_delay = $3;
+          }
         | MASTER_SSL_SYM '=' ulong_num
           {
             Lex->mi.ssl= $3 ? 
@@ -7660,6 +7671,7 @@ slave:
             LEX *lex=Lex;
             lex->sql_command = SQLCOM_SLAVE_ALL_START;
             lex->type = 0;
+            /* If you change this code don't forget to update STOP SLAVE too */
           }
           {}
         | STOP_SYM SLAVE optional_connection_name slave_thread_opts
@@ -14099,6 +14111,7 @@ keyword_sp:
         | MASTER_PASSWORD_SYM      {}
         | MASTER_SERVER_ID_SYM     {}
         | MASTER_CONNECT_RETRY_SYM {}
+        | MASTER_DELAY_SYM         {}
         | MASTER_SSL_SYM           {}
         | MASTER_SSL_CA_SYM        {}
         | MASTER_SSL_CAPATH_SYM    {}


More information about the commits mailing list