[Commits] 50e44623e: TEST PATCH: Range Locking Lazy Lock Release

psergey sergey at mariadb.com
Mon Jun 14 22:29:04 EEST 2021


revision-id: 50e44623e441cba43dbe4d20b25959ee37af5c5d (v5.8-3396-g50e44623e)
parent(s): 56846855b351a199d707e1b44c2d5f70e77d5535
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2021-06-14 22:28:25 +0300
message:

TEST PATCH: Range Locking Lazy Lock Release

STO, lock escalations, and overlapping locks do not work, yet.

---
 utilities/transactions/lock/lock_manager.h         |   6 +-
 utilities/transactions/lock/lock_tracker.h         |   4 +
 .../transactions/lock/point/point_lock_manager.cc  |   5 +-
 .../transactions/lock/point/point_lock_manager.h   |   4 +-
 .../transactions/lock/range/range_lock_manager.h   |   5 +-
 .../range_tree/lib/locktree/concurrent_tree.cc     |   9 +-
 .../range_tree/lib/locktree/concurrent_tree.h      |   2 +-
 .../range/range_tree/lib/locktree/lock_request.cc  |  10 +-
 .../range/range_tree/lib/locktree/lock_request.h   |   1 +
 .../lock/range/range_tree/lib/locktree/locktree.cc |  93 ++++++++++----
 .../lock/range/range_tree/lib/locktree/locktree.h  |  12 +-
 .../lock/range/range_tree/lib/locktree/manager.cc  |   4 +-
 .../range/range_tree/lib/locktree/range_buffer.cc  |  21 +--
 .../range/range_tree/lib/locktree/range_buffer.h   |  11 +-
 .../lock/range/range_tree/lib/locktree/treenode.cc | 143 +++++++++++++++++++--
 .../lock/range/range_tree/lib/locktree/treenode.h  |   9 +-
 .../range/range_tree/range_tree_lock_manager.cc    |  24 +++-
 .../range/range_tree/range_tree_lock_manager.h     |   2 +-
 .../range/range_tree/range_tree_lock_tracker.cc    |   8 +-
 .../range/range_tree/range_tree_lock_tracker.h     |   2 +-
 utilities/transactions/pessimistic_transaction.cc  |  11 +-
 .../transactions/pessimistic_transaction_db.cc     |  10 +-
 .../transactions/pessimistic_transaction_db.h      |   5 +-
 utilities/transactions/transaction_base.cc         |   3 +-
 utilities/transactions/transaction_base.h          |   2 +-
 25 files changed, 314 insertions(+), 92 deletions(-)

diff --git a/utilities/transactions/lock/lock_manager.h b/utilities/transactions/lock/lock_manager.h
index a5ce1948c..e236faec2 100644
--- a/utilities/transactions/lock/lock_manager.h
+++ b/utilities/transactions/lock/lock_manager.h
@@ -42,11 +42,13 @@ class LockManager {
   // is responsible for calling UnLock() on this key.
   virtual Status TryLock(PessimisticTransaction* txn,
                          ColumnFamilyId column_family_id,
-                         const std::string& key, Env* env, bool exclusive) = 0;
+                         const std::string& key, Env* env, bool exclusive,
+                         void **lock_data=nullptr) = 0;
   // The range [start, end] are inclusive at both sides.
   virtual Status TryLock(PessimisticTransaction* txn,
                          ColumnFamilyId column_family_id, const Endpoint& start,
-                         const Endpoint& end, Env* env, bool exclusive) = 0;
+                         const Endpoint& end, Env* env, bool exclusive,
+                         void **lock_data=nullptr) = 0;
 
   // Unlock a key or a range locked by TryLock().  txn must be the same
   // Transaction that locked this key.
diff --git a/utilities/transactions/lock/lock_tracker.h b/utilities/transactions/lock/lock_tracker.h
index 5fa228a82..4952fe471 100644
--- a/utilities/transactions/lock/lock_tracker.h
+++ b/utilities/transactions/lock/lock_tracker.h
@@ -27,6 +27,8 @@ struct PointLockRequest {
   bool read_only = false;
   // Whether the lock is in exclusive mode.
   bool exclusive = true;
+
+  void *lock_data = nullptr;
 };
 
 // Request for locking a range of keys.
@@ -37,6 +39,8 @@ struct RangeLockRequest {
   // The range to be locked
   Endpoint start_endp;
   Endpoint end_endp;
+
+  void *lock_data = nullptr;
 };
 
 struct PointLockStatus {
diff --git a/utilities/transactions/lock/point/point_lock_manager.cc b/utilities/transactions/lock/point/point_lock_manager.cc
index 79954d8f0..950fa9ea3 100644
--- a/utilities/transactions/lock/point/point_lock_manager.cc
+++ b/utilities/transactions/lock/point/point_lock_manager.cc
@@ -227,7 +227,7 @@ bool PointLockManager::IsLockExpired(TransactionID txn_id,
 Status PointLockManager::TryLock(PessimisticTransaction* txn,
                                  ColumnFamilyId column_family_id,
                                  const std::string& key, Env* env,
-                                 bool exclusive) {
+                                 bool exclusive, void**) {
   // Lookup lock map for this column family id
   std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
   LockMap* lock_map = lock_map_ptr.get();
@@ -704,7 +704,8 @@ Status PointLockManager::TryLock(PessimisticTransaction* /* txn */,
                                  ColumnFamilyId /* cf_id */,
                                  const Endpoint& /* start */,
                                  const Endpoint& /* end */, Env* /* env */,
-                                 bool /* exclusive */) {
+                                 bool /* exclusive */,
+                                 void **) {
   return Status::NotSupported(
       "PointLockManager does not support range locking");
 }
diff --git a/utilities/transactions/lock/point/point_lock_manager.h b/utilities/transactions/lock/point/point_lock_manager.h
index 3c541eb3a..d701cc644 100644
--- a/utilities/transactions/lock/point/point_lock_manager.h
+++ b/utilities/transactions/lock/point/point_lock_manager.h
@@ -133,10 +133,10 @@ class PointLockManager : public LockManager {
   void RemoveColumnFamily(const ColumnFamilyHandle* cf) override;
 
   Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
-                 const std::string& key, Env* env, bool exclusive) override;
+                 const std::string& key, Env* env, bool exclusive, void **lock_data) override;
   Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
                  const Endpoint& start, const Endpoint& end, Env* env,
-                 bool exclusive) override;
+                 bool exclusive, void **lock_data) override;
 
   void UnLock(PessimisticTransaction* txn, const LockTracker& tracker,
               Env* env) override;
