[Commits] 1c288c2: MDEV-16428: Concurrent DML on RocksDB tables makes optimistic parallel replication abort

Sergei Petrunia psergey at askmonty.org
Mon Jun 25 23:15:18 EEST 2018


revision-id: 1c288c266e0208e98d55e5348fa78c27517022c9
parent(s): a107c79fcdde80d1dea0a1caf5859647f77b48c9
committer: Sergei Petrunia
branch nick: 10.2-r12-new-submodule
timestamp: 2018-06-25 23:15:18 +0300
message:

MDEV-16428: Concurrent DML on RocksDB tables makes optimistic parallel replication abort

DRAFT:
Make MyRocks call thd_rpl_deadlock_check(X, Y) whenever a transaction X
is about to wait on a row that's locked by transaction Y.
This allows the SQL layer to check if transaction Y has "ran ahead" of
transaction X and abort it if it is necessary.

The patch includes adding a hook into RocksDB to detect the waits.

---
 .gitmodules                   |   2 +-
 storage/rocksdb/ha_rocksdb.cc | 132 ++++++++++++++++++++++++++++++++++++++++++
 storage/rocksdb/rocksdb       |   2 +-
 3 files changed, 134 insertions(+), 2 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 6419657..2ef79a2 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -3,4 +3,4 @@
 	url = https://github.com/MariaDB/mariadb-connector-c
 [submodule "storage/rocksdb/rocksdb"]
 	path = storage/rocksdb/rocksdb
-	url = https://github.com/facebook/rocksdb.git
+	url = https://github.com/spetrunia/rocksdb.git
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index 96bc7da..bd2e0bf 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -53,6 +53,9 @@
 #endif
 #include <mysys_err.h>
 
+#include "my_sys.h"
+#include "lf.h"
+
 // Both MySQL and RocksDB define the same constant. To avoid compilation errors
 // till we make the fix in RocksDB, we'll temporary undefine it here.
 #undef CACHE_LINE_SIZE
@@ -116,6 +119,101 @@ MYSQL_PLUGIN_IMPORT bool my_disable_leak_check;
 void ignore_db_dirs_append(const char *dirname_arg);
 
 
