[Commits] cabbeac: Cherry-picked from MySQL 5.7.

Jan Lindström jan.lindstrom at mariadb.com
Wed Mar 25 07:34:06 EET 2015


revision-id: cabbeac1c3e70568cd4dd549af4c7f7f651a13f4
parent(s): ec68494beb151bc01ff6885476d2d4aeab3fe345
committer: Jan Lindström
branch nick: 10.1-innodb-merge
timestamp: 2015-03-25 07:33:18 +0200
message:

Cherry-picked from MySQL 5.7.

InnoDB/XtraDB internally uses rw-lock implementation to keep consistency of internal
resources. Basically the rw-lock has 2 types S-lock (shared) and X-lock (exluded).
The fix adds the new type SX-lock (shared excluded) for room to optimize
concurrency and improve scalability more.

At least, S-lock and X-lock behave same, and compatible for current code. So,
nothing changed by only this fix as it is. (no functional/performance changes
for users)

The new state SX-lock will be used by the future work. (e.g. WL#6326: InnoDB:
fix index->lock contention)

WL#6363 : implement SX-lock for rw_lock

Author: Yasufumi Kinoshita
commit: https://github.com/mysql/mysql-server/commit/b6e5e1e5eed791822b965d3530038e74c485e1f6

---
 mysql-test/suite/innodb/r/innodb_monitor.result    |   6 +
 .../sys_vars/r/innodb_monitor_disable_basic.result |   6 +
 .../sys_vars/r/innodb_monitor_enable_basic.result  |   6 +
 .../r/innodb_monitor_reset_all_basic.result        |   6 +
 .../sys_vars/r/innodb_monitor_reset_basic.result   |   6 +
 storage/innobase/buf/buf0buf.cc                    |   5 +-
 storage/innobase/include/mtr0log.ic                |   4 +-
 storage/innobase/include/mtr0mtr.h                 |  79 +++-
 storage/innobase/include/mtr0mtr.ic                |  72 +++-
 storage/innobase/include/srv0mon.h                 |   3 +
 storage/innobase/include/sync0rw.h                 | 128 ++++++-
 storage/innobase/include/sync0rw.ic                | 217 +++++++++--
 storage/innobase/include/sync0sync.h               |  13 +-
 storage/innobase/mtr/mtr0mtr.cc                    | 100 ++++-
 storage/innobase/srv/srv0mon.cc                    |  30 ++
 storage/innobase/sync/sync0arr.cc                  | 104 +++--
 storage/innobase/sync/sync0rw.cc                   | 415 +++++++++++++++++---
 storage/innobase/sync/sync0sync.cc                 |  26 +-
 storage/xtradb/include/mtr0log.ic                  |   4 +-
 storage/xtradb/include/mtr0mtr.h                   |  79 +++-
 storage/xtradb/include/mtr0mtr.ic                  |  72 +++-
 storage/xtradb/include/srv0mon.h                   |   3 +
 storage/xtradb/include/sync0rw.h                   | 122 +++++-
 storage/xtradb/include/sync0rw.ic                  | 230 +++++++++--
 storage/xtradb/include/sync0sync.h                 |  19 +-
 storage/xtradb/mtr/mtr0mtr.cc                      | 100 ++++-
 storage/xtradb/srv/srv0mon.cc                      |  30 ++
 storage/xtradb/sync/sync0arr.cc                    | 111 ++++--
 storage/xtradb/sync/sync0rw.cc                     | 425 +++++++++++++++++----
 storage/xtradb/sync/sync0sync.cc                   |  26 +-
 30 files changed, 2083 insertions(+), 364 deletions(-)

diff --git a/mysql-test/suite/innodb/r/innodb_monitor.result b/mysql-test/suite/innodb/r/innodb_monitor.result
index 02e72ae..42e31d3 100644
--- a/mysql-test/suite/innodb/r/innodb_monitor.result
+++ b/mysql-test/suite/innodb/r/innodb_monitor.result
@@ -222,10 +222,13 @@ innodb_dblwr_pages_written	disabled
 innodb_page_size	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 dml_reads	disabled
 dml_inserts	disabled
 dml_deletes	disabled
@@ -279,10 +282,13 @@ lock_row_lock_waits	disabled
 lock_row_lock_time_avg	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 set global innodb_monitor_enable = "%lock*";
 ERROR 42000: Variable 'innodb_monitor_enable' can't be set to the value of '%lock*'
 set global innodb_monitor_enable="%%%%%%%%%%%%%%%%%%%%%%%%%%%";
diff --git a/mysql-test/suite/sys_vars/r/innodb_monitor_disable_basic.result b/mysql-test/suite/sys_vars/r/innodb_monitor_disable_basic.result
index 85fe22d..4ea9e76 100644
--- a/mysql-test/suite/sys_vars/r/innodb_monitor_disable_basic.result
+++ b/mysql-test/suite/sys_vars/r/innodb_monitor_disable_basic.result
@@ -222,10 +222,13 @@ innodb_dblwr_pages_written	disabled
 innodb_page_size	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 dml_reads	disabled
 dml_inserts	disabled
 dml_deletes	disabled
@@ -279,10 +282,13 @@ lock_row_lock_waits	disabled
 lock_row_lock_time_avg	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 set global innodb_monitor_enable = "%lock*";
 ERROR 42000: Variable 'innodb_monitor_enable' can't be set to the value of '%lock*'
 set global innodb_monitor_enable="%%%%%%%%%%%%%%%%%%%%%%%%%%%";
diff --git a/mysql-test/suite/sys_vars/r/innodb_monitor_enable_basic.result b/mysql-test/suite/sys_vars/r/innodb_monitor_enable_basic.result
index 85fe22d..4ea9e76 100644
--- a/mysql-test/suite/sys_vars/r/innodb_monitor_enable_basic.result
+++ b/mysql-test/suite/sys_vars/r/innodb_monitor_enable_basic.result
@@ -222,10 +222,13 @@ innodb_dblwr_pages_written	disabled
 innodb_page_size	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 dml_reads	disabled
 dml_inserts	disabled
 dml_deletes	disabled
@@ -279,10 +282,13 @@ lock_row_lock_waits	disabled
 lock_row_lock_time_avg	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 set global innodb_monitor_enable = "%lock*";
 ERROR 42000: Variable 'innodb_monitor_enable' can't be set to the value of '%lock*'
 set global innodb_monitor_enable="%%%%%%%%%%%%%%%%%%%%%%%%%%%";
diff --git a/mysql-test/suite/sys_vars/r/innodb_monitor_reset_all_basic.result b/mysql-test/suite/sys_vars/r/innodb_monitor_reset_all_basic.result
index 85fe22d..4ea9e76 100644
--- a/mysql-test/suite/sys_vars/r/innodb_monitor_reset_all_basic.result
+++ b/mysql-test/suite/sys_vars/r/innodb_monitor_reset_all_basic.result
@@ -222,10 +222,13 @@ innodb_dblwr_pages_written	disabled
 innodb_page_size	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 dml_reads	disabled
 dml_inserts	disabled
 dml_deletes	disabled
@@ -279,10 +282,13 @@ lock_row_lock_waits	disabled
 lock_row_lock_time_avg	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 set global innodb_monitor_enable = "%lock*";
 ERROR 42000: Variable 'innodb_monitor_enable' can't be set to the value of '%lock*'
 set global innodb_monitor_enable="%%%%%%%%%%%%%%%%%%%%%%%%%%%";
diff --git a/mysql-test/suite/sys_vars/r/innodb_monitor_reset_basic.result b/mysql-test/suite/sys_vars/r/innodb_monitor_reset_basic.result
index 85fe22d..4ea9e76 100644
--- a/mysql-test/suite/sys_vars/r/innodb_monitor_reset_basic.result
+++ b/mysql-test/suite/sys_vars/r/innodb_monitor_reset_basic.result
@@ -222,10 +222,13 @@ innodb_dblwr_pages_written	disabled
 innodb_page_size	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 dml_reads	disabled
 dml_inserts	disabled
 dml_deletes	disabled
@@ -279,10 +282,13 @@ lock_row_lock_waits	disabled
 lock_row_lock_time_avg	disabled
 innodb_rwlock_s_spin_waits	disabled
 innodb_rwlock_x_spin_waits	disabled
+innodb_rwlock_sx_spin_waits	disabled
 innodb_rwlock_s_spin_rounds	disabled
 innodb_rwlock_x_spin_rounds	disabled
+innodb_rwlock_sx_spin_rounds	disabled
 innodb_rwlock_s_os_waits	disabled
 innodb_rwlock_x_os_waits	disabled
+innodb_rwlock_sx_os_waits	disabled
 set global innodb_monitor_enable = "%lock*";
 ERROR 42000: Variable 'innodb_monitor_enable' can't be set to the value of '%lock*'
 set global innodb_monitor_enable="%%%%%%%%%%%%%%%%%%%%%%%%%%%";
diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc
index 59cbe63..0fe7fae 100644
--- a/storage/innobase/buf/buf0buf.cc
+++ b/storage/innobase/buf/buf0buf.cc
@@ -4650,7 +4650,10 @@ buf_pool_validate_instance(
 assert_s_latched:
 						ut_a(rw_lock_is_locked(
 							     &block->lock,
-								     RW_LOCK_SHARED));
+							     RW_LOCK_SHARED)
+						     || rw_lock_is_locked(
+								&block->lock,
+								RW_LOCK_SX));
 						break;
 					case BUF_FLUSH_LIST:
 						n_list_flush++;
diff --git a/storage/innobase/include/mtr0log.ic b/storage/innobase/include/mtr0log.ic
index 6457e02..c7269b9 100644
--- a/storage/innobase/include/mtr0log.ic
+++ b/storage/innobase/include/mtr0log.ic
@@ -190,7 +190,9 @@ mlog_write_initial_log_record_fast(
 	ulint		space;
 	ulint		offset;
 
-	ut_ad(mtr_memo_contains_page(mtr, ptr, MTR_MEMO_PAGE_X_FIX));
+	ut_ad(mtr_memo_contains_page_flagged(mtr, ptr,
+					     MTR_MEMO_PAGE_X_FIX
+					     | MTR_MEMO_PAGE_SX_FIX));
 	ut_ad(type <= MLOG_BIGGEST_TYPE || EXTRA_CHECK_MLOG_NUMBER(type));
 	ut_ad(ptr && log_ptr);
 
diff --git a/storage/innobase/include/mtr0mtr.h b/storage/innobase/include/mtr0mtr.h
index eae981f..3e196d1 100644
--- a/storage/innobase/include/mtr0mtr.h
+++ b/storage/innobase/include/mtr0mtr.h
@@ -49,15 +49,18 @@ Created 11/26/1995 Heikki Tuuri
 					form */
 
 /* Types for the mlock objects to store in the mtr memo; NOTE that the
-first 3 values must be RW_S_LATCH, RW_X_LATCH, RW_NO_LATCH */
+first 4 values must be RW_S_LATCH, RW_X_LATCH, RW_SX_LATCH, RW_NO_LATCH
+and they should be 2pow value to be used also as ORed combination of flag. */
 #define	MTR_MEMO_PAGE_S_FIX	RW_S_LATCH
 #define	MTR_MEMO_PAGE_X_FIX	RW_X_LATCH
+#define	MTR_MEMO_PAGE_SX_FIX	RW_SX_LATCH
 #define	MTR_MEMO_BUF_FIX	RW_NO_LATCH
 #ifdef UNIV_DEBUG
-# define MTR_MEMO_MODIFY	54
+# define MTR_MEMO_MODIFY	32
 #endif /* UNIV_DEBUG */
-#define	MTR_MEMO_S_LOCK		55
-#define	MTR_MEMO_X_LOCK		56
+#define	MTR_MEMO_S_LOCK		64
+#define	MTR_MEMO_X_LOCK		128
+#define	MTR_MEMO_SX_LOCK	256
 
 /** @name Log item types
 The log items are declared 'byte' so that the compiler can warn if val
@@ -256,8 +259,29 @@ mtr_release_s_latch_at_savepoint(
 	mtr_t*		mtr,		/*!< in: mtr */
 	ulint		savepoint,	/*!< in: savepoint */
 	rw_lock_t*	lock);		/*!< in: latch to release */
+/**********************************************************//**
+Releases the block in an mtr memo after a savepoint. */
+
+void
+mtr_release_block_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block);		/*!< in: block to release */
+/**********************************************************//**
+Relax the block latch in an mtr memo after a savepoint
+from X to SX. */
+
+void
+mtr_block_x_to_sx_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block);		/*!< in: block to relax latch */
 #else /* !UNIV_HOTBACKUP */
 # define mtr_release_s_latch_at_savepoint(mtr,savepoint,lock) ((void) 0)
+# define mtr_release_block_at_savepoint(mtr,savepoint,lock) ((void) 0)
+# define mtr_block_x_to_sx_at_savepoint(mtr,savepoint,lock) ((void) 0)
 #endif /* !UNIV_HOTBACKUP */
 
 /**********************************************************//**
@@ -308,6 +332,10 @@ This macro locks an rw-lock in x-mode. */
 #define mtr_x_lock(B, MTR)	mtr_x_lock_func((B), __FILE__, __LINE__,\
 						(MTR))
 /*********************************************************************//**
+This macro locks an rw-lock in sx-mode. */
+#define mtr_sx_lock(B, MTR)	mtr_sx_lock_func((B), __FILE__, __LINE__,\
+						(MTR))
+/*********************************************************************//**
 NOTE! Use the macro above!
 Locks a lock in s-mode. */
 UNIV_INLINE
@@ -329,6 +357,17 @@ mtr_x_lock_func(
 	const char*	file,	/*!< in: file name */
 	ulint		line,	/*!< in: line number */
 	mtr_t*		mtr);	/*!< in: mtr */