diff --git a/utilities/transactions/lock/range/range_lock_manager.h b/utilities/transactions/lock/range/range_lock_manager.h
index 91619934b..70827b36a 100644
--- a/utilities/transactions/lock/range/range_lock_manager.h
+++ b/utilities/transactions/lock/range/range_lock_manager.h
@@ -20,9 +20,10 @@ class RangeLockManagerBase : public LockManager {
   // range
   using LockManager::TryLock;
   Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
-                 const std::string& key, Env* env, bool exclusive) override {
+                 const std::string& key, Env* env, bool exclusive,
+                 void **lock_data) override {
     Endpoint endp(key.data(), key.size(), false);
-    return TryLock(txn, column_family_id, endp, endp, env, exclusive);
+    return TryLock(txn, column_family_id, endp, endp, env, exclusive, lock_data);
   }
 };
 
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc
index 5110cd482..a794dc43f 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.cc
@@ -101,7 +101,7 @@ void concurrent_tree::locked_keyrange::acquire(const keyrange &range) {
 
 bool concurrent_tree::locked_keyrange::add_shared_owner(const keyrange &range,
                                                         TXNID new_owner) {
-  return m_subtree->insert(range, new_owner, /*is_shared*/ true);
+  return m_subtree->insert(range, new_owner, /*is_shared*/ true, NULL /*lock_data*/);
 }
 
 void concurrent_tree::locked_keyrange::release(void) {
@@ -109,12 +109,15 @@ void concurrent_tree::locked_keyrange::release(void) {
 }
 
 void concurrent_tree::locked_keyrange::insert(const keyrange &range,
-                                              TXNID txnid, bool is_shared) {
+                                              TXNID txnid, bool is_shared,
+                                              void **lock_data) {
   // empty means no children, and only the root should ever be empty
   if (m_subtree->is_empty()) {
     m_subtree->set_range_and_txnid(range, txnid, is_shared);
+    if (lock_data)
+        *lock_data= NULL; //psergey-todo: inserted root is here
   } else {
-    m_subtree->insert(range, txnid, is_shared);
+    m_subtree->insert(range, txnid, is_shared, lock_data);
   }
 }
 
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.h b/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.h
index e1bfb86c5..5e2d4705e 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.h
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/concurrent_tree.h
@@ -120,7 +120,7 @@ class concurrent_tree {
     // inserts the given range into the tree, with an associated txnid.
     // requires: range does not overlap with anything in this locked_keyrange
     // rationale: caller is responsible for only inserting unique ranges
-    void insert(const keyrange &range, TXNID txnid, bool is_shared);
+    void insert(const keyrange &range, TXNID txnid, bool is_shared, void **lock_data);
 
     // effect: removes the given range from the tree.
     //         - txnid=TXNID_ANY means remove the range no matter what its
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc
index ec7bd04dc..fba48ad84 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc
@@ -196,13 +196,14 @@ int lock_request::start(void) {
 
   txnid_set conflicts;
   conflicts.create();
+  acquired_lock_node= nullptr; // psergey
   if (m_type == type::WRITE) {
     r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts,
-                                 m_big_txn);
+                                 m_big_txn, &acquired_lock_node);
   } else {
     invariant(m_type == type::READ);
     r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts,
-                                m_big_txn);
+                                m_big_txn, &acquired_lock_node);
   }
 
   // if the lock is not granted, save it to the set of lock requests
@@ -329,12 +330,13 @@ int lock_request::retry(lock_wait_infos *conflicts_collector) {
   txnid_set conflicts;
   conflicts.create();
 
+  acquired_lock_node= nullptr; // psergey
   if (m_type == type::WRITE) {
     r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts,
-                                 m_big_txn);
+                                 m_big_txn, &acquired_lock_node);
   } else {
     r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts,
-                                m_big_txn);
+                                m_big_txn, &acquired_lock_node);
   }
 
   // if the acquisition succeeded then remove ourselves from the
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h
index 3544f102f..2cc2c189e 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h
@@ -152,6 +152,7 @@ class lock_request {
   void kill_waiter(void);
   static void kill_waiter(locktree *lt, void *extra);
 
+  void *acquired_lock_node;
  private:
   enum state {
     UNINITIALIZED,
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
index c238b0204..3ba04b79b 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
@@ -224,9 +224,10 @@ static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
 // the memory tracker of this newly acquired lock.
 static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
                                       const row_lock &lock,
-                                      locktree_manager *mgr) {
+                                      locktree_manager *mgr,
+                                      void **lock_data) {
   uint64_t mem_used = row_lock_size_in_tree(lock);
-  lkr->insert(lock.range, lock.txnid, lock.is_shared);
+  lkr->insert(lock.range, lock.txnid, lock.is_shared, lock_data);
   if (mgr != nullptr) {
     mgr->note_mem_used(mem_used);
   }
@@ -297,7 +298,7 @@ void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
     sto_lkr.prepare(&sto_rangetree);
     int r = acquire_lock_consolidated(&sto_lkr, m_sto_txnid, rec.get_left_key(),
                                       rec.get_right_key(),
-                                      rec.get_exclusive_flag(), nullptr);
+                                      rec.get_exclusive_flag(), nullptr, nullptr);
     invariant_zero(r);
     sto_lkr.release();
     iter.next();
@@ -311,7 +312,9 @@ void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
             TxnidVector *owners) {
       // There can't be multiple owners in STO mode
       invariant_zero(owners);
-      dst_lkr->insert(range, txnid, is_shared);
+      // psergey-todo: here is the migration from STO to non-STO, without
+      // psergey-todo:remembering the ranges...
+      dst_lkr->insert(range, txnid, is_shared, NULL);
       return true;
     }
   } migrate_fn;
