[Commits] 4da9cb93fb8: Use InnoDB-like transaction isolation with Range Locking mode.

Sergei Petrunia psergey at askmonty.org
Mon Jan 21 20:36:09 EET 2019


revision-id: 4da9cb93fb8c291a0023451963c15fd2971b0210 (fb-prod201801-192-g4da9cb93fb8)
parent(s): 70d97cc103fd98c7a4952e7b3a54f272fa7b36f4
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-01-21 21:36:09 +0300
message:

Use InnoDB-like transaction isolation with Range Locking mode.

DML statements (UPDATE/DELETE/..) will always read the latest committed
data (as opposed to transaction's snapshot).

---
 mysql-test/suite/rocksdb/r/range_locking.result | 109 ++++++++++++++++++++++++
 mysql-test/suite/rocksdb/t/range_locking.test   |  95 +++++++++++++++++++++
 storage/rocksdb/ha_rocksdb.cc                   |  81 +++++++++++++++---
 3 files changed, 271 insertions(+), 14 deletions(-)

diff --git a/mysql-test/suite/rocksdb/r/range_locking.result b/mysql-test/suite/rocksdb/r/range_locking.result
index a43f7d668d4..b0217d5269a 100644
--- a/mysql-test/suite/rocksdb/r/range_locking.result
+++ b/mysql-test/suite/rocksdb/r/range_locking.result
@@ -192,3 +192,112 @@ rollback;
 disconnect con1;
 connection default;
 drop table t0,t1;
+#
+# Transaction isolation test
+#
+create table t1 (pk int primary key, a int) engine=rocksdb;
+insert into t1 values (1,1),(2,2),(3,3);
+connect  con1,localhost,root,,;
+# TRX1: Start, Allocate a snapshot
+connection con1;
+begin;
+select * from t1;
+pk	a
+1	1
+2	2
+3	3
+# TRX2: Make a change that TRX1 will not see
+connection default;
+update t1 set a=2222 where pk=2;
+# TRX1: Now, make a change that would overwrite TRX2'x change and commit
+connection con1;
+update t1 set a=a+1 where pk=2;
+commit;
+# Examine the result:
+#   pk=2, a=2223 means UPDATE in TRX1 used "read committed" (InnoDB-like isolation)
+#   pk=2, a=3 means UPDATE in TRX1 silently overwrote TRX2
+#   (and with key tracking, one would get an error on the second UPDATE)
+connection default;
+select * from t1;
+pk	a
+1	1
+2	2223
+3	3
+disconnect con1;
+connection default;
+drop table t1;
+#
+# Same test as above, but check the range scan
+#
+create table t1 (pk int primary key, a int) engine=rocksdb;
+insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6);
+connect  con1,localhost,root,,;
+# TRX1: Start, Allocate a snapshot
+connection con1;
+begin;
+select * from t1;
+pk	a
+1	1
+2	2
+3	3
+4	4
+5	5
+6	6
+# TRX2: Make a change that TRX1 will not see
+connection default;
+update t1 set a=2222 where pk between 3 and 5;
+# TRX1: Now, make a change that would overwrite TRX2'x change and commit
+connection con1;
+update t1 set a=a+1 where pk between 3 and 5;
+commit;
+# Examine the result:
+#   pk={3,4,5} a=2223 means UPDATE in TRX1 used "read committed" (InnoDB-like isolation)
+connection default;
+select * from t1;
+pk	a
+1	1
+2	2
+3	2223
+4	2223
+5	2223
+6	6
+disconnect con1;
+connection default;
+drop table t1;
+#
+# Same as above, but test SELECT FOR UPDATE.
+#
+create table t1 (pk int primary key, a int) engine=rocksdb;
+insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6);
+connect  con1,localhost,root,,;
+# TRX1: Start, Allocate a snapshot
+connection con1;
+begin;
+select * from t1;
+pk	a
+1	1
+2	2
+3	3
+4	4
+5	5
+6	6
+# TRX2: Make a change that TRX1 will not see
+connection default;
+update t1 set a=222 where pk=2;
+update t1 set a=333 where pk=3;
+# TRX1: Check what select [FOR UPDATE] sees
+connection con1;
+select * from t1 where pk in (2,3);
+pk	a
+2	2
+3	3
+select * from t1 where pk=2 for update;
+pk	a
+2	222
+select * from t1 where pk=2;
+pk	a
+2	2
+commit;
+disconnect con1;
+connection default;
+drop table t1;
diff --git a/mysql-test/suite/rocksdb/t/range_locking.test b/mysql-test/suite/rocksdb/t/range_locking.test
index 5c21f374bac..e1e8ac92bdc 100644
--- a/mysql-test/suite/rocksdb/t/range_locking.test
+++ b/mysql-test/suite/rocksdb/t/range_locking.test
@@ -211,3 +211,98 @@ disconnect con1;
 connection default;
 drop table t0,t1;
 
