[Commits] 38080be: MDEV-16428: Make optimistic parallel replication work

Sergei Petrunia psergey at askmonty.org
Mon Jun 25 22:43:23 EEST 2018


revision-id: 38080be88bf20e8956754d62322de1c98ab5d736
parent(s): ba295cda29daee3ffe58549542804efdfd969784
committer: Sergei Petrunia
branch nick: rocksdb-10.2-r12-new-submodule
timestamp: 2018-06-25 22:43:23 +0300
message:

MDEV-16428: Make optimistic parallel replication work

DRAFT PATCH, RocksDB part of it:

Add a callback function that is used to inform the SQL layer
that transaction X is waiting on a row lock that is held by transaction Y.

Optimistic parallel slave uses it to determine that transaction Y has
ran ahead of transaction X. If they need to be committed in the other
order (first X, then Y), then Y will be rolled back to allow X to proceed
(and Y will be retried later on).

---
 include/rocksdb/utilities/transaction_db.h          |  7 +++++++
 utilities/transactions/pessimistic_transaction_db.h |  4 ++++
 utilities/transactions/transaction_lock_mgr.cc      | 15 ++++++++++++++-
 utilities/transactions/transaction_lock_mgr.h       |  3 +++
 4 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h
index b5a33d1..1d5d3dd 100644
--- a/include/rocksdb/utilities/transaction_db.h
+++ b/include/rocksdb/utilities/transaction_db.h
@@ -156,6 +156,11 @@ struct DeadlockPath {
   bool empty() { return path.empty() && !limit_exceeded; }
 };
 
+
+typedef void (*row_wait_callback_t)(rocksdb::TransactionID waiter, 
+                                  rocksdb::TransactionID waitee);
+
+
 class TransactionDB : public StackableDB {
  public:
   // Open a TransactionDB similar to DB::Open().
@@ -217,6 +222,8 @@ class TransactionDB : public StackableDB {
   virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer() = 0;
   virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0;
 
+  virtual void SetRowWaitCallback(row_wait_callback_t callback) {}
+
  protected:
   // To Create an TransactionDB, call Open()
   explicit TransactionDB(DB* db) : StackableDB(db) {}
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h
index 4311e88..8a47cbd 100644
--- a/utilities/transactions/pessimistic_transaction_db.h
+++ b/utilities/transactions/pessimistic_transaction_db.h
@@ -113,6 +113,10 @@ class PessimisticTransactionDB : public TransactionDB {
   std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
   void SetDeadlockInfoBufferSize(uint32_t target_size) override;
 
+  virtual void SetRowWaitCallback(row_wait_callback_t callback) override {
+    lock_mgr_.SetRowWaitCallback(callback);
+  }
+
  protected:
   DBImpl* db_impl_;
   std::shared_ptr<Logger> info_log_;
diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc
index 9485067..e6593b0 100644
--- a/utilities/transactions/transaction_lock_mgr.cc
+++ b/utilities/transactions/transaction_lock_mgr.cc
@@ -173,7 +173,8 @@ TransactionLockMgr::TransactionLockMgr(
       max_num_locks_(max_num_locks),
       lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)),
       dlock_buffer_(max_num_deadlocks),
-      mutex_factory_(mutex_factory) {
+      mutex_factory_(mutex_factory),
+      row_wait_callback(NULL) {
   assert(txn_db);
   txn_db_impl_ =
       static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
@@ -383,6 +384,18 @@ Status TransactionLockMgr::AcquireWithTimeout(
       }
 
       TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
+
+      //psergey: Call the MariaDB callback to inform it that 
+      //  transaction X is doing a row lock wait for a lock that
+      //  is held by transaction Y.
+      //
+      //  Transaction Y 
+      if (row_wait_callback)
+      {
+        for (auto it= wait_ids.begin(); it != wait_ids.end(); it++)
+          row_wait_callback(txn->GetID(), *it);
+      }
+
       if (cv_end_time < 0) {
         // Wait indefinitely
         result = stripe->stripe_cv->Wait(stripe->stripe_mutex);
diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h
index abf7c5d..86a9904 100644
--- a/utilities/transactions/transaction_lock_mgr.h
+++ b/utilities/transactions/transaction_lock_mgr.h
@@ -84,6 +84,8 @@ class TransactionLockMgr {
   std::vector<DeadlockPath> GetDeadlockInfoBuffer();
   void Resize(uint32_t);
 
+  void SetRowWaitCallback(row_wait_callback_t callback) { row_wait_callback= callback; }
+
  private:
   PessimisticTransactionDB* txn_db_impl_;
 
@@ -149,6 +151,7 @@ class TransactionLockMgr {
   void DecrementWaitersImpl(const PessimisticTransaction* txn,
                             const autovector<TransactionID>& wait_ids);
 
+  row_wait_callback_t row_wait_callback;
   // No copying allowed
   TransactionLockMgr(const TransactionLockMgr&);
   void operator=(const TransactionLockMgr&);


More information about the commits mailing list