@@ -329,7 +332,9 @@ bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid,
                                const DBT *left_key, const DBT *right_key,
                                bool is_write_request) {
   if (m_rangetree->is_empty() && m_sto_buffer.is_empty() &&
-      toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
+      toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD && 
+      false // psergey: disable STO optimization
+      ) {
     // We can do the optimization because the rangetree is empty, and
     // we know its worth trying because the sto score is big enough.
     sto_begin(txnid);
@@ -412,7 +417,8 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
                                         const DBT *left_key,
                                         const DBT *right_key,
                                         bool is_write_request,
-                                        txnid_set *conflicts) {
+                                        txnid_set *conflicts,
+                                        void **lock_data) {
   int r = 0;
   concurrent_tree::locked_keyrange *lkr;
 
@@ -471,19 +477,35 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
     // requested range into one dominating range. then we insert the dominating
     // range.
     bool all_shared = !is_write_request;
+    bool need_insert = true;
     for (size_t i = 0; i < num_overlapping_row_locks; i++) {
       row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
       invariant(overlapping_lock.txnid == txnid);
-      requested_range.extend(m_cmp, overlapping_lock.range);
-      remove_row_lock_from_tree(lkr, overlapping_lock, TXNID_ANY, m_mgr);
-      all_shared = all_shared && overlapping_lock.is_shared;
+      
+      // psergey: dont "remove and add back" if we're getting the same lock
+      // as we already have
+      keyrange::comparison c = requested_range.compare(m_cmp, overlapping_lock.range);
+      if (num_overlapping_row_locks == 1 && c == keyrange::EQUALS &&
+          !overlapping_lock.is_shared == is_write_request) {
+        // This is the same lock as we already have
+        need_insert = false;
+        *lock_data=(void*)0x1; // Reused
+        break;
+      } else {
+        need_insert = true;
+        requested_range.extend(m_cmp, overlapping_lock.range);
+        remove_row_lock_from_tree(lkr, overlapping_lock, TXNID_ANY, m_mgr);
+        all_shared = all_shared && overlapping_lock.is_shared;
+      }
+    }
+    
+    if (need_insert) {
+      row_lock new_lock = {.range = requested_range,
+                           .txnid = txnid,
+                           .is_shared = all_shared,
+                           .owners = nullptr};
+      insert_row_lock_into_tree(lkr, new_lock, m_mgr, lock_data);
     }
-
-    row_lock new_lock = {.range = requested_range,
-                         .txnid = txnid,
-                         .is_shared = all_shared,
-                         .owners = nullptr};
-    insert_row_lock_into_tree(lkr, new_lock, m_mgr);
   } else {
     r = DB_LOCK_NOTGRANTED;
   }
@@ -498,7 +520,8 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
 // transactions that conflict with this request.
 int locktree::acquire_lock(bool is_write_request, TXNID txnid,
                            const DBT *left_key, const DBT *right_key,
-                           txnid_set *conflicts) {
+                           txnid_set *conflicts,
+                           void **lock_data) {
   int r = 0;
 
   // we are only supporting write locks for simplicity
@@ -514,7 +537,7 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid,
       sto_try_acquire(&lkr, txnid, left_key, right_key, is_write_request);
   if (!acquired) {
     r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key,
-                                  is_write_request, conflicts);
+                                  is_write_request, conflicts, lock_data);
   }
 
   lkr.release();
@@ -523,13 +546,14 @@ int locktree::acquire_lock(bool is_write_request, TXNID txnid,
 
 int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
                                const DBT *left_key, const DBT *right_key,
-                               txnid_set *conflicts, bool big_txn) {
+                               txnid_set *conflicts, bool big_txn, 
+                               void **lock_data) {
   // All ranges in the locktree must have left endpoints <= right endpoints.
   // Range comparisons rely on this fact, so we make a paranoid invariant here.
   paranoid_invariant(m_cmp(left_key, right_key) <= 0);
   int r = m_mgr == nullptr ? 0 : m_mgr->check_current_lock_constraints(big_txn);
   if (r == 0) {
-    r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
+    r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts, lock_data);
   }
   return r;
 }
@@ -537,15 +561,15 @@ int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
 // the locktree silently upgrades read locks to write locks for simplicity
 int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key,
                                 const DBT *right_key, txnid_set *conflicts,
-                                bool big_txn) {
+                                bool big_txn, void **lock_data) {
   return try_acquire_lock(false, txnid, left_key, right_key, conflicts,
-                          big_txn);
+                          big_txn, lock_data);
 }
 
 int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key,
                                  const DBT *right_key, txnid_set *conflicts,
