[Commits] c15474bf4: Backport of: Initial support for shared point locks, support lock escalations

psergey sergey at mariadb.com
Sun Nov 24 16:03:40 EET 2019


revision-id: c15474bf496a227c6247e2e56040009c55c4bbc0 (v5.8-1895-gc15474bf4)
parent(s): 156a12604b830df3765a12d7194ca5f9ca7d67e6
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-11-24 17:03:40 +0300
message:

Backport of: Initial support for shared point locks, support lock escalations

- Locks can now be shared.
- Sharing only supported as long as the locked ranges are an exact match.
  If this requirement is not met, the locks behave as exclusive locks.

- Make Lock Escalation keep shared locks. Shared locks are not collapsed
  with other kinds of locks.
- Replace RangeLockMgrHandle::get_escalation_count() with GetStatus()
  which also reports amount of memory used for Range Locking (and there
  is more data we could report through this)
- Initialize LTM_STATUS_S::m_initialized.

---
 include/rocksdb/utilities/transaction_db.h         |   9 +-
 utilities/transactions/range_locking/db.h          |   3 +
 .../transactions/range_locking/ft/ft-status.h      |   2 +-
 .../range_locking/locktree/concurrent_tree.cc      |  17 +-
 .../range_locking/locktree/concurrent_tree.h       |  18 +-
 .../range_locking/locktree/locktree.cc             | 240 +++++++++++++++++----
 .../transactions/range_locking/locktree/locktree.h |  19 +-
 .../range_locking/locktree/range_buffer.cc         |  20 +-
 .../range_locking/locktree/range_buffer.h          |  15 +-
 .../range_locking/locktree/treenode.cc             |  84 ++++++--
 .../transactions/range_locking/locktree/treenode.h |  30 ++-
 .../range_locking/portability/txn_subst.h          |  16 ++
 utilities/transactions/transaction_lock_mgr.cc     |  42 ++--
 utilities/transactions/transaction_lock_mgr.h      |   6 +-
 14 files changed, 410 insertions(+), 111 deletions(-)

diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h
index 19e6d2411..bb13bba07 100644
--- a/include/rocksdb/utilities/transaction_db.h
+++ b/include/rocksdb/utilities/transaction_db.h
@@ -56,7 +56,14 @@ class LockManagerHandle {
 class RangeLockMgrHandle : public LockManagerHandle {
  public:
   virtual int set_max_lock_memory(size_t max_lock_memory) = 0;
-  virtual uint64_t get_escalation_count() = 0;
+
+  class Counters {
+   public:
+    uint64_t escalation_count;
+    uint64_t current_lock_memory;
+  };
+
+  virtual Counters GetStatus() = 0;
   virtual ~RangeLockMgrHandle() {};
 };
 
diff --git a/utilities/transactions/range_locking/db.h b/utilities/transactions/range_locking/db.h
index f9349c6ae..64ea28345 100644
--- a/utilities/transactions/range_locking/db.h
+++ b/utilities/transactions/range_locking/db.h
@@ -6,6 +6,8 @@
 
 typedef struct __toku_db DB;
 typedef struct __toku_dbt DBT;
+
+// port: this is currently not used
 struct simple_dbt {
     uint32_t len;
     void     *data;
@@ -72,6 +74,7 @@ struct __toku_dbt {
   void*data;
   uint32_t size;
   uint32_t ulen;
+  // One of DB_DBT_XXX flags
   uint32_t flags;
 };
 typedef struct __toku_descriptor {
diff --git a/utilities/transactions/range_locking/ft/ft-status.h b/utilities/transactions/range_locking/ft/ft-status.h
index 25051f1ed..242964f0b 100644
--- a/utilities/transactions/range_locking/ft/ft-status.h
+++ b/utilities/transactions/range_locking/ft/ft-status.h
@@ -80,7 +80,7 @@ public:
     TOKU_ENGINE_STATUS_ROW_S status[LTM_STATUS_NUM_ROWS];
 
 private:
-    bool m_initialized;
+    bool m_initialized = false;
 };
 typedef  LTM_STATUS_S* LTM_STATUS;
 extern LTM_STATUS_S ltm_status;
diff --git a/utilities/transactions/range_locking/locktree/concurrent_tree.cc b/utilities/transactions/range_locking/locktree/concurrent_tree.cc
index a35a9e40b..74d65f710 100644
--- a/utilities/transactions/range_locking/locktree/concurrent_tree.cc
+++ b/utilities/transactions/range_locking/locktree/concurrent_tree.cc
@@ -97,6 +97,12 @@ void concurrent_tree::locked_keyrange::acquire(const keyrange &range) {
     m_subtree = subtree;
 }
 
+void concurrent_tree::locked_keyrange::add_shared_owner(const keyrange &range,
+                                                        TXNID new_owner)
+{
+    m_subtree->insert(range, new_owner, /*is_shared*/ true);
+}
+
 void concurrent_tree::locked_keyrange::release(void) {
     m_subtree->mutex_unlock();
 }
@@ -110,18 +116,19 @@ void concurrent_tree::locked_keyrange::iterate(F *function) const {
     }
 }
 
-void concurrent_tree::locked_keyrange::insert(const keyrange &range, TXNID txnid) {
+void concurrent_tree::locked_keyrange::insert(const keyrange &range,
+                                              TXNID txnid, bool is_shared) {
     // empty means no children, and only the root should ever be empty
     if (m_subtree->is_empty()) {
-        m_subtree->set_range_and_txnid(range, txnid);
+        m_subtree->set_range_and_txnid(range, txnid, is_shared);
     } else {
-        m_subtree->insert(range, txnid);
+        m_subtree->insert(range, txnid, is_shared);
     }
 }
 
-void concurrent_tree::locked_keyrange::remove(const keyrange &range) {
+void concurrent_tree::locked_keyrange::remove(const keyrange &range, TXNID txnid) {
     invariant(!m_subtree->is_empty());
-    treenode *new_subtree = m_subtree->remove(range);
+    treenode *new_subtree = m_subtree->remove(range, txnid);
     // if removing range changed the root of the subtree,
     // then the subtree must be the root of the entire tree.
     if (new_subtree == nullptr) {
diff --git a/utilities/transactions/range_locking/locktree/concurrent_tree.h b/utilities/transactions/range_locking/locktree/concurrent_tree.h
index 66a7ff176..fabda7294 100644
--- a/utilities/transactions/range_locking/locktree/concurrent_tree.h
+++ b/utilities/transactions/range_locking/locktree/concurrent_tree.h
@@ -106,15 +106,25 @@ public:
         template <class F>
         void iterate(F *function) const;
 
+        // Adds another owner to the lock on the specified keyrange.
+        // requires: the keyrange contains one treenode whose bounds are
+        //           exactly equal to the specifed range (no sub/supersets)
+        void add_shared_owner(const keyrange &range, TXNID new_owner);
+
         // inserts the given range into the tree, with an associated txnid.
         // requires: range does not overlap with anything in this locked_keyrange
         // rationale: caller is responsible for only inserting unique ranges
-        void insert(const keyrange &range, TXNID txnid);
-
-        // effect: removes the given range from the tree
+        void insert(const keyrange &range, TXNID txnid, bool is_shared);
+
+        // effect: removes the given range from the tree.
+        //         - txnid=TXNID_ANY means remove the range no matter what its
+        //           owners are
+        //         - Other value means remove the specified txnid from
+        //           ownership (if the range has other owners, it will remain
+        //           in the tree)
         // requires: range exists exactly in this locked_keyrange
         // rationale: caller is responsible for only removing existing ranges
-        void remove(const keyrange &range);
+        void remove(const keyrange &range, TXNID txnid);
 
         // effect: removes all of the keys represented by this locked keyrange
         // rationale: we'd like a fast way to empty out a tree
diff --git a/utilities/transactions/range_locking/locktree/locktree.cc b/utilities/transactions/range_locking/locktree/locktree.cc
index 9b530c7b0..00ce5aace 100644
--- a/utilities/transactions/range_locking/locktree/locktree.cc
+++ b/utilities/transactions/range_locking/locktree/locktree.cc
@@ -147,6 +147,8 @@ uint32_t locktree::get_reference_count(void) {
 struct row_lock {
     keyrange range;
     TXNID txnid;
+    bool is_shared;
+    TxnidVector *owners;
 };
 
 // iterate over a locked keyrange and copy out all of the data,
@@ -157,8 +159,10 @@ static void iterate_and_get_overlapping_row_locks(const concurrent_tree::locked_
                                                   GrowableArray<row_lock> *row_locks) {
     struct copy_fn_obj {
         GrowableArray<row_lock> *row_locks;
-        bool fn(const keyrange &range, TXNID txnid) {
-            row_lock lock = { .range = range, .txnid = txnid };
+        bool fn(const keyrange &range, TXNID txnid, bool is_shared,
+                TxnidVector *owners) {
+            row_lock lock = { .range = range, .txnid = txnid,
+                              .is_shared = is_shared, .owners = owners};
             row_locks->push(lock);
             return true;
         }
@@ -196,9 +200,10 @@ static uint64_t row_lock_size_in_tree(const row_lock &lock) {
 // remove and destroy the given row lock from the locked keyrange,
 // then notify the memory tracker of the newly freed lock.
 static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
-                                      const row_lock &lock, locktree_manager *mgr) {
+                                      const row_lock &lock, TXNID txnid,
+                                      locktree_manager *mgr) {
     const uint64_t mem_released = row_lock_size_in_tree(lock);
-    lkr->remove(lock.range);
+    lkr->remove(lock.range, txnid);
     if (mgr != nullptr) {
         mgr->note_mem_released(mem_released);
     }
@@ -209,7 +214,7 @@ static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
 static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
                                       const row_lock &lock, locktree_manager *mgr) {
     uint64_t mem_used = row_lock_size_in_tree(lock);
-    lkr->insert(lock.range, lock.txnid);
+    lkr->insert(lock.range, lock.txnid, lock.is_shared);
     if (mgr != nullptr) {
         mgr->note_mem_used(mem_used);
     }
@@ -221,13 +226,17 @@ void locktree::sto_begin(TXNID txnid) {
     m_sto_txnid = txnid;
 }
 
-void locktree::sto_append(const DBT *left_key, const DBT *right_key) {
+void locktree::sto_append(const DBT *left_key, const DBT *right_key,
+                          bool is_write_request) {
     uint64_t buffer_mem, delta;
+
+    // psergey: the below two lines do not make any sense
+    // (and it's the same in upstream TokuDB)
     keyrange range;
     range.create(left_key, right_key);
 
     buffer_mem = m_sto_buffer.total_memory_size();
-    m_sto_buffer.append(left_key, right_key);
+    m_sto_buffer.append(left_key, right_key, is_write_request);
     delta = m_sto_buffer.total_memory_size() - buffer_mem;
     if (m_mgr != nullptr) {
         m_mgr->note_mem_used(delta);
@@ -274,8 +283,10 @@ void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
     range_buffer::iterator::record rec;
     while (iter.current(&rec)) {
         sto_lkr.prepare(&sto_rangetree);
-        int r = acquire_lock_consolidated(&sto_lkr,
-                m_sto_txnid, rec.get_left_key(), rec.get_right_key(), nullptr);
+        int r = acquire_lock_consolidated(&sto_lkr, m_sto_txnid,
+                                          rec.get_left_key(),
+                                          rec.get_right_key(),
+                                          rec.get_exclusive_flag(), nullptr);
         invariant_zero(r);
         sto_lkr.release();
         iter.next();
@@ -285,8 +296,10 @@ void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
     // locktree's rangetree, on behalf of the old single txnid.
     struct migrate_fn_obj {
         concurrent_tree::locked_keyrange *dst_lkr;
-        bool fn(const keyrange &range, TXNID txnid) {
-            dst_lkr->insert(range, txnid);
+        bool fn(const keyrange &range, TXNID txnid, bool is_shared,
+                TxnidVector *owners) {
+            assert(owners == nullptr);
+            dst_lkr->insert(range, txnid, is_shared);
             return true;
         }
     } migrate_fn;
@@ -301,7 +314,8 @@ void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
 
 bool locktree::sto_try_acquire(void *prepared_lkr,
                                TXNID txnid,
-                               const DBT *left_key, const DBT *right_key) {
+                               const DBT *left_key, const DBT *right_key,
+                               bool is_write_request) {
     if (m_rangetree->is_empty() && m_sto_buffer.is_empty() && toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
         // We can do the optimization because the rangetree is empty, and
         // we know its worth trying because the sto score is big enough.
@@ -319,7 +333,7 @@ bool locktree::sto_try_acquire(void *prepared_lkr,
     // this txnid can append its lock to the sto buffer successfully.
     if (m_sto_txnid != TXNID_NONE) {
         invariant(m_sto_txnid == txnid);
-        sto_append(left_key, right_key);
+        sto_append(left_key, right_key, is_write_request);
         return true;
     } else {
         invariant(m_sto_buffer.is_empty());
@@ -327,12 +341,66 @@ bool locktree::sto_try_acquire(void *prepared_lkr,
     }
 }
 
+
+/*
+  Do the same as iterate_and_get_overlapping_row_locks does, but also check for
+  this:
+    The set of overlapping rows locks consists of just one read-only shared
+    lock with the same endpoints as specified (in that case, we can just add 
+    ourselves into that list)
+
+  @return true - One compatible shared lock
+         false - Otherwise
+*/
+static 
+bool iterate_and_get_overlapping_row_locks2(const concurrent_tree::locked_keyrange *lkr,
+                                            const DBT *left_key, const DBT *right_key,
+                                            comparator *cmp,
+                                            TXNID txnid,
+                                            GrowableArray<row_lock> *row_locks) {
+    struct copy_fn_obj {
+        GrowableArray<row_lock> *row_locks;
+        bool first_call= true;
+        bool matching_lock_found = false;
+        const DBT *left_key, *right_key;
+        comparator *cmp;
+
+        bool fn(const keyrange &range, TXNID txnid, bool is_shared,
+                TxnidVector *owners) {
+            
+            if (first_call) {
+              first_call = false;
+              if (is_shared &&
+                 !(*cmp)(left_key,  range.get_left_key()) &&
+                 !(*cmp)(right_key, range.get_right_key())) {
+                matching_lock_found = true;
+              }
+            } else {
+              // if we see multiple matching locks, it doesn't matter whether
+              // the first one was matching.
+              matching_lock_found = false; 
+            }
+            row_lock lock = { .range = range, .txnid = txnid, 
+                              .is_shared = is_shared, .owners = owners };
+            row_locks->push(lock);
+            return true;
+        }
+    } copy_fn;
+    copy_fn.row_locks = row_locks;
+    copy_fn.left_key = left_key;
+    copy_fn.right_key = right_key;
+    copy_fn.cmp = cmp;
+    lkr->iterate(&copy_fn);
+    return copy_fn.matching_lock_found;
+}
+  
 // try to acquire a lock and consolidate it with existing locks if possible
 // param: lkr, a prepared locked keyrange
 // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
 int locktree::acquire_lock_consolidated(void *prepared_lkr,
                                         TXNID txnid,
                                         const DBT *left_key, const DBT *right_key,
+                                        bool is_write_request,
                                         txnid_set *conflicts) {
     int r = 0;
     concurrent_tree::locked_keyrange *lkr;
@@ -345,24 +413,60 @@ int locktree::acquire_lock_consolidated(void *prepared_lkr,
     // copy out the set of overlapping row locks.
     GrowableArray<row_lock> overlapping_row_locks;
     overlapping_row_locks.init();
-    iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
+    bool matching_shared_lock_found= false;
+
+    if (is_write_request)
+      iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
+    else {
+      matching_shared_lock_found= 
+        iterate_and_get_overlapping_row_locks2(lkr, left_key, right_key, &m_cmp,
+                                               txnid, &overlapping_row_locks);
+      // psergey-todo: what to do now? So, we have figured we have just one
+      // shareable lock. Need to add us into it as an owner but the lock
+      // pointer cannot be kept?
+      // A: use find_node_with_overlapping_child(key_range, nullptr);
+      //  then, add ourselves to the owner list.
+      // Dont' foreget to release the subtree after that.
+    }
+
+    if (matching_shared_lock_found) {
+        // there is just one non-confliting matching shared lock.
+        //  we are hilding a lock on it (see acquire() call above).
+        //  we need to modify it to indicate there is another locker...
+        lkr->add_shared_owner(requested_range, txnid);
+
+        // Pretend shared lock uses as much memory.
+        row_lock new_lock = { .range = requested_range, .txnid = txnid,
+                              .is_shared = false, .owners = nullptr };
+        uint64_t mem_used = row_lock_size_in_tree(new_lock);
+        if (m_mgr) {
+            m_mgr->note_mem_used(mem_used);
+        }
+        return 0;
+    }
+
+
     size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
 
     // if any overlapping row locks conflict with this request, bail out.
+
     bool conflicts_exist = determine_conflicting_txnids(overlapping_row_locks,
                                                         txnid, conflicts);
     if (!conflicts_exist) {
         // there are no conflicts, so all of the overlaps are for the requesting txnid.
         // so, we must consolidate all existing overlapping ranges and the requested
         // range into one dominating range. then we insert the dominating range.
+        bool all_shared = !is_write_request;
         for (size_t i = 0; i < num_overlapping_row_locks; i++) {
             row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
             invariant(overlapping_lock.txnid == txnid);
             requested_range.extend(m_cmp, overlapping_lock.range);
-            remove_row_lock_from_tree(lkr, overlapping_lock, m_mgr);
+            remove_row_lock_from_tree(lkr, overlapping_lock, TXNID_ANY, m_mgr);
+            all_shared = all_shared && overlapping_lock.is_shared;
         }
 
-        row_lock new_lock = { .range = requested_range, .txnid = txnid };
+        row_lock new_lock = { .range = requested_range, .txnid = txnid,
+                              .is_shared = all_shared, .owners = nullptr };
         insert_row_lock_into_tree(lkr, new_lock, m_mgr);
     } else {
         r = DB_LOCK_NOTGRANTED;
@@ -383,7 +487,7 @@ int locktree::acquire_lock(bool is_write_request,
     int r = 0;
 
     // we are only supporting write locks for simplicity
-    invariant(is_write_request);
+    //invariant(is_write_request);
 
     // acquire and prepare a locked keyrange over the requested range.
     // prepare is a serialzation point, so we take the opportunity to
@@ -391,9 +495,11 @@ int locktree::acquire_lock(bool is_write_request,
     concurrent_tree::locked_keyrange lkr;
     lkr.prepare(m_rangetree);
 
-    bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key);
+    bool acquired = sto_try_acquire(&lkr, txnid, left_key, right_key, 
+                                    is_write_request);
     if (!acquired) {
-        r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key, conflicts);
+        r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key,
+                                      is_write_request, conflicts);
     }
 
     lkr.release();
@@ -418,7 +524,7 @@ int locktree::try_acquire_lock(bool is_write_request,
 // the locktree silently upgrades read locks to write locks for simplicity
 int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
                                 txnid_set *conflicts, bool big_txn) {
-    return acquire_write_lock(txnid, left_key, right_key, conflicts, big_txn);
+    return try_acquire_lock(false, txnid, left_key, right_key, conflicts, big_txn);
 }
 
 int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key, const DBT *right_key,
@@ -447,7 +553,9 @@ void locktree::dump_locks(void *cdata, dump_callback cb)
         (*cb)(cdata, 
               lock.range.get_left_key(), 
               lock.range.get_right_key(), 
-              lock.txnid);
+              lock.txnid,
+              lock.is_shared,
+              lock.owners);
     }
     lkr.release();
     all_locks.deinit();
@@ -525,8 +633,11 @@ void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
         row_lock lock = overlapping_row_locks.fetch_unchecked(i);
         // If this isn't our lock, that's ok, just don't remove it.
         // See rationale above.
-        if (lock.txnid == txnid) {
-            remove_row_lock_from_tree(&lkr, lock, m_mgr);
+        // psergey-todo: for shared locks, just remove ourselves from the
+        //               owners.
+        if (lock.txnid == txnid ||
+            (lock.owners && lock.owners->contains(txnid))) {
+            remove_row_lock_from_tree(&lkr, lock, txnid, m_mgr);
         }
     }
 
@@ -630,11 +741,17 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
         int num_extracted;
         int num_to_extract;
         row_lock *row_locks;
-        bool fn(const keyrange &range, TXNID txnid) {
+        bool fn(const keyrange &range, TXNID txnid, bool is_shared, TxnidVector *owners) {
             if (num_extracted < num_to_extract) {
                 row_lock lock;
                 lock.range.create_copy(range);
                 lock.txnid = txnid;
+                lock.is_shared= is_shared;
+                // deep-copy the set of owners:
+                if (owners)
+                    lock.owners = new TxnidVector(*owners);
+                else
+                    lock.owners = nullptr;
                 row_locks[num_extracted++] = lock;
                 return true;
             } else {
@@ -655,7 +772,7 @@ static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
     int num_extracted = extract_fn.num_extracted;
     invariant(num_extracted <= num_to_extract);
     for (int i = 0; i < num_extracted; i++) {
-        remove_row_lock_from_tree(lkr, row_locks[i], mgr);
+        remove_row_lock_from_tree(lkr, row_locks[i], TXNID_ANY, mgr);
     }
 
     return num_extracted;
@@ -722,38 +839,60 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_esca
             // through them and merge adjacent locks with the same txnid into
             // one dominating lock and save it to a set of escalated locks.
             //
-            // first, find the index of the next row lock with a different txnid
+            // first, find the index of the next row lock that
+            //  - belongs to a different txnid, or
+            //  - belongs to several txnids, or
+            //  - is a shared lock (we could potentially merge those but
+            //    currently we don't)
             int next_txnid_index = current_index + 1;
+
             while (next_txnid_index < num_extracted &&
-                    extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) {
+                   (extracted_buf[current_index].txnid ==
+                    extracted_buf[next_txnid_index].txnid) &&
+                   !extracted_buf[next_txnid_index].is_shared &&
+                   !extracted_buf[next_txnid_index].owners) {
                 next_txnid_index++;
             }
 
             // Create an escalated range for the current txnid that dominates
             // each range between the current indext and the next txnid's index.
-            const TXNID current_txnid = extracted_buf[current_index].txnid;
+            //const TXNID current_txnid = extracted_buf[current_index].txnid;
             const DBT *escalated_left_key = extracted_buf[current_index].range.get_left_key(); 
             const DBT *escalated_right_key = extracted_buf[next_txnid_index - 1].range.get_right_key();
 
             // Try to find a range buffer for the current txnid. Create one if it doesn't exist.
             // Then, append the new escalated range to the buffer.
-            uint32_t idx;
-            struct txnid_range_buffer *existing_range_buffer;
-            int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
-                    current_txnid,
-                    &existing_range_buffer,
-                    &idx
-                    );
-            if (r == DB_NOTFOUND) {
-                struct txnid_range_buffer *XMALLOC(new_range_buffer);
-                new_range_buffer->txnid = current_txnid;
-                new_range_buffer->buffer.create();
-                new_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
-                range_buffers.insert_at(new_range_buffer, idx);
-            } else {
-                invariant_zero(r);
-                invariant(existing_range_buffer->txnid == current_txnid);
-                existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key);
+            // (If a lock is shared by multiple txnids, append it each of txnid's lists)
+            TxnidVector *owners_ptr;
+            TxnidVector singleton_owner;
+            if (extracted_buf[current_index].owners)
+                owners_ptr = extracted_buf[current_index].owners;
+            else {
+                singleton_owner.insert(extracted_buf[current_index].txnid);
+                owners_ptr = &singleton_owner;
+            }
+
+            for (auto cur_txnid : *owners_ptr ) {
+                uint32_t idx;
+                struct txnid_range_buffer *existing_range_buffer;
+                int r = range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
+                        cur_txnid,
+                        &existing_range_buffer,
+                        &idx
+                        );
+                if (r == DB_NOTFOUND) {
+                    struct txnid_range_buffer *XMALLOC(new_range_buffer);
+                    new_range_buffer->txnid = cur_txnid;
+                    new_range_buffer->buffer.create();
+                    new_range_buffer->buffer.append(escalated_left_key, escalated_right_key,
+                                                    !extracted_buf[current_index].is_shared);
+                    range_buffers.insert_at(new_range_buffer, idx);
+                } else {
+                    invariant_zero(r);
+                    invariant(existing_range_buffer->txnid == cur_txnid);
+                    existing_range_buffer->buffer.append(escalated_left_key, escalated_right_key,
+                                                         !extracted_buf[current_index].is_shared);
+                }
             }
 
             current_index = next_txnid_index;
@@ -761,6 +900,7 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_esca
 
         // destroy the ranges copied during the extraction
         for (int i = 0; i < num_extracted; i++) {
+            delete extracted_buf[i].owners;
             extracted_buf[i].range.destroy();
         }
     }
@@ -768,6 +908,12 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_esca
 
     // Rebuild the locktree from each range in each range buffer,
     // then notify higher layers that the txnid's locks have changed.
+    //
+    // (shared locks: if a lock was initially shared between transactions TRX1,
+    //  TRX2, etc, we will now try to acquire it acting on behalf on TRX1, on
+    //  TRX2, etc.  This will succeed and an identical shared lock will be
+    //  constructed)
+
     invariant(m_rangetree->is_empty());
     const size_t num_range_buffers = range_buffers.size();
     for (size_t i = 0; i < num_range_buffers; i++) {
@@ -781,7 +927,9 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback, void *after_esca
         while (iter.current(&rec)) {
             keyrange range;
             range.create(rec.get_left_key(), rec.get_right_key());
-            row_lock lock = { .range = range, .txnid = current_txnid };
+            row_lock lock = { .range = range, .txnid = current_txnid,
+                              .is_shared= !rec.get_exclusive_flag(),
+                              .owners= nullptr };
             insert_row_lock_into_tree(&lkr, lock, m_mgr);
             iter.next();
         }
diff --git a/utilities/transactions/range_locking/locktree/locktree.h b/utilities/transactions/range_locking/locktree/locktree.h
index 5ff4f7449..e7c909be0 100644
--- a/utilities/transactions/range_locking/locktree/locktree.h
+++ b/utilities/transactions/range_locking/locktree/locktree.h
@@ -339,7 +339,10 @@ namespace toku {
         // since the lock_request object is opaque 
         struct lt_lock_request_info *get_lock_request_info(void);
 
-        typedef void (*dump_callback)(void *cdata, const DBT *left, const DBT *right, TXNID txnid);
+        typedef void (*dump_callback)(void *cdata,
+                                      const DBT *left, const DBT *right,
+                                      TXNID txnid, bool is_shared,
+                                      TxnidVector *owners);
         void dump_locks(void *cdata, dump_callback cb);
     private:
         locktree_manager *m_mgr;
@@ -360,6 +363,12 @@ namespace toku {
         void *m_userdata;
         struct lt_lock_request_info m_lock_request_info;
 
+        // psergey-todo:
+        //  Each transaction also keeps a list of ranges it has locked.
+        //  So, when a transaction is running in STO mode, two identical
+        //  lists are kept: the STO lock list and transaction's owned locks
+        //  list. Why can't we do with just one list?
+
         // The following fields and members prefixed with "sto_" are for
         // the single txnid optimization, intended to speed up the case
         // when only one transaction is using the locktree. If we know
@@ -453,7 +462,8 @@ namespace toku {
 
         // effect: append a range to the sto buffer
         // requires: m_sto_txnid is valid
-        void sto_append(const DBT *left_key, const DBT *right_key);
+        void sto_append(const DBT *left_key, const DBT *right_key,
+                        bool is_write_request);
 
         // effect: ends the single txnid optimization, releaseing any memory
         //         stored in the sto buffer, notifying the tracker, and
@@ -494,7 +504,8 @@ namespace toku {
         //        back to zero.
         // returns: true if the lock was acquired for this txnid
         bool sto_try_acquire(void *prepared_lkr, TXNID txnid,
-                const DBT *left_key, const DBT *right_key);
+                const DBT *left_key, const DBT *right_key,
+                bool is_write_request);
 
         // Effect:
         //  Provides a hook for a helgrind suppression.
@@ -513,7 +524,7 @@ namespace toku {
 
         int acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
                                       const DBT *left_key, const DBT *right_key,
-                                      txnid_set *conflicts);
+                                      bool is_write_request, txnid_set *conflicts);
 
         int acquire_lock(bool is_write_request, TXNID txnid,
                          const DBT *left_key, const DBT *right_key,
diff --git a/utilities/transactions/range_locking/locktree/range_buffer.cc b/utilities/transactions/range_locking/locktree/range_buffer.cc
index d1f14fc4a..eab374945 100644
--- a/utilities/transactions/range_locking/locktree/range_buffer.cc
+++ b/utilities/transactions/range_locking/locktree/range_buffer.cc
@@ -66,7 +66,9 @@ namespace toku {
         return right_neg_inf || right_pos_inf;
     }
 
-    void range_buffer::record_header::init(const DBT *left_key, const DBT *right_key) {
+    void range_buffer::record_header::init(const DBT *left_key, const DBT *right_key,
+                                           bool is_exclusive) {
+        is_exclusive_lock= is_exclusive;
         left_neg_inf = left_key == toku_dbt_negative_infinity();
         left_pos_inf = left_key == toku_dbt_positive_infinity();
         left_key_size = toku_dbt_is_infinite(left_key) ? 0 : left_key->size;
@@ -186,15 +188,16 @@ namespace toku {
         _num_ranges = 0;
     }
 
-    void range_buffer::append(const DBT *left_key, const DBT *right_key) {
+    void range_buffer::append(const DBT *left_key, const DBT *right_key,
+                              bool is_write_request) {
         // if the keys are equal, then only one copy is stored.
         if (toku_dbt_equals(left_key, right_key)) {
             invariant(left_key->size <= MAX_KEY_SIZE);
-            append_point(left_key);
+            append_point(left_key, is_write_request);
         } else {
             invariant(left_key->size <= MAX_KEY_SIZE);
             invariant(right_key->size <= MAX_KEY_SIZE);
-            append_range(left_key, right_key);
+            append_range(left_key, right_key, is_write_request);
         }
         _num_ranges++;
     }
@@ -215,12 +218,13 @@ namespace toku {
         _arena.destroy();
     }
 
-    void range_buffer::append_range(const DBT *left_key, const DBT *right_key) {
+    void range_buffer::append_range(const DBT *left_key, const DBT *right_key,
+                                    bool is_exclusive) {
         size_t record_length = sizeof(record_header) + left_key->size + right_key->size;
         char *buf = reinterpret_cast<char *>(_arena.malloc_from_arena(record_length));
 
         record_header h;
-        h.init(left_key, right_key);
+        h.init(left_key, right_key, is_exclusive);
 
         // serialize the header
         memcpy(buf, &h, sizeof(record_header));
@@ -238,12 +242,12 @@ namespace toku {
         }
     }
 
-    void range_buffer::append_point(const DBT *key) {
+    void range_buffer::append_point(const DBT *key, bool is_exclusive) {
         size_t record_length = sizeof(record_header) + key->size;
         char *buf = reinterpret_cast<char *>(_arena.malloc_from_arena(record_length));
 
         record_header h;
-        h.init(key, nullptr);
+        h.init(key, nullptr, is_exclusive);
 
         // serialize the header
         memcpy(buf, &h, sizeof(record_header));
diff --git a/utilities/transactions/range_locking/locktree/range_buffer.h b/utilities/transactions/range_locking/locktree/range_buffer.h
index 9bc02dc22..e8869fae5 100644
--- a/utilities/transactions/range_locking/locktree/range_buffer.h
+++ b/utilities/transactions/range_locking/locktree/range_buffer.h
@@ -77,12 +77,14 @@ namespace toku {
             bool right_neg_inf;
             uint16_t left_key_size;
             uint16_t right_key_size;
+            bool is_exclusive_lock;
 
             bool left_is_infinite(void) const;
 
             bool right_is_infinite(void) const;
 
-            void init(const DBT *left_key, const DBT *right_key);
+            void init(const DBT *left_key, const DBT *right_key,
+                      bool is_exclusive);
         };
         // PORT static_assert(sizeof(record_header) == 8, "record header format is off");
         
@@ -109,6 +111,10 @@ namespace toku {
                 // how big is this record? this tells us where the next record is
                 size_t size(void) const;
 
+                bool get_exclusive_flag() const {
+                    return _header.is_exclusive_lock;
+                }
+
                 // populate a record header and point our DBT's
                 // buffers into ours if they are not infinite.
                 void deserialize(const char *buf);
@@ -145,7 +151,8 @@ namespace toku {
 
         // append a left/right key range to the buffer.
         // if the keys are equal, then only one copy is stored.
-        void append(const DBT *left_key, const DBT *right_key);
+        void append(const DBT *left_key, const DBT *right_key,
+                    bool is_write_request=false);
 
         // is this range buffer empty?
         bool is_empty(void) const;
@@ -162,11 +169,11 @@ namespace toku {
         memarena _arena;
         int _num_ranges;
 
-        void append_range(const DBT *left_key, const DBT *right_key);
+        void append_range(const DBT *left_key, const DBT *right_key, bool is_write_request);
 
         // append a point to the buffer. this is the space/time saving
         // optimization for key ranges where left == right.
-        void append_point(const DBT *key);
+        void append_point(const DBT *key, bool is_write_request);
     };
 
 } /* namespace toku */
diff --git a/utilities/transactions/range_locking/locktree/treenode.cc b/utilities/transactions/range_locking/locktree/treenode.cc
index 051ec7d1c..5bf349749 100644
--- a/utilities/transactions/range_locking/locktree/treenode.cc
+++ b/utilities/transactions/range_locking/locktree/treenode.cc
@@ -64,6 +64,10 @@ void treenode::init(const comparator *cmp) {
     m_is_root = false;
     m_is_empty = true;
     m_cmp = cmp;
+
+    m_is_shared= false;
+    m_owners= nullptr;
+
     // use an adaptive mutex at each node since we expect the time the
     // lock is held to be relatively short compared to a context switch.
     // indeed, this improves performance at high thread counts considerably.
@@ -89,10 +93,11 @@ void treenode::destroy_root(void) {
     m_cmp = nullptr;
 }
 
-void treenode::set_range_and_txnid(const keyrange &range, TXNID txnid) {
+void treenode::set_range_and_txnid(const keyrange &range, TXNID txnid, bool is_shared) {
     // allocates a new copy of the range for this node
     m_range.create_copy(range);
     m_txnid = txnid;
+    m_is_shared= is_shared;
     m_is_empty = false;
 }
 
@@ -108,10 +113,11 @@ bool treenode::range_overlaps(const keyrange &range) {
     return m_range.overlaps(*m_cmp, range);
 }
 
-treenode *treenode::alloc(const comparator *cmp, const keyrange &range, TXNID txnid) {
+treenode *treenode::alloc(const comparator *cmp, const keyrange &range,
+                          TXNID txnid, bool is_shared) {
     treenode *XCALLOC(node);
     node->init(cmp);
-    node->set_range_and_txnid(range, txnid);
+    node->set_range_and_txnid(range, txnid, is_shared);
     return node;
 }
 
@@ -122,12 +128,31 @@ void treenode::swap_in_place(treenode *node1, treenode *node2) {
     node1->m_txnid = node2->m_txnid;
     node2->m_range = tmp_range;
     node2->m_txnid = tmp_txnid;
+
+    bool tmp_is_shared= node1->m_is_shared;
+    node1->m_is_shared= node2->m_is_shared;
+    node2->m_is_shared= tmp_is_shared;
+}
+
+void treenode::add_shared_owner(TXNID txnid) {
+    assert(m_is_shared);
+    if (m_txnid != TXNID_SHARED) {
+        m_owners= new TxnidVector;
+        m_owners->insert(m_txnid);
+        m_txnid= TXNID_SHARED;
+    }
+    m_owners->insert(txnid);
 }
 
 void treenode::free(treenode *node) {
     // destroy the range, freeing any copied keys
     node->m_range.destroy();
 
+    if (node->m_owners) {
+        delete node->m_owners;
+        node->m_owners = nullptr; // need this?
+    }
+
     // the root is simply marked as empty.
     if (node->is_root()) {
         // PORT toku_mutex_assert_locked(&node->m_mutex);
@@ -189,7 +214,7 @@ void treenode::traverse_overlaps(const keyrange &range, F *function) {
     if (c == keyrange::comparison::EQUALS) {
         // Doesn't matter if fn wants to keep going, there
         // is nothing left, so return.
-        function->fn(m_range, m_txnid);
+        function->fn(m_range, m_txnid, m_is_shared, m_owners);
         return;
     }
 
@@ -204,7 +229,7 @@ void treenode::traverse_overlaps(const keyrange &range, F *function) {
     }
 
     if (c == keyrange::comparison::OVERLAPS) {
-        bool keep_going = function->fn(m_range, m_txnid);
+        bool keep_going = function->fn(m_range, m_txnid, m_is_shared, m_owners);
         if (!keep_going) {
             return;
         }
@@ -221,29 +246,35 @@ void treenode::traverse_overlaps(const keyrange &range, F *function) {
     }
 }
 
-void treenode::insert(const keyrange &range, TXNID txnid) {
+void treenode::insert(const keyrange &range, TXNID txnid, bool is_shared) {
     // choose a child to check. if that child is null, then insert the new node there.
     // otherwise recur down that child's subtree
     keyrange::comparison c = range.compare(*m_cmp, m_range);
     if (c == keyrange::comparison::LESS_THAN) {
         treenode *left_child = lock_and_rebalance_left();
         if (left_child == nullptr) {
-            left_child = treenode::alloc(m_cmp, range, txnid);
+            left_child = treenode::alloc(m_cmp, range, txnid, is_shared);
             m_left_child.set(left_child);
         } else {
-            left_child->insert(range, txnid);
+            left_child->insert(range, txnid, is_shared);
             left_child->mutex_unlock();
         }
-    } else {
-        invariant(c == keyrange::comparison::GREATER_THAN);
+    } else if (c == keyrange::comparison::GREATER_THAN) {
+        //invariant(c == keyrange::comparison::GREATER_THAN);
         treenode *right_child = lock_and_rebalance_right();
         if (right_child == nullptr) {
-            right_child = treenode::alloc(m_cmp, range, txnid);
+            right_child = treenode::alloc(m_cmp, range, txnid, is_shared);
             m_right_child.set(right_child);
         } else {
-            right_child->insert(range, txnid);
+            right_child->insert(range, txnid, is_shared);
             right_child->mutex_unlock();
         }
+    } else if (c == keyrange::comparison::EQUALS) {
+        invariant(is_shared);
+        invariant(m_is_shared);
+        add_shared_owner(txnid);
+    } else {
+        invariant(0);
     }
 }
 
@@ -337,19 +368,38 @@ void treenode::recursive_remove(void) {
     treenode::free(this);
 }
 
-treenode *treenode::remove(const keyrange &range) {
+void treenode::remove_shared_owner(TXNID txnid) {
+    m_owners->erase(txnid);
+    /* if there is just one owner left, move it to m_txnid */
+    if (m_owners->size() == 1)
+    {
+        m_txnid = * m_owners->begin();
+        delete m_owners;
+        m_owners = nullptr;
+    }
+}
+
+treenode *treenode::remove(const keyrange &range, TXNID txnid) {
     treenode *child;
     // if the range is equal to this node's range, then just remove
     // the root of this subtree. otherwise search down the tree
     // in either the left or right children.
     keyrange::comparison c = range.compare(*m_cmp, m_range);
     switch (c) {
-    case keyrange::comparison::EQUALS:
-        return remove_root_of_subtree();
+    case keyrange::comparison::EQUALS: {
+        // if we are the only owners, remove. Otherwise, just remove
+        // us from the owners list.
+        if (txnid != TXNID_ANY && has_multiple_owners()) {
+            remove_shared_owner(txnid);
+            return this;
+        } else {
+            return remove_root_of_subtree();
+        }
+    }
     case keyrange::comparison::LESS_THAN:
         child = m_left_child.get_locked();
         invariant_notnull(child);
-        child = child->remove(range);
+        child = child->remove(range, txnid);
 
         // unlock the child if there still is one.
         // regardless, set the right child pointer
@@ -361,7 +411,7 @@ treenode *treenode::remove(const keyrange &range) {
     case keyrange::comparison::GREATER_THAN:
         child = m_right_child.get_locked();
         invariant_notnull(child);
-        child = child->remove(range);
+        child = child->remove(range, txnid);
 
         // unlock the child if there still is one.
         // regardless, set the right child pointer
diff --git a/utilities/transactions/range_locking/locktree/treenode.h b/utilities/transactions/range_locking/locktree/treenode.h
index a4b01f1cc..f23324f03 100644
--- a/utilities/transactions/range_locking/locktree/treenode.h
+++ b/utilities/transactions/range_locking/locktree/treenode.h
@@ -92,7 +92,7 @@ public:
     void destroy_root(void);
 
     // effect: sets the txnid and copies the given range for this node
-    void set_range_and_txnid(const keyrange &range, TXNID txnid);
+    void set_range_and_txnid(const keyrange &range, TXNID txnid, bool is_shared);
 
     // returns: true iff this node is marked as empty
     bool is_empty(void);
@@ -127,12 +127,12 @@ public:
 
     // effect: inserts the given range and txnid into a subtree, recursively
     // requires: range does not overlap with any node below the subtree
-    void insert(const keyrange &range, TXNID txnid);
+    void insert(const keyrange &range, TXNID txnid, bool is_shared);
 
     // effect: removes the given range from the subtree
     // requires: range exists in the subtree
     // returns: the root of the resulting subtree
-    treenode *remove(const keyrange &range);
+    treenode *remove(const keyrange &range, TXNID txnid);
 
     // effect: removes this node and all of its children, recursively
     // requires: every node at and below this node is unlocked
@@ -166,13 +166,30 @@ private:
     // destroyed, it frees the memory associated with whatever range
     // it has at the time of destruction.
     keyrange m_range;
+
+    void remove_shared_owner(TXNID txnid);
+
+    bool has_multiple_owners() { return (m_txnid == TXNID_SHARED); }
+
+private:
+    // Owner transaction id.
+    // A value of TXNID_SHARED means this node has multiple owners
     TXNID m_txnid;
 
+    // If true, this lock is a non-exclusive lock, and it can have either
+    // one or several owners.
+    bool m_is_shared;
+
+    // List of the owners, or nullptr if there's just one owner.
+    TxnidVector *m_owners;
+
     // two child pointers
     child_ptr m_left_child;
     child_ptr m_right_child;
 
     // comparator for ranges
+    // psergey-todo: Is there any sense to store the comparator in each tree
+    // node?
     const comparator *m_cmp;
 
     // marked for the root node. the root node is never free()'d
@@ -185,6 +202,10 @@ private:
     // effect: initializes an empty node with the given comparator
     void init(const comparator *cmp);
 
+    // requires: this is a shared node (m_is_shared==true)
+    // effect: another transaction is added as an owner.
+    void add_shared_owner(TXNID txnid);
+
     // requires: *parent is initialized to something meaningful.
     // requires: subtree is non-empty
     // returns: the leftmost child of the given subtree
@@ -230,7 +251,8 @@ private:
     treenode *maybe_rebalance(void);
 
     // returns: allocated treenode populated with a copy of the range and txnid
-    static treenode *alloc(const comparator *cmp, const keyrange &range, TXNID txnid);
+    static treenode *alloc(const comparator *cmp, const keyrange &range,
+                           TXNID txnid, bool is_shared);
 
     // requires: node is a locked root node, or an unlocked non-root node
     static void free(treenode *node);
diff --git a/utilities/transactions/range_locking/portability/txn_subst.h b/utilities/transactions/range_locking/portability/txn_subst.h
index 3882eb1c5..58c3fced0 100644
--- a/utilities/transactions/range_locking/portability/txn_subst.h
+++ b/utilities/transactions/range_locking/portability/txn_subst.h
@@ -3,8 +3,24 @@
 //
 #pragma once
 
+#include <set>
 #include "util/omt.h"
 
 typedef uint64_t TXNID;
 #define TXNID_NONE        ((TXNID)0)
 
+// A set of transactions
+//  (TODO: consider using class toku::txnid_set. The reason for using STL
+//   container was that its API is easier)
+class TxnidVector : public std::set<TXNID> {
+public:
+  bool contains(TXNID txnid) { return find(txnid) != end(); }
+};
+
+// A value for lock structures with a meaning "the lock is owned by multiple
+// transactions (and one has to check the TxnidVector to get their ids)
+#define TXNID_SHARED      (TXNID(-1))
+
+// Auxiliary value meaning "any transaction id will do".  No real transaction
+// may have this is as id.
+#define TXNID_ANY      (TXNID(-2))
diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc
index bd81dbdfc..fa8e49476 100644
--- a/utilities/transactions/transaction_lock_mgr.cc
+++ b/utilities/transactions/transaction_lock_mgr.cc
@@ -820,7 +820,7 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
                                   uint32_t column_family_id,
                                   const Endpoint &start_endp,
                                   const Endpoint &end_endp,
-                                  bool /*exclusive*/) {
+                                  bool exclusive) {
   toku::lock_request request;
   request.create(mutex_factory_);
   DBT start_key_dbt, end_key_dbt;
@@ -842,7 +842,8 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
   auto lt= get_locktree_by_cfid(column_family_id);
 
   request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt,
-              toku::lock_request::WRITE, false /* not a big txn */,
+              exclusive? toku::lock_request::WRITE: toku::lock_request::READ,
+              false /* not a big txn */,
               (void*)wait_txn_id);
   
   uint64_t killed_time_msec = 0; // TODO: what should this have?
@@ -1147,25 +1148,25 @@ RangeLockMgr::~RangeLockMgr() {
   ltm_.destroy();
 }
 
-uint64_t RangeLockMgr::get_escalation_count() {
+RangeLockMgrHandle::Counters RangeLockMgr::GetStatus() {
   LTM_STATUS_S ltm_status_test;
   ltm_.get_status(&ltm_status_test);
+  Counters res;
   
   // Searching status variable by its string name is how Toku's unit tests
   // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?)
-  TOKU_ENGINE_STATUS_ROW key_status = NULL;
   // lookup keyname in status
-  for (int i = 0; ; i++) {
+  for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS; i++) {
       TOKU_ENGINE_STATUS_ROW status = &ltm_status_test.status[i];
-      if (status->keyname == NULL)
-          break;
       if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) {
-          key_status = status;
-          break;
+          res.escalation_count = status->value.num;
+          continue;
+      }
+      if (strcmp(status->keyname, "LTM_SIZE_CURRENT") == 0) {
+          res.current_lock_memory = status->value.num;
       }
   }
-  assert(key_status);
-  return key_status->value.num;
+  return res;
 }
 
 void RangeLockMgr::AddColumnFamily(const ColumnFamilyHandle *cfh) {
@@ -1254,13 +1255,15 @@ struct LOCK_PRINT_CONTEXT {
 };
 
 static 
-void push_into_lock_status_data(void* param, const DBT *left, 
-                                const DBT *right, TXNID txnid_arg) {
+void push_into_lock_status_data(void* param,
+                                const DBT *left, const DBT *right,
+                                TXNID txnid_arg, bool is_shared,
+                                TxnidVector *owners) {
   struct LOCK_PRINT_CONTEXT *ctx= (LOCK_PRINT_CONTEXT*)param;
   struct KeyLockInfo info;
 
   info.key.append((const char*)left->data, (size_t)left->size);
-  info.exclusive= true;
+  info.exclusive= !is_shared;
 
   if (!(left->size == right->size && 
         !memcmp(left->data, right->data, left->size)))
@@ -1270,8 +1273,15 @@ void push_into_lock_status_data(void* param, const DBT *left,
     info.key2.append((const char*)right->data, right->size);
   }
 
-  TXNID txnid= ((PessimisticTransaction*)txnid_arg)->GetID();
-  info.ids.push_back(txnid);
+  if (txnid_arg != TXNID_SHARED) {
+    TXNID txnid= ((PessimisticTransaction*)txnid_arg)->GetID();
+    info.ids.push_back(txnid);
+  } else {
+    for (auto it : *owners) {
+      TXNID real_id= ((PessimisticTransaction*)it)->GetID();
+      info.ids.push_back(real_id);
+    }
+  }
   ctx->data->insert({ctx->cfh_id, info});
 }
 
diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h
index 33ebbeb03..4bb66febf 100644
--- a/utilities/transactions/transaction_lock_mgr.h
+++ b/utilities/transactions/transaction_lock_mgr.h
@@ -240,7 +240,11 @@ class RangeLockMgr :
     return ltm_.set_max_lock_memory(max_lock_memory);
   }
 
-  uint64_t get_escalation_count() override;
+  size_t get_max_lock_memory() {
+    return ltm_.get_max_lock_memory();
+  }
+
+  Counters GetStatus() override;
 
   LockStatusData GetLockStatusData() override;
 


More information about the commits mailing list