+/*********************************************************************//**
+NOTE! Use the macro mtr_sx_lock()!
+Locks a lock in sx-mode. */
+UNIV_INLINE
+void
+mtr_sx_lock_func(
+/*=============*/
+	rw_lock_t*	lock,	/*!< in/out: rw-lock */
+	const char*	file,	/*!< in: file name */
+	ulint		line,	/*!< in: line number */
+	mtr_t*		mtr);	/*!< in/out: mtr */
 #endif /* !UNIV_HOTBACKUP */
 
 /***************************************************//**
@@ -346,17 +385,30 @@ mtr_memo_release(
 # ifndef UNIV_HOTBACKUP
 /**********************************************************//**
 Checks if memo contains the given item.
- at return	TRUE if contains */
+ at return true if contains */
 UNIV_INLINE
 bool
 mtr_memo_contains(
 /*==============*/
-	mtr_t*		mtr,	/*!< in: mtr */
+	const mtr_t*	mtr,	/*!< in: mtr */
 	const void*	object,	/*!< in: object to search */
 	ulint		type)	/*!< in: type of object */
 	__attribute__((warn_unused_result, nonnull));
 
 /**********************************************************//**
+Checks if memo contains the given item.
+ at return true if contains */
+UNIV_INLINE
+bool
+mtr_memo_contains_flagged(
+/*======================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const void*	object,	/*!< in: object to search */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+	__attribute__((warn_unused_result, nonnull));
+
+/**********************************************************//**
 Checks if memo contains the given page.
 @return	TRUE if contains */
 UNIV_INTERN
@@ -365,7 +417,20 @@ mtr_memo_contains_page(
 /*===================*/
 	mtr_t*		mtr,	/*!< in: mtr */
 	const byte*	ptr,	/*!< in: pointer to buffer frame */
-	ulint		type);	/*!< in: type of object */
+	ulint		type)	/*!< in: type of object */
+	__attribute__((warn_unused_result, nonnull));
+/**********************************************************//**
+Checks if memo contains the given page.
+ at return true if contains */
+
+bool
+mtr_memo_contains_page_flagged(
+/*===========================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const byte*	ptr,	/*!< in: pointer to buffer frame */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+	__attribute__((warn_unused_result, nonnull));
 /*********************************************************//**
 Prints info of an mtr handle. */
 UNIV_INTERN
diff --git a/storage/innobase/include/mtr0mtr.ic b/storage/innobase/include/mtr0mtr.ic
index 44d548e..5503173 100644
--- a/storage/innobase/include/mtr0mtr.ic
+++ b/storage/innobase/include/mtr0mtr.ic
@@ -80,7 +80,8 @@ mtr_memo_push(
 
 	ut_ad(object);
 	ut_ad(type >= MTR_MEMO_PAGE_S_FIX);
-	ut_ad(type <= MTR_MEMO_X_LOCK);
+	ut_ad(type <= MTR_MEMO_SX_LOCK);
+	ut_ad(ut_is_2pow(type));
 	ut_ad(mtr);
 	ut_ad(mtr->magic_n == MTR_MAGIC_N);
 	ut_ad(mtr->state == MTR_ACTIVE);
@@ -89,7 +90,8 @@ mtr_memo_push(
 	the made_dirty flag. This tells us if we need to
 	grab log_flush_order_mutex at mtr_commit so that we
 	can insert the dirtied page to the flush list. */
-	if (type == MTR_MEMO_PAGE_X_FIX && !mtr->made_dirty) {
+	if ((type == MTR_MEMO_PAGE_X_FIX || type == MTR_MEMO_PAGE_SX_FIX)
+	    && !mtr->made_dirty) {
 		mtr->made_dirty =
 			mtr_block_dirtied((const buf_block_t*) object);
 	}
@@ -158,12 +160,12 @@ mtr_release_s_latch_at_savepoint(
 # ifdef UNIV_DEBUG
 /**********************************************************//**
 Checks if memo contains the given item.
- at return	TRUE if contains */
+ at return true if contains */
 UNIV_INLINE
 bool
 mtr_memo_contains(
 /*==============*/
-	mtr_t*		mtr,	/*!< in: mtr */
+	const mtr_t*	mtr,	/*!< in: mtr */
 	const void*	object,	/*!< in: object to search */
 	ulint		type)	/*!< in: type of object */
 {
@@ -193,6 +195,49 @@ mtr_memo_contains(
 
 	return(false);
 }
+
+/**********************************************************//**
+Checks if memo contains the given item.
+ at return true if contains */
+UNIV_INLINE
+bool
+mtr_memo_contains_flagged(
+/*======================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const void*	object,	/*!< in: object to search */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+{
+	ut_ad(mtr);
+	ut_ad(mtr->magic_n == MTR_MAGIC_N);
+	ut_ad(mtr->state == MTR_ACTIVE || mtr->state == MTR_COMMITTING);
+
+	for (const dyn_block_t* block = dyn_array_get_last_block(&mtr->memo);
+	     block;
+	     block = dyn_array_get_prev_block(&mtr->memo, block)) {
+		const mtr_memo_slot_t*	start
+			= reinterpret_cast<mtr_memo_slot_t*>(
+				dyn_block_get_data(block));
+		mtr_memo_slot_t*	slot
+			= reinterpret_cast<mtr_memo_slot_t*>(
+				dyn_block_get_data(block)
+				+ dyn_block_get_used(block));
+
+		ut_ad(!(dyn_block_get_used(block) % sizeof(mtr_memo_slot_t)));
+
+		while (slot-- != start) {
+			if (object != slot->object) {
+				continue;
+			}
+
+			if (flags & slot->type) {
+				return(true);
+			}
+		}
+	}
+
+	return(false);
+}
 # endif /* UNIV_DEBUG */
 #endif /* !UNIV_HOTBACKUP */
 
@@ -295,4 +340,23 @@ mtr_x_lock_func(
 
 	mtr_memo_push(mtr, lock, MTR_MEMO_X_LOCK);
 }
+
+/*********************************************************************//**
+Locks a lock in sx-mode. */
+UNIV_INLINE
+void
+mtr_sx_lock_func(
+/*=============*/
+	rw_lock_t*	lock,	/*!< in/out: rw-lock */
+	const char*	file,	/*!< in: file name */
+	ulint		line,	/*!< in: line number */
+	mtr_t*		mtr)	/*!< in/out: mtr */
+{
+	ut_ad(mtr);
+	ut_ad(lock);
+
+	rw_lock_sx_lock_func(lock, 0, file, line);
+
+	mtr_memo_push(mtr, lock, MTR_MEMO_SX_LOCK);
+}
 #endif /* !UNIV_HOTBACKUP */
diff --git a/storage/innobase/include/srv0mon.h b/storage/innobase/include/srv0mon.h
index d1d902e..0746eea 100644
--- a/storage/innobase/include/srv0mon.h
+++ b/storage/innobase/include/srv0mon.h
@@ -385,10 +385,13 @@ enum monitor_id_t {
 	MONITOR_OVLD_SRV_PAGE_SIZE,
 	MONITOR_OVLD_RWLOCK_S_SPIN_WAITS,
 	MONITOR_OVLD_RWLOCK_X_SPIN_WAITS,
+	MONITOR_OVLD_RWLOCK_SX_SPIN_WAITS,
 	MONITOR_OVLD_RWLOCK_S_SPIN_ROUNDS,
 	MONITOR_OVLD_RWLOCK_X_SPIN_ROUNDS,
+	MONITOR_OVLD_RWLOCK_SX_SPIN_ROUNDS,
 	MONITOR_OVLD_RWLOCK_S_OS_WAITS,
 	MONITOR_OVLD_RWLOCK_X_OS_WAITS,
+	MONITOR_OVLD_RWLOCK_SX_OS_WAITS,
 
 	/* Data DML related counters */
 	MONITOR_MODULE_DML_STATS,
diff --git a/storage/innobase/include/sync0rw.h b/storage/innobase/include/sync0rw.h
index cfd9776..a4259df 100644
--- a/storage/innobase/include/sync0rw.h
+++ b/storage/innobase/include/sync0rw.h
@@ -83,20 +83,40 @@ struct rw_lock_stats_t {
 	/** number of unlocks (that unlock exclusive locks),
 	set only when UNIV_SYNC_PERF_STAT is defined */
 	ib_int64_counter_t	rw_x_exit_count;
+
+	/** number of spin waits on rw-latches,
+	resulted during sx locks */
+	ib_int64_counter_t	rw_sx_spin_wait_count;
+
+	/** number of spin loop rounds on rw-latches,
+	resulted during sx locks */
+	ib_int64_counter_t	rw_sx_spin_round_count;
+
+	/** number of OS waits on rw-latches,
+	resulted during sx locks */
+	ib_int64_counter_t	rw_sx_os_wait_count;
+
+	/** number of unlocks (that unlock sx locks),
+	set only when UNIV_SYNC_PERF_STAT is defined */
+	ib_int64_counter_t	rw_sx_exit_count;
 };
 
-/* Latch types; these are used also in btr0btr.h: keep the numerical values
-smaller than 30 and the order of the numerical values like below! */
+/* Latch types; these are used also in btr0btr.h and mtr0mtr.h: keep the
+numerical values smaller than 30 (smaller than BTR_MODIFY_TREE and
+MTR_MEMO_MODIFY) and the order of the numerical values like below! and they
+should be 2pow value to be used also as ORed combination of flag. */
 #define RW_S_LATCH	1
 #define	RW_X_LATCH	2
-#define	RW_NO_LATCH	3
+#define	RW_SX_LATCH	4
+#define	RW_NO_LATCH	8
 
 #ifndef UNIV_HOTBACKUP
-/* We decrement lock_word by this amount for each x_lock. It is also the
+/* We decrement lock_word by X_LOCK_DECR for each x_lock. It is also the
 start value for the lock_word, meaning that it limits the maximum number
-of concurrent read locks before the rw_lock breaks. The current value of
-0x00100000 allows 1,048,575 concurrent readers and 2047 recursive writers.*/
-#define X_LOCK_DECR		0x00100000
+of concurrent read locks before the rw_lock breaks. */
+/* We decrement lock_word by X_LOCK_HALF_DECR for sx_lock. */
+#define X_LOCK_DECR		0x20000000
+#define X_LOCK_HALF_DECR	0x10000000
 
 struct rw_lock_t;
 #ifdef UNIV_SYNC_DEBUG
@@ -283,6 +303,21 @@ unlocking, not the corresponding function. */
 #define rw_lock_s_unlock(L)		rw_lock_s_unlock_gen(L, 0)
 #define rw_lock_x_unlock(L)		rw_lock_x_unlock_gen(L, 0)
 
+/* TODO: PFS doesn't treat the new lock state for now. */
+#define rw_lock_sx_lock(L)					\
+	rw_lock_sx_lock_func((L), 0, __FILE__, __LINE__)
+#define rw_lock_sx_lock_inline(M, P, F, L)			\
+	rw_lock_sx_lock_func((M), (P), (F), (L))
+#define rw_lock_sx_lock_gen(M, P)				\
+	rw_lock_sx_lock_func((M), (P), __FILE__, __LINE__)
+#ifdef UNIV_SYNC_DEBUG
+# define rw_lock_sx_unlock(L)		rw_lock_sx_unlock_func(0, L)
+# define rw_lock_sx_unlock_gen(L, P)	rw_lock_sx_unlock_func(P, L)
+#else /* UNIV_SYNC_DEBUG */
+# define rw_lock_sx_unlock(L)		rw_lock_sx_unlock_func(L)
+# define rw_lock_sx_unlock_gen(L, P)	rw_lock_sx_unlock_func(L)
+#endif /* UNIV_SYNC_DEBUG */
+
 /******************************************************************//**
 Creates, or rather, initializes an rw-lock object in a specified memory
 location (which must be appropriately aligned). The rw-lock is initialized
@@ -319,7 +354,7 @@ UNIV_INTERN
 ibool
 rw_lock_validate(
 /*=============*/
-	rw_lock_t*	lock);	/*!< in: rw-lock */
+	const rw_lock_t*	lock);	/*!< in: rw-lock */
 #endif /* UNIV_DEBUG */
 /******************************************************************//**
 Low-level function which tries to lock an rw-lock in s-mode. Performs no
@@ -394,6 +429,24 @@ rw_lock_x_lock_func(
 	const char*	file_name,/*!< in: file name where lock requested */
 	ulint		line);	/*!< in: line where requested */
 /******************************************************************//**
+NOTE! Use the corresponding macro, not directly this function! Lock an
+rw-lock in SX mode for the current thread. If the rw-lock is locked
+in exclusive mode, or there is an exclusive lock request waiting,
+the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
+for the lock, before suspending the thread. If the same thread has an x-lock
+on the rw-lock, locking succeed, with the following exception: if pass != 0,
+only a single sx-lock may be taken on the lock. NOTE: If the same thread has
+an s-lock, locking does not succeed! */
+
+void
+rw_lock_sx_lock_func(
+/*=================*/
+	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
+	ulint		pass,	/*!< in: pass value; != 0, if the lock will
+				be passed to another thread to unlock */
+	const char*	file_name,/*!< in: file name where lock requested */
+	ulint		line);	/*!< in: line where requested */
+/******************************************************************//**
 Releases an exclusive mode lock. */
 UNIV_INLINE
 void
@@ -404,6 +457,19 @@ rw_lock_x_unlock_func(
 				been passed to another thread to unlock */
 #endif
 	rw_lock_t*	lock);	/*!< in/out: rw-lock */
+
+/******************************************************************//**
+Releases an sx mode lock. */
+UNIV_INLINE
+void
+rw_lock_sx_unlock_func(
+/*===================*/
+#ifdef UNIV_SYNC_DEBUG
+	ulint		pass,	/*!< in: pass value; != 0, if the lock may have
+				been passed to another thread to unlock */
+#endif /* UNIV_SYNC_DEBUG */
+	rw_lock_t*	lock);	/*!< in/out: rw-lock */
+
 /******************************************************************//**
 This function is used in the insert buffer to move the ownership of an
 x-latch on a buffer frame to the current thread. The x-latch was set by
@@ -427,6 +493,15 @@ ulint
 rw_lock_get_x_lock_count(
 /*=====================*/
 	const rw_lock_t*	lock);	/*!< in: rw-lock */
+/******************************************************************//**
+Returns the number of sx-lock for the lock. Does not reserve the lock
+mutex, so the caller must be sure it is not changed during the call.
+ at return value of writer_count */
+UNIV_INLINE
+ulint
+rw_lock_get_sx_lock_count(
+/*======================*/
+	const rw_lock_t*	lock);	/*!< in: rw-lock */
 /********************************************************************//**
 Check if there are threads waiting for the rw-lock.
 @return	1 if waiters, 0 otherwise */
@@ -438,15 +513,15 @@ rw_lock_get_waiters(
 /******************************************************************//**
 Returns the write-status of the lock - this function made more sense
 with the old rw_lock implementation.
- at return	RW_LOCK_NOT_LOCKED, RW_LOCK_EX, RW_LOCK_WAIT_EX */
+ at return RW_LOCK_NOT_LOCKED, RW_LOCK_EX, RW_LOCK_WAIT_EX, RW_LOCK_SX */
 UNIV_INLINE
 ulint
 rw_lock_get_writer(
 /*===============*/
 	const rw_lock_t*	lock);	/*!< in: rw-lock */
 /******************************************************************//**
-Returns the number of readers.
- at return	number of readers */
+Returns the number of readers (s-locks).
+ at return number of readers */
 UNIV_INLINE
 ulint
 rw_lock_get_reader_count(
@@ -455,13 +530,14 @@ rw_lock_get_reader_count(
 /******************************************************************//**
 Decrements lock_word the specified amount if it is greater than 0.
 This is used by both s_lock and x_lock operations.
- at return	TRUE if decr occurs */
+ at return true if decr occurs */
 UNIV_INLINE
-ibool
+bool
 rw_lock_lock_word_decr(
 /*===================*/
 	rw_lock_t*	lock,		/*!< in/out: rw-lock */
-	ulint		amount);	/*!< in: amount to decrement */
+	ulint		amount,		/*!< in: amount to decrement */
+	lint		threshold);	/*!< in: threshold of judgement */
 /******************************************************************//**
 Increments lock_word the specified amount and returns new value.
 @return	lock->lock_word after increment */
@@ -485,7 +561,7 @@ void
 rw_lock_set_writer_id_and_recursion_flag(
 /*=====================================*/
 	rw_lock_t*	lock,		/*!< in/out: lock to work on */
-	ibool		recursive);	/*!< in: TRUE if recursion
+	bool		recursive);	/*!< in: true if recursion
 					allowed */
 #ifdef UNIV_SYNC_DEBUG
 /******************************************************************//**
@@ -499,16 +575,29 @@ rw_lock_own(
 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_SHARED,
 					RW_LOCK_EX */
 	__attribute__((warn_unused_result));
+
+/******************************************************************//**
+Checks if the thread has locked the rw-lock in the specified mode, with
+the pass value == 0. */
+
+bool
+rw_lock_own_flagged(
+/*================*/
+	const rw_lock_t*	lock,	/*!< in: rw-lock */
+	rw_lock_flags_t		flags)	/*!< in: specify lock types with
+					OR of the rw_lock_flag_t values */
+	__attribute__((warn_unused_result));
 #endif /* UNIV_SYNC_DEBUG */
 /******************************************************************//**
-Checks if somebody has locked the rw-lock in the specified mode. */
-UNIV_INTERN
-ibool
+Checks if somebody has locked the rw-lock in the specified mode.
+ at return true if locked */
+
+bool
 rw_lock_is_locked(
 /*==============*/
 	rw_lock_t*	lock,		/*!< in: rw-lock */
 	ulint		lock_type);	/*!< in: lock type: RW_LOCK_SHARED,
-					RW_LOCK_EX */
+					RW_LOCK_EX or RW_LOCK_SX */
 #ifdef UNIV_SYNC_DEBUG
 /***************************************************************//**
 Prints debug info of an rw-lock. */
@@ -587,6 +676,7 @@ struct rw_lock_t {
 				id of the current x-holder or wait-x thread.
 				This flag must be reset in x_unlock
 				functions before incrementing the lock_word */
+	volatile ulint	sx_recursive;/*!< number of granted SX locks. */
 	volatile os_thread_id_t	writer_thread;
 				/*!< Thread id of writer thread. Is only
 				guaranteed to have sane and non-stale
diff --git a/storage/innobase/include/sync0rw.ic b/storage/innobase/include/sync0rw.ic
index 8c4e938..0a7401a 100644
--- a/storage/innobase/include/sync0rw.ic
+++ b/storage/innobase/include/sync0rw.ic
@@ -118,7 +118,7 @@ rw_lock_reset_waiter_flag(
 /******************************************************************//**
 Returns the write-status of the lock - this function made more sense
 with the old rw_lock implementation.
- at return	RW_LOCK_NOT_LOCKED, RW_LOCK_EX, RW_LOCK_WAIT_EX */
+ at return RW_LOCK_NOT_LOCKED, RW_LOCK_EX, RW_LOCK_WAIT_EX, RW_LOCK_SX */
 UNIV_INLINE
 ulint
 rw_lock_get_writer(
@@ -126,21 +126,31 @@ rw_lock_get_writer(
 	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	lint lock_word = lock->lock_word;
-	if (lock_word > 0) {
+
+	ut_ad(lock_word <= X_LOCK_DECR);
+	if (lock_word > X_LOCK_HALF_DECR) {
 		/* return NOT_LOCKED in s-lock state, like the writer
 		member of the old lock implementation. */
 		return(RW_LOCK_NOT_LOCKED);
-	} else if ((lock_word == 0) || (lock_word <= -X_LOCK_DECR)) {
+	} else if (lock_word > 0) {
+		/* sx-locked, no x-locks */
+		return(RW_LOCK_SX);
+	} else if ((lock_word == 0)
+		   || (lock_word == -X_LOCK_HALF_DECR)
+		   || (lock_word <= -X_LOCK_DECR)) {
+		/* x-lock with sx-lock is also treated as RW_LOCK_EX */
 		return(RW_LOCK_EX);
 	} else {
-		ut_ad(lock_word > -X_LOCK_DECR);
+		/* x-waiter with sx-lock is also treated as RW_LOCK_WAIT_EX
+		e.g. -X_LOCK_HALF_DECR < lock_word < 0 : without sx
+		     -X_LOCK_DECR < lock_word < -X_LOCK_HALF_DECR : with sx */
 		return(RW_LOCK_WAIT_EX);
 	}
 }
 
 /******************************************************************//**
-Returns the number of readers.
- at return	number of readers */
+Returns the number of readers (s-locks).
+ at return number of readers */
 UNIV_INLINE
 ulint
 rw_lock_get_reader_count(
@@ -148,13 +158,28 @@ rw_lock_get_reader_count(
 	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	lint lock_word = lock->lock_word;
-	if (lock_word > 0) {
-		/* s-locked, no x-waiters */
+	ut_ad(lock_word <= X_LOCK_DECR);
+
+	if (lock_word > X_LOCK_HALF_DECR) {
+		/* s-locked, no x-waiter */
 		return(X_LOCK_DECR - lock_word);
-	} else if (lock_word < 0 && lock_word > -X_LOCK_DECR) {
-		/* s-locked, with x-waiters */
+	} else if (lock_word > 0) {
+		/* s-locked, with sx-locks only */
+		return(X_LOCK_HALF_DECR - lock_word);
+	} else if (lock_word == 0) {
+		/* x-locked */
+		return(0);
+	} else if (lock_word > -X_LOCK_HALF_DECR) {
+		/* s-locked, with x-waiter */
 		return((ulint)(-lock_word));
+	} else if (lock_word == -X_LOCK_HALF_DECR) {
+		/* x-locked with sx-locks */
+		return(0);
+	} else if (lock_word > -X_LOCK_DECR) {
+		/* s-locked, with x-waiter and sx-lock */
+		return((ulint)(-(lock_word + X_LOCK_HALF_DECR)));
 	}
+	/* no s-locks */
 	return(0);
 }
 
@@ -180,10 +205,55 @@ rw_lock_get_x_lock_count(
 	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	lint lock_copy = lock->lock_word;
-	if ((lock_copy != 0) && (lock_copy > -X_LOCK_DECR)) {
+	ut_ad(lock_copy <= X_LOCK_DECR);
+
+	if (lock_copy == 0 || lock_copy == -X_LOCK_HALF_DECR) {
+		/* "1 x-lock" or "1 x-lock + sx-locks" */
+		return(1);
+	} else if (lock_copy > -X_LOCK_DECR) {
+		/* s-locks, one or more sx-locks if > 0, or x-waiter if < 0 */
 		return(0);
+	} else if (lock_copy > -(X_LOCK_DECR + X_LOCK_HALF_DECR)) {
+		/* no s-lock, no sx-lock, 2 or more x-locks.
+		First 2 x-locks are set with -X_LOCK_DECR,
+		all other recursive x-locks are set with -1 */
+		return(2 - (lock_copy + X_LOCK_DECR));
+	} else {
+		/* no s-lock, 1 or more sx-lock, 2 or more x-locks.
+		First 2 x-locks are set with -(X_LOCK_DECR + X_LOCK_HALF_DECR),
+		all other recursive x-locks are set with -1 */
+		return(2 - (lock_copy + X_LOCK_DECR + X_LOCK_HALF_DECR));
 	}
-	return((lock_copy == 0) ? 1 : (2 - (lock_copy + X_LOCK_DECR)));
+}
+
+/******************************************************************//**
+Returns the number of sx-lock for the lock. Does not reserve the lock
+mutex, so the caller must be sure it is not changed during the call.
+ at return value of sx-lock count */
+UNIV_INLINE
+ulint
+rw_lock_get_sx_lock_count(
+/*======================*/
+	const rw_lock_t*	lock)	/*!< in: rw-lock */
+{
+#ifdef UNIV_DEBUG
+	lint lock_copy = lock->lock_word;
+
+	ut_ad(lock_copy <= X_LOCK_DECR);
+
+	while (lock_copy < 0) {
+		lock_copy += X_LOCK_DECR;
+	}
+
+	if (lock_copy > 0
+	    && lock_copy <= X_LOCK_HALF_DECR) {
+		return(lock->sx_recursive);
+	}
+
+	return(0);
+#else /* UNIV_DEBUG */
+	return(lock->sx_recursive);
+#endif /* UNIV_DEBUG */
 }
 
 /******************************************************************//**
@@ -192,34 +262,35 @@ one for systems supporting atomic operations, one for others. This does
 does not support recusive x-locks: they should be handled by the caller and
 need not be atomic since they are performed by the current lock holder.
 Returns true if the decrement was made, false if not.
- at return	TRUE if decr occurs */
+ at return true if decr occurs */
 UNIV_INLINE
-ibool
+bool
 rw_lock_lock_word_decr(
 /*===================*/
 	rw_lock_t*	lock,		/*!< in/out: rw-lock */
-	ulint		amount)		/*!< in: amount to decrement */
+	ulint		amount,		/*!< in: amount to decrement */
+	lint		threshold)	/*!< in: threshold of judgement */
 {
 #ifdef INNODB_RW_LOCKS_USE_ATOMICS
 	lint local_lock_word;
 
 	os_rmb;
 	local_lock_word = lock->lock_word;
-	while (local_lock_word > 0) {
+	while (local_lock_word > threshold) {
 		if (os_compare_and_swap_lint(&lock->lock_word,
 					     local_lock_word,
 					     local_lock_word - amount)) {
-			return(TRUE);
+			return(true);
 		}
 		local_lock_word = lock->lock_word;
 	}
-	return(FALSE);
+	return(false);
 #else /* INNODB_RW_LOCKS_USE_ATOMICS */
-	ibool success = FALSE;
+	bool success = false;
 	mutex_enter(&(lock->mutex));
-	if (lock->lock_word > 0) {
+	if (lock->lock_word > threshold) {
 		lock->lock_word -= amount;
-		success = TRUE;
+		success = true;
 	}
 	mutex_exit(&(lock->mutex));
 	return(success);
@@ -266,7 +337,7 @@ void
 rw_lock_set_writer_id_and_recursion_flag(
 /*=====================================*/
 	rw_lock_t*	lock,		/*!< in/out: lock to work on */
-	ibool		recursive)	/*!< in: TRUE if recursion
+	bool		recursive)	/*!< in: true if recursion
 					allowed */
 {
 	os_thread_id_t	curr_thread	= os_thread_get_curr_id();
@@ -312,7 +383,7 @@ rw_lock_s_lock_low(
 	const char*	file_name, /*!< in: file name where lock requested */
 	ulint		line)	/*!< in: line where requested */
 {
-	if (!rw_lock_lock_word_decr(lock, 1)) {
+	if (!rw_lock_lock_word_decr(lock, 1, 0)) {
 		/* Locking did not succeed */
 		return(FALSE);
 	}
@@ -412,7 +483,7 @@ rw_lock_x_lock_func_nowait(
 	To achieve this we load it before os_compare_and_swap_lint(),
 	which implies full memory barrier in current implementation. */
 	if (success) {
-		rw_lock_set_writer_id_and_recursion_flag(lock, TRUE);
+		rw_lock_set_writer_id_and_recursion_flag(lock, true);
 
 	} else if (local_recursive
 		   && os_thread_eq(lock->writer_thread,
@@ -420,9 +491,12 @@ rw_lock_x_lock_func_nowait(
 		/* Relock: this lock_word modification is safe since no other
 		threads can modify (lock, unlock, or reserve) lock_word while
 		there is an exclusive writer and this is the writer thread. */
-		if (lock->lock_word == 0) {
-			lock->lock_word = -X_LOCK_DECR;
+		if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) {
+			/* There are 1 x-locks */
+			lock->lock_word -= X_LOCK_DECR;
 		} else {
+			ut_ad(lock->lock_word <= -X_LOCK_DECR);
+			/* There are 2 or more x-locks */
 			lock->lock_word--;
 		}
 
@@ -472,7 +546,8 @@ rw_lock_s_unlock_func(
 #endif
 
 	/* Increment lock_word to indicate 1 less reader */
-	if (rw_lock_lock_word_incr(lock, 1) == 0) {
+	lint	lock_word = rw_lock_lock_word_incr(lock, 1);
+	if (lock_word == 0 || lock_word == -X_LOCK_HALF_DECR) {
 
 		/* wait_ex waiter exists. It may not be asleep, but we signal
 		anyway. We do not wake other waiters, because they can't
@@ -501,7 +576,8 @@ rw_lock_x_unlock_func(
 #endif
 	rw_lock_t*	lock)	/*!< in/out: rw-lock */
 {
-	ut_ad(lock->lock_word == 0 || lock->lock_word <= -X_LOCK_DECR);
+	ut_ad(lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR
+	      || lock->lock_word <= -X_LOCK_DECR);
 
 	/* lock->recursive flag also indicates if lock->writer_thread is
 	valid or stale. If we are the last of the recursive callers
@@ -518,18 +594,16 @@ rw_lock_x_unlock_func(
 	rw_lock_remove_debug_info(lock, pass, RW_LOCK_EX);
 #endif
 
-	ulint x_lock_incr;
-	if (lock->lock_word == 0) {
-		x_lock_incr = X_LOCK_DECR;
-	} else if (lock->lock_word == -X_LOCK_DECR) {
-		x_lock_incr = X_LOCK_DECR;
-	} else {
-		ut_ad(lock->lock_word < -X_LOCK_DECR);
-		x_lock_incr = 1;
-	}
+	if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) {
+		/* There is 1 x-lock */
+		/* atomic increment is needed, because it is last */
+		if (rw_lock_lock_word_incr(lock, X_LOCK_DECR) <= 0) {
+			ut_error;
+		}
 
-	if (rw_lock_lock_word_incr(lock, x_lock_incr) == X_LOCK_DECR) {
-		/* Lock is now free. May have to signal read/write waiters.
+		/* This no longer has an X-lock but it may still have
+		an SX-lock. So it is now free for S-locks by other threads.
+		We need to signal read/write waiters.
 		We do not need to signal wait_ex waiters, since they cannot
 		exist when there is a writer. */
 		if (lock->waiters) {
@@ -537,6 +611,14 @@ rw_lock_x_unlock_func(
 			os_event_set(lock->event);
 			sync_array_object_signalled();
 		}
+	} else if (lock->lock_word == -X_LOCK_DECR
+		   || lock->lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR)) {
+		/* There are 2 x-locks */
+		lock->lock_word += X_LOCK_DECR;
+	} else {
+		/* There are more than 2 x-locks. */
+		ut_ad(lock->lock_word < -X_LOCK_DECR);
+		lock->lock_word += 1;
 	}
 
 	ut_ad(rw_lock_validate(lock));
@@ -546,6 +628,63 @@ rw_lock_x_unlock_func(
 #endif
 }
 
+/******************************************************************//**
+Releases a sx mode lock. */
+UNIV_INLINE
+void
+rw_lock_sx_unlock_func(
+/*===================*/
+#ifdef UNIV_SYNC_DEBUG
+	ulint		pass,	/*!< in: pass value; != 0, if the lock may have
+				been passed to another thread to unlock */
+#endif /* UNIV_SYNC_DEBUG */
+	rw_lock_t*	lock)	/*!< in/out: rw-lock */
+{
+	ut_ad(rw_lock_get_sx_lock_count(lock));
+	ut_a(lock->sx_recursive > 0);
+
+	lock->sx_recursive -= 1;
+
+#ifdef UNIV_SYNC_DEBUG
+	rw_lock_remove_debug_info(lock, pass, RW_LOCK_SX);
+#endif /* UNIV_SYNC_DEBUG */
+
+	if (lock->sx_recursive == 0) {
+		/* Last caller in a possible recursive chain. */
+		if (lock->lock_word > 0) {
+			lock->recursive = FALSE;
+			UNIV_MEM_INVALID(&lock->writer_thread,
+					 sizeof lock->writer_thread);
+
+			if (rw_lock_lock_word_incr(lock, X_LOCK_HALF_DECR)
+			    <= X_LOCK_HALF_DECR) {
+				ut_error;
+			}
+			/* Lock is now free. May have to signal read/write
+			waiters. We do not need to signal wait_ex waiters,
+			since they cannot exist when there is an sx-lock
+			holder. */
+			if (lock->waiters) {
+				rw_lock_reset_waiter_flag(lock);
+				os_event_set(lock->event);
+				sync_array_object_signalled();
+			}
+		} else {
+			/* still has x-lock */
+			ut_ad(lock->lock_word == -X_LOCK_HALF_DECR
+			      || lock->lock_word <= -(X_LOCK_DECR
+						      + X_LOCK_HALF_DECR));
+			lock->lock_word += X_LOCK_HALF_DECR;
+		}
+	}
+
+	ut_ad(rw_lock_validate(lock));
+
+#ifdef UNIV_SYNC_PERF_STAT
+	rw_sx_exit_count++;
+#endif /* UNIV_SYNC_PERF_STAT */
+}
+
 #ifdef UNIV_PFS_RWLOCK
 
 /******************************************************************//**
diff --git a/storage/innobase/include/sync0sync.h b/storage/innobase/include/sync0sync.h
index 2ffa14f..e86fd2d 100644
--- a/storage/innobase/include/sync0sync.h
+++ b/storage/innobase/include/sync0sync.h
@@ -741,7 +741,18 @@ or row lock! */
 #define RW_LOCK_EXCLUSIVE	351
 #define RW_LOCK_SHARED		352
 #define RW_LOCK_WAIT_EX		353
-#define SYNC_MUTEX		354
+#define RW_LOCK_SX		354
+#define SYNC_MUTEX		355
+
+#ifdef UNIV_SYNC_DEBUG
+/* Flags to specify lock types for rw_lock_own_flagged() */
+enum rw_lock_flag_t {
+	RW_LOCK_FLAG_S  = 1 << 0,
+	RW_LOCK_FLAG_X  = 1 << 1,
+	RW_LOCK_FLAG_SX = 1 << 2
+};
+typedef ulint rw_lock_flags_t;
+#endif /* UNIV_SYNC_DEBUG */
 
 /* NOTE! The structure appears here only for the compiler to know its size.
 Do not use its fields directly! The structure used in the spin lock
diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc
index 400aa9b..ef6fe09 100644
--- a/storage/innobase/mtr/mtr0mtr.cc
+++ b/storage/innobase/mtr/mtr0mtr.cc
@@ -78,6 +78,7 @@ mtr_memo_slot_release_func(
 	switch (slot->type) {
 	case MTR_MEMO_PAGE_S_FIX:
 	case MTR_MEMO_PAGE_X_FIX:
+	case MTR_MEMO_PAGE_SX_FIX:
 	case MTR_MEMO_BUF_FIX:
 		buf_page_release((buf_block_t*) object, slot->type);
 		break;
@@ -87,10 +88,15 @@ mtr_memo_slot_release_func(
 	case MTR_MEMO_X_LOCK:
 		rw_lock_x_unlock((rw_lock_t*) object);
 		break;
+	case MTR_MEMO_SX_LOCK:
+		rw_lock_sx_unlock((rw_lock_t*) object);
+		break;
 #ifdef UNIV_DEBUG
 	default:
 		ut_ad(slot->type == MTR_MEMO_MODIFY);
-		ut_ad(mtr_memo_contains(mtr, object, MTR_MEMO_PAGE_X_FIX));
+		ut_ad(mtr_memo_contains_flagged(mtr, object,
+						MTR_MEMO_PAGE_X_FIX
+						| MTR_MEMO_PAGE_SX_FIX));
 #endif /* UNIV_DEBUG */
 	}
 }
@@ -149,11 +155,13 @@ mtr_memo_slot_note_modification(
 	ut_ad(!srv_read_only_mode);
 	ut_ad(mtr->magic_n == MTR_MAGIC_N);
 
-	if (slot->object != NULL && slot->type == MTR_MEMO_PAGE_X_FIX) {
-		buf_block_t*	block = (buf_block_t*) slot->object;
-
-		ut_ad(!mtr->made_dirty || log_flush_order_mutex_own());
-		buf_flush_note_modification(block, mtr);
+	if (buf_block_t* block = static_cast<buf_block_t*>(slot->object)) {
+		switch (slot->type) {
+		case MTR_MEMO_PAGE_X_FIX:
+		case MTR_MEMO_PAGE_SX_FIX:
+			ut_ad(!mtr->made_dirty || log_flush_order_mutex_own());
+			buf_flush_note_modification(block, mtr);
+		}
 	}
 }
 
@@ -384,6 +392,65 @@ mtr_memo_release(
 
 	return(false);
 }
+
+/**********************************************************//**
+Releases the block in an mtr memo after a savepoint. */
+
+void
+mtr_release_block_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block)		/*!< in: block to release */
+{
+	mtr_memo_slot_t* slot;
+	dyn_array_t*	memo;
+
+	ut_ad(mtr);
+	ut_ad(mtr->magic_n == MTR_MAGIC_N);
+	ut_ad(mtr->state == MTR_ACTIVE);
+
+	memo = &mtr->memo;
+
+	slot = static_cast<mtr_memo_slot_t*>(
+		dyn_array_get_element(memo, savepoint));
+
+	ut_a(slot->object == block);
+
+	buf_page_release(block, slot->type);
+
+	slot->object = NULL;
+}
+/**********************************************************//**
+Relax the block latch in an mtr memo after a savepoint
+from X to SX. */
+
+void
+mtr_block_x_to_sx_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block)		/*!< in: block to relax latch */
+{
+	mtr_memo_slot_t* slot;
+	dyn_array_t*	memo;
+
+	ut_ad(mtr);
+	ut_ad(mtr->magic_n == MTR_MAGIC_N);
+	ut_ad(mtr->state == MTR_ACTIVE);
+
+	memo = &mtr->memo;
+
+	slot = static_cast<mtr_memo_slot_t*>(
+		dyn_array_get_element(memo, savepoint));
+
+	ut_a(slot->object == block);
+	ut_a(slot->type == MTR_MEMO_PAGE_X_FIX);
+
+	rw_lock_sx_lock(&block->lock);
+	rw_lock_x_unlock(&block->lock);
+	slot->type = MTR_MEMO_PAGE_SX_FIX;
+}
 #endif /* !UNIV_HOTBACKUP */
 
 /********************************************************//**
@@ -399,8 +466,10 @@ mtr_read_ulint(
 				/*!< in: mini-transaction handle */
 {
 	ut_ad(mtr->state == MTR_ACTIVE);
-	ut_ad(mtr_memo_contains_page(mtr, ptr, MTR_MEMO_PAGE_S_FIX)
-	      || mtr_memo_contains_page(mtr, ptr, MTR_MEMO_PAGE_X_FIX));
+	ut_ad(mtr_memo_contains_page_flagged(mtr, ptr,
+					     MTR_MEMO_PAGE_S_FIX
+					     | MTR_MEMO_PAGE_X_FIX
+					     | MTR_MEMO_PAGE_SX_FIX));
 
 	return(mach_read_ulint(ptr, type));
 }
@@ -421,6 +490,21 @@ mtr_memo_contains_page(
 	return(mtr_memo_contains(mtr, buf_block_align(ptr), type));
 }
 
+/**********************************************************//**
+Checks if memo contains the given page.
+ at return true if contains */
+
+bool
+mtr_memo_contains_page_flagged(
+/*===========================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const byte*	ptr,	/*!< in: pointer to buffer frame */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+{
+	return(mtr_memo_contains_flagged(mtr, buf_block_align(ptr), flags));
+}
+
 /*********************************************************//**
 Prints info of an mtr handle. */
 UNIV_INTERN
diff --git a/storage/innobase/srv/srv0mon.cc b/storage/innobase/srv/srv0mon.cc
index 39481a9..93251ca 100644
--- a/storage/innobase/srv/srv0mon.cc
+++ b/storage/innobase/srv/srv0mon.cc
@@ -1238,6 +1238,12 @@ static monitor_info_t	innodb_counter_info[] =
 	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
 	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_X_SPIN_WAITS},
 
+	{"innodb_rwlock_sx_spin_waits", "server",
+	 "Number of rwlock spin waits due to sx latch request",
+	 static_cast<monitor_type_t>(
+	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
+	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_SX_SPIN_WAITS},
+
 	{"innodb_rwlock_s_spin_rounds", "server",
 	 "Number of rwlock spin loop rounds due to shared latch request",
 	 static_cast<monitor_type_t>(
@@ -1250,6 +1256,12 @@ static monitor_info_t	innodb_counter_info[] =
 	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
 	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_X_SPIN_ROUNDS},
 
+	{"innodb_rwlock_sx_spin_rounds", "server",
+	 "Number of rwlock spin loop rounds due to sx latch request",
+	 static_cast<monitor_type_t>(
+	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
+	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_SX_SPIN_ROUNDS},
+
 	{"innodb_rwlock_s_os_waits", "server",
 	 "Number of OS waits due to shared latch request",
 	 static_cast<monitor_type_t>(
@@ -1262,6 +1274,12 @@ static monitor_info_t	innodb_counter_info[] =
 	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
 	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_X_OS_WAITS},
 
+	{"innodb_rwlock_sx_os_waits", "server",
+	 "Number of OS waits due to sx latch request",
+	 static_cast<monitor_type_t>(
+	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
+	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_SX_OS_WAITS},
+
 	/* ========== Counters for DML operations ========== */
 	{"module_dml", "dml", "Statistics for DMLs",
 	 MONITOR_MODULE,
@@ -1805,6 +1823,10 @@ srv_mon_process_existing_counter(
 		value = rw_lock_stats.rw_x_spin_wait_count;
 		break;
 
+	case MONITOR_OVLD_RWLOCK_SX_SPIN_WAITS:
+		value = rw_lock_stats.rw_sx_spin_wait_count;
+		break;
+
 	case MONITOR_OVLD_RWLOCK_S_SPIN_ROUNDS:
 		value = rw_lock_stats.rw_s_spin_round_count;
 		break;
@@ -1813,6 +1835,10 @@ srv_mon_process_existing_counter(
 		value = rw_lock_stats.rw_x_spin_round_count;
 		break;
 
+	case MONITOR_OVLD_RWLOCK_SX_SPIN_ROUNDS:
+		value = rw_lock_stats.rw_sx_spin_round_count;
+		break;
+
 	case MONITOR_OVLD_RWLOCK_S_OS_WAITS:
 		value = rw_lock_stats.rw_s_os_wait_count;
 		break;
@@ -1821,6 +1847,10 @@ srv_mon_process_existing_counter(
 		value = rw_lock_stats.rw_x_os_wait_count;
 		break;
 
+	case MONITOR_OVLD_RWLOCK_SX_OS_WAITS:
+		value = rw_lock_stats.rw_sx_os_wait_count;
+		break;
+
 	case MONITOR_OVLD_BUFFER_POOL_SIZE:
 		value = srv_buf_pool_size;
 		break;
diff --git a/storage/innobase/sync/sync0arr.cc b/storage/innobase/sync/sync0arr.cc
index aa0cdfe..566f9c0 100644
--- a/storage/innobase/sync/sync0arr.cc
+++ b/storage/innobase/sync/sync0arr.cc
@@ -512,10 +512,12 @@ sync_array_cell_print(
 
 	} else if (type == RW_LOCK_EX
 		   || type == RW_LOCK_WAIT_EX
+		   || type == RW_LOCK_SX
 		   || type == RW_LOCK_SHARED) {
 
 		fputs(type == RW_LOCK_EX ? "X-lock on"
 		      : type == RW_LOCK_WAIT_EX ? "X-lock (wait_ex) on"
+		      : type == RW_LOCK_SX ? "SX-lock on"
 		      : "S-lock on", file);
 
 		rwlock = (rw_lock_t*)cell->old_wait_rw_lock;
@@ -530,13 +532,12 @@ sync_array_cell_print(
 
 			if (writer && writer != RW_LOCK_NOT_LOCKED) {
 				fprintf(file,
-					"a writer (thread id %lu) has"
-					" reserved it in mode %s",
-					(ulong) os_thread_pf(rwlock->writer_thread),
-					writer == RW_LOCK_EX
-					? " exclusive\n"
-					: " wait exclusive\n");
-				*reserver = rwlock->writer_thread;
+				"a writer (thread id %lu) has"
+				" reserved it in mode %s",
+				(ulong) os_thread_pf(rwlock->writer_thread),
+				writer == RW_LOCK_EX ? " exclusive\n"
+				: writer == RW_LOCK_SX ? " SX\n"
+				: " wait exclusive\n");
 			}
 
 			fprintf(file,
@@ -555,7 +556,6 @@ sync_array_cell_print(
 			fprintf(file,
 				"Holder thread %lu file %s line %lu\n",
 				rwlock->thread_id, rwlock->file_name, rwlock->line);
-
 		}
 	} else {
 		ut_error;
@@ -644,7 +644,8 @@ sync_array_detect_deadlock(
 		return(FALSE); /* No deadlock here */
 	}
 
-	if (cell->request_type == SYNC_MUTEX) {
+	switch (cell->request_type) {
+	case SYNC_MUTEX:
 
 		mutex = static_cast<ib_mutex_t*>(cell->wait_object);
 
@@ -675,8 +676,8 @@ sync_array_detect_deadlock(
 
 		return(FALSE); /* No deadlock */
 
-	} else if (cell->request_type == RW_LOCK_EX
-		   || cell->request_type == RW_LOCK_WAIT_EX) {
+	case RW_LOCK_EX:
+	case RW_LOCK_WAIT_EX:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
@@ -686,17 +687,21 @@ sync_array_detect_deadlock(
 
 			thread = debug->thread_id;
 
-			if (((debug->lock_type == RW_LOCK_EX)
-			     && !os_thread_eq(thread, cell->thread))
-			    || ((debug->lock_type == RW_LOCK_WAIT_EX)
-				&& !os_thread_eq(thread, cell->thread))
-			    || (debug->lock_type == RW_LOCK_SHARED)) {
+			switch (debug->lock_type) {
+			case RW_LOCK_EX:
+			case RW_LOCK_WAIT_EX:
+			case RW_LOCK_SX:
+				if (os_thread_eq(thread, cell->thread)) {
+					break;
+				}
+				/* fall through */
+			case RW_LOCK_SHARED:
 
 				/* The (wait) x-lock request can block
 				infinitely only if someone (can be also cell
 				thread) is holding s-lock, or someone
-				(cannot be cell thread) (wait) x-lock, and
-				he is blocked by start thread */
+				(cannot be cell thread) (wait) x-lock or
+				sx-lock, and he is blocked by start thread */
 
 				ret = sync_array_deadlock_step(
 					arr, start, thread, debug->pass,
@@ -714,7 +719,41 @@ sync_array_detect_deadlock(
 
 		return(FALSE);
 
-	} else if (cell->request_type == RW_LOCK_SHARED) {
+	case RW_LOCK_SX:
+
+		lock = static_cast<rw_lock_t*>(cell->wait_object);
+
+		for (debug = UT_LIST_GET_FIRST(lock->debug_list);
+		     debug != 0;
+		     debug = UT_LIST_GET_NEXT(list, debug)) {
+
+			thread = debug->thread_id;
+
+			switch (debug->lock_type) {
+			case RW_LOCK_EX:
+			case RW_LOCK_WAIT_EX:
+			case RW_LOCK_SX:
+				if (os_thread_eq(thread, cell->thread)) {
+					break;
+				}
+
+				/* The sx-lock request can block infinitely
+				only if someone (can be also cell thread) is
+				holding (wait) x-lock or sx-lock, and he is
+				blocked by start thread */
+
+				ret = sync_array_deadlock_step(
+					arr, start, thread, debug->pass,
+					depth);
+				if (ret) {
+					goto print;
+				}
+			}
+		}
+
+		return(FALSE);
+
+	case RW_LOCK_SHARED:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
@@ -743,12 +782,9 @@ sync_array_detect_deadlock(
 
 		return(FALSE);
 
-	} else {
+	default:
 		ut_error;
 	}
-
-	return(TRUE);	/* Execution never reaches this line: for compiler
-			fooling only */
 }
 #endif /* UNIV_SYNC_DEBUG */
 
@@ -763,7 +799,8 @@ sync_arr_cell_can_wake_up(
 	ib_mutex_t*	mutex;
 	rw_lock_t*	lock;
 
-	if (cell->request_type == SYNC_MUTEX) {
+	switch (cell->request_type) {
+	case SYNC_MUTEX:
 
 		mutex = static_cast<ib_mutex_t*>(cell->wait_object);
 
@@ -773,28 +810,37 @@ sync_arr_cell_can_wake_up(
 			return(TRUE);
 		}
 
-	} else if (cell->request_type == RW_LOCK_EX) {
+		break;
+
+	case RW_LOCK_EX:
+	case RW_LOCK_SX:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
 		os_rmb;
-		if (lock->lock_word > 0) {
+		if (lock->lock_word > X_LOCK_HALF_DECR) {
 		/* Either unlocked or only read locked. */
 
 			return(TRUE);
 		}
 
-        } else if (cell->request_type == RW_LOCK_WAIT_EX) {
+		break;
+
+	case RW_LOCK_WAIT_EX:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
-                /* lock_word == 0 means all readers have left */
 		os_rmb;
+                /* lock_word == 0 means all readers or sx have left */
 		if (lock->lock_word == 0) {
 
 			return(TRUE);
 		}
-	} else if (cell->request_type == RW_LOCK_SHARED) {
+
+		break;
+
+	case RW_LOCK_SHARED:
+
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
                 /* lock_word > 0 means no writer or reserved writer */
diff --git a/storage/innobase/sync/sync0rw.cc b/storage/innobase/sync/sync0rw.cc
index 8919716..651375b 100644
--- a/storage/innobase/sync/sync0rw.cc
+++ b/storage/innobase/sync/sync0rw.cc
@@ -48,22 +48,45 @@ Created 9/11/1995 Heikki Tuuri
 	=============================
 The status of a rw_lock is held in lock_word. The initial value of lock_word is
 X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
-for each x-lock. This describes the lock state for each value of lock_word:
-
-lock_word == X_LOCK_DECR:      Unlocked.
-0 < lock_word < X_LOCK_DECR:   Read locked, no waiting writers.
-			       (X_LOCK_DECR - lock_word) is the
-			       number of readers that hold the lock.
-lock_word == 0:		       Write locked
--X_LOCK_DECR < lock_word < 0:  Read locked, with a waiting writer.
-			       (-lock_word) is the number of readers
-			       that hold the lock.
-lock_word <= -X_LOCK_DECR:     Recursively write locked. lock_word has been
-			       decremented by X_LOCK_DECR for the first lock
-			       and the first recursive lock, then by 1 for
-			       each recursive lock thereafter.
-			       So the number of locks is:
-			       (lock_copy == 0) ? 1 : 2 - (lock_copy + X_LOCK_DECR)
+or 1 for each x-lock. This describes the lock state for each value of lock_word:
+
+lock_word == X_LOCK_DECR:	Unlocked.
+X_LOCK_HALF_DECR < lock_word < X_LOCK_DECR:
+				S locked, no waiting writers.
+				(X_LOCK_DECR - lock_word) is the number
+				of S locks.
+lock_word == X_LOCK_HALF_DECR:	SX locked, no waiting writers.
+0 < lock_word < X_LOCK_HALF_DECR:
+				SX locked AND S locked, no waiting writers.
+				(X_LOCK_HALF_DECR - lock_word) is the number
+				of S locks.
+lock_word == 0:			X locked, no waiting writers.
+-X_LOCK_HALF_DECR < lock_word < 0:
+				S locked, with a waiting writer.
+				(-lock_word) is the number of S locks.
+lock_word == -X_LOCK_HALF_DECR:	X locked and SX locked, no waiting writers.
+-X_LOCK_DECR < lock_word < -X_LOCK_HALF_DECR:
+				S locked, with a waiting writer
+				which has SX lock.
+				-(lock_word + X_LOCK_HALF_DECR) is the number
+				of S locks.
+lock_word == -X_LOCK_DECR:	X locked with recursive X lock (2 X locks).
+-(X_LOCK_DECR + X_LOCK_HALF_DECR) < lock_word < -X_LOCK_DECR:
+				X locked. The number of the X locks is:
+				2 - (lock_word + X_LOCK_DECR)
+lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR):
+				X locked with recursive X lock (2 X locks)
+				and SX locked.
+lock_word < -(X_LOCK_DECR + X_LOCK_HALF_DECR):
+				X locked and SX locked.
+				The number of the X locks is:
+				2 - (lock_word + X_LOCK_DECR + X_LOCK_HALF_DECR)
+
+ LOCK COMPATIBILITY MATRIX
+    S SX  X
+ S  +  +  -
+ SX +  -  -
+ X  -  -  -
 
 The lock_word is always read and updated atomically and consistently, so that
 it always represents the state of the lock, and the state of the lock changes
@@ -71,12 +94,13 @@ with a single atomic operation. This lock_word holds all of the information
 that a thread needs in order to determine if it is eligible to gain the lock
 or if it must spin or sleep. The one exception to this is that writer_thread
 must be verified before recursive write locks: to solve this scenario, we make
-writer_thread readable by all threads, but only writeable by the x-lock holder.
+writer_thread readable by all threads, but only writeable by the x-lock or
+sx-lock holder.
 
 The other members of the lock obey the following rules to remain consistent:
 
 recursive:	This and the writer_thread field together control the
-		behaviour of recursive x-locking.
+		behaviour of recursive x-locking or sx-locking.
 		lock->recursive must be FALSE in following states:
 			1) The writer_thread contains garbage i.e.: the
 			lock has just been initialized.
@@ -238,6 +262,7 @@ rw_lock_create_func(
 	contains garbage at initialization and cannot be used for
 	recursive x-locking. */
 	lock->recursive = FALSE;
+	lock->sx_recursive = 0;
 	/* Silence Valgrind when UNIV_DEBUG_VALGRIND is not enabled. */
 	memset((void*) &lock->writer_thread, 0, sizeof lock->writer_thread);
 	UNIV_MEM_INVALID(&lock->writer_thread, sizeof lock->writer_thread);
@@ -328,7 +353,7 @@ UNIV_INTERN
 ibool
 rw_lock_validate(
 /*=============*/
-	rw_lock_t*	lock)	/*!< in: rw-lock */
+	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	ulint	waiters;
 	lint	lock_word;
@@ -449,7 +474,7 @@ rw_lock_x_lock_move_ownership(
 {
 	ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX));
 
-	rw_lock_set_writer_id_and_recursion_flag(lock, TRUE);
+	rw_lock_set_writer_id_and_recursion_flag(lock, true);
 }
 
 /******************************************************************//**
@@ -457,13 +482,14 @@ Function for the next writer to call. Waits for readers to exit.
 The caller must have already decremented lock_word by X_LOCK_DECR. */
 UNIV_INLINE
 void
-rw_lock_x_lock_wait(
-/*================*/
+rw_lock_x_lock_wait_func(
+/*=====================*/
 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
 #ifdef UNIV_SYNC_DEBUG
 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
 				be passed to another thread to unlock */
 #endif
+	lint		threshold,/*!< in: threshold to wait for */
 	const char*	file_name,/*!< in: file name where lock requested */
 	ulint		line)	/*!< in: line where requested */
 {
@@ -478,10 +504,10 @@ rw_lock_x_lock_wait(
 	counter_index = (size_t) os_thread_get_curr_id();
 
 	os_rmb;
-	ut_ad(lock->lock_word <= 0);
+	ut_ad(lock->lock_word <= threshold);
 
         HMT_low();
-	while (lock->lock_word < 0) {
+	while (lock->lock_word < threshold) {
 		if (srv_spin_wait_delay) {
 			ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
 		}
@@ -503,7 +529,7 @@ rw_lock_x_lock_wait(
 		i = 0;
 
 		/* Check lock_word to ensure wake-up isn't missed.*/
-		if (lock->lock_word < 0) {
+		if (lock->lock_word < threshold) {
 
 			/* these stats may not be accurate */
 			lock->count_os_wait++;
@@ -538,6 +564,13 @@ rw_lock_x_lock_wait(
 	HMT_medium();
 	rw_lock_stats.rw_x_spin_round_count.add(counter_index, i);
 }
+#ifdef UNIV_SYNC_DEBUG
+# define rw_lock_x_lock_wait(L, P, T, F, O)		\
+	rw_lock_x_lock_wait_func(L, P, T, F, O)
+#else
+# define rw_lock_x_lock_wait(L, P, T, F, O)		\
+	rw_lock_x_lock_wait_func(L, T, F, O)
+#endif
 
 /******************************************************************//**
 Low-level function for acquiring an exclusive lock.
@@ -554,7 +587,7 @@ rw_lock_x_lock_low(
 {
 	ibool local_recursive= lock->recursive;
 
-	if (rw_lock_lock_word_decr(lock, X_LOCK_DECR)) {
+	if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, X_LOCK_HALF_DECR)) {
 
 		/* lock->recursive also tells us if the writer_thread
 		field is stale or active. As we are going to write
@@ -564,29 +597,46 @@ rw_lock_x_lock_low(
 
 		/* Decrement occurred: we are writer or next-writer. */
 		rw_lock_set_writer_id_and_recursion_flag(
-			lock, pass ? FALSE : TRUE);
+			lock, !pass);
 
-		rw_lock_x_lock_wait(lock,
-#ifdef UNIV_SYNC_DEBUG
-				    pass,
-#endif
-				    file_name, line);
+		rw_lock_x_lock_wait(lock, pass,
+				    0, file_name, line);
 
 	} else {
 		os_thread_id_t	thread_id = os_thread_get_curr_id();
 
-		/* Decrement failed: relock or failed lock
+		/* Decrement failed: An X or SX lock is held by either
+		this thread or another. Try to relock.
 		Note: recursive must be loaded before writer_thread see
 		comment for rw_lock_set_writer_id_and_recursion_flag().
 		To achieve this we load it before rw_lock_lock_word_decr(),
 		which implies full memory barrier in current implementation. */
 		if (!pass && local_recursive
 		    && os_thread_eq(lock->writer_thread, thread_id)) {
-			/* Relock */
-			if (lock->lock_word == 0) {
-				lock->lock_word -= X_LOCK_DECR;
+			/* Other s-locks can be allowed. If it is request x
+			recursively while holding sx lock, this x lock should
+			be along with the latching-order. */
+
+			/* The existing X or SX lock is from this thread */
+			if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, 0)) {
+				/* There is at least one SX-lock from this
+				thread, but no X-lock. */
+
+				/* Wait for any the other S-locks to be
+				released. */
+				rw_lock_x_lock_wait(lock, pass,
+						    -X_LOCK_HALF_DECR,
+						    file_name, line);
 			} else {
-				--lock->lock_word;
+				/* At least one X lock by this thread already
+				exists. Add another. */
+				if (lock->lock_word == 0
+				    || lock->lock_word == -X_LOCK_HALF_DECR) {
+					lock->lock_word -= X_LOCK_DECR;
+				} else {
+					ut_ad(lock->lock_word <= -X_LOCK_DECR);
+					--lock->lock_word;
+				}
 			}
 
 		} else {
@@ -611,6 +661,83 @@ rw_lock_x_lock_low(
 }
 
 /******************************************************************//**
+Low-level function for acquiring an sx lock.
+ at return FALSE if did not succeed, TRUE if success. */
+UNIV_INLINE
+ibool
+rw_lock_sx_lock_low(
+/*================*/
+	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
+	ulint		pass,	/*!< in: pass value; != 0, if the lock will
+				be passed to another thread to unlock */
+	const char*	file_name,/*!< in: file name where lock requested */
+	ulint		line)	/*!< in: line where requested */
+{
+	if (rw_lock_lock_word_decr(lock, X_LOCK_HALF_DECR, X_LOCK_HALF_DECR)) {
+
+		/* lock->recursive also tells us if the writer_thread
+		field is stale or active. As we are going to write
+		our own thread id in that field it must be that the
+		current writer_thread value is not active. */
+		ut_a(!lock->recursive);
+
+		/* Decrement occurred: we are the SX lock owner. */
+		rw_lock_set_writer_id_and_recursion_flag(
+			lock, !pass);
+
+		lock->sx_recursive = 1;
+
+	} else {
+		os_thread_id_t	thread_id = os_thread_get_curr_id();
+
+		/* Decrement failed: It already has an X or SX lock by this
+		thread or another thread. If it is this thread, relock,
+		else fail. */
+		if (!pass && lock->recursive
+		    && os_thread_eq(lock->writer_thread, thread_id)) {
+			/* This thread owns an X or SX lock */
+			if (lock->sx_recursive++ == 0) {
+				/* This thread is making first SX-lock request
+				and it must be holding at least one X-lock here
+				because:
+
+				* There can't be a WAIT_EX thread because we are
+				  the thread which has it's thread_id written in
+				  the writer_thread field and we are not waiting.
+
+				* Any other X-lock thread cannot exist because
+				  it must update recursive flag only after
+				  updating the thread_id. Had there been
+				  a concurrent X-locking thread which succeeded
+				  in decrementing the lock_word it must have
+				  written it's thread_id before setting the
+				  recursive flag. As we cleared the if()
+				  condition above therefore we must be the only
+				  thread working on this lock and it is safe to
+				  read and write to the lock_word. */
+
+				ut_ad((lock->lock_word == 0)
+				      || ((lock->lock_word <= -X_LOCK_DECR)
+					  && (lock->lock_word
+					      > -(X_LOCK_DECR
+						  + X_LOCK_HALF_DECR))));
+				lock->lock_word -= X_LOCK_HALF_DECR;
+			}
+		} else {
+			/* Another thread locked before us */
+			return(FALSE);
+		}
+	}
+#ifdef UNIV_SYNC_DEBUG
+	rw_lock_add_debug_info(lock, pass, RW_LOCK_SX, file_name, line);
+#endif /* UNIV_SYNC_DEBUG */
+	lock->last_x_file_name = file_name;
+	lock->last_x_line = (unsigned int) line;
+
+	return(TRUE);
+}
+
+/******************************************************************//**
 NOTE! Use the corresponding macro, not directly this function! Lock an
 rw-lock in exclusive mode for the current thread. If the rw-lock is locked
 in shared or exclusive mode, or there is an exclusive lock request waiting,
@@ -632,7 +759,7 @@ rw_lock_x_lock_func(
 	ulint		i;	/*!< spin round count */
 	ulint		index;	/*!< index of the reserved wait cell */
 	sync_array_t*	sync_arr;
-	ibool		spinning = FALSE;
+	bool		spinning = false;
 	size_t		counter_index;
 
 	/* We reuse the thread id to index into the counter, cache
@@ -657,7 +784,7 @@ rw_lock_x_lock_func(
 	} else {
 
 		if (!spinning) {
-			spinning = TRUE;
+			spinning = true;
 
 			rw_lock_stats.rw_x_spin_wait_count.add(
 				counter_index, 1);
@@ -667,7 +794,7 @@ rw_lock_x_lock_func(
 		os_rmb;
 		HMT_low();
 		while (i < SYNC_SPIN_ROUNDS
-		       && lock->lock_word <= 0) {
+		       && lock->lock_word <= X_LOCK_HALF_DECR) {
 			if (srv_spin_wait_delay) {
 				ut_delay(ut_rnd_interval(0,
 							 srv_spin_wait_delay));
@@ -708,6 +835,103 @@ rw_lock_x_lock_func(
 	goto lock_loop;
 }
 
+/******************************************************************//**
+NOTE! Use the corresponding macro, not directly this function! Lock an
+rw-lock in SX mode for the current thread. If the rw-lock is locked
+in exclusive mode, or there is an exclusive lock request waiting,
+the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
+for the lock, before suspending the thread. If the same thread has an x-lock
+on the rw-lock, locking succeed, with the following exception: if pass != 0,
+only a single sx-lock may be taken on the lock. NOTE: If the same thread has
+an s-lock, locking does not succeed! */
+UNIV_INTERN
+void
+rw_lock_sx_lock_func(
+/*=================*/
+	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
+	ulint		pass,	/*!< in: pass value; != 0, if the lock will
+				be passed to another thread to unlock */
+	const char*	file_name,/*!< in: file name where lock requested */
+	ulint		line)	/*!< in: line where requested */
+
+{
+	ulint		i;	/*!< spin round count */
+	ulint		index;	/*!< index of the reserved wait cell */
+	sync_array_t*	sync_arr;
+	ibool		spinning = false;
+	size_t		counter_index;
+
+	/* We reuse the thread id to index into the counter, cache
+	it here for efficiency. */
+
+	counter_index = (size_t) os_thread_get_curr_id();
+
+	ut_ad(rw_lock_validate(lock));
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED));
+#endif /* UNIV_SYNC_DEBUG */
+
+	i = 0;
+
+lock_loop:
+
+	if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
+		rw_lock_stats.rw_sx_spin_round_count.add(counter_index, i);
+
+		return;	/* Locking succeeded */
+
+	} else {
+
+		if (!spinning) {
+			spinning = true;
+
+			rw_lock_stats.rw_sx_spin_wait_count.add(
+				counter_index, 1);
+		}
+
+		/* Spin waiting for the lock_word to become free */
+		while (i < SYNC_SPIN_ROUNDS
+		       && lock->lock_word <= X_LOCK_HALF_DECR) {
+			if (srv_spin_wait_delay) {
+				ut_delay(ut_rnd_interval(0,
+							 srv_spin_wait_delay));
+			}
+
+			i++;
+		}
+		if (i == SYNC_SPIN_ROUNDS) {
+			os_thread_yield();
+		} else {
+			goto lock_loop;
+		}
+	}
+
+	rw_lock_stats.rw_sx_spin_round_count.add(counter_index, i);
+
+	sync_arr = sync_array_get();
+
+	sync_array_reserve_cell(
+		sync_arr, lock, RW_LOCK_SX, file_name, line, &index);
+
+	/* Waiters must be set before checking lock_word, to ensure signal
+	is sent. This could lead to a few unnecessary wake-up signals. */
+	rw_lock_set_waiter_flag(lock);
+
+	if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
+		sync_array_free_cell(sync_arr, index);
+		return; /* Locking succeeded */
+	}
+
+	/* these stats may not be accurate */
+	lock->count_os_wait++;
+	rw_lock_stats.rw_sx_os_wait_count.add(counter_index, 1);
+
+	sync_array_wait_event(sync_arr, index);
+
+	i = 0;
+	goto lock_loop;
+}
+
 #ifdef UNIV_SYNC_DEBUG
 /******************************************************************//**
 Acquires the debug mutex. We cannot use the mutex defined in sync0sync,
@@ -765,9 +989,16 @@ rw_lock_add_debug_info(
 	rw_lock_debug_mutex_exit();
 
 	if ((pass == 0) && (lock_type != RW_LOCK_WAIT_EX)) {
+		/* recursive x while holding sx
+		(lock_type == RW_LOCK_EX && lock_word == -X_LOCK_HALF_DECR)
+		is treated as not-relock (new lock). */
+		lint	lock_word = lock->lock_word;
 		sync_thread_add_level(lock, lock->level,
-				      lock_type == RW_LOCK_EX
-				      && lock->lock_word < 0);
+				      (lock_type == RW_LOCK_EX
+				       && lock_word < -X_LOCK_HALF_DECR)
+				      || (lock_type == RW_LOCK_SX
+					  && (lock_word < 0
+					      || lock->sx_recursive > 1)));
 	}
 }
 
@@ -856,37 +1087,88 @@ rw_lock_own(
 
 	return(FALSE);
 }
+
+/******************************************************************//**
+Checks if the thread has locked the rw-lock in the specified mode, with
+the pass value == 0.
+ at return true if locked */
+UNIV_INTERN
+bool
+rw_lock_own_flagged(
+/*================*/
+	const rw_lock_t*	lock,	/*!< in: rw-lock */
+	rw_lock_flags_t		flags)	/*!< in: specify lock types with
+					OR of the rw_lock_flag_t values */
+{
+	rw_lock_debug_t*	info;
+
+	ut_ad(lock);
+	ut_ad(rw_lock_validate(lock));
+
+	rw_lock_debug_mutex_enter();
+
+	info = UT_LIST_GET_FIRST(lock->debug_list);
+
+	while (info != NULL) {
+
+		if (os_thread_eq(info->thread_id, os_thread_get_curr_id())
+		    && (info->pass == 0)) {
+			ulint	flag = 0;
+
+			switch (info->lock_type) {
+			case RW_LOCK_SHARED:
+				flag = RW_LOCK_FLAG_S;
+				break;
+			case RW_LOCK_EX:
+				flag = RW_LOCK_FLAG_X;
+				break;
+			case RW_LOCK_SX:
+				flag = RW_LOCK_FLAG_SX;
+			}
+
+			if (flags & flag) {
+				rw_lock_debug_mutex_exit();
+				/* Found! */
+
+				return(true);
+			}
+		}
+
+		info = UT_LIST_GET_NEXT(list, info);
+	}
+	rw_lock_debug_mutex_exit();
+
+	return(false);
+}
 #endif /* UNIV_SYNC_DEBUG */
 
 /******************************************************************//**
 Checks if somebody has locked the rw-lock in the specified mode.
- at return	TRUE if locked */
+ at return true if locked */
 UNIV_INTERN
-ibool
+bool
 rw_lock_is_locked(
 /*==============*/
 	rw_lock_t*	lock,		/*!< in: rw-lock */
 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_SHARED,
-					RW_LOCK_EX */
+					RW_LOCK_EX or RW_LOCK_SX */
 {
-	ibool	ret	= FALSE;
-
 	ut_ad(lock);
 	ut_ad(rw_lock_validate(lock));
 
-	if (lock_type == RW_LOCK_SHARED) {
-		if (rw_lock_get_reader_count(lock) > 0) {
-			ret = TRUE;
-		}
-	} else if (lock_type == RW_LOCK_EX) {
-		if (rw_lock_get_writer(lock) == RW_LOCK_EX) {
-			ret = TRUE;
-		}
-	} else {
+	switch (lock_type) {
+	case RW_LOCK_SHARED:
+		return(rw_lock_get_reader_count(lock) > 0);
+
+	case RW_LOCK_EX:
+		return(rw_lock_get_writer(lock) == RW_LOCK_EX);
+
+	case RW_LOCK_SX:
+		return(rw_lock_get_sx_lock_count(lock) > 0);
+
+	default:
 		ut_error;
 	}
-
-	return(ret);
 }
 
 #ifdef UNIV_SYNC_DEBUG
@@ -1003,15 +1285,24 @@ rw_lock_debug_print(
 	fprintf(f, "Locked: thread %lu file %s line %lu  ",
 		(ulong) os_thread_pf(info->thread_id), info->file_name,
 		(ulong) info->line);
-	if (rwt == RW_LOCK_SHARED) {
+
+	switch (rwt) {
+	case RW_LOCK_SHARED:
 		fputs("S-LOCK", f);
-	} else if (rwt == RW_LOCK_EX) {
+		break;
+	case RW_LOCK_EX:
 		fputs("X-LOCK", f);
-	} else if (rwt == RW_LOCK_WAIT_EX) {
+		break;
+	case RW_LOCK_SX:
+		fputs("SX-LOCK", f);
+		break;
+	case RW_LOCK_WAIT_EX:
 		fputs("WAIT X-LOCK", f);
-	} else {
+		break;
+	default:
 		ut_error;
 	}
+
 	if (info->pass != 0) {
 		fprintf(f, " pass value %lu", (ulong) info->pass);
 	}
diff --git a/storage/innobase/sync/sync0sync.cc b/storage/innobase/sync/sync0sync.cc
index 44922de..5075263 100644
--- a/storage/innobase/sync/sync0sync.cc
+++ b/storage/innobase/sync/sync0sync.cc
@@ -1579,12 +1579,14 @@ sync_print_wait_info(
 	FILE*	file)		/*!< in: file where to print */
 {
 	fprintf(file,
-		"Mutex spin waits " UINT64PF ", rounds " UINT64PF ", "
-		"OS waits " UINT64PF "\n"
-		"RW-shared spins " UINT64PF ", rounds " UINT64PF ", "
-		"OS waits " UINT64PF "\n"
-		"RW-excl spins " UINT64PF ", rounds " UINT64PF ", "
-		"OS waits " UINT64PF "\n",
+		"Mutex spin waits "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n"
+		"RW-shared spins "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n"
+		"RW-excl spins "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n"
+		"RW-sx spins "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n",
 		(ib_uint64_t) mutex_spin_wait_count,
 		(ib_uint64_t) mutex_spin_round_count,
 		(ib_uint64_t) mutex_os_wait_count,
@@ -1593,11 +1595,14 @@ sync_print_wait_info(
 		(ib_uint64_t) rw_lock_stats.rw_s_os_wait_count,
 		(ib_uint64_t) rw_lock_stats.rw_x_spin_wait_count,
 		(ib_uint64_t) rw_lock_stats.rw_x_spin_round_count,
-		(ib_uint64_t) rw_lock_stats.rw_x_os_wait_count);
+		(ib_uint64_t) rw_lock_stats.rw_x_os_wait_count,
+		(ib_uint64_t) rw_lock_stats.rw_sx_spin_wait_count,
+		(ib_uint64_t) rw_lock_stats.rw_sx_spin_round_count,
+		(ib_uint64_t) rw_lock_stats.rw_sx_os_wait_count);
 
 	fprintf(file,
 		"Spin rounds per wait: %.2f mutex, %.2f RW-shared, "
-		"%.2f RW-excl\n",
+		"%.2f RW-excl, %.2f RW-sx\n",
 		(double) mutex_spin_round_count /
 		(mutex_spin_wait_count ? mutex_spin_wait_count : 1),
 		(double) rw_lock_stats.rw_s_spin_round_count /
@@ -1605,7 +1610,10 @@ sync_print_wait_info(
 		 ? rw_lock_stats.rw_s_spin_wait_count : 1),
 		(double) rw_lock_stats.rw_x_spin_round_count /
 		(rw_lock_stats.rw_x_spin_wait_count
-		 ? rw_lock_stats.rw_x_spin_wait_count : 1));
+		 ? rw_lock_stats.rw_x_spin_wait_count : 1),
+		(double) rw_lock_stats.rw_sx_spin_round_count /
+		(rw_lock_stats.rw_sx_spin_wait_count
+		 ? rw_lock_stats.rw_sx_spin_wait_count : 1));
 }
 
 /*******************************************************************//**
diff --git a/storage/xtradb/include/mtr0log.ic b/storage/xtradb/include/mtr0log.ic
index d508d30..13a81f3 100644
--- a/storage/xtradb/include/mtr0log.ic
+++ b/storage/xtradb/include/mtr0log.ic
@@ -191,7 +191,9 @@ mlog_write_initial_log_record_fast(
 	ulint		space;
 	ulint		offset;
 
-	ut_ad(mtr_memo_contains_page(mtr, ptr, MTR_MEMO_PAGE_X_FIX));
+	ut_ad(mtr_memo_contains_page_flagged(mtr, ptr,
+					     MTR_MEMO_PAGE_X_FIX
+					     | MTR_MEMO_PAGE_SX_FIX));
 	ut_ad(type <= MLOG_BIGGEST_TYPE || EXTRA_CHECK_MLOG_NUMBER(type));
 	ut_ad(ptr && log_ptr);
 
diff --git a/storage/xtradb/include/mtr0mtr.h b/storage/xtradb/include/mtr0mtr.h
index c919915..2ddf9f6 100644
--- a/storage/xtradb/include/mtr0mtr.h
+++ b/storage/xtradb/include/mtr0mtr.h
@@ -49,15 +49,18 @@ Created 11/26/1995 Heikki Tuuri
 					form */
 
 /* Types for the mlock objects to store in the mtr memo; NOTE that the
-first 3 values must be RW_S_LATCH, RW_X_LATCH, RW_NO_LATCH */
+first 4 values must be RW_S_LATCH, RW_X_LATCH, RW_SX_LATCH, RW_NO_LATCH
+and they should be 2pow value to be used also as ORed combination of flag. */
 #define	MTR_MEMO_PAGE_S_FIX	RW_S_LATCH
 #define	MTR_MEMO_PAGE_X_FIX	RW_X_LATCH
+#define	MTR_MEMO_PAGE_SX_FIX	RW_SX_LATCH
 #define	MTR_MEMO_BUF_FIX	RW_NO_LATCH
 #ifdef UNIV_DEBUG
-# define MTR_MEMO_MODIFY	54
+# define MTR_MEMO_MODIFY	32
 #endif /* UNIV_DEBUG */
-#define	MTR_MEMO_S_LOCK		55
-#define	MTR_MEMO_X_LOCK		56
+#define	MTR_MEMO_S_LOCK		64
+#define	MTR_MEMO_X_LOCK		128
+#define	MTR_MEMO_SX_LOCK	256
 
 /** @name Log item types
 The log items are declared 'byte' so that the compiler can warn if val
@@ -256,8 +259,29 @@ mtr_release_s_latch_at_savepoint(
 	mtr_t*		mtr,		/*!< in: mtr */
 	ulint		savepoint,	/*!< in: savepoint */
 	prio_rw_lock_t*	lock);		/*!< in: latch to release */
+/**********************************************************//**
+Releases the block in an mtr memo after a savepoint. */
+
+void
+mtr_release_block_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block);		/*!< in: block to release */
+/**********************************************************//**
+Relax the block latch in an mtr memo after a savepoint
+from X to SX. */
+
+void
+mtr_block_x_to_sx_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block);		/*!< in: block to relax latch */
 #else /* !UNIV_HOTBACKUP */
 # define mtr_release_s_latch_at_savepoint(mtr,savepoint,lock) ((void) 0)
+# define mtr_release_block_at_savepoint(mtr,savepoint,lock) ((void) 0)
+# define mtr_block_x_to_sx_at_savepoint(mtr,savepoint,lock) ((void) 0)
 #endif /* !UNIV_HOTBACKUP */
 
 /**********************************************************//**
@@ -308,6 +332,10 @@ This macro locks an rw-lock in x-mode. */
 #define mtr_x_lock(B, MTR)	mtr_x_lock_func((B), __FILE__, __LINE__,\
 						(MTR))
 /*********************************************************************//**
+This macro locks an rw-lock in sx-mode. */
+#define mtr_sx_lock(B, MTR)	mtr_sx_lock_func((B), __FILE__, __LINE__,\
+						(MTR))
+/*********************************************************************//**
 NOTE! Use the macro above!
 Locks a lock in s-mode. */
 UNIV_INLINE
@@ -329,6 +357,17 @@ mtr_x_lock_func(
 	const char*	file,	/*!< in: file name */
 	ulint		line,	/*!< in: line number */
 	mtr_t*		mtr);	/*!< in: mtr */
+/*********************************************************************//**
+NOTE! Use the macro mtr_sx_lock()!
+Locks a lock in sx-mode. */
+UNIV_INLINE
+void
+mtr_sx_lock_func(
+/*=============*/
+	rw_lock_t*	lock,	/*!< in/out: rw-lock */
+	const char*	file,	/*!< in: file name */
+	ulint		line,	/*!< in: line number */
+	mtr_t*		mtr);	/*!< in/out: mtr */
 #endif /* !UNIV_HOTBACKUP */
 
 /***************************************************//**
@@ -346,17 +385,30 @@ mtr_memo_release(
 # ifndef UNIV_HOTBACKUP
 /**********************************************************//**
 Checks if memo contains the given item.
- at return	TRUE if contains */
+ at return true if contains */
 UNIV_INLINE
 bool
 mtr_memo_contains(
 /*==============*/
-	mtr_t*		mtr,	/*!< in: mtr */
+	const mtr_t*	mtr,	/*!< in: mtr */
 	const void*	object,	/*!< in: object to search */
 	ulint		type)	/*!< in: type of object */
 	__attribute__((warn_unused_result, nonnull));
 
 /**********************************************************//**
+Checks if memo contains the given item.
+ at return true if contains */
+UNIV_INLINE
+bool
+mtr_memo_contains_flagged(
+/*======================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const void*	object,	/*!< in: object to search */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+	__attribute__((warn_unused_result, nonnull));
+
+/**********************************************************//**
 Checks if memo contains the given page.
 @return	TRUE if contains */
 UNIV_INTERN
@@ -365,7 +417,20 @@ mtr_memo_contains_page(
 /*===================*/
 	mtr_t*		mtr,	/*!< in: mtr */
 	const byte*	ptr,	/*!< in: pointer to buffer frame */
-	ulint		type);	/*!< in: type of object */
+	ulint		type)	/*!< in: type of object */
+	__attribute__((warn_unused_result, nonnull));
+/**********************************************************//**
+Checks if memo contains the given page.
+ at return true if contains */
+
+bool
+mtr_memo_contains_page_flagged(
+/*===========================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const byte*	ptr,	/*!< in: pointer to buffer frame */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+	__attribute__((warn_unused_result, nonnull));
 /*********************************************************//**
 Prints info of an mtr handle. */
 UNIV_INTERN
diff --git a/storage/xtradb/include/mtr0mtr.ic b/storage/xtradb/include/mtr0mtr.ic
index aade8c3..5341e69 100644
--- a/storage/xtradb/include/mtr0mtr.ic
+++ b/storage/xtradb/include/mtr0mtr.ic
@@ -80,7 +80,8 @@ mtr_memo_push(
 
 	ut_ad(object);
 	ut_ad(type >= MTR_MEMO_PAGE_S_FIX);
-	ut_ad(type <= MTR_MEMO_X_LOCK);
+	ut_ad(type <= MTR_MEMO_SX_LOCK);
+	ut_ad(ut_is_2pow(type));
 	ut_ad(mtr);
 	ut_ad(mtr->magic_n == MTR_MAGIC_N);
 	ut_ad(mtr->state == MTR_ACTIVE);
@@ -89,7 +90,8 @@ mtr_memo_push(
 	the made_dirty flag. This tells us if we need to
 	grab log_flush_order_mutex at mtr_commit so that we
 	can insert the dirtied page to the flush list. */
-	if (type == MTR_MEMO_PAGE_X_FIX && !mtr->made_dirty) {
+	if ((type == MTR_MEMO_PAGE_X_FIX || type == MTR_MEMO_PAGE_SX_FIX)
+	    && !mtr->made_dirty) {
 		mtr->made_dirty =
 			mtr_block_dirtied((const buf_block_t*) object);
 	}
@@ -158,12 +160,12 @@ mtr_release_s_latch_at_savepoint(
 # ifdef UNIV_DEBUG
 /**********************************************************//**
 Checks if memo contains the given item.
- at return	TRUE if contains */
+ at return true if contains */
 UNIV_INLINE
 bool
 mtr_memo_contains(
 /*==============*/
-	mtr_t*		mtr,	/*!< in: mtr */
+	const mtr_t*	mtr,	/*!< in: mtr */
 	const void*	object,	/*!< in: object to search */
 	ulint		type)	/*!< in: type of object */
 {
@@ -193,6 +195,49 @@ mtr_memo_contains(
 
 	return(false);
 }
+
+/**********************************************************//**
+Checks if memo contains the given item.
+ at return true if contains */
+UNIV_INLINE
+bool
+mtr_memo_contains_flagged(
+/*======================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const void*	object,	/*!< in: object to search */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+{
+	ut_ad(mtr);
+	ut_ad(mtr->magic_n == MTR_MAGIC_N);
+	ut_ad(mtr->state == MTR_ACTIVE || mtr->state == MTR_COMMITTING);
+
+	for (const dyn_block_t* block = dyn_array_get_last_block(&mtr->memo);
+	     block;
+	     block = dyn_array_get_prev_block(&mtr->memo, block)) {
+		const mtr_memo_slot_t*	start
+			= reinterpret_cast<mtr_memo_slot_t*>(
+				dyn_block_get_data(block));
+		mtr_memo_slot_t*	slot
+			= reinterpret_cast<mtr_memo_slot_t*>(
+				dyn_block_get_data(block)
+				+ dyn_block_get_used(block));
+
+		ut_ad(!(dyn_block_get_used(block) % sizeof(mtr_memo_slot_t)));
+
+		while (slot-- != start) {
+			if (object != slot->object) {
+				continue;
+			}
+
+			if (flags & slot->type) {
+				return(true);
+			}
+		}
+	}
+
+	return(false);
+}
 # endif /* UNIV_DEBUG */
 #endif /* !UNIV_HOTBACKUP */
 
@@ -295,4 +340,23 @@ mtr_x_lock_func(
 
 	mtr_memo_push(mtr, lock, MTR_MEMO_X_LOCK);
 }
+
+/*********************************************************************//**
+Locks a lock in sx-mode. */
+UNIV_INLINE
+void
+mtr_sx_lock_func(
+/*=============*/
+	rw_lock_t*	lock,	/*!< in/out: rw-lock */
+	const char*	file,	/*!< in: file name */
+	ulint		line,	/*!< in: line number */
+	mtr_t*		mtr)	/*!< in/out: mtr */
+{
+	ut_ad(mtr);
+	ut_ad(lock);
+
+	rw_lock_sx_lock_func(lock, 0, file, line);
+
+	mtr_memo_push(mtr, lock, MTR_MEMO_SX_LOCK);
+}
 #endif /* !UNIV_HOTBACKUP */
diff --git a/storage/xtradb/include/srv0mon.h b/storage/xtradb/include/srv0mon.h
index 7086846..c7914d8 100644
--- a/storage/xtradb/include/srv0mon.h
+++ b/storage/xtradb/include/srv0mon.h
@@ -386,10 +386,13 @@ enum monitor_id_t {
 	MONITOR_OVLD_SRV_PAGE_SIZE,
 	MONITOR_OVLD_RWLOCK_S_SPIN_WAITS,
 	MONITOR_OVLD_RWLOCK_X_SPIN_WAITS,
+	MONITOR_OVLD_RWLOCK_SX_SPIN_WAITS,
 	MONITOR_OVLD_RWLOCK_S_SPIN_ROUNDS,
 	MONITOR_OVLD_RWLOCK_X_SPIN_ROUNDS,
+	MONITOR_OVLD_RWLOCK_SX_SPIN_ROUNDS,
 	MONITOR_OVLD_RWLOCK_S_OS_WAITS,
 	MONITOR_OVLD_RWLOCK_X_OS_WAITS,
+	MONITOR_OVLD_RWLOCK_SX_OS_WAITS,
 
 	/* Data DML related counters */
 	MONITOR_MODULE_DML_STATS,
diff --git a/storage/xtradb/include/sync0rw.h b/storage/xtradb/include/sync0rw.h
index 1df6a79..4fce9f9 100644
--- a/storage/xtradb/include/sync0rw.h
+++ b/storage/xtradb/include/sync0rw.h
@@ -83,20 +83,40 @@ struct rw_lock_stats_t {
 	/** number of unlocks (that unlock exclusive locks),
 	set only when UNIV_SYNC_PERF_STAT is defined */
 	ib_int64_counter_t	rw_x_exit_count;
+
+	/** number of spin waits on rw-latches,
+	resulted during sx locks */
+	ib_int64_counter_t	rw_sx_spin_wait_count;
+
+	/** number of spin loop rounds on rw-latches,
+	resulted during sx locks */
+	ib_int64_counter_t	rw_sx_spin_round_count;
+
+	/** number of OS waits on rw-latches,
+	resulted during sx locks */
+	ib_int64_counter_t	rw_sx_os_wait_count;
+
+	/** number of unlocks (that unlock sx locks),
+	set only when UNIV_SYNC_PERF_STAT is defined */
+	ib_int64_counter_t	rw_sx_exit_count;
 };
 
-/* Latch types; these are used also in btr0btr.h: keep the numerical values
-smaller than 30 and the order of the numerical values like below! */
+/* Latch types; these are used also in btr0btr.h and mtr0mtr.h: keep the
+numerical values smaller than 30 (smaller than BTR_MODIFY_TREE and
+MTR_MEMO_MODIFY) and the order of the numerical values like below! and they
+should be 2pow value to be used also as ORed combination of flag. */
 #define RW_S_LATCH	1
 #define	RW_X_LATCH	2
-#define	RW_NO_LATCH	3
+#define	RW_SX_LATCH	4
+#define	RW_NO_LATCH	8
 
 #ifndef UNIV_HOTBACKUP
-/* We decrement lock_word by this amount for each x_lock. It is also the
+/* We decrement lock_word by X_LOCK_DECR for each x_lock. It is also the
 start value for the lock_word, meaning that it limits the maximum number
-of concurrent read locks before the rw_lock breaks. The current value of
-0x00100000 allows 1,048,575 concurrent readers and 2047 recursive writers.*/
-#define X_LOCK_DECR		0x00100000
+of concurrent read locks before the rw_lock breaks. */
+/* We decrement lock_word by X_LOCK_HALF_DECR for sx_lock. */
+#define X_LOCK_DECR		0x20000000
+#define X_LOCK_HALF_DECR	0x10000000
 
 struct rw_lock_t;
 struct prio_rw_lock_t;
@@ -284,6 +304,21 @@ unlocking, not the corresponding function. */
 #define rw_lock_s_unlock(L)		rw_lock_s_unlock_gen(L, 0)
 #define rw_lock_x_unlock(L)		rw_lock_x_unlock_gen(L, 0)
 
+/* TODO: PFS doesn't treat the new lock state for now. */
+#define rw_lock_sx_lock(L)					\
+	rw_lock_sx_lock_func((L), 0, __FILE__, __LINE__)
+#define rw_lock_sx_lock_inline(M, P, F, L)			\
+	rw_lock_sx_lock_func((M), (P), (F), (L))
+#define rw_lock_sx_lock_gen(M, P)				\
+	rw_lock_sx_lock_func((M), (P), __FILE__, __LINE__)
+#ifdef UNIV_SYNC_DEBUG
+# define rw_lock_sx_unlock(L)		rw_lock_sx_unlock_func(0, L)
+# define rw_lock_sx_unlock_gen(L, P)	rw_lock_sx_unlock_func(P, L)
+#else /* UNIV_SYNC_DEBUG */
+# define rw_lock_sx_unlock(L)		rw_lock_sx_unlock_func(L)
+# define rw_lock_sx_unlock_gen(L, P)	rw_lock_sx_unlock_func(L)
+#endif /* UNIV_SYNC_DEBUG */
+
 /******************************************************************//**
 Creates, or rather, initializes an rw-lock object in a specified memory
 location (which must be appropriately aligned). The rw-lock is initialized
@@ -347,7 +382,7 @@ UNIV_INTERN
 ibool
 rw_lock_validate(
 /*=============*/
-	rw_lock_t*	lock);	/*!< in: rw-lock */
+	const rw_lock_t*	lock);	/*!< in: rw-lock */
 /******************************************************************//**
 Checks that the priority rw-lock has been initialized and that there are no
 simultaneous shared and exclusive locks.
@@ -481,6 +516,24 @@ rw_lock_x_lock_func(
 	const char*	file_name,/*!< in: file name where lock requested */
 	ulint		line);	/*!< in: line where requested */
 /******************************************************************//**
+NOTE! Use the corresponding macro, not directly this function! Lock an
+rw-lock in SX mode for the current thread. If the rw-lock is locked
+in exclusive mode, or there is an exclusive lock request waiting,
+the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
+for the lock, before suspending the thread. If the same thread has an x-lock
+on the rw-lock, locking succeed, with the following exception: if pass != 0,
+only a single sx-lock may be taken on the lock. NOTE: If the same thread has
+an s-lock, locking does not succeed! */
+
+void
+rw_lock_sx_lock_func(
+/*=================*/
+	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
+	ulint		pass,	/*!< in: pass value; != 0, if the lock will
+				be passed to another thread to unlock */
+	const char*	file_name,/*!< in: file name where lock requested */
+	ulint		line);	/*!< in: line where requested */
+/******************************************************************//**
 Releases an exclusive mode lock. */
 UNIV_INLINE
 void
@@ -491,6 +544,19 @@ rw_lock_x_unlock_func(
 				been passed to another thread to unlock */
 #endif
 	rw_lock_t*	lock);	/*!< in/out: rw-lock */
+
+/******************************************************************//**
+Releases an sx mode lock. */
+UNIV_INLINE
+void
+rw_lock_sx_unlock_func(
+/*===================*/
+#ifdef UNIV_SYNC_DEBUG
+	ulint		pass,	/*!< in: pass value; != 0, if the lock may have
+				been passed to another thread to unlock */
+#endif /* UNIV_SYNC_DEBUG */
+	rw_lock_t*	lock);	/*!< in/out: rw-lock */
+
 /******************************************************************//**
 Releases an exclusive mode priority lock. */
 UNIV_INLINE
@@ -542,6 +608,15 @@ ulint
 rw_lock_get_waiters(
 /*================*/
 	const rw_lock_t*	lock);	/*!< in: rw-lock */
+/******************************************************************//**
+Returns the number of sx-lock for the lock. Does not reserve the lock
+mutex, so the caller must be sure it is not changed during the call.
+ at return value of writer_count */
+UNIV_INLINE
+ulint
+rw_lock_get_sx_lock_count(
+/*======================*/
+	const rw_lock_t*	lock);	/*!< in: rw-lock */
 /********************************************************************//**
 Check if there are threads waiting for the priority rw-lock.
 @return	1 if waiters, 0 otherwise */
@@ -587,13 +662,14 @@ rw_lock_get_reader_count(
 /******************************************************************//**
 Decrements lock_word the specified amount if it is greater than 0.
 This is used by both s_lock and x_lock operations.
- at return	TRUE if decr occurs */
+ at return true if decr occurs */
 UNIV_INLINE
-ibool
+bool
 rw_lock_lock_word_decr(
 /*===================*/
 	rw_lock_t*	lock,		/*!< in/out: rw-lock */
-	ulint		amount);	/*!< in: amount to decrement */
+	ulint		amount,		/*!< in: amount to decrement */
+	lint		threshold);	/*!< in: threshold of judgement */
 /******************************************************************//**
 Increments lock_word the specified amount and returns new value.
 @return	lock->lock_word after increment */
@@ -617,7 +693,7 @@ void
 rw_lock_set_writer_id_and_recursion_flag(
 /*=====================================*/
 	rw_lock_t*	lock,		/*!< in/out: lock to work on */
-	ibool		recursive);	/*!< in: TRUE if recursion
+	bool		recursive);	/*!< in: true if recursion
 					allowed */
 #ifdef UNIV_SYNC_DEBUG
 /******************************************************************//**
@@ -642,16 +718,29 @@ rw_lock_own(
 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_SHARED,
 					RW_LOCK_EX */
 	__attribute__((warn_unused_result));
+
+/******************************************************************//**
+Checks if the thread has locked the rw-lock in the specified mode, with
+the pass value == 0. */
+
+bool
+rw_lock_own_flagged(
+/*================*/
+	const rw_lock_t*	lock,	/*!< in: rw-lock */
+	rw_lock_flags_t		flags)	/*!< in: specify lock types with
+					OR of the rw_lock_flag_t values */
+	__attribute__((warn_unused_result));
 #endif /* UNIV_SYNC_DEBUG */
 /******************************************************************//**
-Checks if somebody has locked the rw-lock in the specified mode. */
-UNIV_INTERN
-ibool
+Checks if somebody has locked the rw-lock in the specified mode.
+ at return true if locked */
+
+bool
 rw_lock_is_locked(
 /*==============*/
 	rw_lock_t*	lock,		/*!< in: rw-lock */
 	ulint		lock_type);	/*!< in: lock type: RW_LOCK_SHARED,
-					RW_LOCK_EX */
+					RW_LOCK_EX or RW_LOCK_SX */
 #ifdef UNIV_SYNC_DEBUG
 /***************************************************************//**
 Prints debug info of an rw-lock. */
@@ -730,6 +819,7 @@ struct rw_lock_t {
 				id of the current x-holder or wait-x thread.
 				This flag must be reset in x_unlock
 				functions before incrementing the lock_word */
+	volatile ulint	sx_recursive;/*!< number of granted SX locks. */
 	volatile os_thread_id_t	writer_thread;
 				/*!< Thread id of writer thread. Is only
 				guaranteed to have sane and non-stale
diff --git a/storage/xtradb/include/sync0rw.ic b/storage/xtradb/include/sync0rw.ic
index f66d435..bf2c294 100644
--- a/storage/xtradb/include/sync0rw.ic
+++ b/storage/xtradb/include/sync0rw.ic
@@ -137,7 +137,7 @@ rw_lock_reset_waiter_flag(
 /******************************************************************//**
 Returns the write-status of the lock - this function made more sense
 with the old rw_lock implementation.
- at return	RW_LOCK_NOT_LOCKED, RW_LOCK_EX, RW_LOCK_WAIT_EX */
+ at return RW_LOCK_NOT_LOCKED, RW_LOCK_EX, RW_LOCK_WAIT_EX, RW_LOCK_SX */
 UNIV_INLINE
 ulint
 rw_lock_get_writer(
@@ -145,14 +145,24 @@ rw_lock_get_writer(
 	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	lint lock_word = lock->lock_word;
-	if (lock_word > 0) {
+
+	ut_ad(lock_word <= X_LOCK_DECR);
+	if (lock_word > X_LOCK_HALF_DECR) {
 		/* return NOT_LOCKED in s-lock state, like the writer
 		member of the old lock implementation. */
 		return(RW_LOCK_NOT_LOCKED);
-	} else if ((lock_word == 0) || (lock_word <= -X_LOCK_DECR)) {
+	} else if (lock_word > 0) {
+		/* sx-locked, no x-locks */
+		return(RW_LOCK_SX);
+	} else if ((lock_word == 0)
+		   || (lock_word == -X_LOCK_HALF_DECR)
+		   || (lock_word <= -X_LOCK_DECR)) {
+		/* x-lock with sx-lock is also treated as RW_LOCK_EX */
 		return(RW_LOCK_EX);
 	} else {
-		ut_ad(lock_word > -X_LOCK_DECR);
+		/* x-waiter with sx-lock is also treated as RW_LOCK_WAIT_EX
+		e.g. -X_LOCK_HALF_DECR < lock_word < 0 : without sx
+		     -X_LOCK_DECR < lock_word < -X_LOCK_HALF_DECR : with sx */
 		return(RW_LOCK_WAIT_EX);
 	}
 }
@@ -171,7 +181,7 @@ rw_lock_get_writer(
 }
 
 /******************************************************************//**
-Returns the number of readers.
+Returns the number of readers (s-locks).
 @return	number of readers */
 UNIV_INLINE
 ulint
@@ -180,13 +190,28 @@ rw_lock_get_reader_count(
 	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	lint lock_word = lock->lock_word;
-	if (lock_word > 0) {
-		/* s-locked, no x-waiters */
+	ut_ad(lock_word <= X_LOCK_DECR);
+
+	if (lock_word > X_LOCK_HALF_DECR) {
+		/* s-locked, no x-waiter */
 		return(X_LOCK_DECR - lock_word);
-	} else if (lock_word < 0 && lock_word > -X_LOCK_DECR) {
-		/* s-locked, with x-waiters */
+	} else if (lock_word > 0) {
+		/* s-locked, with sx-locks only */
+		return(X_LOCK_HALF_DECR - lock_word);
+	} else if (lock_word == 0) {
+		/* x-locked */
+		return(0);
+	} else if (lock_word > -X_LOCK_HALF_DECR) {
+		/* s-locked, with x-waiter */
 		return((ulint)(-lock_word));
+	} else if (lock_word == -X_LOCK_HALF_DECR) {
+		/* x-locked with sx-locks */
+		return(0);
+	} else if (lock_word > -X_LOCK_DECR) {
+		/* s-locked, with x-waiter and sx-lock */
+		return((ulint)(-(lock_word + X_LOCK_HALF_DECR)));
 	}
+	/* no s-locks */
 	return(0);
 }
 
@@ -224,10 +249,55 @@ rw_lock_get_x_lock_count(
 	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	lint lock_copy = lock->lock_word;
-	if ((lock_copy != 0) && (lock_copy > -X_LOCK_DECR)) {
+	ut_ad(lock_copy <= X_LOCK_DECR);
+
+	if (lock_copy == 0 || lock_copy == -X_LOCK_HALF_DECR) {
+		/* "1 x-lock" or "1 x-lock + sx-locks" */
+		return(1);
+	} else if (lock_copy > -X_LOCK_DECR) {
+		/* s-locks, one or more sx-locks if > 0, or x-waiter if < 0 */
 		return(0);
+	} else if (lock_copy > -(X_LOCK_DECR + X_LOCK_HALF_DECR)) {
+		/* no s-lock, no sx-lock, 2 or more x-locks.
+		First 2 x-locks are set with -X_LOCK_DECR,
+		all other recursive x-locks are set with -1 */
+		return(2 - (lock_copy + X_LOCK_DECR));
+	} else {
+		/* no s-lock, 1 or more sx-lock, 2 or more x-locks.
+		First 2 x-locks are set with -(X_LOCK_DECR + X_LOCK_HALF_DECR),
+		all other recursive x-locks are set with -1 */
+		return(2 - (lock_copy + X_LOCK_DECR + X_LOCK_HALF_DECR));
 	}
-	return((lock_copy == 0) ? 1 : (2 - (lock_copy + X_LOCK_DECR)));
+}
+
+/******************************************************************//**
+Returns the number of sx-lock for the lock. Does not reserve the lock
+mutex, so the caller must be sure it is not changed during the call.
+ at return value of sx-lock count */
+UNIV_INLINE
+ulint
+rw_lock_get_sx_lock_count(
+/*======================*/
+	const rw_lock_t*	lock)	/*!< in: rw-lock */
+{
+#ifdef UNIV_DEBUG
+	lint lock_copy = lock->lock_word;
+
+	ut_ad(lock_copy <= X_LOCK_DECR);
+
+	while (lock_copy < 0) {
+		lock_copy += X_LOCK_DECR;
+	}
+
+	if (lock_copy > 0
+	    && lock_copy <= X_LOCK_HALF_DECR) {
+		return(lock->sx_recursive);
+	}
+
+	return(0);
+#else /* UNIV_DEBUG */
+	return(lock->sx_recursive);
+#endif /* UNIV_DEBUG */
 }
 
 /******************************************************************//**
@@ -249,34 +319,35 @@ one for systems supporting atomic operations, one for others. This does
 does not support recusive x-locks: they should be handled by the caller and
 need not be atomic since they are performed by the current lock holder.
 Returns true if the decrement was made, false if not.
- at return	TRUE if decr occurs */
+ at return true if decr occurs */
 UNIV_INLINE
-ibool
+bool
 rw_lock_lock_word_decr(
 /*===================*/
 	rw_lock_t*	lock,		/*!< in/out: rw-lock */
-	ulint		amount)		/*!< in: amount to decrement */
+	ulint		amount,		/*!< in: amount to decrement */
+	lint		threshold)	/*!< in: threshold of judgement */
 {
 #ifdef INNODB_RW_LOCKS_USE_ATOMICS
 	lint local_lock_word;
 
 	os_rmb;
 	local_lock_word = lock->lock_word;
-	while (local_lock_word > 0) {
+	while (local_lock_word > threshold) {
 		if (os_compare_and_swap_lint(&lock->lock_word,
 					     local_lock_word,
 					     local_lock_word - amount)) {
-			return(TRUE);
+			return(true);
 		}
 		local_lock_word = lock->lock_word;
 	}
-	return(FALSE);
+	return(false);
 #else /* INNODB_RW_LOCKS_USE_ATOMICS */
-	ibool success = FALSE;
+	bool success = false;
 	mutex_enter(&(lock->mutex));
-	if (lock->lock_word > 0) {
+	if (lock->lock_word > threshold) {
 		lock->lock_word -= amount;
-		success = TRUE;
+		success = true;
 	}
 	mutex_exit(&(lock->mutex));
 	return(success);
@@ -323,7 +394,7 @@ void
 rw_lock_set_writer_id_and_recursion_flag(
 /*=====================================*/
 	rw_lock_t*	lock,		/*!< in/out: lock to work on */
-	ibool		recursive)	/*!< in: TRUE if recursion
+	bool		recursive)	/*!< in: true if recursion
 					allowed */
 {
 	os_thread_id_t	curr_thread	= os_thread_get_curr_id();
@@ -369,7 +440,7 @@ rw_lock_s_lock_low(
 	const char*	file_name, /*!< in: file name where lock requested */
 	ulint		line)	/*!< in: line where requested */
 {
-	if (!rw_lock_lock_word_decr(lock, 1)) {
+	if (!rw_lock_lock_word_decr(lock, 1, 0)) {
 		/* Locking did not succeed */
 		return(FALSE);
 	}
@@ -537,7 +608,7 @@ rw_lock_x_lock_func_nowait(
 	To achieve this we load it before os_compare_and_swap_lint(),
 	which implies full memory barrier in current implementation. */
 	if (success) {
-		rw_lock_set_writer_id_and_recursion_flag(lock, TRUE);
+		rw_lock_set_writer_id_and_recursion_flag(lock, true);
 
 	} else if (local_recursive
 		   && os_thread_eq(lock->writer_thread,
@@ -545,9 +616,12 @@ rw_lock_x_lock_func_nowait(
 		/* Relock: this lock_word modification is safe since no other
 		threads can modify (lock, unlock, or reserve) lock_word while
 		there is an exclusive writer and this is the writer thread. */
-		if (lock->lock_word == 0) {
-			lock->lock_word = -X_LOCK_DECR;
+		if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) {
+			/* There are 1 x-locks */
+			lock->lock_word -= X_LOCK_DECR;
 		} else {
+			ut_ad(lock->lock_word <= -X_LOCK_DECR);
+			/* There are 2 or more x-locks */
 			lock->lock_word--;
 		}
 
@@ -571,7 +645,7 @@ rw_lock_x_lock_func_nowait(
 	lock->last_x_file_name = file_name;
 	lock->last_x_line = line;
 
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t*)lock));
 
 	return(TRUE);
 }
@@ -597,7 +671,8 @@ rw_lock_s_unlock_func(
 #endif
 
 	/* Increment lock_word to indicate 1 less reader */
-	if (rw_lock_lock_word_incr(lock, 1) == 0) {
+	lint	lock_word = rw_lock_lock_word_incr(lock, 1);
+	if (lock_word == 0 || lock_word == -X_LOCK_HALF_DECR) {
 
 		/* wait_ex waiter exists. It may not be asleep, but we signal
 		anyway. We do not wake other waiters, because they can't
@@ -607,7 +682,7 @@ rw_lock_s_unlock_func(
 
 	}
 
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t*)lock));
 
 #ifdef UNIV_SYNC_PERF_STAT
 	rw_s_exit_count++;
@@ -680,7 +755,8 @@ rw_lock_x_prepare_unlock(
 #endif
 	rw_lock_t*	lock)	/*!< in/out: rw-lock */
 {
-	ut_ad(lock->lock_word == 0 || lock->lock_word <= -X_LOCK_DECR);
+	ut_ad(lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR
+	      || lock->lock_word <= -X_LOCK_DECR);
 
 	/* lock->recursive flag also indicates if lock->writer_thread is
 	valid or stale. If we are the last of the recursive callers
@@ -722,25 +798,52 @@ rw_lock_x_unlock_func(
 #endif
 	rw_lock_t*	lock)	/*!< in/out: rw-lock */
 {
-	ulint x_lock_incr = rw_lock_x_prepare_unlock(
+	ut_ad(lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR
+	      || lock->lock_word <= -X_LOCK_DECR);
+
+	/* lock->recursive flag also indicates if lock->writer_thread is
+	valid or stale. If we are the last of the recursive callers
+	then we must unset lock->recursive flag to indicate that the
+	lock->writer_thread is now stale.
+	Note that since we still hold the x-lock we can safely read the
+	lock_word. */
+	if (lock->lock_word == 0) {
+		/* Last caller in a possible recursive chain. */
+		lock->recursive = FALSE;
+	}
+
 #ifdef UNIV_SYNC_DEBUG
-						     pass,
+	rw_lock_remove_debug_info(lock, pass, RW_LOCK_EX);
 #endif
-						     lock);
 
-	if (rw_lock_lock_word_incr(lock, x_lock_incr) == X_LOCK_DECR) {
-		/* Lock is now free. May have to signal read/write waiters.
+	if (lock->lock_word == 0 || lock->lock_word == -X_LOCK_HALF_DECR) {
+		/* There is 1 x-lock */
+		/* atomic increment is needed, because it is last */
+		if (rw_lock_lock_word_incr(lock, X_LOCK_DECR) <= 0) {
+			ut_error;
+		}
+
+		/* This no longer has an X-lock but it may still have
+		an SX-lock. So it is now free for S-locks by other threads.
+		We need to signal read/write waiters.
 		We do not need to signal wait_ex waiters, since they cannot
 		exist when there is a writer. */
-
 		if (lock->waiters) {
 			rw_lock_reset_waiter_flag(lock);
 			os_event_set(lock->event);
 			sync_array_object_signalled();
 		}
+	} else if (lock->lock_word == -X_LOCK_DECR
+		   || lock->lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR)) {
+		/* There are 2 x-locks */
+		lock->lock_word += X_LOCK_DECR;
+	} else {
+		/* There are more than 2 x-locks. */
+		ut_ad(lock->lock_word < -X_LOCK_DECR);
+		lock->lock_word += 1;
 	}
 
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t*)lock));
 
 #ifdef UNIV_SYNC_PERF_STAT
 	rw_x_exit_count++;
@@ -799,6 +902,61 @@ rw_lock_x_unlock_func(
 #endif
 }
 
+/******************************************************************//**
+Releases a sx mode lock. */
+UNIV_INLINE
+void
+rw_lock_sx_unlock_func(
+/*===================*/
+#ifdef UNIV_SYNC_DEBUG
+	ulint		pass,	/*!< in: pass value; != 0, if the lock may have
+				been passed to another thread to unlock */
+#endif /* UNIV_SYNC_DEBUG */
+	rw_lock_t*	lock)	/*!< in/out: rw-lock */
+{
+	ut_ad(rw_lock_get_sx_lock_count(lock));
+	ut_a(lock->sx_recursive > 0);
+
+	lock->sx_recursive -= 1;
+
+#ifdef UNIV_SYNC_DEBUG
+	rw_lock_remove_debug_info(lock, pass, RW_LOCK_SX);
+#endif /* UNIV_SYNC_DEBUG */
+
+	if (lock->sx_recursive == 0) {
+		/* Last caller in a possible recursive chain. */
+		if (lock->lock_word > 0) {
+			lock->recursive = FALSE;
+			UNIV_MEM_INVALID(&lock->writer_thread,
+					 sizeof lock->writer_thread);
+
+			if (rw_lock_lock_word_incr(lock, X_LOCK_HALF_DECR)
+			    <= X_LOCK_HALF_DECR) {
+				ut_error;
+			}
+			/* Lock is now free. May have to signal read/write
+			waiters. We do not need to signal wait_ex waiters,
+			since they cannot exist when there is an sx-lock
+			holder. */
+			if (lock->waiters) {
+				rw_lock_reset_waiter_flag(lock);
+				os_event_set(lock->event);
+				sync_array_object_signalled();
+			}
+		} else {
+			/* still has x-lock */
+			ut_ad(lock->lock_word == -X_LOCK_HALF_DECR
+			      || lock->lock_word <= -(X_LOCK_DECR
+						      + X_LOCK_HALF_DECR));
+			lock->lock_word += X_LOCK_HALF_DECR;
+		}
+	}
+
+#ifdef UNIV_SYNC_PERF_STAT
+	rw_sx_exit_count++;
+#endif /* UNIV_SYNC_PERF_STAT */
+}
+
 #ifdef UNIV_PFS_RWLOCK
 
 /******************************************************************//**
diff --git a/storage/xtradb/include/sync0sync.h b/storage/xtradb/include/sync0sync.h
index 0d36a8e..669102f 100644
--- a/storage/xtradb/include/sync0sync.h
+++ b/storage/xtradb/include/sync0sync.h
@@ -922,10 +922,21 @@ or row lock! */
 #define RW_LOCK_EXCLUSIVE	351
 #define RW_LOCK_SHARED		352
 #define RW_LOCK_WAIT_EX		353
-#define SYNC_MUTEX		354
-#define SYNC_PRIO_MUTEX		355
-#define PRIO_RW_LOCK_EX		356
-#define PRIO_RW_LOCK_SHARED	357
+#define RW_LOCK_SX		354
+#define SYNC_MUTEX		355
+#define SYNC_PRIO_MUTEX		356
+#define PRIO_RW_LOCK_EX		357
+#define PRIO_RW_LOCK_SHARED	358
+
+#ifdef UNIV_SYNC_DEBUG
+/* Flags to specify lock types for rw_lock_own_flagged() */
+enum rw_lock_flag_t {
+	RW_LOCK_FLAG_S  = 1 << 0,
+	RW_LOCK_FLAG_X  = 1 << 1,
+	RW_LOCK_FLAG_SX = 1 << 2
+};
+typedef ulint rw_lock_flags_t;
+#endif /* UNIV_SYNC_DEBUG */
 
 /* NOTE! The structure appears here only for the compiler to know its size.
 Do not use its fields directly! The structure used in the spin lock
diff --git a/storage/xtradb/mtr/mtr0mtr.cc b/storage/xtradb/mtr/mtr0mtr.cc
index 4010fcc..b9c2bf4 100644
--- a/storage/xtradb/mtr/mtr0mtr.cc
+++ b/storage/xtradb/mtr/mtr0mtr.cc
@@ -79,6 +79,7 @@ mtr_memo_slot_release_func(
 	switch (slot->type) {
 	case MTR_MEMO_PAGE_S_FIX:
 	case MTR_MEMO_PAGE_X_FIX:
+	case MTR_MEMO_PAGE_SX_FIX:
 	case MTR_MEMO_BUF_FIX:
 		buf_page_release((buf_block_t*) object, slot->type);
 		break;
@@ -88,10 +89,15 @@ mtr_memo_slot_release_func(
 	case MTR_MEMO_X_LOCK:
 		rw_lock_x_unlock((prio_rw_lock_t*) object);
 		break;
+	case MTR_MEMO_SX_LOCK:
+		rw_lock_sx_unlock((rw_lock_t*) object);
+		break;
 #ifdef UNIV_DEBUG
 	default:
 		ut_ad(slot->type == MTR_MEMO_MODIFY);
-		ut_ad(mtr_memo_contains(mtr, object, MTR_MEMO_PAGE_X_FIX));
+		ut_ad(mtr_memo_contains_flagged(mtr, object,
+						MTR_MEMO_PAGE_X_FIX
+						| MTR_MEMO_PAGE_SX_FIX));
 #endif /* UNIV_DEBUG */
 	}
 }
@@ -150,11 +156,13 @@ mtr_memo_slot_note_modification(
 	ut_ad(!srv_read_only_mode);
 	ut_ad(mtr->magic_n == MTR_MAGIC_N);
 
-	if (slot->object != NULL && slot->type == MTR_MEMO_PAGE_X_FIX) {
-		buf_block_t*	block = (buf_block_t*) slot->object;
-
-		ut_ad(!mtr->made_dirty || log_flush_order_mutex_own());
-		buf_flush_note_modification(block, mtr);
+	if (buf_block_t* block = static_cast<buf_block_t*>(slot->object)) {
+		switch (slot->type) {
+		case MTR_MEMO_PAGE_X_FIX:
+		case MTR_MEMO_PAGE_SX_FIX:
+			ut_ad(!mtr->made_dirty || log_flush_order_mutex_own());
+			buf_flush_note_modification(block, mtr);
+		}
 	}
 }
 
@@ -387,6 +395,65 @@ mtr_memo_release(
 
 	return(false);
 }
+
+/**********************************************************//**
+Releases the block in an mtr memo after a savepoint. */
+
+void
+mtr_release_block_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block)		/*!< in: block to release */
+{
+	mtr_memo_slot_t* slot;
+	dyn_array_t*	memo;
+
+	ut_ad(mtr);
+	ut_ad(mtr->magic_n == MTR_MAGIC_N);
+	ut_ad(mtr->state == MTR_ACTIVE);
+
+	memo = &mtr->memo;
+
+	slot = static_cast<mtr_memo_slot_t*>(
+		dyn_array_get_element(memo, savepoint));
+
+	ut_a(slot->object == block);
+
+	buf_page_release(block, slot->type);
+
+	slot->object = NULL;
+}
+/**********************************************************//**
+Relax the block latch in an mtr memo after a savepoint
+from X to SX. */
+
+void
+mtr_block_x_to_sx_at_savepoint(
+/*===========================*/
+	mtr_t*		mtr,		/*!< in: mtr */
+	ulint		savepoint,	/*!< in: savepoint */
+	buf_block_t*	block)		/*!< in: block to relax latch */
+{
+	mtr_memo_slot_t* slot;
+	dyn_array_t*	memo;
+
+	ut_ad(mtr);
+	ut_ad(mtr->magic_n == MTR_MAGIC_N);
+	ut_ad(mtr->state == MTR_ACTIVE);
+
+	memo = &mtr->memo;
+
+	slot = static_cast<mtr_memo_slot_t*>(
+		dyn_array_get_element(memo, savepoint));
+
+	ut_a(slot->object == block);
+	ut_a(slot->type == MTR_MEMO_PAGE_X_FIX);
+
+	rw_lock_sx_lock(&block->lock);
+	rw_lock_x_unlock(&block->lock);
+	slot->type = MTR_MEMO_PAGE_SX_FIX;
+}
 #endif /* !UNIV_HOTBACKUP */
 
 /********************************************************//**
@@ -402,8 +469,10 @@ mtr_read_ulint(
 				/*!< in: mini-transaction handle */
 {
 	ut_ad(mtr->state == MTR_ACTIVE);
-	ut_ad(mtr_memo_contains_page(mtr, ptr, MTR_MEMO_PAGE_S_FIX)
-	      || mtr_memo_contains_page(mtr, ptr, MTR_MEMO_PAGE_X_FIX));
+	ut_ad(mtr_memo_contains_page_flagged(mtr, ptr,
+					     MTR_MEMO_PAGE_S_FIX
+					     | MTR_MEMO_PAGE_X_FIX
+					     | MTR_MEMO_PAGE_SX_FIX));
 
 	return(mach_read_ulint(ptr, type));
 }
@@ -424,6 +493,21 @@ mtr_memo_contains_page(
 	return(mtr_memo_contains(mtr, buf_block_align(ptr), type));
 }
 
+/**********************************************************//**
+Checks if memo contains the given page.
+ at return true if contains */
+
+bool
+mtr_memo_contains_page_flagged(
+/*===========================*/
+	const mtr_t*	mtr,	/*!< in: mtr */
+	const byte*	ptr,	/*!< in: pointer to buffer frame */
+	ulint		flags)	/*!< in: specify types of object with
+				OR of MTR_MEMO_PAGE_S_FIX... values */
+{
+	return(mtr_memo_contains_flagged(mtr, buf_block_align(ptr), flags));
+}
+
 /*********************************************************//**
 Prints info of an mtr handle. */
 UNIV_INTERN
diff --git a/storage/xtradb/srv/srv0mon.cc b/storage/xtradb/srv/srv0mon.cc
index 5bac621..e49279a 100644
--- a/storage/xtradb/srv/srv0mon.cc
+++ b/storage/xtradb/srv/srv0mon.cc
@@ -1237,6 +1237,12 @@ static monitor_info_t	innodb_counter_info[] =
 	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
 	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_X_SPIN_WAITS},
 
+	{"innodb_rwlock_sx_spin_waits", "server",
+	 "Number of rwlock spin waits due to sx latch request",
+	 static_cast<monitor_type_t>(
+	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
+	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_SX_SPIN_WAITS},
+
 	{"innodb_rwlock_s_spin_rounds", "server",
 	 "Number of rwlock spin loop rounds due to shared latch request",
 	 static_cast<monitor_type_t>(
@@ -1249,6 +1255,12 @@ static monitor_info_t	innodb_counter_info[] =
 	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
 	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_X_SPIN_ROUNDS},
 
+	{"innodb_rwlock_sx_spin_rounds", "server",
+	 "Number of rwlock spin loop rounds due to sx latch request",
+	 static_cast<monitor_type_t>(
+	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
+	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_SX_SPIN_ROUNDS},
+
 	{"innodb_rwlock_s_os_waits", "server",
 	 "Number of OS waits due to shared latch request",
 	 static_cast<monitor_type_t>(
@@ -1261,6 +1273,12 @@ static monitor_info_t	innodb_counter_info[] =
 	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
 	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_X_OS_WAITS},
 
+	{"innodb_rwlock_sx_os_waits", "server",
+	 "Number of OS waits due to sx latch request",
+	 static_cast<monitor_type_t>(
+	 MONITOR_EXISTING | MONITOR_DEFAULT_ON),
+	 MONITOR_DEFAULT_START, MONITOR_OVLD_RWLOCK_SX_OS_WAITS},
+
 	/* ========== Counters for DML operations ========== */
 	{"module_dml", "dml", "Statistics for DMLs",
 	 MONITOR_MODULE,
@@ -1804,6 +1822,10 @@ srv_mon_process_existing_counter(
 		value = rw_lock_stats.rw_x_spin_wait_count;
 		break;
 
+	case MONITOR_OVLD_RWLOCK_SX_SPIN_WAITS:
+		value = rw_lock_stats.rw_sx_spin_wait_count;
+		break;
+
 	case MONITOR_OVLD_RWLOCK_S_SPIN_ROUNDS:
 		value = rw_lock_stats.rw_s_spin_round_count;
 		break;
@@ -1812,6 +1834,10 @@ srv_mon_process_existing_counter(
 		value = rw_lock_stats.rw_x_spin_round_count;
 		break;
 
+	case MONITOR_OVLD_RWLOCK_SX_SPIN_ROUNDS:
+		value = rw_lock_stats.rw_sx_spin_round_count;
+		break;
+
 	case MONITOR_OVLD_RWLOCK_S_OS_WAITS:
 		value = rw_lock_stats.rw_s_os_wait_count;
 		break;
@@ -1820,6 +1846,10 @@ srv_mon_process_existing_counter(
 		value = rw_lock_stats.rw_x_os_wait_count;
 		break;
 
+	case MONITOR_OVLD_RWLOCK_SX_OS_WAITS:
+		value = rw_lock_stats.rw_sx_os_wait_count;
+		break;
+
 	case MONITOR_OVLD_BUFFER_POOL_SIZE:
 		value = srv_buf_pool_size;
 		break;
diff --git a/storage/xtradb/sync/sync0arr.cc b/storage/xtradb/sync/sync0arr.cc
index 01cae70..29b0da7 100644
--- a/storage/xtradb/sync/sync0arr.cc
+++ b/storage/xtradb/sync/sync0arr.cc
@@ -550,12 +550,14 @@ sync_array_cell_print(
 	} else if (type == RW_LOCK_EX
 		   || type == RW_LOCK_WAIT_EX
 		   || type == RW_LOCK_SHARED
+		   || type == RW_LOCK_SX
 		   || type == PRIO_RW_LOCK_SHARED
 		   || type == PRIO_RW_LOCK_EX) {
 
 		fputs((type == RW_LOCK_EX || type == PRIO_RW_LOCK_EX)
 		      ? "X-lock on"
 		      : type == RW_LOCK_WAIT_EX ? "X-lock (wait_ex) on"
+		      : type == RW_LOCK_SX ? "SX-lock on"
 		      : "S-lock on", file);
 
 		/* Currently we are unable to tell high priority
@@ -585,8 +587,8 @@ sync_array_cell_print(
 					"a writer (thread id %lu) has"
 					" reserved it in mode %s",
 					(ulong) os_thread_pf(rwlock->writer_thread),
-					writer == RW_LOCK_EX
-					? " exclusive\n"
+					writer == RW_LOCK_EX ? " exclusive\n"
+					: writer == RW_LOCK_SX ? " SX\n"
 					: " wait exclusive\n");
 
 				*reserver = rwlock->writer_thread;
@@ -716,9 +718,9 @@ sync_array_detect_deadlock(
 		return(FALSE); /* No deadlock here */
 	}
 
-	if (cell->request_type == SYNC_MUTEX
-	    || cell->request_type == SYNC_PRIO_MUTEX) {
-
+	switch (cell->request_type) {
+	case SYNC_MUTEX:
+	case SYNC_PRIO_MUTEX:
 		if (cell->request_type == SYNC_MUTEX) {
 			mutex = static_cast<ib_mutex_t*>(cell->wait_object);
 		} else {
@@ -753,9 +755,9 @@ sync_array_detect_deadlock(
 
 		return(FALSE); /* No deadlock */
 
-	} else if (cell->request_type == RW_LOCK_EX
-		   || cell->request_type == PRIO_RW_LOCK_EX
-		   || cell->request_type == RW_LOCK_WAIT_EX) {
+	case RW_LOCK_EX:
+	case RW_LOCK_WAIT_EX:
+	case PRIO_RW_LOCK_EX:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
@@ -765,17 +767,21 @@ sync_array_detect_deadlock(
 
 			thread = debug->thread_id;
 
-			if (((debug->lock_type == RW_LOCK_EX)
-			     && !os_thread_eq(thread, cell->thread))
-			    || ((debug->lock_type == RW_LOCK_WAIT_EX)
-				&& !os_thread_eq(thread, cell->thread))
-			    || (debug->lock_type == RW_LOCK_SHARED)) {
+			switch (debug->lock_type) {
+			case RW_LOCK_EX:
+			case RW_LOCK_WAIT_EX:
+			case RW_LOCK_SX:
+				if (os_thread_eq(thread, cell->thread)) {
+					break;
+				}
+				/* fall through */
+			case RW_LOCK_SHARED:
 
 				/* The (wait) x-lock request can block
 				infinitely only if someone (can be also cell
 				thread) is holding s-lock, or someone
-				(cannot be cell thread) (wait) x-lock, and
-				he is blocked by start thread */
+				(cannot be cell thread) (wait) x-lock or
+				sx-lock, and he is blocked by start thread */
 
 				ret = sync_array_deadlock_step(
 					arr, start, thread, debug->pass,
@@ -793,8 +799,43 @@ sync_array_detect_deadlock(
 
 		return(FALSE);
 
-	} else if (cell->request_type == RW_LOCK_SHARED
-		   || cell->request_type == PRIO_RW_LOCK_SHARED) {
+	case RW_LOCK_SX:
+
+		lock = static_cast<rw_lock_t*>(cell->wait_object);
+
+		for (debug = UT_LIST_GET_FIRST(lock->debug_list);
+		     debug != 0;
+		     debug = UT_LIST_GET_NEXT(list, debug)) {
+
+			thread = debug->thread_id;
+
+			switch (debug->lock_type) {
+			case RW_LOCK_EX:
+			case RW_LOCK_WAIT_EX:
+			case RW_LOCK_SX:
+			case PRIO_RW_LOCK_EX:
+				if (os_thread_eq(thread, cell->thread)) {
+					break;
+				}
+
+				/* The sx-lock request can block infinitely
+				only if someone (can be also cell thread) is
+				holding (wait) x-lock or sx-lock, and he is
+				blocked by start thread */
+
+				ret = sync_array_deadlock_step(
+					arr, start, thread, debug->pass,
+					depth);
+				if (ret) {
+					goto print;
+				}
+			}
+		}
+
+		return(FALSE);
+
+	case RW_LOCK_SHARED:
+	case PRIO_RW_LOCK_SHARED:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
@@ -823,12 +864,9 @@ sync_array_detect_deadlock(
 
 		return(FALSE);
 
-	} else {
+	default:
 		ut_error;
 	}
-
-	return(TRUE);	/* Execution never reaches this line: for compiler
-			fooling only */
 }
 #endif /* UNIV_SYNC_DEBUG */
 
@@ -843,8 +881,9 @@ sync_arr_cell_can_wake_up(
 	ib_mutex_t*	mutex;
 	rw_lock_t*	lock;
 
-	if (cell->request_type == SYNC_MUTEX
-	    || cell->request_type == SYNC_PRIO_MUTEX) {
+	switch (cell->request_type) {
+	case SYNC_MUTEX:
+	case SYNC_PRIO_MUTEX:
 
 		if (cell->request_type == SYNC_MUTEX) {
 			mutex = static_cast<ib_mutex_t*>(cell->wait_object);
@@ -859,30 +898,38 @@ sync_arr_cell_can_wake_up(
 			return(TRUE);
 		}
 
-	} else if (cell->request_type == RW_LOCK_EX
-		   || cell->request_type == PRIO_RW_LOCK_EX) {
+		break;
+
+	case RW_LOCK_EX:
+	case RW_LOCK_SX:
+        case PRIO_RW_LOCK_EX:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
 		os_rmb;
-		if (lock->lock_word > 0) {
+		if (lock->lock_word > X_LOCK_HALF_DECR) {
 		/* Either unlocked or only read locked. */
 
 			return(TRUE);
 		}
 
-        } else if (cell->request_type == RW_LOCK_WAIT_EX) {
+		break;
+
+	case RW_LOCK_WAIT_EX:
 
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
-                /* lock_word == 0 means all readers have left */
+                /* lock_word == 0 means all readers or sx have left */
 		os_rmb;
 		if (lock->lock_word == 0) {
 
 			return(TRUE);
 		}
-	} else if (cell->request_type == RW_LOCK_SHARED
-		   || cell->request_type == PRIO_RW_LOCK_SHARED) {
+
+		break;
+	case RW_LOCK_SHARED:
+	case PRIO_RW_LOCK_SHARED:
+
 		lock = static_cast<rw_lock_t*>(cell->wait_object);
 
                 /* lock_word > 0 means no writer or reserved writer */
@@ -891,8 +938,8 @@ sync_arr_cell_can_wake_up(
 
 			return(TRUE);
 		}
-	} else {
-
+		break;
+	default:
 		ut_error;
 	}
 
diff --git a/storage/xtradb/sync/sync0rw.cc b/storage/xtradb/sync/sync0rw.cc
index 00fb5e5..c3090e4 100644
--- a/storage/xtradb/sync/sync0rw.cc
+++ b/storage/xtradb/sync/sync0rw.cc
@@ -48,22 +48,45 @@ Created 9/11/1995 Heikki Tuuri
 	=============================
 The status of a rw_lock is held in lock_word. The initial value of lock_word is
 X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
-for each x-lock. This describes the lock state for each value of lock_word:
-
-lock_word == X_LOCK_DECR:      Unlocked.
-0 < lock_word < X_LOCK_DECR:   Read locked, no waiting writers.
-			       (X_LOCK_DECR - lock_word) is the
-			       number of readers that hold the lock.
-lock_word == 0:		       Write locked
--X_LOCK_DECR < lock_word < 0:  Read locked, with a waiting writer.
-			       (-lock_word) is the number of readers
-			       that hold the lock.
-lock_word <= -X_LOCK_DECR:     Recursively write locked. lock_word has been
-			       decremented by X_LOCK_DECR for the first lock
-			       and the first recursive lock, then by 1 for
-			       each recursive lock thereafter.
-			       So the number of locks is:
-			       (lock_copy == 0) ? 1 : 2 - (lock_copy + X_LOCK_DECR)
+or 1 for each x-lock. This describes the lock state for each value of lock_word:
+
+lock_word == X_LOCK_DECR:	Unlocked.
+X_LOCK_HALF_DECR < lock_word < X_LOCK_DECR:
+				S locked, no waiting writers.
+				(X_LOCK_DECR - lock_word) is the number
+				of S locks.
+lock_word == X_LOCK_HALF_DECR:	SX locked, no waiting writers.
+0 < lock_word < X_LOCK_HALF_DECR:
+				SX locked AND S locked, no waiting writers.
+				(X_LOCK_HALF_DECR - lock_word) is the number
+				of S locks.
+lock_word == 0:			X locked, no waiting writers.
+-X_LOCK_HALF_DECR < lock_word < 0:
+				S locked, with a waiting writer.
+				(-lock_word) is the number of S locks.
+lock_word == -X_LOCK_HALF_DECR:	X locked and SX locked, no waiting writers.
+-X_LOCK_DECR < lock_word < -X_LOCK_HALF_DECR:
+				S locked, with a waiting writer
+				which has SX lock.
+				-(lock_word + X_LOCK_HALF_DECR) is the number
+				of S locks.
+lock_word == -X_LOCK_DECR:	X locked with recursive X lock (2 X locks).
+-(X_LOCK_DECR + X_LOCK_HALF_DECR) < lock_word < -X_LOCK_DECR:
+				X locked. The number of the X locks is:
+				2 - (lock_word + X_LOCK_DECR)
+lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR):
+				X locked with recursive X lock (2 X locks)
+				and SX locked.
+lock_word < -(X_LOCK_DECR + X_LOCK_HALF_DECR):
+				X locked and SX locked.
+				The number of the X locks is:
+				2 - (lock_word + X_LOCK_DECR + X_LOCK_HALF_DECR)
+
+ LOCK COMPATIBILITY MATRIX
+    S SX  X
+ S  +  +  -
+ SX +  -  -
+ X  -  -  -
 
 The lock_word is always read and updated atomically and consistently, so that
 it always represents the state of the lock, and the state of the lock changes
@@ -71,12 +94,13 @@ with a single atomic operation. This lock_word holds all of the information
 that a thread needs in order to determine if it is eligible to gain the lock
 or if it must spin or sleep. The one exception to this is that writer_thread
 must be verified before recursive write locks: to solve this scenario, we make
-writer_thread readable by all threads, but only writeable by the x-lock holder.
+writer_thread readable by all threads, but only writeable by the x-lock or
+sx-lock holder.
 
 The other members of the lock obey the following rules to remain consistent:
 
 recursive:	This and the writer_thread field together control the
-		behaviour of recursive x-locking.
+		behaviour of recursive x-locking or sx-locking.
 		lock->recursive must be FALSE in following states:
 			1) The writer_thread contains garbage i.e.: the
 			lock has just been initialized.
@@ -239,6 +263,7 @@ rw_lock_create_func(
 	contains garbage at initialization and cannot be used for
 	recursive x-locking. */
 	lock->recursive = FALSE;
+	lock->sx_recursive = 0;
 	/* Silence Valgrind when UNIV_DEBUG_VALGRIND is not enabled. */
 	memset((void*) &lock->writer_thread, 0, sizeof lock->writer_thread);
 	UNIV_MEM_INVALID(&lock->writer_thread, sizeof lock->writer_thread);
@@ -325,7 +350,7 @@ rw_lock_free_func(
 #endif /* !INNODB_RW_LOCKS_USE_ATOMICS */
 
 	os_rmb;
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t *)lock));
 	ut_a(lock->lock_word == X_LOCK_DECR);
 
 	mutex_enter(&rw_lock_list_mutex);
@@ -380,7 +405,7 @@ UNIV_INTERN
 ibool
 rw_lock_validate(
 /*=============*/
-	rw_lock_t*	lock)	/*!< in: rw-lock */
+	const rw_lock_t*	lock)	/*!< in: rw-lock */
 {
 	ulint	waiters;
 	lint	lock_word;
@@ -408,7 +433,7 @@ rw_lock_validate(
 /*=============*/
 	prio_rw_lock_t*	lock)	/*!< in: rw-lock */
 {
-	return(rw_lock_validate(&lock->base_lock));
+	return(rw_lock_validate((const rw_lock_t*)&lock->base_lock));
 }
 
 #endif /* UNIV_DEBUG */
@@ -444,7 +469,7 @@ rw_lock_s_lock_spin(
 
 	counter_index = (size_t) os_thread_get_curr_id();
 
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t*)lock));
 
 	rw_lock_stats.rw_s_spin_wait_count.add(counter_index, 1);
 lock_loop:
@@ -579,7 +604,7 @@ rw_lock_x_lock_move_ownership(
 {
 	ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX));
 
-	rw_lock_set_writer_id_and_recursion_flag(lock, TRUE);
+	rw_lock_set_writer_id_and_recursion_flag(lock, true);
 }
 
 /******************************************************************//**
@@ -587,8 +612,8 @@ Function for the next writer to call. Waits for readers to exit.
 The caller must have already decremented lock_word by X_LOCK_DECR. */
 UNIV_INLINE
 void
-rw_lock_x_lock_wait(
-/*================*/
+rw_lock_x_lock_wait_func(
+/*=====================*/
 	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
 	bool		high_priority,
 				/*!< in: if true, the rw lock is a priority
@@ -598,6 +623,7 @@ rw_lock_x_lock_wait(
 	ulint		pass,	/*!< in: pass value; != 0, if the lock will
 				be passed to another thread to unlock */
 #endif
+	lint		threshold,/*!< in: threshold to wait for */
 	const char*	file_name,/*!< in: file name where lock requested */
 	ulint		line)	/*!< in: line where requested */
 {
@@ -613,7 +639,7 @@ rw_lock_x_lock_wait(
 	counter_index = (size_t) os_thread_get_curr_id();
 
 	os_rmb;
-	ut_ad(lock->lock_word <= 0);
+	ut_ad(lock->lock_word <= threshold);
 
         HMT_low();
 	if (high_priority) {
@@ -622,7 +648,7 @@ rw_lock_x_lock_wait(
 		prio_rw_lock->high_priority_wait_ex_waiter = 1;
 	}
 
-	while (lock->lock_word < 0) {
+	while (lock->lock_word < threshold) {
 		if (srv_spin_wait_delay) {
 			ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
 		}
@@ -644,7 +670,7 @@ rw_lock_x_lock_wait(
 		i = 0;
 
 		/* Check lock_word to ensure wake-up isn't missed.*/
-		if (lock->lock_word < 0) {
+		if (lock->lock_word < threshold) {
 
 			/* these stats may not be accurate */
 			lock->count_os_wait++;
@@ -685,6 +711,13 @@ rw_lock_x_lock_wait(
 
 	rw_lock_stats.rw_x_spin_round_count.add(counter_index, i);
 }
+#ifdef UNIV_SYNC_DEBUG
+# define rw_lock_x_lock_wait(L, P, T, F, O)		\
+	rw_lock_x_lock_wait_func(L, P, T, F, O)
+#else
+# define rw_lock_x_lock_wait(L, P, T, F, O)		\
+	rw_lock_x_lock_wait_func(L, T, F, O)
+#endif
 
 /******************************************************************//**
 Low-level function for acquiring an exclusive lock.
@@ -705,7 +738,7 @@ rw_lock_x_lock_low(
 {
 	ibool local_recursive= lock->recursive;
 
-	if (rw_lock_lock_word_decr(lock, X_LOCK_DECR)) {
+	if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, X_LOCK_HALF_DECR)) {
 
 		/* lock->recursive also tells us if the writer_thread
 		field is stale or active. As we are going to write
@@ -715,29 +748,47 @@ rw_lock_x_lock_low(
 
 		/* Decrement occurred: we are writer or next-writer. */
 		rw_lock_set_writer_id_and_recursion_flag(
-			lock, pass ? FALSE : TRUE);
+			lock, !pass);
 
-		rw_lock_x_lock_wait(lock, high_priority,
-#ifdef UNIV_SYNC_DEBUG
+		rw_lock_x_lock_wait_func(lock, high_priority,
 				    pass,
-#endif
-				    file_name, line);
+			            0, file_name, line);
 
 	} else {
 		os_thread_id_t	thread_id = os_thread_get_curr_id();
 
-		/* Decrement failed: relock or failed lock
+		/* Decrement failed: An X or SX lock is held by either
+		this thread or another. Try to relock.
 		Note: recursive must be loaded before writer_thread see
 		comment for rw_lock_set_writer_id_and_recursion_flag().
 		To achieve this we load it before rw_lock_lock_word_decr(),
 		which implies full memory barrier in current implementation. */
 		if (!pass && local_recursive
 		    && os_thread_eq(lock->writer_thread, thread_id)) {
-			/* Relock */
-			if (lock->lock_word == 0) {
-				lock->lock_word -= X_LOCK_DECR;
+			/* Other s-locks can be allowed. If it is request x
+			recursively while holding sx lock, this x lock should
+			be along with the latching-order. */
+
+			/* The existing X or SX lock is from this thread */
+			if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, 0)) {
+				/* There is at least one SX-lock from this
+				thread, but no X-lock. */
+
+				/* Wait for any the other S-locks to be
+				released. */
+				rw_lock_x_lock_wait_func(lock, pass,
+						    -X_LOCK_HALF_DECR,
+					             0, file_name, line);
 			} else {
-				--lock->lock_word;
+				/* At least one X lock by this thread already
+				exists. Add another. */
+				if (lock->lock_word == 0
+				    || lock->lock_word == -X_LOCK_HALF_DECR) {
+					lock->lock_word -= X_LOCK_DECR;
+				} else {
+					ut_ad(lock->lock_word <= -X_LOCK_DECR);
+					--lock->lock_word;
+				}
 			}
 
 		} else {
@@ -761,6 +812,83 @@ rw_lock_x_lock_low(
 }
 
 /******************************************************************//**
+Low-level function for acquiring an sx lock.
+ at return FALSE if did not succeed, TRUE if success. */
+UNIV_INLINE
+ibool
+rw_lock_sx_lock_low(
+/*================*/
+	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
+	ulint		pass,	/*!< in: pass value; != 0, if the lock will
+				be passed to another thread to unlock */
+	const char*	file_name,/*!< in: file name where lock requested */
+	ulint		line)	/*!< in: line where requested */
+{
+	if (rw_lock_lock_word_decr(lock, X_LOCK_HALF_DECR, X_LOCK_HALF_DECR)) {
+
+		/* lock->recursive also tells us if the writer_thread
+		field is stale or active. As we are going to write
+		our own thread id in that field it must be that the
+		current writer_thread value is not active. */
+		ut_a(!lock->recursive);
+
+		/* Decrement occurred: we are the SX lock owner. */
+		rw_lock_set_writer_id_and_recursion_flag(
+			lock, !pass);
+
+		lock->sx_recursive = 1;
+
+	} else {
+		os_thread_id_t	thread_id = os_thread_get_curr_id();
+
+		/* Decrement failed: It already has an X or SX lock by this
+		thread or another thread. If it is this thread, relock,
+		else fail. */
+		if (!pass && lock->recursive
+		    && os_thread_eq(lock->writer_thread, thread_id)) {
+			/* This thread owns an X or SX lock */
+			if (lock->sx_recursive++ == 0) {
+				/* This thread is making first SX-lock request
+				and it must be holding at least one X-lock here
+				because:
+
+				* There can't be a WAIT_EX thread because we are
+				  the thread which has it's thread_id written in
+				  the writer_thread field and we are not waiting.
+
+				* Any other X-lock thread cannot exist because
+				  it must update recursive flag only after
+				  updating the thread_id. Had there been
+				  a concurrent X-locking thread which succeeded
+				  in decrementing the lock_word it must have
+				  written it's thread_id before setting the
+				  recursive flag. As we cleared the if()
+				  condition above therefore we must be the only
+				  thread working on this lock and it is safe to
+				  read and write to the lock_word. */
+
+				ut_ad((lock->lock_word == 0)
+				      || ((lock->lock_word <= -X_LOCK_DECR)
+					  && (lock->lock_word
+					      > -(X_LOCK_DECR
+						  + X_LOCK_HALF_DECR))));
+				lock->lock_word -= X_LOCK_HALF_DECR;
+			}
+		} else {
+			/* Another thread locked before us */
+			return(FALSE);
+		}
+	}
+#ifdef UNIV_SYNC_DEBUG
+	rw_lock_add_debug_info(lock, pass, RW_LOCK_SX, file_name, line);
+#endif /* UNIV_SYNC_DEBUG */
+	lock->last_x_file_name = file_name;
+	lock->last_x_line = (unsigned int) line;
+
+	return(TRUE);
+}
+
+/******************************************************************//**
 NOTE! Use the corresponding macro, not directly this function! Lock an
 rw-lock in exclusive mode for the current thread. If the rw-lock is locked
 in shared or exclusive mode, or there is an exclusive lock request waiting,
@@ -787,7 +915,7 @@ rw_lock_x_lock_func(
 	ulint		i;	/*!< spin round count */
 	ulint		index;	/*!< index of the reserved wait cell */
 	sync_array_t*	sync_arr;
-	ibool		spinning = FALSE;
+	bool		spinning = false;
 	size_t		counter_index;
 	prio_rw_lock_t*	prio_lock = NULL;
 
@@ -796,7 +924,7 @@ rw_lock_x_lock_func(
 
 	counter_index = (size_t) os_thread_get_curr_id();
 
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t *)lock));
 #ifdef UNIV_SYNC_DEBUG
 	ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED));
 #endif /* UNIV_SYNC_DEBUG */
@@ -819,7 +947,7 @@ rw_lock_x_lock_func(
 						      high_priority, lock)) {
 
 		if (!spinning) {
-			spinning = TRUE;
+			spinning = true;
 
 			rw_lock_stats.rw_x_spin_wait_count.add(
 				counter_index, 1);
@@ -829,7 +957,7 @@ rw_lock_x_lock_func(
 		os_rmb;
 		HMT_low();
 		while (i < SYNC_SPIN_ROUNDS
-		       && lock->lock_word <= 0) {
+		       && lock->lock_word <= X_LOCK_HALF_DECR) {
 			if (srv_spin_wait_delay) {
 				ut_delay(ut_rnd_interval(0,
 							 srv_spin_wait_delay));
@@ -940,6 +1068,103 @@ rw_lock_x_lock_func(
 			    srv_current_thread_priority > 0);
 }
 
+/******************************************************************//**
+NOTE! Use the corresponding macro, not directly this function! Lock an
+rw-lock in SX mode for the current thread. If the rw-lock is locked
+in exclusive mode, or there is an exclusive lock request waiting,
+the function spins a preset time (controlled by SYNC_SPIN_ROUNDS), waiting
+for the lock, before suspending the thread. If the same thread has an x-lock
+on the rw-lock, locking succeed, with the following exception: if pass != 0,
+only a single sx-lock may be taken on the lock. NOTE: If the same thread has
+an s-lock, locking does not succeed! */
+UNIV_INTERN
+void
+rw_lock_sx_lock_func(
+/*=================*/
+	rw_lock_t*	lock,	/*!< in: pointer to rw-lock */
+	ulint		pass,	/*!< in: pass value; != 0, if the lock will
+				be passed to another thread to unlock */
+	const char*	file_name,/*!< in: file name where lock requested */
+	ulint		line)	/*!< in: line where requested */
+
+{
+	ulint		i;	/*!< spin round count */
+	ulint		index;	/*!< index of the reserved wait cell */
+	sync_array_t*	sync_arr;
+	ibool		spinning = false;
+	size_t		counter_index;
+
+	/* We reuse the thread id to index into the counter, cache
+	it here for efficiency. */
+
+	counter_index = (size_t) os_thread_get_curr_id();
+
+	ut_ad(rw_lock_validate((const rw_lock_t *)lock));
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED));
+#endif /* UNIV_SYNC_DEBUG */
+
+	i = 0;
+
+lock_loop:
+
+	if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
+		rw_lock_stats.rw_sx_spin_round_count.add(counter_index, i);
+
+		return;	/* Locking succeeded */
+
+	} else {
+
+		if (!spinning) {
+			spinning = true;
+
+			rw_lock_stats.rw_sx_spin_wait_count.add(
+				counter_index, 1);
+		}
+
+		/* Spin waiting for the lock_word to become free */
+		while (i < SYNC_SPIN_ROUNDS
+		       && lock->lock_word <= X_LOCK_HALF_DECR) {
+			if (srv_spin_wait_delay) {
+				ut_delay(ut_rnd_interval(0,
+							 srv_spin_wait_delay));
+			}
+
+			i++;
+		}
+		if (i == SYNC_SPIN_ROUNDS) {
+			os_thread_yield();
+		} else {
+			goto lock_loop;
+		}
+	}
+
+	rw_lock_stats.rw_sx_spin_round_count.add(counter_index, i);
+
+	sync_arr = sync_array_get();
+
+	sync_array_reserve_cell(
+		sync_arr, lock, RW_LOCK_SX, file_name, line, &index);
+
+	/* Waiters must be set before checking lock_word, to ensure signal
+	is sent. This could lead to a few unnecessary wake-up signals. */
+	rw_lock_set_waiter_flag(lock);
+
+	if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
+		sync_array_free_cell(sync_arr, index);
+		return; /* Locking succeeded */
+	}
+
+	/* these stats may not be accurate */
+	lock->count_os_wait++;
+	rw_lock_stats.rw_sx_os_wait_count.add(counter_index, 1);
+
+	sync_array_wait_event(sync_arr, index);
+
+	i = 0;
+	goto lock_loop;
+}
+
 #ifdef UNIV_SYNC_DEBUG
 /******************************************************************//**
 Acquires the debug mutex. We cannot use the mutex defined in sync0sync,
@@ -997,9 +1222,16 @@ rw_lock_add_debug_info(
 	rw_lock_debug_mutex_exit();
 
 	if ((pass == 0) && (lock_type != RW_LOCK_WAIT_EX)) {
+		/* recursive x while holding sx
+		(lock_type == RW_LOCK_EX && lock_word == -X_LOCK_HALF_DECR)
+		is treated as not-relock (new lock). */
+		lint	lock_word = lock->lock_word;
 		sync_thread_add_level(lock, lock->level,
-				      lock_type == RW_LOCK_EX
-				      && lock->lock_word < 0);
+				      (lock_type == RW_LOCK_EX
+				       && lock_word < -X_LOCK_HALF_DECR)
+				      || (lock_type == RW_LOCK_SX
+					  && (lock_word < 0
+					      || lock->sx_recursive > 1)));
 	}
 }
 
@@ -1064,7 +1296,7 @@ rw_lock_own(
 	rw_lock_debug_t*	info;
 
 	ut_ad(lock);
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t *)lock));
 
 	rw_lock_debug_mutex_enter();
 
@@ -1103,37 +1335,87 @@ rw_lock_own(
 	return(rw_lock_own(&lock->base_lock, lock_type));
 }
 
+/******************************************************************//**
+Checks if the thread has locked the rw-lock in the specified mode, with
+the pass value == 0.
+ at return true if locked */
+UNIV_INTERN
+bool
+rw_lock_own_flagged(
+/*================*/
+	const rw_lock_t*	lock,	/*!< in: rw-lock */
+	rw_lock_flags_t		flags)	/*!< in: specify lock types with
+					OR of the rw_lock_flag_t values */
+{
+	rw_lock_debug_t*	info;
+
+	ut_ad(lock);
+	ut_ad(rw_lock_validate(lock));
+
+	rw_lock_debug_mutex_enter();
+
+	info = UT_LIST_GET_FIRST(lock->debug_list);
+
+	while (info != NULL) {
+
+		if (os_thread_eq(info->thread_id, os_thread_get_curr_id())
+		    && (info->pass == 0)) {
+			ulint	flag = 0;
+
+			switch (info->lock_type) {
+			case RW_LOCK_SHARED:
+				flag = RW_LOCK_FLAG_S;
+				break;
+			case RW_LOCK_EX:
+				flag = RW_LOCK_FLAG_X;
+				break;
+			case RW_LOCK_SX:
+				flag = RW_LOCK_FLAG_SX;
+			}
+
+			if (flags & flag) {
+				rw_lock_debug_mutex_exit();
+				/* Found! */
+
+				return(true);
+			}
+		}
+
+		info = UT_LIST_GET_NEXT(list, info);
+	}
+	rw_lock_debug_mutex_exit();
+
+	return(false);
+}
 #endif /* UNIV_SYNC_DEBUG */
 
 /******************************************************************//**
 Checks if somebody has locked the rw-lock in the specified mode.
- at return	TRUE if locked */
+ at return	true if locked */
 UNIV_INTERN
-ibool
+bool
 rw_lock_is_locked(
 /*==============*/
 	rw_lock_t*	lock,		/*!< in: rw-lock */
 	ulint		lock_type)	/*!< in: lock type: RW_LOCK_SHARED,
-					RW_LOCK_EX */
+					RW_LOCK_EX or RW_LOCK_SX*/
 {
-	ibool	ret	= FALSE;
-
 	ut_ad(lock);
-	ut_ad(rw_lock_validate(lock));
+	ut_ad(rw_lock_validate((const rw_lock_t *)lock));
 
-	if (lock_type == RW_LOCK_SHARED) {
-		if (rw_lock_get_reader_count(lock) > 0) {
-			ret = TRUE;
-		}
-	} else if (lock_type == RW_LOCK_EX) {
-		if (rw_lock_get_writer(lock) == RW_LOCK_EX) {
-			ret = TRUE;
-		}
-	} else {
+	switch (lock_type) {
+	case RW_LOCK_SHARED:
+		return(rw_lock_get_reader_count(lock) > 0);
+
+	case RW_LOCK_EX:
+		return(rw_lock_get_writer(lock) == RW_LOCK_EX);
+
+	case RW_LOCK_SX:
+		return(rw_lock_get_sx_lock_count(lock) > 0);
+
+	default:
 		ut_error;
 	}
-
-	return(ret);
 }
 
 #ifdef UNIV_SYNC_DEBUG
@@ -1250,15 +1532,24 @@ rw_lock_debug_print(
 	fprintf(f, "Locked: thread %lu file %s line %lu  ",
 		(ulong) os_thread_pf(info->thread_id), info->file_name,
 		(ulong) info->line);
-	if (rwt == RW_LOCK_SHARED) {
+
+	switch (rwt) {
+	case RW_LOCK_SHARED:
 		fputs("S-LOCK", f);
-	} else if (rwt == RW_LOCK_EX) {
+		break;
+	case RW_LOCK_EX:
 		fputs("X-LOCK", f);
-	} else if (rwt == RW_LOCK_WAIT_EX) {
+		break;
+	case RW_LOCK_SX:
+		fputs("SX-LOCK", f);
+		break;
+	case RW_LOCK_WAIT_EX:
 		fputs("WAIT X-LOCK", f);
-	} else {
+		break;
+	default:
 		ut_error;
 	}
+
 	if (info->pass != 0) {
 		fprintf(f, " pass value %lu", (ulong) info->pass);
 	}
diff --git a/storage/xtradb/sync/sync0sync.cc b/storage/xtradb/sync/sync0sync.cc
index 2bdaaa3..8ec61cd 100644
--- a/storage/xtradb/sync/sync0sync.cc
+++ b/storage/xtradb/sync/sync0sync.cc
@@ -1706,12 +1706,14 @@ sync_print_wait_info(
 	FILE*	file)		/*!< in: file where to print */
 {
 	fprintf(file,
-		"Mutex spin waits " UINT64PF ", rounds " UINT64PF ", "
-		"OS waits " UINT64PF "\n"
-		"RW-shared spins " UINT64PF ", rounds " UINT64PF ", "
-		"OS waits " UINT64PF "\n"
-		"RW-excl spins " UINT64PF ", rounds " UINT64PF ", "
-		"OS waits " UINT64PF "\n",
+		"Mutex spin waits "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n"
+		"RW-shared spins "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n"
+		"RW-excl spins "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n"
+		"RW-sx spins "UINT64PF", rounds "UINT64PF", "
+		"OS waits "UINT64PF"\n",
 		(ib_uint64_t) mutex_spin_wait_count,
 		(ib_uint64_t) mutex_spin_round_count,
 		(ib_uint64_t) mutex_os_wait_count,
@@ -1720,11 +1722,14 @@ sync_print_wait_info(
 		(ib_uint64_t) rw_lock_stats.rw_s_os_wait_count,
 		(ib_uint64_t) rw_lock_stats.rw_x_spin_wait_count,
 		(ib_uint64_t) rw_lock_stats.rw_x_spin_round_count,
-		(ib_uint64_t) rw_lock_stats.rw_x_os_wait_count);
+		(ib_uint64_t) rw_lock_stats.rw_x_os_wait_count,
+		(ib_uint64_t) rw_lock_stats.rw_sx_spin_wait_count,
+		(ib_uint64_t) rw_lock_stats.rw_sx_spin_round_count,
+		(ib_uint64_t) rw_lock_stats.rw_sx_os_wait_count);
 
 	fprintf(file,
 		"Spin rounds per wait: %.2f mutex, %.2f RW-shared, "
-		"%.2f RW-excl\n",
+		"%.2f RW-excl, %.2f RW-sx\n",
 		(double) mutex_spin_round_count /
 		(mutex_spin_wait_count ? mutex_spin_wait_count : 1),
 		(double) rw_lock_stats.rw_s_spin_round_count /
@@ -1732,7 +1737,10 @@ sync_print_wait_info(
 		 ? rw_lock_stats.rw_s_spin_wait_count : 1),
 		(double) rw_lock_stats.rw_x_spin_round_count /
 		(rw_lock_stats.rw_x_spin_wait_count
-		 ? rw_lock_stats.rw_x_spin_wait_count : 1));
+		 ? rw_lock_stats.rw_x_spin_wait_count : 1),
+		(double) rw_lock_stats.rw_sx_spin_round_count /
+		(rw_lock_stats.rw_sx_spin_wait_count
+		 ? rw_lock_stats.rw_sx_spin_wait_count : 1));
 }
 
 /*******************************************************************//**



More information about the commits mailing list