[Commits] 8eb09c859: Remove Endpoints as a separate data format

Sergei Petrunia psergey at askmonty.org
Sun Apr 7 20:49:34 EEST 2019


revision-id: 8eb09c8598238945560b9750425b2a64238bec1d (v5.8-1028-g8eb09c859)
parent(s): 3abdc95f082d80a3ce76bad7d6d319f8706429b2
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-04-07 20:49:34 +0300
message:

Remove Endpoints as a separate data format

Before, Range Endpoints were passed as Slice object, and also the user
provided compare_endpoints_func and convert_key_to_endpoint_func.

Now, the user passes rocksdb::Endpoint objects.
The lock tree stores endpoints, but the encoding is handled internally
by the range locking module.

The patch is work-in-progress as the endpoint comparison function still
uses memcmp.

---
 include/rocksdb/utilities/transaction.h            | 49 ++++++++++-
 include/rocksdb/utilities/transaction_db.h         | 21 -----
 utilities/transactions/pessimistic_transaction.cc  |  4 +-
 utilities/transactions/pessimistic_transaction.h   |  4 +-
 .../transactions/pessimistic_transaction_db.cc     |  7 +-
 .../transactions/pessimistic_transaction_db.h      |  4 +-
 utilities/transactions/range_locking_test.cc       | 23 +----
 utilities/transactions/transaction_lock_mgr.cc     | 99 ++++++++++++++++++----
 utilities/transactions/transaction_lock_mgr.h      | 16 +---
 9 files changed, 144 insertions(+), 83 deletions(-)

diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h
index 8a15a3bbd..18ae4726c 100644
--- a/include/rocksdb/utilities/transaction.h
+++ b/include/rocksdb/utilities/transaction.h
@@ -24,6 +24,53 @@ using TransactionName = std::string;
 
 using TransactionID = uint64_t;
 
