[Commits] 248acd103: Use a different locktree for each column family

Sergei Petrunia psergey at askmonty.org
Sun Apr 28 23:39:10 EEST 2019


revision-id: 248acd10346428b078aed780183ffe7f9f3c6896 (v5.8-1036-g248acd103)
parent(s): 97b782b47ae55675e2b0132d6332824343fe141e
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-04-28 23:39:10 +0300
message:

Use a different locktree for each column family

This is a preparation for range locking to properly support
reverse-ordered column families (and not just assume an identical
ordering across the whole DB).

---
 .../transactions/pessimistic_transaction_db.cc     |   6 +-
 utilities/transactions/transaction_lock_mgr.cc     | 184 +++++++++++++++++----
 utilities/transactions/transaction_lock_mgr.h      |  33 ++--
 3 files changed, 178 insertions(+), 45 deletions(-)

diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc
index e50fb1dad..e3ce435af 100644
--- a/utilities/transactions/pessimistic_transaction_db.cc
+++ b/utilities/transactions/pessimistic_transaction_db.cc
@@ -356,7 +356,7 @@ Status TransactionDB::WrapStackableDB(
 // allocate a LockMap for it.
 void PessimisticTransactionDB::AddColumnFamily(
     const ColumnFamilyHandle* handle) {
-  lock_mgr_->AddColumnFamily(handle->GetID());
+  lock_mgr_->AddColumnFamily(handle);
 }
 
 Status PessimisticTransactionDB::CreateColumnFamily(
@@ -370,7 +370,7 @@ Status PessimisticTransactionDB::CreateColumnFamily(
 
   s = db_->CreateColumnFamily(options, column_family_name, handle);
   if (s.ok()) {
-    lock_mgr_->AddColumnFamily((*handle)->GetID());
+    lock_mgr_->AddColumnFamily(*handle);
     UpdateCFComparatorMap(*handle);
   }
 
@@ -385,7 +385,7 @@ Status PessimisticTransactionDB::DropColumnFamily(
 
   Status s = db_->DropColumnFamily(column_family);
   if (s.ok()) {
-    lock_mgr_->RemoveColumnFamily(column_family->GetID());
+    lock_mgr_->RemoveColumnFamily(column_family);
   }
 
   return s;
diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc
index edec58261..9b6fd9381 100644
--- a/utilities/transactions/transaction_lock_mgr.cc
+++ b/utilities/transactions/transaction_lock_mgr.cc
@@ -189,7 +189,8 @@ size_t LockMap::GetStripe(const std::string& key) const {
   return stripe;
 }
 
-void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
+void TransactionLockMgr::AddColumnFamily(const ColumnFamilyHandle *cfh) {
+  uint32_t column_family_id= cfh->GetID();
   InstrumentedMutexLock l(&lock_map_mutex_);
 
   if (lock_maps_.find(column_family_id) == lock_maps_.end()) {
@@ -202,7 +203,8 @@ void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) {
   }
 }
 
-void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) {
+void TransactionLockMgr::RemoveColumnFamily(const ColumnFamilyHandle *cfh) {
+  uint32_t column_family_id= cfh->GetID();
   // Remove lock_map for this column family.  Since the lock map is stored
   // as a shared ptr, concurrent transactions can still keep using it
   // until they release their references to it.
@@ -780,23 +782,31 @@ RangeLockMgrHandle* NewRangeLockManager(
 class RangeLockList: public PessimisticTransaction::LockStorage {
 public:
   virtual ~RangeLockList() {
-    buffer_.destroy();
+    for(auto it : buffers_) {
+      it.second->destroy();
+    }
   }
 
   RangeLockList() : releasing_locks_(false) {
-    buffer_.create();
   }
 
-  void append(const DBT *left_key, const DBT *right_key) {
+  void append(uint32_t cf_id, const DBT *left_key, const DBT *right_key) {
     MutexLock l(&mutex_);
     // there's only one thread that calls this function.
     // the same thread will do lock release.
     assert(!releasing_locks_);
-    buffer_.append(left_key, right_key);
+    auto it= buffers_.find(cf_id);
+    if (it == buffers_.end()) {
+      // create a new one
+      //it->second.create();
+      it= buffers_.emplace(cf_id, std::shared_ptr<toku::range_buffer>(new toku::range_buffer())).first;
+      it->second->create();
+    }
+    else
+      it->second->append(left_key, right_key);
   }
 
-  // Ranges that the transaction is holding locks on
-  toku::range_buffer buffer_;
+  std::unordered_map<uint32_t, std::shared_ptr<toku::range_buffer>> buffers_;
 
   // Synchronization. See RangeLockMgr::UnLockAll for details
   port::Mutex mutex_;
@@ -836,6 +846,8 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
   // locktree::kill_waiter call. Do we need this anymore?
   TransactionID wait_txn_id = txn->GetID();
 
+  auto lt= get_locktree_by_cfid(column_family_id);
+
   request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt,
               toku::lock_request::WRITE, false /* not a big txn */,
               (void*)wait_txn_id);
@@ -911,7 +923,7 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
     txn->owned_locks= std::unique_ptr<RangeLockList>(new RangeLockList);
   }
   RangeLockList* range_list= (RangeLockList*)txn->owned_locks.get();
-  range_list->append(&start_key_dbt, &end_key_dbt);
+  range_list->append(column_family_id, &start_key_dbt, &end_key_dbt);
 
   return Status::OK();
 }
@@ -948,6 +960,7 @@ range_lock_mgr_release_lock_int(toku::locktree *lt,
 void RangeLockMgr::UnLock(PessimisticTransaction* txn,
                           uint32_t column_family_id,
                           const std::string& key, Env*) {
+  auto lt= get_locktree_by_cfid(column_family_id);
   range_lock_mgr_release_lock_int(lt, txn, column_family_id, key);
   toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */);
 }
@@ -959,14 +972,14 @@ void RangeLockMgr::UnLock(const PessimisticTransaction* txn,
   for (auto& key_map_iter : *key_map) {
     uint32_t column_family_id = key_map_iter.first;
     auto& keys = key_map_iter.second;
+    auto lt= get_locktree_by_cfid(column_family_id);
 
     for (auto& key_iter : keys) {
       const std::string& key = key_iter.first;
       range_lock_mgr_release_lock_int(lt, txn, column_family_id, key);
     }
+    toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */);
   }
-
-  toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */);
 }
 
 void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env*) {
@@ -1012,13 +1025,18 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env*) {
     //  not holding any locks, the lock tree might be in the STO-mode with 
     //  another transaction, and our attempt to release an empty set of locks 
     //  will cause an assertion failure.
-    if (range_list->buffer_.get_num_ranges())
-      lt->release_locks((TXNID)txn, &range_list->buffer_, true);
-    range_list->buffer_.destroy();
-    range_list->buffer_.create();
-    range_list->releasing_locks_= false;
+    for (auto it: range_list->buffers_) {
+      if (it.second->get_num_ranges()) {
+        toku::locktree *lt = get_locktree_by_cfid(it.first);
+        lt->release_locks((TXNID)txn, it.second.get(), true);
 
-    toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */);
+        it.second->destroy();
+        it.second->create();
+
+        toku::lock_request::retry_all_lock_requests(lt, nullptr);
+      }
+    }
+    range_list->releasing_locks_= false;
   }
 }
 
