[Commits] a49400fed32: Range Locking: Make range locks on secondary indexes inhibit DMLs

Sergei Petrunia psergey at askmonty.org
Mon Jan 7 18:52:48 EET 2019


revision-id: a49400fed3292fa3e2dde4215f50c58bd4d6b342 (fb-prod201801-186-ga49400fed32)
parent(s): 85cebce065b7e2880fa33db34673ede2b9fd391c
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-01-07 19:52:48 +0300
message:

Range Locking: Make range locks on secondary indexes inhibit DMLs

---
 mysql-test/suite/rocksdb/r/range_locking.result | 45 ++++++++++++
 mysql-test/suite/rocksdb/t/range_locking.test   | 52 ++++++++++++++
 storage/rocksdb/ha_rocksdb.cc                   | 93 ++++++++++++++++++-------
 storage/rocksdb/ha_rocksdb.h                    |  4 +-
 4 files changed, 166 insertions(+), 28 deletions(-)

diff --git a/mysql-test/suite/rocksdb/r/range_locking.result b/mysql-test/suite/rocksdb/r/range_locking.result
index c3271db2730..a43f7d668d4 100644
--- a/mysql-test/suite/rocksdb/r/range_locking.result
+++ b/mysql-test/suite/rocksdb/r/range_locking.result
@@ -147,3 +147,48 @@ rollback;
 connection default;
 disconnect con1;
 drop table t0,t1;
