[Commits] cba804d57: MDEV-19986: MyRocks: Range Locking: SeekForUpdate support

Sergei Petrunia psergey at askmonty.org
Mon Jul 8 19:01:56 EEST 2019


revision-id: cba804d57d693e5cf7763d404f29d2483992f96a (v5.8-1043-gcba804d57)
parent(s): c91095e08fbdacb69831a93ab403a80487dee435
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-07-08 19:01:56 +0300
message:

MDEV-19986: MyRocks: Range Locking: SeekForUpdate support

Initial implementation, RocksDB part

---
 include/rocksdb/utilities/transaction.h            |   4 +
 utilities/transactions/pessimistic_transaction.cc  | 172 +++++++++++++++++++++
 utilities/transactions/pessimistic_transaction.h   |   3 +
 .../transactions/pessimistic_transaction_db.h      |   1 +
 4 files changed, 180 insertions(+)

diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h
index 6b84d4a44..731f76708 100644
--- a/include/rocksdb/utilities/transaction.h
+++ b/include/rocksdb/utilities/transaction.h
@@ -366,6 +366,10 @@ class Transaction {
   virtual Iterator* GetIterator(const ReadOptions& read_options,
                                 ColumnFamilyHandle* column_family) = 0;
 
+  virtual Iterator* GetLockingIterator(const ReadOptions& read_options,
+                                       ColumnFamilyHandle* column_family) {
+    return nullptr;
+  };
   // Put, Merge, Delete, and SingleDelete behave similarly to the corresponding
   // functions in WriteBatch, but will also do conflict checking on the
   // keys being written.
diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc
index 4c0578444..cbac16437 100644
--- a/utilities/transactions/pessimistic_transaction.cc
+++ b/utilities/transactions/pessimistic_transaction.cc
@@ -56,6 +56,178 @@ PessimisticTransaction::PessimisticTransaction(
   Initialize(txn_options);
 }
 
+//////////////////////////////////////////////////////////////////////////////
+// Locking iterator
+//////////////////////////////////////////////////////////////////////////////
+
+//
+// LockingIterator is an iterator that locks the rows before returning, as well
+// as scanned gaps between the rows.
+//
+//  Example:
+//    lock_iter= trx->GetLockingIterator();
+//    lock_iter->Seek('abc');
+//    lock_iter->Valid()==true && lock_iter->key() == 'bcd';
+//
+//   After the above, the returned record 'bcd' is locked by transaction trx.
+//   Also, the range between ['abc'..'bcd'] is empty and locked by trx.
+//
+//    lock_iter->Next();
+//    lock_iter->Valid()==true && lock_iter->key() == 'efg'
+//
+//   Now, the range ['bcd'.. 'efg'] (bounds incluive) is also locked, and there are no
+//   records between 'bcd'  and 'efg'.
+//
+class LockingIterator : public Iterator {
+
+  ColumnFamilyHandle* cfh_;
+  PessimisticTransaction *txn_;
+  Iterator *iter_;
+  Status status_;
+
+ public:
+  LockingIterator(Iterator *iter, ColumnFamilyHandle *cfh,
+                  PessimisticTransaction *txn) :
+    cfh_(cfh), txn_(txn), iter_(iter), status_(Status::InvalidArgument()) {}
+
+  // An iterator is either positioned at a key/value pair, or
+  // not valid.  This method returns true iff the iterator is valid.
+  // Always returns false if !status().ok().
+  virtual bool Valid() const override { return status_.ok(); }
+
+  // Note: MyRocks doesn't ever call these:
+  virtual void SeekToFirst() override;
+  virtual void SeekToLast() override { assert(0); }
+
+  virtual void Seek(const Slice& target) override;
+
+  // Position at the last key in the source that at or before target.
+  // The iterator is Valid() after this call iff the source contains
+  // an entry that comes at or before target.
+  virtual void SeekForPrev(const Slice& target) { assert(0); }
+
+  virtual void Next() override;
+  virtual void Prev() override {
+    assert(0); // TODO: implement this
+  }
+
+  virtual Slice key() const override {
+    assert(Valid());
+    return iter_->key();
+  }
+
+  virtual Slice value() const override {
+    assert(Valid());
+    return iter_->value();
+  }
+
+  virtual Status status() const override {
+    return status_;
+  }
+
+private:
+  void ScanForward(const Slice& target, bool call_next);
+};
+
+
+Iterator* PessimisticTransaction::GetLockingIterator(
+    const ReadOptions& read_options,
+    ColumnFamilyHandle* column_family) {
+
+  if (!txn_db_impl_->UsesRangeLocking()) {
+    return nullptr;
+  }
+  auto iter= GetIterator(read_options, column_family);
+
+  if (iter)
+    return new LockingIterator(iter, column_family, this);
+  else
+    return nullptr;
+}
+
+
+void LockingIterator::Seek(const Slice& target) {
+  iter_->Seek(target);
+  ScanForward(target, false);
+}
+
+/*
+  Lock from the current position up to the next key
+  (This is basically like Seek(right_after(current_position))
+*/
+void LockingIterator::Next() {
+  assert(Valid());
+  // Save the current key value. We need it as the left endpoint
+  // of the range lock we're going to acquire
+  std::string current_key = iter_->key().ToString();
+
+  iter_->Next();
+  ScanForward(Slice(current_key), true);
+}
+
+// Note: MyRocks never uses this as call as it has index_nr as prefix for all
+// keys.
+void LockingIterator::SeekToFirst() {
+
+  iter_->SeekToFirst();
+  if (!iter_->Valid()) {
+    status_ = iter_->status();
+    return;
+  }
+
+  std::string current_key = iter_->key().ToString();
+  ScanForward(Slice(current_key), true);
+}
+
+void LockingIterator::ScanForward(const Slice& target, bool call_next) {
+
+  if (!iter_->Valid()) {
+    status_ = iter_->status();
+    return;
+  }
+
+  while (1) {
+    /*
+      TODO: the underlying iterator respects iterator bounds, so we don't need
+      to check them here
+    */
+    auto end_key = iter_->key();
+    status_ = txn_->GetRangeLock(cfh_, Endpoint(target), Endpoint(end_key));
+    if (!status_.ok()) {
+      // Failed to get a lock (most likely lock wait timeout)
+      return;
+    }
+
+    //Ok, now we have a lock which is inhibiting modifications in the range
+    // Somebody might have done external modifications, though:
+    //  - removed the key we've found
+    //  - added a key before that key.
+    iter_->Seek(target);
+    if (call_next && iter_->Valid())
+      iter_->Next();
+
+    if (iter_->Valid()) {
+      if (cfh_->GetComparator()->Compare(iter_->key(), end_key) <= 0) {
+        // Ok, the key is within the range.
+        status_ = Status::OK();
+        break;
+      } else {
+        // We've got a row but it is outside the range we've locked.
+        // Re-try the lock-and-read step.
+        continue;
+      }
+    } else {
+      // There's no row (within the iterator bounds perhaps). Exit now.
+      // (we might already have locked a range in this function but there's
+      // nothing we can do about it)
+      status_ = iter_->status();
+      break;
+    }
+  }
+}
+
+/////////////////////////////////////////////////////////////////////////////
+
 void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
   txn_id_ = GenTxnID();
 
diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h
index 9fb44c74e..4fb5f63e0 100644
--- a/utilities/transactions/pessimistic_transaction.h
+++ b/utilities/transactions/pessimistic_transaction.h
@@ -60,6 +60,9 @@ class PessimisticTransaction : public TransactionBaseImpl {
 
   Status SetName(const TransactionName& name) override;
 
+  virtual Iterator* GetLockingIterator(const ReadOptions& read_options,
+                                       ColumnFamilyHandle* column_family) override;
+
   // Generate a new unique transaction identifier
   static TransactionID GenTxnID();
 
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h
index e88856342..2646228f5 100644
--- a/utilities/transactions/pessimistic_transaction_db.h
+++ b/utilities/transactions/pessimistic_transaction_db.h
@@ -129,6 +129,7 @@ class PessimisticTransactionDB : public TransactionDB {
 
   // Key Tracking should be done only with point lock manager.
   bool ShouldDoKeyTracking() const { return range_lock_mgr_ == nullptr; }
+  bool UsesRangeLocking() const { return range_lock_mgr_ != nullptr; }
  protected:
   DBImpl* db_impl_;
   std::shared_ptr<Logger> info_log_;


More information about the commits mailing list