+--echo #
+--echo # Transaction isolation test
+--echo #
+
+create table t1 (pk int primary key, a int) engine=rocksdb;
+insert into t1 values (1,1),(2,2),(3,3);
+
+connect (con1,localhost,root,,);
+
+--echo # TRX1: Start, Allocate a snapshot
+connection con1;
+begin;
+select * from t1;
+
+--echo # TRX2: Make a change that TRX1 will not see
+connection default;
+update t1 set a=2222 where pk=2;
+
+--echo # TRX1: Now, make a change that would overwrite TRX2'x change and commit
+connection con1;
+update t1 set a=a+1 where pk=2;
+commit;
+
+--echo # Examine the result:
+--echo #   pk=2, a=2223 means UPDATE in TRX1 used "read committed" (InnoDB-like isolation)
+--echo #   pk=2, a=3 means UPDATE in TRX1 silently overwrote TRX2
+--echo #   (and with key tracking, one would get an error on the second UPDATE)
+connection default;
+select * from t1;
+
+disconnect con1;
+connection default;
+drop table t1;
+
+--echo #
+--echo # Same test as above, but check the range scan
+--echo #
+
+create table t1 (pk int primary key, a int) engine=rocksdb;
+insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6);
+
+connect (con1,localhost,root,,);
+
+--echo # TRX1: Start, Allocate a snapshot
+connection con1;
+begin;
+select * from t1;
+
+--echo # TRX2: Make a change that TRX1 will not see
+connection default;
+update t1 set a=2222 where pk between 3 and 5;
+
+--echo # TRX1: Now, make a change that would overwrite TRX2'x change and commit
+connection con1;
+update t1 set a=a+1 where pk between 3 and 5;
+commit;
+
+--echo # Examine the result:
+--echo #   pk={3,4,5} a=2223 means UPDATE in TRX1 used "read committed" (InnoDB-like isolation)
+connection default;
+select * from t1;
+
+disconnect con1;
+connection default;
+drop table t1;
+
+--echo #
+--echo # Same as above, but test SELECT FOR UPDATE.
+--echo #
+create table t1 (pk int primary key, a int) engine=rocksdb;
+insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6);
+
+connect (con1,localhost,root,,);
+
+--echo # TRX1: Start, Allocate a snapshot
+connection con1;
+begin;
+select * from t1;
+
+--echo # TRX2: Make a change that TRX1 will not see
+connection default;
+update t1 set a=222 where pk=2;
+update t1 set a=333 where pk=3;
+
+--echo # TRX1: Check what select [FOR UPDATE] sees
+connection con1;
+select * from t1 where pk in (2,3);
+select * from t1 where pk=2 for update;
+select * from t1 where pk=2;
+
+commit;
+
+disconnect con1;
+connection default;
+drop table t1;
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index c4f611a663f..f6f11ad4133 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -2101,8 +2101,39 @@ protected:
   virtual void do_set_savepoint() = 0;
   virtual void do_rollback_to_savepoint() = 0;
 
+ private:
+  /*
+    If true, the current statement should not use a snapshot for reading.
+    Note that in a multi-statement transaction, the snapshot may have been
+    allocated by another statement.
+  */
+  bool m_stmt_ignores_snapshot = false;
+
+  /* Snapshot-ignore mode will put away m_reads_opts.snapshot here: */
+  const rocksdb::Snapshot *m_saved_snapshot;
+
  public:
+
+  void start_ignore_snapshot() {
+    // note: this may be called several times for the same statement
+    if (!m_stmt_ignores_snapshot) {
+      m_saved_snapshot = m_read_opts.snapshot;
+      m_read_opts.snapshot = nullptr;
+      m_stmt_ignores_snapshot= true;
+    }
+  }
+
+  void end_ignore_snapshot_if_needed() {
+    if (m_stmt_ignores_snapshot) {
+      m_stmt_ignores_snapshot = false;
+      m_read_opts.snapshot = m_saved_snapshot;
+      m_saved_snapshot = nullptr;
+    }
+  }
+  bool in_snapshot_ignore_mode() const { return m_stmt_ignores_snapshot; }
+
   rocksdb::ReadOptions m_read_opts;
+
   const char *m_mysql_log_file_name;
   my_off_t m_mysql_log_offset;
   const char *m_mysql_gtid;
@@ -2596,7 +2627,7 @@ public:
 
   virtual bool is_tx_started() const = 0;
   virtual void start_tx() = 0;