-                                 bool big_txn) {
-  return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
+                                 bool big_txn, void **lock_data) {
+  return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn, lock_data);
 }
 
 // typedef void (*dump_callback)(void *cdata, const DBT *left, const DBT *right,
@@ -736,7 +760,26 @@ void locktree::release_locks(TXNID txnid, const range_buffer *ranges,
       // Range comparisons rely on this fact, so we make a paranoid invariant
       // here.
       paranoid_invariant(m_cmp(left_key, right_key) <= 0);
-      remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
+
+
+      //psergey-todo: here, check rec._header.lock_data...
+      void *lock_data= rec.get_lock_data();
+      if (lock_data == (void*)0x1) {
+         // Do nothing. Double-added nodea
+      } else if (lock_data) {
+        // psergey-todo: new removal procedure.
+        // The caller will call retry_all_lock_requests. 
+        // We just set m_is_deleted...
+        treenode *node= (treenode*)lock_data;
+        node->m_is_deleted.store(1);
+      } else {
+        // psergey-todo: this should be the exception. It should locate the
+        // node and set m_is_deleted=1...
+        fprintf(stderr, "AAAA wrong removal!\n");
+        assert(0);
+        remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
+      }
+
       iter.next();
     }
     // Increase the sto score slightly. Eventually it will hit
@@ -955,7 +998,7 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback,
                        .txnid = current_txnid,
                        .is_shared = !rec.get_exclusive_flag(),
                        .owners = nullptr};
-      insert_row_lock_into_tree(&lkr, lock, m_mgr);
+      insert_row_lock_into_tree(&lkr, lock, m_mgr, NULL);
       iter.next();
     }
 
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h
index 3e438f502..4d7d5e243 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h
@@ -309,7 +309,7 @@ class locktree {
   // note: Read locks cannot be shared between txnids, as one would expect.
   //       This is for simplicity since read locks are rare in MySQL.
   int acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
-                        txnid_set *conflicts, bool big_txn);
+                        txnid_set *conflicts, bool big_txn, void **lock_data);
 
   // effect: Attempts to grant a write lock for the range of keys between
   // [left_key, right_key]. returns: If the lock cannot be granted, return
@@ -318,7 +318,7 @@ class locktree {
   //          the range. If the locktree cannot create more locks, return
   //          TOKUDB_OUT_OF_LOCKS.
   int acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
-                         txnid_set *conflicts, bool big_txn);
+                         txnid_set *conflicts, bool big_txn, void **lock_data);
 
   // effect: populate the conflicts set with the txnids that would preventing
   //         the given txnid from getting a lock on [left_key, right_key]