+
+/*
+  A range endpoint.
+
+  Basic ranges can be defined over rowkeys. A Comparator function defines
+  ordering, a range endpoint is just a rowkey.
+
+  When one use lexicographic-like ordering, they may want to request "prefix
+  ranges".
+
+  == Lexicographic ordering ==
+  A lexicographic-like ordering satisfies these criteria:
+
+  1.The ordering is prefix-based. If there are two keys in form
+
+    key_a = {prefix_a suffix_a}
+    key_b = {prefix_b suffix_b}
+  and
+    prefix_a < prefix_b
+  then
+    key_a < key_b.
+
+  An empty string is less than any other constant (from this it follows that
+  for any prefix and suffix, {prefix, suffix} > {prefix})
+
+  == Prefix ranges ==
+  With lexicographic-like ordering, one may to construct ranges from a
+  restriction in form prefix=P:
+   - the left endpoint would would be {P, inf_suffix=false}
+   - the right endpoint would be {P, inf_suffix=true}.
+
+  (TODO: or should we instead of the above just require that [Reverse]ByteWiseComparator
+  is used?)
+*/
+
+class Endpoint {
+ public:
+  Slice slice;
+  bool inf_suffix;
+
+  Endpoint(const char* s, bool inf_suffix_arg=false) :
+    slice(s), inf_suffix(inf_suffix_arg) {}
+
+  Endpoint(const char* s, size_t size, bool inf_suffix_arg=false) :
+    slice(s, size), inf_suffix(inf_suffix_arg) {}
+};
+
 // Provides notification to the caller of SetSnapshotOnNextOperation when
 // the actual snapshot gets created
 class TransactionNotifier {
@@ -256,7 +303,7 @@ class Transaction {
   //  Note: range endpoints generally a use a different data format than
   //  ranges.
   virtual Status GetRangeLock(ColumnFamilyHandle*,
-                              const Slice&, const Slice&) {
+                              const Endpoint&, const Endpoint&) {
     return Status::NotSupported();
   }
 
diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h
index 06d73bedd..b12c70785 100644
--- a/include/rocksdb/utilities/transaction_db.h
+++ b/include/rocksdb/utilities/transaction_db.h
@@ -33,24 +33,6 @@ enum TxnDBWritePolicy {
 
 const uint32_t kInitialMaxDeadlocks = 5;
 
-struct RangeLockingOptions {
-  typedef void (*convert_key_to_endpoint_func)(const rocksdb::Slice &key,
-                                               std::string *endpoint);
-
-  typedef int (*compare_endpoints_func)(const char *a, size_t a_len,
-                                         const char *b, size_t b_len);
-
-  // TODO:  So, functions to compare ranges are here, while
-  // functions to compare rowkeys are in per-column family and are in
-  //  rocksdb::ColumnFamilyOptions
-  //
-  // TODO: Can we change this to work in a way that does not expose the endpoints
-  //  to the user (like discussed on the meeting?)
-  //
-  convert_key_to_endpoint_func cvt_func;
-  compare_endpoints_func cmp_func;
-};
-
 struct TransactionDBOptions {
   // Specifies the maximum number of keys that can be locked at the same time
   // per column family.
@@ -115,9 +97,6 @@ struct TransactionDBOptions {
   // If true, range_locking_opts specifies options on range locking (filling
   // the struct is mandatory)
   bool use_range_locking = false;
-
-  // Members are valid if use_range_locking= true.
-  RangeLockingOptions range_locking_opts;
 };
 
 struct TransactionOptions {
diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc
index 339dcf422..7ca6ab981 100644
--- a/utilities/transactions/pessimistic_transaction.cc
+++ b/utilities/transactions/pessimistic_transaction.cc
@@ -601,8 +601,8 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
 
 Status
 PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family,
-                                     const Slice& start_endp,
-                                     const Slice& end_endp) {
+                                     const Endpoint& start_endp,
+                                     const Endpoint& end_endp) {
   ColumnFamilyHandle* cfh =
       column_family ? column_family : db_impl_->DefaultColumnFamily();
   uint32_t cfh_id= GetColumnFamilyID(cfh);
diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h
index 50030700d..9fb44c74e 100644
--- a/utilities/transactions/pessimistic_transaction.h
+++ b/utilities/transactions/pessimistic_transaction.h
@@ -113,8 +113,8 @@ class PessimisticTransaction : public TransactionBaseImpl {
   int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
 
   virtual Status GetRangeLock(ColumnFamilyHandle* column_family,
-                              const Slice& start_key,
-                              const Slice& end_key);
+                              const Endpoint& start_key,
+                              const Endpoint& end_key);
  protected:
   // Refer to
   // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery
diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc
index 270859d1a..06981adf4 100644
--- a/utilities/transactions/pessimistic_transaction_db.cc
+++ b/utilities/transactions/pessimistic_transaction_db.cc
@@ -50,8 +50,7 @@ void PessimisticTransactionDB::init_lock_manager() {
                new TransactionDBMutexFactoryImpl());
 
   if (txn_db_options_.use_range_locking) {
-    range_lock_mgr_= new RangeLockMgr(this, txn_db_options_.range_locking_opts,
-                                      mutex_factory);
+    range_lock_mgr_= new RangeLockMgr(this, mutex_factory);
     lock_mgr = range_lock_mgr_;
   } else {
     lock_mgr = new TransactionLockMgr(this, txn_db_options_.num_stripes,
@@ -399,8 +398,8 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
 Status
 PessimisticTransactionDB::TryRangeLock(PessimisticTransaction *txn,
                                        uint32_t cfh_id,
-                                       const Slice& start_endp,
-                                       const Slice& end_endp) {
+                                       const Endpoint& start_endp,
+                                       const Endpoint& end_endp) {
   if (range_lock_mgr_) {
     return range_lock_mgr_->TryRangeLock(txn, cfh_id, start_endp,
                                          end_endp, /*exclusive=*/false);
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h
index ad6eaf4a9..59da1634c 100644
--- a/utilities/transactions/pessimistic_transaction_db.h
+++ b/utilities/transactions/pessimistic_transaction_db.h
@@ -80,8 +80,8 @@ class PessimisticTransactionDB : public TransactionDB {
                  const std::string& key, bool exclusive);
   Status TryRangeLock(PessimisticTransaction* txn,
                       uint32_t cfh_id,
-                      const Slice& start_endp,
-                      const Slice& end_endp);
+                      const Endpoint& start_endp,
+                      const Endpoint& end_endp);
 
   void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys,
               bool all_keys_hint=false);
diff --git a/utilities/transactions/range_locking_test.cc b/utilities/transactions/range_locking_test.cc
index 26ee9b84c..aef345be5 100644
--- a/utilities/transactions/range_locking_test.cc
+++ b/utilities/transactions/range_locking_test.cc
@@ -36,19 +36,6 @@ using std::string;
 namespace rocksdb {
 
 
-void range_endpoint_convert_same(const rocksdb::Slice &key,
-                                 std::string *res)
-{
-  res->clear();
-  res->append(key.data(), key.size());
-}
-
-int range_endpoints_compare_default(const char *a, size_t a_len,
-                                    const char *b, size_t b_len)
-{
-  return Slice(a, a_len).compare(Slice(b, b_len));
-}
-
 class RangeLockingTest : public ::testing::Test {
  public:
   TransactionDB* db;
@@ -65,10 +52,6 @@ class RangeLockingTest : public ::testing::Test {
     DestroyDB(dbname, options);
     Status s;
     txn_db_options.use_range_locking = true;
-    txn_db_options.range_locking_opts.cvt_func =
-      range_endpoint_convert_same;
-    txn_db_options.range_locking_opts.cmp_func =
-      range_endpoints_compare_default;
     s = TransactionDB::Open(options, txn_db_options, dbname, &db);
     assert(s.ok());
 
@@ -101,7 +84,7 @@ TEST_F(RangeLockingTest, BasicRangeLocking) {
   // Get a range lock
   {
     auto s= txn0->GetRangeLock(db->DefaultColumnFamily(), 
-                               Slice("a"), Slice("c"));
+                               Endpoint("a"), Endpoint("c"));
     ASSERT_EQ(s, Status::OK());
   }
  
@@ -109,7 +92,7 @@ TEST_F(RangeLockingTest, BasicRangeLocking) {
   // Check that range Lock inhibits an overlapping range lock
   {
     auto s= txn1->GetRangeLock(db->DefaultColumnFamily(), 
-                                Slice("b"), Slice("z"));
+                                Endpoint("b"), Endpoint("z"));
     ASSERT_TRUE(s.IsTimedOut());
   }
 
@@ -127,7 +110,7 @@ TEST_F(RangeLockingTest, BasicRangeLocking) {
     ASSERT_EQ(s, Status::OK());
 
     auto s2= txn1->GetRangeLock(db->DefaultColumnFamily(),
-                                Slice("c"), Slice("e"));
+                                Endpoint("c"), Endpoint("e"));
     ASSERT_TRUE(s2.IsTimedOut());
   }
 
diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc
index e0802fb25..d6c221cc7 100644
--- a/utilities/transactions/transaction_lock_mgr.cc
+++ b/utilities/transactions/transaction_lock_mgr.cc
@@ -790,16 +790,30 @@ public:
   bool releasing_locks_;
 };
 
+static
+void serialize_endpoint(const Endpoint &endp, std::string *buf) {
+  const char SUFFIX_INF= 0x0;
+  const char SUFFIX_SUP= 0x1;
+  buf->push_back(endp.inf_suffix ? SUFFIX_SUP : SUFFIX_INF);
+  buf->append(endp.slice.data(), endp.slice.size());
+}
+
+
 // Get a range lock on [start_key; end_key] range
 Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
                                   uint32_t column_family_id,
-                                  const rocksdb::Slice &start_key,
-                                  const rocksdb::Slice &end_key,
+                                  const Endpoint &start_endp,
+                                  const Endpoint &end_endp,
                                   bool /*exclusive*/) {
   toku::lock_request request;
   request.create(mutex_factory_);
   DBT start_key_dbt, end_key_dbt;
 
+  std::string start_key;
+  std::string end_key;
+  serialize_endpoint(start_endp, &start_key);
+  serialize_endpoint(end_endp, &end_key);
+
   toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size());
   toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size());
 
@@ -896,10 +910,8 @@ Status RangeLockMgr::TryLock(PessimisticTransaction* txn,
                              uint32_t column_family_id,
                              const std::string& key, Env*,
                              bool exclusive) {
-  std::string endpoint;
-  convert_key_to_endpoint(rocksdb::Slice(key.data(), key.size()), &endpoint);
-  rocksdb::Slice endp_slice(endpoint.data(), endpoint.length());
-  return TryRangeLock(txn, column_family_id, endp_slice, endp_slice, exclusive);
+  Endpoint endp(key.data(), key.size(), false);
+  return TryRangeLock(txn, column_family_id, endp, endp, exclusive);
 }
 
 static void 
@@ -907,8 +919,12 @@ range_lock_mgr_release_lock_int(toku::locktree *lt,
                                 const PessimisticTransaction* txn,
                                 uint32_t /*column_family_id*/,
                                 const std::string& key) {
-  DBT key_dbt; 
-  toku_fill_dbt(&key_dbt, key.data(), key.size());
+  DBT key_dbt;
+  Endpoint endp(key.data(), key.size(), false);
+  std::string endp_image;
+  serialize_endpoint(endp, &endp_image);
+
+  toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size());
   toku::range_buffer range_buf;
   range_buf.create();
   range_buf.append(&key_dbt, &key_dbt);
@@ -917,8 +933,8 @@ range_lock_mgr_release_lock_int(toku::locktree *lt,
 }
 
 void RangeLockMgr::UnLock(PessimisticTransaction* txn,
-                            uint32_t column_family_id,
-                            const std::string& key, Env*) {
+                          uint32_t column_family_id,
+                          const std::string& key, Env*) {
   range_lock_mgr_release_lock_int(lt, txn, column_family_id, key);
   toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */);
 }
@@ -993,24 +1009,73 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env*) {
   }
 }
 
+
+
+
 int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg,
                                         const DBT *a_key,
                                         const DBT *b_key) {
   RangeLockMgr* mgr= (RangeLockMgr*) arg;
-  return mgr->compare_endpoints((const char*)a_key->data, a_key->size,
-                                (const char*)b_key->data, b_key->size);
+  // TODO: this should compare endpoints using the user-provided comparator +
+  // endpoint encoding.
+  // (just use one from any column family)
+  
+  const char *a= (const char*)a_key->data;
+  const char *b= (const char*)b_key->data;
+
+  size_t a_len= a_key->size;
+  size_t b_len= b_key->size;
+
+  size_t min_len= std::min(a_len, b_len);
+
+  //compare the values. Skip the first byte as it is the endpoint signifier
+
+  //TODO: use the upper layer' comparison function.
+  int res= memcmp(a+1, b+1, min_len-1);
+  if (!res)
+  {
+    if (b_len > min_len)
+    {
+      // a is shorter;
+      if (a[0] == 0)
+        return  -1; //"a is smaller"
+      else
+      {
+        // a is considered padded with 0xFF:FF:FF:FF...
+        return 1; // "a" is bigger
+      }
+    }
+    else if (a_len > min_len)
+    {
+      // the opposite of the above: b is shorter.
+      if (b[0] == 0)
+        return  1; //"b is smaller"
+      else
+      {
+        // b is considered padded with 0xFF:FF:FF:FF...
+        return -1; // "b" is bigger
+      }
+    }
+    else
+    {
+      // the lengths are equal (and the key values, too)
+      if (a[0] < b[0])
+        return -1;
+      else if (a[0] > b[0])
+        return 1;
+      else
+        return 0;
+    }
+  }
+  else
+    return res;
 }
 
 
 RangeLockMgr::RangeLockMgr(TransactionDB* txn_db,
-                           const RangeLockingOptions& opts,
                            std::shared_ptr<TransactionDBMutexFactory> mutex_factory) :
                            my_txn_db_(txn_db), mutex_factory_(mutex_factory) {
-  convert_key_to_endpoint= opts.cvt_func;
-  compare_endpoints=       opts.cmp_func;
-
   ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_);
-
   cmp_.create(compare_dbt_endpoints, (void*)this, NULL);
   DICTIONARY_ID dict_id = { .dictid = 1 };
   lt= ltm.get_lt(dict_id, cmp_, /* on_create_extra*/nullptr);
diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h
index d6c5479f2..c45ebae3f 100644
--- a/utilities/transactions/transaction_lock_mgr.h
+++ b/utilities/transactions/transaction_lock_mgr.h
@@ -212,8 +212,8 @@ class RangeLockMgr :
   //  used ATM)
   Status TryRangeLock(PessimisticTransaction* txn,
                       uint32_t column_family_id,
-                      const rocksdb::Slice &start_key,
-                      const rocksdb::Slice &end_key,
+                      const Endpoint &start_endp,
+                      const Endpoint &end_endp,
                       bool exclusive);
   
   void UnLock(const PessimisticTransaction* txn, const TransactionKeyMap* keys,
@@ -227,7 +227,6 @@ class RangeLockMgr :
               const std::string& key, Env* env) override ;
 
   RangeLockMgr(TransactionDB* txn_db,
-               const RangeLockingOptions& opts,
                std::shared_ptr<TransactionDBMutexFactory> mutex_factory);
   ~RangeLockMgr();
 
@@ -240,23 +239,12 @@ class RangeLockMgr :
 
   LockStatusData GetLockStatusData() override;
 
-  typedef RangeLockingOptions::convert_key_to_endpoint_func convert_key_to_endpoint_func;
-  typedef RangeLockingOptions::compare_endpoints_func compare_endpoints_func;
-
  private:
   toku::locktree_manager ltm;
   toku::locktree *lt; // only one tree for now
 
   toku::comparator cmp_;
 
-  // Convert rowkey to endpoint (TODO: shouldn't "rowkey=const" translate into
-  // a pair of [start; end] endpoints in general? They translate into the same
-  // value in our current encoding, but...)
-  convert_key_to_endpoint_func convert_key_to_endpoint;
-
-  // User-provided endpoint comparison function
-  compare_endpoints_func compare_endpoints;
-
   TransactionDB* my_txn_db_;
   std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
 


More information about the commits mailing list