-  virtual void start_stmt() = 0;
+  virtual void start_stmt(bool is_dml_statement) = 0;
 
   void set_initial_savepoint() {
     /*
@@ -2849,7 +2880,7 @@ public:
   }
 
   void acquire_snapshot(bool acquire_now) override {
-    if (m_read_opts.snapshot == nullptr) {
+    if (m_read_opts.snapshot == nullptr && !in_snapshot_ignore_mode()) {
       const auto thd_ss = std::static_pointer_cast<Rdb_explicit_snapshot>(
           m_thd->get_explicit_snapshot());
       if (thd_ss) {
@@ -2964,7 +2995,7 @@ public:
 
     if (value != nullptr) {
       value->Reset();
-    }
+    } // psergey-todo: m_read_opts.snapshot below!
     return m_rocksdb_tx->GetForUpdate(m_read_opts, column_family, key, value,
                                       exclusive);
   }
@@ -3028,13 +3059,25 @@ public:
   /*
     Start a statement inside a multi-statement transaction.
 
-    @todo: are we sure this is called once (and not several times) per
-    statement start?
+    @note: If a statement uses N tables, this function will be called N times,
+    for each TABLE object that is used.
 
     For hooking to start of statement that is its own transaction, see
     ha_rocksdb::external_lock().
   */
-  void start_stmt() override {
+  void start_stmt(bool is_dml_statement) override {
+
+    if (rocksdb_use_range_locking && is_dml_statement) {
+      /*
+        In Range Locking mode, RocksDB does not do "key tracking".
+        Use InnoDB-like concurrency mode: make the DML statements always read
+        the latest data (instead of using transaction's snapshot).
+        This "downgrades" the transaction isolation to READ-COMMITTED on the
+        master, but in return the actions can be replayed on the slave.
+      */
+      start_ignore_snapshot();
+    }
+
     // Set the snapshot to delayed acquisition (SetSnapshotOnNextOperation)
     acquire_snapshot(false);
   }
@@ -3270,7 +3313,7 @@ public:
     set_initial_savepoint();
   }
 
-  void start_stmt() override {}
+  void start_stmt(bool is_dml_statement) override {}
 
   void rollback_stmt() override {
     if (m_batch)
@@ -3484,8 +3527,10 @@ static int rocksdb_prepare(handlerton *const hton, THD *const thd,
 
     DEBUG_SYNC(thd, "rocksdb.prepared");
   }
-  else
+  else {
     tx->make_stmt_savepoint_permanent();
+    tx->end_ignore_snapshot_if_needed();
+  }
 
   return HA_EXIT_SUCCESS;
 }
@@ -3660,6 +3705,7 @@ static int rocksdb_commit(handlerton *const hton, THD *const thd,
       */
       tx->set_tx_failed(false);
       tx->make_stmt_savepoint_permanent();
+      tx->end_ignore_snapshot_if_needed();
     }
 
     if (my_core::thd_tx_isolation(thd) <= ISO_READ_COMMITTED) {
@@ -3698,6 +3744,7 @@ static int rocksdb_rollback(handlerton *const hton, THD *const thd,
       */
 
       tx->rollback_stmt();
+      tx->end_ignore_snapshot_if_needed();
       tx->set_tx_failed(true);
     }
 
@@ -4242,13 +4289,19 @@ static bool rocksdb_show_status(handlerton *const hton, THD *const thd,
   return res;
 }
 
+
+/*
+  @param is_dml_statement   If true, we are is a DML statement
+*/
+
 static inline void rocksdb_register_tx(handlerton *const hton, THD *const thd,
-                                       Rdb_transaction *const tx) {
+                                       Rdb_transaction *const tx,
+                                       bool is_dml_stmt) {
   DBUG_ASSERT(tx != nullptr);
 
   trans_register_ha(thd, FALSE, rocksdb_hton);
   if (my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
-    tx->start_stmt();
+    tx->start_stmt(is_dml_stmt);
     trans_register_ha(thd, TRUE, rocksdb_hton);
   }
 }
@@ -4344,7 +4397,7 @@ static int rocksdb_start_tx_and_assign_read_view(
 
   DBUG_ASSERT(!tx->has_snapshot());
   tx->set_tx_read_only(true);
-  rocksdb_register_tx(hton, thd, tx);
+  rocksdb_register_tx(hton, thd, tx, false);
   tx->acquire_snapshot(true);
 
   if (ss_info) {
@@ -4492,7 +4545,7 @@ static int rocksdb_start_tx_with_shared_read_view(
 
     DBUG_ASSERT(!tx->has_snapshot());
     tx->set_tx_read_only(true);
-    rocksdb_register_tx(hton, thd, tx);
+    rocksdb_register_tx(hton, thd, tx, false);
     tx->acquire_snapshot(true);
 
     // case: an explicit snapshot was not assigned to this transaction
@@ -11010,7 +11063,7 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) {
       }
     }
     tx->m_n_mysql_tables_in_use++;
-    rocksdb_register_tx(rocksdb_hton, thd, tx);
+    rocksdb_register_tx(rocksdb_hton, thd, tx, (lock_type == F_WRLCK));
     tx->io_perf_start(&m_io_perf);
   }
 
@@ -11037,7 +11090,7 @@ int ha_rocksdb::start_stmt(THD *const thd, thr_lock_type lock_type) {
 
   Rdb_transaction *const tx = get_or_create_tx(thd);
   read_thd_vars(thd);
-  rocksdb_register_tx(ht, thd, tx);
+  rocksdb_register_tx(ht, thd, tx, (lock_type == F_WRLCK));
   tx->io_perf_start(&m_io_perf);
 
   DBUG_RETURN(HA_EXIT_SUCCESS);


More information about the commits mailing list