@@ -540,14 +540,16 @@ class locktree {
 
   int acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
                                 const DBT *left_key, const DBT *right_key,
-                                bool is_write_request, txnid_set *conflicts);
+                                bool is_write_request, txnid_set *conflicts,
+                                void **lock_data);
 
   int acquire_lock(bool is_write_request, TXNID txnid, const DBT *left_key,
-                   const DBT *right_key, txnid_set *conflicts);
+                   const DBT *right_key, txnid_set *conflicts,
+                   void **lock_data);
 
   int try_acquire_lock(bool is_write_request, TXNID txnid, const DBT *left_key,
                        const DBT *right_key, txnid_set *conflicts,
-                       bool big_txn);
+                       bool big_txn, void **lock_data);
 
   friend class locktree_unit_test;
   friend class manager_unit_test;
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc
index 4186182be..ed2aae3d2 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/manager.cc
@@ -303,11 +303,11 @@ void locktree_manager::note_mem_released(uint64_t mem_released) {
 }
 
 bool locktree_manager::out_of_locks(void) const {
-  return m_current_lock_memory >= m_max_lock_memory;
+  return false;// m_current_lock_memory >= m_max_lock_memory;
 }
 
 bool locktree_manager::over_big_threshold(void) {
-  return m_current_lock_memory >= m_max_lock_memory / 2;
+  return false; //m_current_lock_memory >= m_max_lock_memory / 2;
 }
 
 int locktree_manager::iterate_pending_lock_requests(
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc
index 1e1d23ef8..e34728659 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.cc
@@ -72,8 +72,10 @@ bool range_buffer::record_header::right_is_infinite(void) const {
 
 void range_buffer::record_header::init(const DBT *left_key,
                                        const DBT *right_key,
-                                       bool is_exclusive) {
+                                       bool is_exclusive, 
+                                       void *lock_data_arg) {
   is_exclusive_lock = is_exclusive;
+  lock_data = lock_data_arg;
   left_neg_inf = left_key == toku_dbt_negative_infinity();
   left_pos_inf = left_key == toku_dbt_positive_infinity();
   left_key_size = toku_dbt_is_infinite(left_key) ? 0 : left_key->size;
@@ -88,6 +90,9 @@ void range_buffer::record_header::init(const DBT *left_key,
   }
 }
 
+void* range_buffer::iterator::record::get_lock_data() const {
+  return _header.lock_data;
+}
 const DBT *range_buffer::iterator::record::get_left_key(void) const {
   if (_header.left_neg_inf) {
     return toku_dbt_negative_infinity();
@@ -195,15 +200,15 @@ void range_buffer::create(void) {
 }
 
 void range_buffer::append(const DBT *left_key, const DBT *right_key,
-                          bool is_write_request) {
+                          bool is_write_request, void *lock_data) {
   // if the keys are equal, then only one copy is stored.
   if (toku_dbt_equals(left_key, right_key)) {
     invariant(left_key->size <= MAX_KEY_SIZE);
-    append_point(left_key, is_write_request);
+    append_point(left_key, is_write_request, lock_data);
   } else {
     invariant(left_key->size <= MAX_KEY_SIZE);
     invariant(right_key->size <= MAX_KEY_SIZE);
-    append_range(left_key, right_key, is_write_request);
+    append_range(left_key, right_key, is_write_request, lock_data);
   }
   _num_ranges++;
 }
@@ -219,13 +224,13 @@ int range_buffer::get_num_ranges(void) const { return _num_ranges; }
 void range_buffer::destroy(void) { _arena.destroy(); }
 
 void range_buffer::append_range(const DBT *left_key, const DBT *right_key,
-                                bool is_exclusive) {
+                                bool is_exclusive, void *lock_data) {
   size_t record_length =
       sizeof(record_header) + left_key->size + right_key->size;
   char *buf = reinterpret_cast<char *>(_arena.malloc_from_arena(record_length));
 
   record_header h;
-  h.init(left_key, right_key, is_exclusive);
+  h.init(left_key, right_key, is_exclusive, lock_data);
 
   // serialize the header
   memcpy(buf, &h, sizeof(record_header));
@@ -243,12 +248,12 @@ void range_buffer::append_range(const DBT *left_key, const DBT *right_key,
   }
 }
 
-void range_buffer::append_point(const DBT *key, bool is_exclusive) {
+void range_buffer::append_point(const DBT *key, bool is_exclusive, void *lock_data) {
   size_t record_length = sizeof(record_header) + key->size;
   char *buf = reinterpret_cast<char *>(_arena.malloc_from_arena(record_length));
 
   record_header h;
-  h.init(key, nullptr, is_exclusive);
+  h.init(key, nullptr, is_exclusive, lock_data);
 
   // serialize the header
   memcpy(buf, &h, sizeof(record_header));
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.h b/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.h
index 76e28d747..318cbef69 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.h
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/range_buffer.h
@@ -79,12 +79,13 @@ class range_buffer {
     uint16_t left_key_size;
     uint16_t right_key_size;
     bool is_exclusive_lock;
+    void *lock_data;
 
     bool left_is_infinite(void) const;
 
     bool right_is_infinite(void) const;
 
-    void init(const DBT *left_key, const DBT *right_key, bool is_exclusive);
+    void init(const DBT *left_key, const DBT *right_key, bool is_exclusive, void *lock_data_arg);
   };
   // PORT static_assert(sizeof(record_header) == 8, "record header format is
   // off");
@@ -108,6 +109,8 @@ class range_buffer {
       // get a read-only pointer to the right key of this record's range
       const DBT *get_right_key(void) const;
 
+      void *get_lock_data() const;
+
       // how big is this record? this tells us where the next record is
       size_t size(void) const;
 
@@ -150,7 +153,7 @@ class range_buffer {
   // append a left/right key range to the buffer.
   // if the keys are equal, then only one copy is stored.
   void append(const DBT *left_key, const DBT *right_key,
-              bool is_write_request = false);
+              bool is_write_request = false, void *lock_data=nullptr);
 
   // is this range buffer empty?
   bool is_empty(void) const;
@@ -168,11 +171,11 @@ class range_buffer {
   int _num_ranges;
 
   void append_range(const DBT *left_key, const DBT *right_key,
-                    bool is_write_request);
+                    bool is_write_request, void *lock_data=nullptr);
 
   // append a point to the buffer. this is the space/time saving
   // optimization for key ranges where left == right.
-  void append_point(const DBT *key, bool is_write_request);
+  void append_point(const DBT *key, bool is_write_request, void *lock_data=nullptr);
 };
 
 } /* namespace toku */
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc
index bcdaa672f..bf82e1d37 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.cc
@@ -1,5 +1,5 @@
 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+// vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
 #ifndef ROCKSDB_LITE
 #ifndef OS_WIN
 #ident "$Id$"
@@ -84,6 +84,8 @@ void treenode::init(const comparator *cmp) {
   m_is_shared = false;
   m_owners = nullptr;
 
+  m_is_deleted= false;
+
   // use an adaptive mutex at each node since we expect the time the
   // lock is held to be relatively short compared to a context switch.
   // indeed, this improves performance at high thread counts considerably.
@@ -135,6 +137,7 @@ treenode *treenode::alloc(const comparator *cmp, const keyrange &range,
 }
 
 void treenode::swap_in_place(treenode *node1, treenode *node2) {
+  assert(0);
   keyrange tmp_range = node1->m_range;
   TXNID tmp_txnid = node1->m_txnid;
   node1->m_range = node2->m_range;
@@ -230,18 +233,22 @@ treenode *treenode::find_node_with_overlapping_child(
   }
 }
 
-bool treenode::insert(const keyrange &range, TXNID txnid, bool is_shared) {
+bool treenode::insert(const keyrange &range, TXNID txnid, bool is_shared, void **lock_data) {
   int rc = true;
   // choose a child to check. if that child is null, then insert the new node
   // there. otherwise recur down that child's subtree
   keyrange::comparison c = range.compare(*m_cmp, m_range);
+  if (lock_data)
+      *lock_data= NULL;
   if (c == keyrange::comparison::LESS_THAN) {
     treenode *left_child = lock_and_rebalance_left();
     if (left_child == nullptr) {
       left_child = treenode::alloc(m_cmp, range, txnid, is_shared);
+      if (lock_data)
+         *lock_data= left_child;
       m_left_child.set(left_child);
     } else {
-      left_child->insert(range, txnid, is_shared);
+      left_child->insert(range, txnid, is_shared, lock_data);
       left_child->mutex_unlock();
     }
   } else if (c == keyrange::comparison::GREATER_THAN) {
@@ -249,9 +256,11 @@ bool treenode::insert(const keyrange &range, TXNID txnid, bool is_shared) {
     treenode *right_child = lock_and_rebalance_right();
     if (right_child == nullptr) {
       right_child = treenode::alloc(m_cmp, range, txnid, is_shared);
+      if (lock_data)
+         *lock_data= right_child;
       m_right_child.set(right_child);
     } else {
-      right_child->insert(range, txnid, is_shared);
+      right_child->insert(range, txnid, is_shared, lock_data);
       right_child->mutex_unlock();
     }
   } else if (c == keyrange::comparison::EQUALS) {
@@ -286,6 +295,13 @@ treenode *treenode::find_rightmost_child(treenode **parent) {
   return find_child_at_extreme(1, parent);
 }
 
+/*
+   psergey: remove this subtree.
+   Entry: this node is locked (if it wasn't, it wouldn't be possible to return its value)
+   Return:
+     the node that should be put in place of this node.
+     (at the moment, return value is either nullptr or "this")
+*/
 treenode *treenode::remove_root_of_subtree() {
   // if this node has no children, just free it and return null
   if (m_left_child.ptr == nullptr && m_right_child.ptr == nullptr) {
@@ -335,6 +351,64 @@ treenode *treenode::remove_root_of_subtree() {
   return this;
 }
 
+// psergey: this should remove this node but without use of swap_in_place
+// operations.
+//
+//  @return
+//   The node that should be put instead of this node. can be NULL.
+
+treenode *treenode::remove_root_of_subtree2() {
+  // if this node has no children, just free it and return null
+  if (m_left_child.ptr == nullptr && m_right_child.ptr == nullptr) {
+    // treenode::free requires that non-root nodes are unlocked
+    if (!is_root()) {
+      mutex_unlock();
+    }
+    treenode::free(this);
+    return nullptr;
+  }
+
+  // we have a child, so get either the in-order successor or
+  // predecessor of this node to be our replacement.
+  // replacement_parent is updated by the find functions as
+  // they recur down the tree, so initialize it to this.
+  treenode *child, *replacement;
+  treenode *replacement_parent = this;
+  if (m_left_child.ptr != nullptr) {
+    child = m_left_child.get_locked();
+    replacement = child->find_rightmost_child(&replacement_parent);
+    invariant(replacement == child || replacement_parent != this);
+
+    // detach the replacement from its parent
+    if (replacement_parent == this) {
+      m_left_child = replacement->m_left_child;
+    } else {
+      replacement_parent->m_right_child = replacement->m_left_child;
+    }
+  } else {
+    child = m_right_child.get_locked();
+    replacement = child->find_leftmost_child(&replacement_parent);
+    invariant(replacement == child || replacement_parent != this);
+
+    // detach the replacement from its parent
+    if (replacement_parent == this) {
+      m_right_child = replacement->m_right_child;
+    } else {
+      replacement_parent->m_left_child = replacement->m_right_child;
+    }
+  }
+  child->mutex_unlock();
+
+  
+  //DONT:
+  // swap in place with the detached replacement, then destroy it
+  //treenode::swap_in_place(replacement, this);
+  //treenode::free(replacement);
+
+  return replacement;
+}
+
+
 void treenode::recursive_remove(void) {
   treenode *left = m_left_child.ptr;
   if (left) {
@@ -365,7 +439,7 @@ void treenode::remove_shared_owner(TXNID txnid) {
     m_owners = nullptr;
   }
 }
-
+// psergey: return the node that should be put in place of this node...
 treenode *treenode::remove(const keyrange &range, TXNID txnid) {
   treenode *child;
   // if the range is equal to this node's range, then just remove
@@ -492,26 +566,73 @@ treenode *treenode::maybe_rebalance(void) {
   return new_root;
 }
 
+treenode *maybe_delete(treenode *node);
+
 treenode *treenode::lock_and_rebalance_left(void) {
   treenode *child = m_left_child.get_locked();
+
   if (child) {
-    treenode *new_root = child->maybe_rebalance();
-    m_left_child.set(new_root);
-    child = new_root;
+    child = maybe_delete(child);
+    
+    if (child) {
+      treenode *new_root = child->maybe_rebalance();
+      if (new_root != child)
+        child= maybe_delete(new_root);
+    }
   }
+  
+  m_left_child.set(child);
   return child;
 }
 
 treenode *treenode::lock_and_rebalance_right(void) {
   treenode *child = m_right_child.get_locked();
+
   if (child) {
-    treenode *new_root = child->maybe_rebalance();
-    m_right_child.set(new_root);
-    child = new_root;
+    child = maybe_delete(child);
+
+    if (child) {
+      treenode *new_root = child->maybe_rebalance();
+      if (new_root != child)
+        child= maybe_delete(new_root);
+    }
   }
+
+  m_right_child.set(child);
   return child;
 }
 
+/*
+  Lazily remove the node.
+  
+  @param
+    node.  The node is locked. Its parent is also locked so we are allowed 
+           to modify it.
+           Nobody's waiting on the node's mutex (because we've locked the parent,
+           they are waiting on its parent)
+
+  @return
+     The same node that passed.
+
+     The node to put instead of the removed node. Can return NULL.
+     The returned node must be locked.
+*/
+
+treenode *maybe_delete(treenode *node) {
+
+  while (node) {
+
+    if (!node->m_is_deleted.load())
+      return node; // Not deleted. Do nothing
+
+    node = node->remove_root_of_subtree2();
+    // Remove this node and return a tree made of its children...
+    if (node)
+     node->mutex_lock();
+  }
+  return node;
+}
+
 void treenode::child_ptr::set(treenode *node) {
   ptr = node;
   depth_est = ptr ? ptr->get_depth_estimate() : 0;
diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.h b/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.h
index ec25a8c58..457e0d2af 100644
--- a/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.h
+++ b/utilities/transactions/lock/range/range_tree/lib/locktree/treenode.h
@@ -54,6 +54,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
 #pragma once
 
 #include <string.h>
+#include <atomic>
 
 #include "../ft/comparator.h"
 #include "../portability/memory.h"
@@ -161,7 +162,7 @@ class treenode {
 
   // effect: inserts the given range and txnid into a subtree, recursively
   // requires: range does not overlap with any node below the subtree
-  bool insert(const keyrange &range, TXNID txnid, bool is_shared);
+  bool insert(const keyrange &range, TXNID txnid, bool is_shared, void **lock_data);
 
   // effect: removes the given range from the subtree
   // requires: range exists in the subtree
@@ -172,6 +173,8 @@ class treenode {
   // requires: every node at and below this node is unlocked
   void recursive_remove(void);
 
+  std::atomic<bool> m_is_deleted;
+
  private:
   // the child_ptr is a light abstraction for the locking of
   // a child and the maintenence of its depth estimate.
@@ -259,6 +262,8 @@ class treenode {
   // returns: the new root of the subtree
   treenode *remove_root_of_subtree(void);
 
+  treenode *remove_root_of_subtree2(void);
+
   // requires: subtree is non-empty, direction is not 0
   // returns: the child of the subtree at either the left or rightmost extreme
   treenode *find_child_at_extreme(int direction, treenode **parent);
@@ -297,6 +302,8 @@ class treenode {
   static void swap_in_place(treenode *node1, treenode *node2);
 
   friend class concurrent_tree_unit_test;
+
+  friend treenode *maybe_delete(treenode *node);
 };
 
 } /* namespace toku */
diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc
index 1c07f0992..9cfff4968 100644
--- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc
+++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc
@@ -67,7 +67,8 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn,
                                      uint32_t column_family_id,
                                      const Endpoint& start_endp,
                                      const Endpoint& end_endp, Env*,
-                                     bool exclusive) {
+                                     bool exclusive, 
+                                     void **lock_data) {
   toku::lock_request request;
   request.create(mutex_factory_);
   DBT start_key_dbt, end_key_dbt;
@@ -134,6 +135,9 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn,
   request.destroy();
   switch (r) {
     case 0:
+      //
+      if (lock_data)
+         *lock_data= request.acquired_lock_node;
       break;  // fall through
     case DB_LOCK_NOTGRANTED:
       return Status::TimedOut(Status::SubCode::kLockTimeout);
@@ -363,6 +367,24 @@ void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) {
     cmp.destroy();
 
     ltree_map_.insert({column_family_id, MakeLockTreePtr(ltree)});
+
+    //if (column_family_id) 
+    {
+      const char *b="Fake-Root";
+      std::string key;
+      serialize_endpoint(Endpoint(Slice(b, strlen(b)), false), &key);
+      DBT key_dbt;
+      toku_fill_dbt(&key_dbt, key.data(), key.size());
+      TXNID dummy_txn_id(123);
+      void *dummy;
+      txnid_set conflicts; 
+      conflicts.create();
+
+      ltree->acquire_write_lock(dummy_txn_id, &key_dbt, &key_dbt, &conflicts,
+                                false/*big_txn*/, &dummy);
+      
+      conflicts.destroy();
+    }
   }
 }
 
diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h
index 5d55ded02..5df3de0e2 100644
--- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h
+++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h
@@ -44,7 +44,7 @@ class RangeTreeLockManager : public RangeLockManagerBase,
   using LockManager::TryLock;
   Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id,
                  const Endpoint& start_endp, const Endpoint& end_endp, Env* env,
-                 bool exclusive) override;
+                 bool exclusive, void **lock_data) override;
 
   void UnLock(PessimisticTransaction* txn, const LockTracker& tracker,
               Env* env) override;
diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc
index d138ed91f..0ac19e926 100644
--- a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc
+++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc
@@ -26,7 +26,7 @@ void RangeTreeLockTracker::Track(const PointLockRequest &lock_req) {
   serialize_endpoint(Endpoint(lock_req.key, false), &key);
   toku_fill_dbt(&key_dbt, key.data(), key.size());
   RangeLockList *rl = getOrCreateList();
-  rl->Append(lock_req.column_family_id, &key_dbt, &key_dbt);
+  rl->Append(lock_req.column_family_id, &key_dbt, &key_dbt, lock_req.lock_data);
 }
 
 void RangeTreeLockTracker::Track(const RangeLockRequest &lock_req) {
@@ -40,7 +40,7 @@ void RangeTreeLockTracker::Track(const RangeLockRequest &lock_req) {
   toku_fill_dbt(&end_dbt, end_key.data(), end_key.size());
 
   RangeLockList *rl = getOrCreateList();
-  rl->Append(lock_req.column_family_id, &start_dbt, &end_dbt);
+  rl->Append(lock_req.column_family_id, &start_dbt, &end_dbt, lock_req.lock_data);
 }
 
 PointLockStatus RangeTreeLockTracker::GetPointLockStatus(
@@ -58,7 +58,7 @@ PointLockStatus RangeTreeLockTracker::GetPointLockStatus(
 void RangeTreeLockTracker::Clear() { range_list_.reset(); }
 
 void RangeLockList::Append(ColumnFamilyId cf_id, const DBT *left_key,
-                           const DBT *right_key) {
+                           const DBT *right_key, void *lock_data) {
   MutexLock l(&mutex_);
   // Only the transaction owner thread calls this function.
   // The same thread does the lock release, so we can be certain nobody is
@@ -70,7 +70,7 @@ void RangeLockList::Append(ColumnFamilyId cf_id, const DBT *left_key,
     it = buffers_.emplace(cf_id, std::make_shared<toku::range_buffer>()).first;
     it->second->create();
   }
-  it->second->append(left_key, right_key);
+  it->second->append(left_key, right_key, true /*is_write_request*/, lock_data);
 }
 
 void RangeLockList::ReleaseLocks(RangeTreeLockManager *mgr,
diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h
index 4ef48d252..08bd1d17f 100644
--- a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h
+++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h
@@ -40,7 +40,7 @@ class RangeLockList {
 
   RangeLockList() : releasing_locks_(false) {}
 
-  void Append(ColumnFamilyId cf_id, const DBT* left_key, const DBT* right_key);
+  void Append(ColumnFamilyId cf_id, const DBT* left_key, const DBT* right_key, void *lock_data);
   void ReleaseLocks(RangeTreeLockManager* mgr, PessimisticTransaction* txn,
                     bool all_trx_locks);
   void ReplaceLocks(const toku::locktree* lt, const toku::range_buffer& buffer);
diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc
index 7d4276972..166ea56f7 100644
--- a/utilities/transactions/pessimistic_transaction.cc
+++ b/utilities/transactions/pessimistic_transaction.cc
@@ -581,8 +581,9 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
 
   // Lock this key if this transactions hasn't already locked it or we require
   // an upgrade.
+  void *lock_data= NULL;
   if (!previously_locked || lock_upgrade) {
-    s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
+    s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive, &lock_data);
   }
 
   SetSnapshotIfNeeded();
@@ -651,7 +652,7 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
     // setting, and at a lower sequence number, so skipping here should be
     // safe.
     if (!assume_tracked) {
-      TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
+      TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive, lock_data);
     } else {
 #ifndef NDEBUG
       if (tracked_locks_->IsPointLockSupported()) {
@@ -674,11 +675,11 @@ Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family,
   ColumnFamilyHandle* cfh =
       column_family ? column_family : db_impl_->DefaultColumnFamily();
   uint32_t cfh_id = GetColumnFamilyID(cfh);
-
-  Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp);
+  void *lock_data;
+  Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp, &lock_data);
 
   if (s.ok()) {
-    RangeLockRequest req{cfh_id, start_endp, end_endp};
+    RangeLockRequest req{cfh_id, start_endp, end_endp, lock_data};
     tracked_locks_->Track(req);
   }
   return s;
diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc
index 2c69f7359..7813374c3 100644
--- a/utilities/transactions/pessimistic_transaction_db.cc
+++ b/utilities/transactions/pessimistic_transaction_db.cc
@@ -387,16 +387,18 @@ Status PessimisticTransactionDB::DropColumnFamily(
 Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
                                          uint32_t cfh_id,
                                          const std::string& key,
-                                         bool exclusive) {
-  return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
+                                         bool exclusive,
+                                         void **lock_data) {
+  return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive, lock_data);
 }
 
 Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
                                               uint32_t cfh_id,
                                               const Endpoint& start_endp,
-                                              const Endpoint& end_endp) {
+                                              const Endpoint& end_endp,
+                                              void **lock_data) {
   return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
-                                /*exclusive=*/true);
+                                /*exclusive=*/true, lock_data);
 }
 
 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h
index eb0dd2f05..24db46cac 100644
--- a/utilities/transactions/pessimistic_transaction_db.h
+++ b/utilities/transactions/pessimistic_transaction_db.h
@@ -98,9 +98,10 @@ class PessimisticTransactionDB : public TransactionDB {
   virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
 
   Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
-                 const std::string& key, bool exclusive);
+                 const std::string& key, bool exclusive, void **lock_data=nullptr);
   Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
-                      const Endpoint& start_endp, const Endpoint& end_endp);
+                      const Endpoint& start_endp, const Endpoint& end_endp,
+                      void **lock_data=nullptr);
 
   void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
   void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc
index 49fa99d7d..055ea3723 100644
--- a/utilities/transactions/transaction_base.cc
+++ b/utilities/transactions/transaction_base.cc
@@ -555,13 +555,14 @@ uint64_t TransactionBaseImpl::GetNumKeys() const {
 
 void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
                                    SequenceNumber seq, bool read_only,
-                                   bool exclusive) {
+                                   bool exclusive, void *lock_data) {
   PointLockRequest r;
   r.column_family_id = cfh_id;
   r.key = key;
   r.seq = seq;
   r.read_only = read_only;
   r.exclusive = exclusive;
+  r.lock_data = lock_data;
 
   // Update map of all tracked keys for this transaction
   tracked_locks_->Track(r);
diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h
index 2c5770d8a..76d24ca8f 100644
--- a/utilities/transactions/transaction_base.h
+++ b/utilities/transactions/transaction_base.h
@@ -258,7 +258,7 @@ class TransactionBaseImpl : public Transaction {
   // seqno is the earliest seqno this key was involved with this transaction.
   // readonly should be set to true if no data was written for this key
   void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
-                bool readonly, bool exclusive);
+                bool readonly, bool exclusive, void *lock_data=nullptr);
 
   // Called when UndoGetForUpdate determines that this key can be unlocked.
   virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,


More information about the commits mailing list