@@ -1082,13 +1100,21 @@ int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg,
     return res;
 }
 
+int RangeLockMgr::compare_dbt_endpoints_rev(__toku_db* db, void *arg,
+                                            const DBT *a_key,
+                                            const DBT *b_key) {
+  return -compare_dbt_endpoints(db, arg, a_key, b_key);
+}
+
 
 RangeLockMgr::RangeLockMgr(std::shared_ptr<TransactionDBMutexFactory> mutex_factory) :
-  mutex_factory_(mutex_factory) {
-  ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_);
-  cmp_.create(compare_dbt_endpoints, (void*)this, NULL);
-  DICTIONARY_ID dict_id = { .dictid = 1 };
-  lt= ltm.get_lt(dict_id, cmp_, /* on_create_extra*/nullptr);
+  mutex_factory_(mutex_factory),
+  ltree_lookup_cache_(new ThreadLocalPtr(nullptr)) {
+
+  ltm_.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_);
+
+  fw_cmp_.create(compare_dbt_endpoints, (void*)this, NULL);
+  bw_cmp_.create(compare_dbt_endpoints, (void*)this, NULL);
 }
 
 
@@ -1102,7 +1128,7 @@ RangeLockMgr::RangeLockMgr(std::shared_ptr<TransactionDBMutexFactory> mutex_fact
   @param void*   Callback context
 */
 
-void RangeLockMgr::on_escalate(TXNID txnid, const locktree*,
+void RangeLockMgr::on_escalate(TXNID txnid, const locktree* lt,
                                const range_buffer &buffer, void *) {
   auto txn= (PessimisticTransaction*)txnid;
 
@@ -1119,12 +1145,17 @@ void RangeLockMgr::on_escalate(TXNID txnid, const locktree*,
   }
 
   // TODO: are we tracking this mem: lt->get_manager()->note_mem_released(trx_locks->ranges.buffer->total_memory_size());
-  trx_locks->buffer_.destroy();
-  trx_locks->buffer_.create();
+
+  uint32_t cf_id = (uint32_t)lt->get_dict_id().dictid;
+
+  auto it= trx_locks->buffers_.find(cf_id);
+  it->second->destroy();
+  it->second->create();
+
   toku::range_buffer::iterator iter(&buffer);
   toku::range_buffer::iterator::record rec;
   while (iter.current(&rec)) {
-    trx_locks->buffer_.append(rec.get_left_key(), rec.get_right_key());
+    it->second->append(rec.get_left_key(), rec.get_right_key());
     iter.next();
   }
   // TODO: same as above: lt->get_manager()->note_mem_used(ranges.buffer->total_memory_size());
@@ -1132,16 +1163,18 @@ void RangeLockMgr::on_escalate(TXNID txnid, const locktree*,
 
 
 RangeLockMgr::~RangeLockMgr() {
-  if (lt) {
-    ltm.release_lt(lt);
+   //TODO: at this point, synchronization is not needed, right?
+  for (auto it: ltree_map_) {
+    ltm_.release_lt(it.second);
   }
-  ltm.destroy();
-  cmp_.destroy();
+  ltm_.destroy();
+  fw_cmp_.destroy();
+  bw_cmp_.destroy();
 }
 
 uint64_t RangeLockMgr::get_escalation_count() {
   LTM_STATUS_S ltm_status_test;
-  ltm.get_status(&ltm_status_test);
+  ltm_.get_status(&ltm_status_test);
   
   // Searching status variable by its string name is how Toku's unit tests
   // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?)
@@ -1160,6 +1193,87 @@ uint64_t RangeLockMgr::get_escalation_count() {
   return key_status->value.num;
 }
 
+void RangeLockMgr::AddColumnFamily(const ColumnFamilyHandle *cfh) {
+  uint32_t column_family_id= cfh->GetID();
+
+  InstrumentedMutexLock l(&ltree_map_mutex_);
+  if (ltree_map_.find(column_family_id) == ltree_map_.end()) {
+    DICTIONARY_ID dict_id = { .dictid = column_family_id };
+    toku::comparator *ltree_cmp;
+    // "RocksDB_SE_v3.10" // BytewiseComparator() ,ReverseBytewiseComparator()
+    if (!strcmp(cfh->GetComparator()->Name(), "RocksDB_SE_v3.10"))
+      ltree_cmp = &fw_cmp_;
+    else if (!strcmp(cfh->GetComparator()->Name(),"rev:RocksDB_SE_v3.10"))
+      ltree_cmp = &bw_cmp_;
+    else {
+      assert(false);
+      ltree_cmp= nullptr;
+    }
+    toku::locktree *ltree= ltm_.get_lt(dict_id, *ltree_cmp,
+                                       /* on_create_extra*/nullptr);
+    ltree_map_.emplace(column_family_id, ltree);
+  } else {
+    // column_family already exists in lock map
+    assert(false);
+  }
+}
+
+void RangeLockMgr::RemoveColumnFamily(const ColumnFamilyHandle *cfh) {
+  uint32_t column_family_id= cfh->GetID();
+  // Remove lock_map for this column family.  Since the lock map is stored
+  // as a shared ptr, concurrent transactions can still keep using it
+  // until they release their references to it.
+  {
+    InstrumentedMutexLock l(&ltree_map_mutex_);
+
+    auto lock_maps_iter = ltree_map_.find(column_family_id);
+    assert(lock_maps_iter != ltree_map_.end());
+
+    ltm_.release_lt(lock_maps_iter->second);
+
+    ltree_map_.erase(lock_maps_iter);
+  }  // lock_map_mutex_
+
+  //TODO: why do we delete first and clear the caches second? Shouldn't this be
+  // done in the reverse order? (if we do it in the reverse order, how will we
+  // prevent somebody from re-populating the cache?)
+
+  // Clear all thread-local caches. We collect a vector of caches but we dont
+  // really need them.
+  autovector<void*> local_caches;
+  ltree_lookup_cache_->Scrape(&local_caches, nullptr);
+}
+
+toku::locktree *RangeLockMgr::get_locktree_by_cfid(uint32_t column_family_id) {
+
+  // First check thread-local cache
+  if (ltree_lookup_cache_->Get() == nullptr) {
+    ltree_lookup_cache_->Reset(new LockTreeMap());
+  }
+
+  auto ltree_map_cache = static_cast<LockTreeMap*>(ltree_lookup_cache_->Get());
+
+  auto it = ltree_map_cache->find(column_family_id);
+  if (it != ltree_map_cache->end()) {
+    // Found lock map for this column family.
+    return it->second;
+  }
+
+  // Not found in local cache, grab mutex and check shared LockMaps
+  InstrumentedMutexLock l(&ltree_map_mutex_);
+
+  it = ltree_map_.find(column_family_id);
+  if (it == ltree_map_.end()) {
+    return nullptr;
+  } else {
+    // Found lock map.  Store in thread-local cache and return.
+    //std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
+    ltree_map_cache->insert({column_family_id, it->second});
+    return it->second;
+  }
+
+  return nullptr;
+}
 
 struct LOCK_PRINT_CONTEXT {
   BaseLockMgr::LockStatusData *data;
@@ -1193,7 +1307,13 @@ void push_into_lock_status_data(void* param, const DBT *left,
 BaseLockMgr::LockStatusData RangeLockMgr::GetLockStatusData() {
   LockStatusData data;
   LOCK_PRINT_CONTEXT ctx = {&data, GetColumnFamilyID(my_txn_db_->DefaultColumnFamily()) };
-  lt->dump_locks((void*)&ctx, push_into_lock_status_data);
+
+  {
+    InstrumentedMutexLock l(&ltree_map_mutex_);
+    for (auto it : ltree_map_) {
+      it.second->dump_locks((void*)&ctx, push_into_lock_status_data);
+    }
+  }
   return data;
 }
 
diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h
index 02c68964e..e41a4961a 100644
--- a/utilities/transactions/transaction_lock_mgr.h
+++ b/utilities/transactions/transaction_lock_mgr.h
@@ -60,8 +60,8 @@ class PessimisticTransactionDB;
 //
 class BaseLockMgr {
  public:
-  virtual void AddColumnFamily(uint32_t column_family_id) = 0;
-  virtual void RemoveColumnFamily(uint32_t column_family_id) = 0;
+  virtual void AddColumnFamily(const ColumnFamilyHandle *cfh) = 0;
+  virtual void RemoveColumnFamily(const ColumnFamilyHandle *cfh) = 0;
 
   virtual
   Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id,
@@ -97,11 +97,11 @@ class TransactionLockMgr : public BaseLockMgr {
 
   // Creates a new LockMap for this column family.  Caller should guarantee
   // that this column family does not already exist.
-  void AddColumnFamily(uint32_t column_family_id);
+  void AddColumnFamily(const ColumnFamilyHandle *cfh);
 
   // Deletes the LockMap for this column family.  Caller should guarantee that
   // this column family is no longer in use.
-  void RemoveColumnFamily(uint32_t column_family_id);
+  void RemoveColumnFamily(const ColumnFamilyHandle *cfh);
 
   // Attempt to lock key.  If OK status is returned, the caller is responsible
   // for calling UnLock() on this key.
@@ -199,8 +199,8 @@ class RangeLockMgr :
   public BaseLockMgr, 
   public RangeLockMgrHandle {
  public:
-  void AddColumnFamily(uint32_t) override { /* do nothing */ }
-  void RemoveColumnFamily(uint32_t) override { /* do nothing */ }
+  void AddColumnFamily(const ColumnFamilyHandle *cfh) override;
+  void RemoveColumnFamily(const ColumnFamilyHandle *cfh) override;
 
   Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id,
                  const std::string& key, Env* env, bool exclusive) override ;
@@ -240,7 +240,7 @@ class RangeLockMgr :
 
   int set_max_lock_memory(size_t max_lock_memory) override
   {
-    return ltm.set_max_lock_memory(max_lock_memory);
+    return ltm_.set_max_lock_memory(max_lock_memory);
   }
 
   uint64_t get_escalation_count() override;
@@ -248,15 +248,28 @@ class RangeLockMgr :
   LockStatusData GetLockStatusData() override;
 
  private:
-  toku::locktree_manager ltm;
-  toku::locktree *lt; // only one tree for now
+  toku::locktree_manager ltm_;
 
-  toku::comparator cmp_;
+  toku::comparator fw_cmp_;
+  toku::comparator bw_cmp_;
 
   TransactionDB* my_txn_db_;
   std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
 
+  // Map from cf_id to locktree*. Can only be accessed while holding the
+  // ltree_map_mutex_.
+  using LockTreeMap = std::unordered_map<uint32_t, locktree*>;
+  LockTreeMap ltree_map_;
+
+  InstrumentedMutex ltree_map_mutex_;
+
+  // Per-thread cache of ltree_map_.
+  std::unique_ptr<ThreadLocalPtr> ltree_lookup_cache_;
+
+  toku::locktree *get_locktree_by_cfid(uint32_t cf_id);
+
   static int compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key);
+  static int compare_dbt_endpoints_rev(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key);
   
   // Callbacks
   static int  on_create(locktree*, void*) { return 0; /* no error */ }


More information about the commits mailing list