+////////////////////////////////////////////////////////////////////////
+
+
+/*
+  A thread-safe, concurrent way to maintain a map from RocksDB's 
+  TransactionIDs to MariaDB's THD*.
+*/
+
+class Transaction_id_map
+{
+  typedef rocksdb::TransactionID key_type;
+  typedef THD*                   value_type;
+
+  typedef struct
+  {
+    key_type   key;
+    value_type val;
+  } ELEMENT;
+
+  LF_HASH lf_hash;
+
+public:
+  void init();
+  void cleanup();
+
+  /*
+    Before using the LockTable, each thread should get its own "pins". 
+  */
+  LF_PINS* get_pins() { return lf_hash_get_pins(&lf_hash); }
+  void put_pins(LF_PINS *pins) { return lf_hash_put_pins(pins); }
+  
+  void put(LF_PINS *pins, key_type key, value_type value);
+  value_type get(LF_PINS *pins, key_type key);
+  void remove(LF_PINS *pins, key_type key);
+};
+
+void Transaction_id_map::init()
+{
+  lf_hash_init(&lf_hash,
+               sizeof(ELEMENT), 
+               LF_HASH_UNIQUE, 
+               0 /* key offset */,
+               sizeof(key_type)/*key_len*/, NULL /*get_hash_key*/,
+               NULL /*charset*/);
+
+  lf_hash.alloc.constructor= NULL;
+  lf_hash.alloc.destructor=  NULL;
+  lf_hash.element_size= sizeof(ELEMENT);
+}
+
+
+void Transaction_id_map::cleanup()
+{
+  DBUG_ASSERT(lf_hash.count == 0);
+  lf_hash_destroy(&lf_hash);
+}
+
+
+void Transaction_id_map::put(LF_PINS *pins, key_type key, value_type value)
+{
+  ELEMENT new_elem;
+  new_elem.key= key;
+  new_elem.val= value;
+ 
+  /* 
+    The following call may return:
+     0:  OK
+     1:  Element already exists (should not happen) 
+    -1:  out-of-memory condition.
+    Dont do error handling as we don't have any way to do it.
+  */
+  lf_hash_insert(&lf_hash, pins, &new_elem);
+}
+
+
+void Transaction_id_map::remove(LF_PINS *pins, key_type key)
+{
+  lf_hash_delete(&lf_hash, pins, &key, sizeof(key_type));
+}
+
+
+Transaction_id_map::value_type Transaction_id_map::get(LF_PINS *pins, key_type key)
+{
+  ELEMENT *ptr= (ELEMENT*)lf_hash_search(&lf_hash, pins, &key, sizeof(key_type));
+
+  if (ptr)
+    return ptr->val;
+
+  return NULL;
+}
+
+Transaction_id_map trx_id_map;
+///////////////////////////////////////////////////////////////////////////
+
+
 namespace myrocks {
 
 static st_global_stats global_stats;
@@ -2492,6 +2590,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
   rocksdb::Transaction *m_rocksdb_reuse_tx = nullptr;
 
 public:
+  LF_PINS *lf_pins; // LF-hash pins to use with trx_id_map
   void set_lock_timeout(int timeout_sec_arg) override {
     if (m_rocksdb_tx)
       m_rocksdb_tx->SetLockTimeout(rdb_convert_sec_to_ms(m_timeout_sec));
@@ -2555,6 +2654,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
 
     release_snapshot();
     s = m_rocksdb_tx->Commit();
+    trx_id_map.remove(lf_pins, m_rocksdb_tx->GetID());
     if (!s.ok()) {
       rdb_handle_io_error(s, RDB_IO_ERROR_TX_COMMIT);
       res = true;
@@ -2589,6 +2689,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
       /* This will also release all of the locks: */
       m_rocksdb_tx->Rollback();
 
+      trx_id_map.remove(lf_pins, m_rocksdb_tx->GetID());
       /* Save the transaction object to be reused */
       release_tx();
 
@@ -2743,6 +2844,8 @@ class Rdb_transaction_impl : public Rdb_transaction {
         rdb->BeginTransaction(write_opts, tx_opts, m_rocksdb_reuse_tx);
     m_rocksdb_reuse_tx = nullptr;
 
+    trx_id_map.put(lf_pins, m_rocksdb_tx->GetID(), m_thd);
+
     m_read_opts = rocksdb::ReadOptions();
 
     set_initial_savepoint();
@@ -2801,6 +2904,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
       : Rdb_transaction(thd), m_rocksdb_tx(nullptr) {
     // Create a notifier that can be called when a snapshot gets generated.
     m_notifier = std::make_shared<Rdb_snapshot_notifier>(this);
+    lf_pins= trx_id_map.get_pins();
   }
 
   virtual ~Rdb_transaction_impl() {
@@ -2813,6 +2917,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
 
     // Free any transaction memory that is still hanging around.
     delete m_rocksdb_reuse_tx;
+    trx_id_map.put_pins(lf_pins);
     DBUG_ASSERT(m_rocksdb_tx == nullptr);
   }
 };
@@ -4131,6 +4236,28 @@ static int rocksdb_start_tx_and_assign_read_view(
   return HA_EXIT_SUCCESS;
 }
 
+
+/* There is no prototype of this function in any header */
+extern "C" int thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
+
+
+/* A row_wait_callback_t-Compatible function */
+void rdb_row_wait_callback(rocksdb::TransactionID waiter,
+                           rocksdb::TransactionID waitee)
+{
+  Rdb_transaction *&tx = get_tx_from_thd(current_thd);
+
+  if (!tx->is_writebatch_trx()) {
+    const auto tx_impl = static_cast<const Rdb_transaction_impl *>(tx);
+
+    THD *waiter_thd= trx_id_map.get(tx_impl->lf_pins, waiter);
+    THD *waitee_thd= trx_id_map.get(tx_impl->lf_pins, waitee);
+    //DBUG_ASSERT(waiter_thd == tx->m_thd);
+    thd_rpl_deadlock_check(waiter_thd, waitee_thd);
+  }
+}
+
+
 /* Dummy SAVEPOINT support. This is needed for long running transactions
  * like mysqldump (https://bugs.mysql.com/bug.php?id=71017).
  * Current SAVEPOINT does not correctly handle ROLLBACK and does not return
@@ -4663,6 +4790,10 @@ static int rocksdb_init_func(void *const p) {
 
   Rdb_sst_info::init(rdb);
 
+  trx_id_map.init();
+
+  rdb->SetRowWaitCallback(&rdb_row_wait_callback);
+
   /*
     Enable auto compaction, things needed for compaction filter are finished
     initializing
@@ -4887,6 +5018,7 @@ static int rocksdb_done_func(void *const p) {
 
   delete rdb;
   rdb = nullptr;
+  trx_id_map.cleanup();
 
   delete commit_latency_stats;
   commit_latency_stats = nullptr;
diff --git a/storage/rocksdb/rocksdb b/storage/rocksdb/rocksdb
index ba295cd..38080be 160000
--- a/storage/rocksdb/rocksdb
+++ b/storage/rocksdb/rocksdb
@@ -1 +1 @@
-Subproject commit ba295cda29daee3ffe58549542804efdfd969784
+Subproject commit 38080be88bf20e8956754d62322de1c98ab5d736


More information about the commits mailing list