+#
+# Test that locks on ranges on non-unique secondary keys inhibit
+# modifications of the contents of these ranges
+#
+create table t0(a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+create table t1 (
+kp1 int not null,
+kp2 int not null,
+a int,
+key(kp1, kp2)
+) engine=rocksdb;
+insert into t1 select  1, a, 1234 from t0;
+insert into t1 values (2, 3, 1234);
+insert into t1 values (2, 5, 1234);
+insert into t1 values (2, 7, 1234);
+insert into t1 select  3, a, 1234 from t0;
+connect  con1,localhost,root,,;
+connection con1;
+begin;
+explain
+select * from t1 where kp1=2 for update;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ref	kp1	kp1	4	const	#	NULL
+select * from t1 where kp1=2 for update;
+kp1	kp2	a
+2	3	1234
+2	5	1234
+2	7	1234
+connection default;
+begin;
+insert into t1 values (2, 9, 9999);
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1
+delete from t1 where kp1=2 and kp2=5;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1
+update t1 set kp1=333 where kp1=2 and kp2=3;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1
+update t1 set kp1=2 where kp1=1 and kp2=8;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1
+rollback;
+connection con1;
+rollback;
+disconnect con1;
+connection default;
+drop table t0,t1;
diff --git a/mysql-test/suite/rocksdb/t/range_locking.test b/mysql-test/suite/rocksdb/t/range_locking.test
index 40743f1bf63..5c21f374bac 100644
--- a/mysql-test/suite/rocksdb/t/range_locking.test
+++ b/mysql-test/suite/rocksdb/t/range_locking.test
@@ -157,5 +157,57 @@ connection con1;
 rollback;
 connection default;
 disconnect con1;
+drop table t0,t1;
+
+--echo #
+--echo # Test that locks on ranges on non-unique secondary keys inhibit
+--echo # modifications of the contents of these ranges
+--echo #
+
+create table t0(a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+create table t1 (
+  kp1 int not null,
+  kp2 int not null,
+  a int,
+  key(kp1, kp2)
+) engine=rocksdb;
+
+insert into t1 select  1, a, 1234 from t0;
+insert into t1 values (2, 3, 1234);
+insert into t1 values (2, 5, 1234);
+insert into t1 values (2, 7, 1234);
+insert into t1 select  3, a, 1234 from t0;
+
+connect (con1,localhost,root,,);
+connection con1;
+begin;
+--replace_column 9 #
+explain
+select * from t1 where kp1=2 for update;
+select * from t1 where kp1=2 for update;
+
+connection default;
+begin;
+--error ER_LOCK_WAIT_TIMEOUT
+insert into t1 values (2, 9, 9999);
+
+--error ER_LOCK_WAIT_TIMEOUT
+delete from t1 where kp1=2 and kp2=5;
+
+# Update that "moves a row away" from the locked range
+--error ER_LOCK_WAIT_TIMEOUT
+update t1 set kp1=333 where kp1=2 and kp2=3;
 
+# Update that "moves a row into" the locked range
+--error ER_LOCK_WAIT_TIMEOUT
+update t1 set kp1=2 where kp1=1 and kp2=8;
+
+rollback;
+
+connection con1;
+rollback;
+disconnect con1;
+connection default;
 drop table t0,t1;
+
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index 9720fc0bd3b..d9758b8cf00 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -2273,11 +2273,15 @@ protected:
   virtual void release_lock(rocksdb::ColumnFamilyHandle *const column_family,
                             const std::string &rowkey) = 0;
 
-  virtual rocksdb::Status lock_range(rocksdb::ColumnFamilyHandle *const cf,
-                                     const rocksdb::Slice &start,
-                                     bool  start_inf_suffix,
-                                     const rocksdb::Slice &end,
-                                     bool  end_inf_suffix) = 0;
+  virtual
+  rocksdb::Status lock_range(rocksdb::ColumnFamilyHandle *const cf,
+                             const rocksdb::Slice &start, bool start_inf_suffix,
+                             const rocksdb::Slice &end, bool end_inf_suffix)=0;
+
+  rocksdb::Status lock_singlepoint_range(rocksdb::ColumnFamilyHandle *const cf,
+                                         const rocksdb::Slice &point) {
+    return lock_range(cf, point, false, point, false);
+  }
 
   virtual bool prepare(const rocksdb::TransactionName &name) = 0;
 
@@ -2735,13 +2739,12 @@ public:
     Both start and end endpoint may may be prefixes.
     Both bounds are inclusive.
   */
+  virtual
   rocksdb::Status lock_range(rocksdb::ColumnFamilyHandle *const cf,
                              const rocksdb::Slice &start,
-                             bool  start_inf_suffix,
+                             bool start_inf_suffix,
                              const rocksdb::Slice &end,
-                             bool  end_inf_suffix
-                             ) override
-  {
+                             bool end_inf_suffix) override {
     const char SUFFIX_INF= 0x0;
     const char SUFFIX_SUP= 0x1;
     //  GetRangeLock accepts range endpoints, not keys.
@@ -2755,9 +2758,9 @@ public:
     right.append(end_inf_suffix? SUFFIX_SUP :SUFFIX_INF);
     right.append(end.data(), end.size());
 
-    return m_rocksdb_tx->GetRangeLock(cf,
-                           rocksdb::Slice(left.ptr(),left.length()),
-                           rocksdb::Slice(right.ptr(),right.length()));
+    return
+      m_rocksdb_tx->GetRangeLock(cf, rocksdb::Slice(left.ptr(),left.length()),
+                                 rocksdb::Slice(right.ptr(),right.length()));
   }
 private:
   void release_tx(void) {
@@ -3159,10 +3162,9 @@ public:
 
   rocksdb::Status lock_range(rocksdb::ColumnFamilyHandle *const cf,
                              const rocksdb::Slice &start,
-                             bool  start_inf_suffix,
+                             bool start_inf_suffix,
                              const rocksdb::Slice &end,
-                             bool  end_inf_suffix) override
-  {
+                             bool end_inf_suffix) override {
     return rocksdb::Status::OK();
   }
 
@@ -8209,7 +8211,7 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key,
 }
 
 
-void ha_rocksdb::set_range_lock(Rdb_transaction *tx,
+int ha_rocksdb::set_range_lock(Rdb_transaction *tx,
                                 const Rdb_key_def &kd, 
                                 const enum ha_rkey_function &find_flag,
                                 const rocksdb::Slice &slice,
@@ -8220,7 +8222,7 @@ void ha_rocksdb::set_range_lock(Rdb_transaction *tx,
   bool start_has_inf_suffix, end_has_inf_suffix;
 
   if (m_lock_rows != RDB_LOCK_WRITE || !rocksdb_use_range_locking) {
-    return;
+    return 0;
   }
 
   /*
@@ -8295,9 +8297,14 @@ void ha_rocksdb::set_range_lock(Rdb_transaction *tx,
     end_slice= rocksdb::Slice((char*)end_slice_buf, end_slice_size);
     end_has_inf_suffix= true;
   }
-  tx->lock_range(kd.get_cf(),
-                 slice,     start_has_inf_suffix,
-                 end_slice, end_has_inf_suffix);
+
+  auto s= tx->lock_range(kd.get_cf(), slice, start_has_inf_suffix,
+                         end_slice, end_has_inf_suffix);
+  if (!s.ok()) {
+    return (tx->set_status_error(table->in_use, s, kd, m_tbl_def,
+                                 m_table_handler));
+  }
+  return 0;
 }
 
 /*
@@ -8401,10 +8408,10 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
   Rdb_transaction *const tx = get_or_create_tx(table->in_use);
   const bool is_new_snapshot = !tx->has_snapshot();
 
-  set_range_lock(tx, kd, find_flag,
-                 rocksdb::Slice(reinterpret_cast<const char *>(m_sk_packed_tuple),
-                                packed_size),
-                 end_key);
+  rocksdb::Slice lock_slice(reinterpret_cast<const char *>(m_sk_packed_tuple),
+                                    packed_size);
+  if ((rc = set_range_lock(tx, kd, find_flag, lock_slice, end_key)))
+    DBUG_RETURN(rc);
 
   int bytes_changed_by_succ = 0;
   if (find_flag == HA_READ_PREFIX_LAST_OR_PREV ||
@@ -9908,6 +9915,15 @@ int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd,
     old_key_slice = rocksdb::Slice(
         reinterpret_cast<const char *>(m_sk_packed_tuple_old), old_packed_size);
 
+    /* Range locking: lock the index tuple being deleted */
+    if (rocksdb_use_range_locking) {
+      auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), old_key_slice);
+      if (!s.ok()) {
+        return (row_info.tx->set_status_error(table->in_use, s, kd,
+                                              m_tbl_def, m_table_handler));
+      }
+    }
+
     row_info.tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(),
                                                          old_key_slice);
 
@@ -9923,6 +9939,14 @@ int ha_rocksdb::update_sk(const TABLE *const table_arg, const Rdb_key_def &kd,
   if (bulk_load_sk && row_info.old_data == nullptr) {
     rc = bulk_load_key(row_info.tx, kd, new_key_slice, new_value_slice, true);
   } else {
+    /* Range locking: lock the index tuple being inserted */
+    if (rocksdb_use_range_locking) {
+      auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), new_key_slice);
+      if (!s.ok()) {
+        return (row_info.tx->set_status_error(table->in_use, s, kd,
+                                              m_tbl_def, m_table_handler));
+      }
+    }
     row_info.tx->get_indexed_write_batch()->Put(kd.get_cf(), new_key_slice,
                                                 new_value_slice);
   }
@@ -9996,6 +10020,10 @@ int ha_rocksdb::update_write_row(const uchar *const old_data,
     DBUG_RETURN(rc);
   }
 
+  // Range Locking: do we have a lock on the old PK value here? 
+  //  - we have read the row we are about to update, right? (except for some
+  //  RBR mode? (in which we won't want to acquire locks anyway?))
+
   if (!skip_unique_check) {
     /*
       Check to see if we are going to have failures because of unique
@@ -10431,7 +10459,8 @@ int ha_rocksdb::delete_row(const uchar *const buf) {
   rocksdb::Slice key_slice(m_last_rowkey.ptr(), m_last_rowkey.length());
   Rdb_transaction *const tx = get_or_create_tx(table->in_use);
   ulonglong bytes_written = 0;
-
+  // Range Locking: we are certain that the PK record is already locked here,
+  // right? 
   const uint index = pk_index(table, m_tbl_def);
   rocksdb::Status s =
       delete_or_singledelete(index, tx, m_pk_descr->get_cf(), key_slice);
@@ -10459,7 +10488,19 @@ int ha_rocksdb::delete_row(const uchar *const buf) {
                                    nullptr, false, hidden_pk_id);
       rocksdb::Slice secondary_key_slice(
           reinterpret_cast<const char *>(m_sk_packed_tuple), packed_size);
-      /* Deleting on secondary key doesn't need any locks: */
+
+      /*
+        For point locking, Deleting on secondary key doesn't need any locks.
+        Range locking must set locks
+      */
+      if (rocksdb_use_range_locking) {
+        auto s= tx->lock_singlepoint_range(kd.get_cf(), secondary_key_slice);
+        if (!s.ok()) {
+          return (tx->set_status_error(table->in_use, s, kd, m_tbl_def,
+                                       m_table_handler));
+        }
+      }
+
       tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(),
                                                   secondary_key_slice);
       bytes_written += secondary_key_slice.size();
diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h
index 87d1d5c09c1..46a189682c7 100644
--- a/storage/rocksdb/ha_rocksdb.h
+++ b/storage/rocksdb/ha_rocksdb.h
@@ -658,8 +658,8 @@ class ha_rocksdb : public my_core::handler {
   void setup_scan_iterator(const Rdb_key_def &kd, rocksdb::Slice *slice,
                            const bool use_all_keys, const uint eq_cond_len)
       MY_ATTRIBUTE((__nonnull__));
-  //psergey:
-  void set_range_lock(Rdb_transaction *tx,
+
+  int set_range_lock(Rdb_transaction *tx,
                       const Rdb_key_def &kd, 
                       const enum ha_rkey_function &find_flag,
                       const rocksdb::Slice &slice,


More information about the commits mailing list