[Commits] f286a3586f1: Apply patch: Implement iterator class

psergey sergey at mariadb.com
Mon May 17 17:43:07 EEST 2021


revision-id: f286a3586f1fec1f9c7bad5314136e176ea30653 (percona-202102-52-gf286a3586f1)
parent(s): 0f4980afd8faa61b23a3008d3cba726881072174
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2021-05-17 17:42:54 +0300
message:

Apply patch: Implement iterator class

Summary: This abstracts basic iteration over keys (with TTL filtering) into the `Rdb_iterator_base` class. Logic that does ICP or primary key locking is not included.

Test Plan: mtr

Reviewers: luqun, herman, yzha, #mysql_eng

Subscribers: pgl

Differential Revision: https://phabricator.intern.facebook.com/D25933169

---
 mysql-test/suite/rocksdb/r/check_flags.result |   2 +-
 mysql-test/suite/rocksdb/t/check_flags.test   |   2 +-
 storage/rocksdb/CMakeLists.txt                |   1 +
 storage/rocksdb/ha_rocksdb.cc                 | 676 +++++++-------------------
 storage/rocksdb/ha_rocksdb.h                  |  94 ++--
 storage/rocksdb/ha_rocksdb_proto.h            |   2 +-
 storage/rocksdb/nosql_access.cc               |   4 +-
 storage/rocksdb/rdb_converter.h               |   2 +-
 storage/rocksdb/rdb_iterator.cc               | 359 ++++++++++++++
 storage/rocksdb/rdb_iterator.h                | 121 +++++
 10 files changed, 703 insertions(+), 560 deletions(-)

diff --git a/mysql-test/suite/rocksdb/r/check_flags.result b/mysql-test/suite/rocksdb/r/check_flags.result
index 8ff4153707e..9fe20b968a6 100644
--- a/mysql-test/suite/rocksdb/r/check_flags.result
+++ b/mysql-test/suite/rocksdb/r/check_flags.result
@@ -34,7 +34,7 @@ KILL QUERY $conn1_id;
 set debug_sync='now SIGNAL go';
 ERROR 70100: Query execution was interrupted
 set debug_sync='RESET';
-set debug_sync='rocksdb.check_flags_inwdi SIGNAL parked WAIT_FOR go';
+set debug_sync='rocksdb.check_flags_nwd SIGNAL parked WAIT_FOR go';
 SELECT kp1 FROM t3 ORDER BY kp1;
 set debug_sync='now WAIT_FOR parked';
 KILL QUERY $conn1_id;
diff --git a/mysql-test/suite/rocksdb/t/check_flags.test b/mysql-test/suite/rocksdb/t/check_flags.test
index 58dc1f4f8da..c100dce8afc 100644
--- a/mysql-test/suite/rocksdb/t/check_flags.test
+++ b/mysql-test/suite/rocksdb/t/check_flags.test
@@ -91,7 +91,7 @@ set debug_sync='RESET';
 
 
 connection conn1;
-set debug_sync='rocksdb.check_flags_inwdi SIGNAL parked WAIT_FOR go';
+set debug_sync='rocksdb.check_flags_nwd SIGNAL parked WAIT_FOR go';
 send SELECT kp1 FROM t3 ORDER BY kp1;
 
 connection default;
diff --git a/storage/rocksdb/CMakeLists.txt b/storage/rocksdb/CMakeLists.txt
index 3fc21fb97cc..135a6af62df 100644
--- a/storage/rocksdb/CMakeLists.txt
+++ b/storage/rocksdb/CMakeLists.txt
@@ -125,6 +125,7 @@ SET(ROCKSDB_SOURCES
   ha_rocksdb.cc ha_rocksdb.h ha_rocksdb_proto.h
   logger.h
   rdb_datadic.cc rdb_datadic.h
+  rdb_iterator.cc rdb_iterator.h
   rdb_cf_options.cc rdb_cf_options.h
   rdb_cf_manager.cc rdb_cf_manager.h
   rdb_converter.cc rdb_converter.h
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index 2f41ba40a17..c484dfb5894 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -94,6 +94,7 @@
 #include "./rdb_datadic.h"
 #include "./rdb_i_s.h"
 #include "./rdb_index_merge.h"
+#include "./rdb_iterator.h"
 #include "./rdb_mutex_wrapper.h"
 #include "./rdb_psi.h"
 #include "./rdb_threads.h"
@@ -2975,8 +2976,9 @@ class Rdb_transaction {
   }
 
   int set_status_error(THD *const thd, const rocksdb::Status &s,
-                       const Rdb_key_def &kd, Rdb_tbl_def *const tbl_def,
-                       Rdb_table_handler *const table_handler) {
+                       const Rdb_key_def &kd, const Rdb_tbl_def *const tbl_def,
+                       Rdb_table_handler *const table_handler
+                           MY_ATTRIBUTE((unused))) {
     DBUG_ASSERT(!s.ok());
     DBUG_ASSERT(tbl_def != nullptr);
 
@@ -2993,7 +2995,8 @@ class Rdb_transaction {
                                                 rocksdb_rollback_on_timeout);
       m_detailed_error.copy(timeout_message(
           "index", tbl_def->full_tablename().c_str(), kd.get_name().c_str()));
-      table_handler->m_lock_wait_timeout_counter.inc();
+      /* TODO(yzha) - row stats are gone in 8.0
+      table_handler->m_lock_wait_timeout_counter.inc(); */
       rocksdb_row_lock_wait_timeouts++;
 
       return HA_ERR_LOCK_WAIT_TIMEOUT;
@@ -3002,7 +3005,8 @@ class Rdb_transaction {
     if (s.IsDeadlock()) {
       my_core::thd_mark_transaction_to_rollback(thd, 1 /* whole transaction */);
       m_detailed_error = String();
-      table_handler->m_deadlock_counter.inc();
+      /* TODO(yzha) - row stats are gone in 8.0
+      table_handler->m_deadlock_counter.inc(); */
       rocksdb_row_lock_deadlocks++;
       return HA_ERR_LOCK_DEADLOCK;
     } else if (s.IsBusy()) {
@@ -3017,7 +3021,8 @@ class Rdb_transaction {
             user_host_buff, thd->query());
       }
       m_detailed_error = String(" (snapshot conflict)", system_charset_info);
-      table_handler->m_deadlock_counter.inc();
+      /* TODO(yzha) - row stats are gone in 8.0
+      table_handler->m_deadlock_counter.inc(); */
       return HA_ERR_ROCKSDB_STATUS_BUSY;
     }
 
@@ -3530,7 +3535,7 @@ class Rdb_transaction {
 
   rocksdb::Iterator *get_iterator(
       rocksdb::ColumnFamilyHandle *const column_family, bool skip_bloom_filter,
-      bool fill_cache, const rocksdb::Slice &eq_cond_lower_bound,
+      const rocksdb::Slice &eq_cond_lower_bound,
       const rocksdb::Slice &eq_cond_upper_bound, bool read_current = false,
       bool create_snapshot = true) {
     // Make sure we are not doing both read_current (which implies we don't
@@ -3542,6 +3547,7 @@ class Rdb_transaction {
     if (create_snapshot) acquire_snapshot(true);
 
     rocksdb::ReadOptions options = m_read_opts;
+    const bool fill_cache = !THDVAR(get_thd(), skip_fill_cache);
 
     if (skip_bloom_filter) {
       const bool enable_iterate_bounds =
@@ -6787,7 +6793,7 @@ static void dbug_change_status_to_corrupted(rocksdb::Status *status) {
 // If the iterator is not valid it might be because of EOF but might be due
 // to IOError or corruption. The good practice is always check it.
 // https://github.com/facebook/rocksdb/wiki/Iterator#error-handling
-inline bool is_valid_iterator(rocksdb::Iterator *scan_it) {
+bool is_valid_iterator(rocksdb::Iterator *scan_it) {
   if (scan_it->Valid()) {
     return true;
   } else {
@@ -6915,6 +6921,11 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() {
   const int save_active_index = active_index;
   active_index = table->s->next_number_index;
   const uint8 save_table_status = table->m_status;
+
+  std::unique_ptr<Rdb_iterator> save_iterator(new Rdb_iterator_base(
+      ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def));
+  std::swap(m_iterator, save_iterator);
+
   ulonglong last_val = 0;
 
   Rdb_transaction *const tx = get_or_create_tx(table->in_use);
@@ -6966,7 +6977,7 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() {
     (Why don't we use index_init/index_end? class handler defines index_init
     as private, for some reason).
     */
-  release_scan_iterator();
+  std::swap(m_iterator, save_iterator);
 
   return last_val;
 }
@@ -7012,41 +7023,44 @@ int ha_rocksdb::load_hidden_pk_value() {
   active_index = MAX_KEY;
   const uint8 save_table_status = table->m_status;
 
+  std::unique_ptr<Rdb_iterator> save_iterator(new Rdb_iterator_base(
+      ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def));
+  std::swap(m_iterator, save_iterator);
+
   Rdb_transaction *const tx = get_or_create_tx(table->in_use);
   const bool is_new_snapshot = !tx->has_snapshot();
 
   longlong hidden_pk_id = 1;
+  longlong old = 0;
+  int rc = 0;
   // Do a lookup.
   if (!index_last(table->record[0])) {
     /*
       Decode PK field from the key
     */
-    auto err = read_hidden_pk_id_from_rowkey(&hidden_pk_id);
-    if (err) {
-      if (is_new_snapshot) {
-        tx->release_snapshot();
-      }
-      return err;
+    rc = read_hidden_pk_id_from_rowkey(&hidden_pk_id);
+    if (rc) {
+      goto exit;
     }
 
     hidden_pk_id++;
   }
 
-  longlong old = m_tbl_def->m_hidden_pk_val;
+  old = m_tbl_def->m_hidden_pk_val;
   while (old < hidden_pk_id &&
          !m_tbl_def->m_hidden_pk_val.compare_exchange_weak(old, hidden_pk_id)) {
   }
 
+exit:
   if (is_new_snapshot) {
     tx->release_snapshot();
   }
 
   table->m_status = save_table_status;
   active_index = save_active_index;
+  std::swap(m_iterator, save_iterator);
 
-  release_scan_iterator();
-
-  return HA_EXIT_SUCCESS;
+  return rc;
 }
 
 /* Get PK value from m_tbl_def->m_hidden_pk_info. */
@@ -7125,11 +7139,6 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton,
                        my_core::TABLE_SHARE *const table_arg)
     : handler(hton, table_arg),
       m_table_handler(nullptr),
-      m_scan_it(nullptr),
-      m_scan_it_skips_bloom(false),
-      m_scan_it_snapshot(nullptr),
-      m_scan_it_lower_bound(nullptr),
-      m_scan_it_upper_bound(nullptr),
       m_tbl_def(nullptr),
       m_pk_descr(nullptr),
       m_key_descr_arr(nullptr),
@@ -7137,7 +7146,6 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton,
       m_pk_packed_tuple(nullptr),
       m_sk_packed_tuple(nullptr),
       m_end_key_packed_tuple(nullptr),
-      m_sk_match_prefix(nullptr),
       m_sk_packed_tuple_old(nullptr),
       m_dup_sk_packed_tuple(nullptr),
       m_dup_sk_packed_tuple_old(nullptr),
@@ -7203,12 +7211,13 @@ bool ha_rocksdb::init_with_fields() {
   rows within a transaction, etc, because the compaction filter ignores
   snapshots when filtering keys.
 */
-bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd,
-                                     const rocksdb::Slice &ttl_rec_val,
-                                     const int64_t curr_ts) {
+bool rdb_should_hide_ttl_rec(const Rdb_key_def &kd,
+                             const rocksdb::Slice &ttl_rec_val,
+                             Rdb_transaction *tx) {
   DBUG_ASSERT(kd.has_ttl());
   DBUG_ASSERT(kd.m_ttl_rec_offset != UINT_MAX);
-  THD *thd = ha_thd();
+  THD *thd = tx->get_thd();
+  const int64_t curr_ts = tx->m_snapshot_timestamp;
 
   /*
     Curr_ts can only be 0 if there are no snapshots open.
@@ -7223,7 +7232,7 @@ bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd,
     DBUG_ASSERT(false);
     push_warning_printf(thd, Sql_condition::SL_WARNING, ER_WRONG_ARGUMENTS,
                         "TTL read filtering called with no snapshot.");
-    update_row_stats(ROWS_UNFILTERED_NO_SNAPSHOT);
+    rdb_update_global_stats(ROWS_UNFILTERED_NO_SNAPSHOT, 1);
     return false;
   }
 
@@ -7263,7 +7272,7 @@ bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd,
   bool is_hide_ttl =
       ts + kd.m_ttl_duration + read_filter_ts <= static_cast<uint64>(curr_ts);
   if (is_hide_ttl) {
-    update_row_stats(ROWS_FILTERED);
+    rdb_update_global_stats(ROWS_FILTERED, 1);
 
     /* increment examined row count when rows are skipped */
     thd->inc_examined_row_count(1);
@@ -7381,8 +7390,6 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
 
   m_sk_packed_tuple = reinterpret_cast<uchar *>(
       my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0)));
-  m_sk_match_prefix = reinterpret_cast<uchar *>(
-      my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0)));
   m_sk_packed_tuple_old = reinterpret_cast<uchar *>(
       my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0)));
   m_end_key_packed_tuple = reinterpret_cast<uchar *>(
@@ -7390,11 +7397,6 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
   m_pack_buffer = reinterpret_cast<uchar *>(
       my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0)));
 
-  m_scan_it_lower_bound = reinterpret_cast<uchar *>(
-      my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0)));
-  m_scan_it_upper_bound = reinterpret_cast<uchar *>(
-      my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0)));
-
   /*
     If inplace alter is happening, allocate special buffers for unique
     secondary index duplicate checking.
@@ -7408,8 +7410,7 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
 
   if (m_pk_packed_tuple == nullptr || m_sk_packed_tuple == nullptr ||
       m_sk_packed_tuple_old == nullptr || m_end_key_packed_tuple == nullptr ||
-      m_pack_buffer == nullptr || m_scan_it_upper_bound == nullptr ||
-      m_scan_it_lower_bound == nullptr ||
+      m_pack_buffer == nullptr ||
       (alloc_alter_buffers && (m_dup_sk_packed_tuple == nullptr ||
                                m_dup_sk_packed_tuple_old == nullptr))) {
     // One or more of the above allocations failed.  Clean up and exit
@@ -7428,9 +7429,6 @@ void ha_rocksdb::free_key_buffers() {
   my_free(m_sk_packed_tuple);
   m_sk_packed_tuple = nullptr;
 
-  my_free(m_sk_match_prefix);
-  m_sk_match_prefix = nullptr;
-
   my_free(m_sk_packed_tuple_old);
   m_sk_packed_tuple_old = nullptr;
 
@@ -7446,12 +7444,6 @@ void ha_rocksdb::free_key_buffers() {
   my_free(m_dup_sk_packed_tuple_old);
   m_dup_sk_packed_tuple_old = nullptr;
 
-  my_free(m_scan_it_lower_bound);
-  m_scan_it_lower_bound = nullptr;
-
-  my_free(m_scan_it_upper_bound);
-  m_scan_it_upper_bound = nullptr;
-
   release_blob_buffer();
 }
 
@@ -7576,6 +7568,7 @@ int ha_rocksdb::close(void) {
   m_pk_descr = nullptr;
   m_key_descr_arr = nullptr;
   m_converter = nullptr;
+  m_iterator = nullptr;
   free_key_buffers();
 
   if (m_table_handler != nullptr) {
@@ -8738,173 +8731,6 @@ bool ha_rocksdb::check_keyread_allowed(bool &pk_can_be_decoded,
   return true;
 }
 
-int ha_rocksdb::read_key_exact(const Rdb_key_def &kd,
-                               rocksdb::Iterator *const iter,
-                               const rocksdb::Slice &key_slice,
-                               const int64_t ttl_filter_ts) {
-  THD *thd = ha_thd();
-  /*
-    We are looking for the first record such that
-      index_tuple= lookup_tuple.
-    lookup_tuple may be a prefix of the index.
-  */
-  rocksdb_smart_seek(kd.m_is_reverse_cf, iter, key_slice);
-
-  while (iter->Valid() && kd.value_matches_prefix(iter->key(), key_slice)) {
-    if (thd && thd->killed) {
-      return HA_ERR_QUERY_INTERRUPTED;
-    }
-    /*
-      If TTL is enabled we need to check if the given key has already expired
-      from the POV of the current transaction.  If it has, try going to the next
-      key.
-    */
-    if (kd.has_ttl() && should_hide_ttl_rec(kd, iter->value(), ttl_filter_ts)) {
-      rocksdb_smart_next(kd.m_is_reverse_cf, iter);
-      continue;
-    }
-
-    return HA_EXIT_SUCCESS;
-  }
-
-  /*
-    Got a record that is not equal to the lookup value, or even a record
-    from another table.index.
-  */
-  return HA_ERR_KEY_NOT_FOUND;
-}
-
-int ha_rocksdb::read_before_key(const Rdb_key_def &kd,
-                                const bool full_key_match,
-                                const rocksdb::Slice &key_slice) {
-  THD *thd = ha_thd();
-  /*
-    We are looking for the first record such that
-
-      index_tuple $LT lookup_tuple
-
-    with HA_READ_BEFORE_KEY, $LT = '<',
-    with HA_READ_PREFIX_LAST_OR_PREV, $LT = '<='
-    with HA_READ_PREFIX_LAST, $LT = '=='
-
-    Symmetry with read_after_key is possible if rocksdb supported prefix seeks.
-  */
-  rocksdb_smart_seek(!kd.m_is_reverse_cf, m_scan_it, key_slice);
-
-  while (is_valid_iterator(m_scan_it)) {
-    if (thd && thd->killed) {
-      return HA_ERR_QUERY_INTERRUPTED;
-    }
-    /*
-      We are using full key and we've hit an exact match.
-    */
-    if ((full_key_match &&
-         kd.value_matches_prefix(m_scan_it->key(), key_slice))) {
-      rocksdb_smart_next(!kd.m_is_reverse_cf, m_scan_it);
-      continue;
-    }
-
-    return HA_EXIT_SUCCESS;
-  }
-
-  return HA_ERR_KEY_NOT_FOUND;
-}
-
-int ha_rocksdb::read_after_key(const Rdb_key_def &kd,
-                               const rocksdb::Slice &key_slice) {
-  /*
-    We are looking for the first record such that
-
-      index_tuple $GT lookup_tuple
-
-    with HA_READ_AFTER_KEY, $GT = '>',
-    with HA_READ_KEY_OR_NEXT, $GT = '>='
-    with HA_READ_KEY_EXACT, $GT = '=='
-  */
-  rocksdb_smart_seek(kd.m_is_reverse_cf, m_scan_it, key_slice);
-
-  return is_valid_iterator(m_scan_it) ? HA_EXIT_SUCCESS : HA_ERR_KEY_NOT_FOUND;
-}
-
-int ha_rocksdb::position_to_correct_key(const Rdb_key_def &kd,
-                                        const enum ha_rkey_function &find_flag,
-                                        const bool full_key_match,
-                                        const rocksdb::Slice &key_slice,
-                                        bool *const move_forward) {
-  int rc = 0;
-
-  *move_forward = true;
-
-  switch (find_flag) {
-    case HA_READ_KEY_EXACT:
-    case HA_READ_AFTER_KEY:
-    case HA_READ_KEY_OR_NEXT:
-      rc = read_after_key(kd, key_slice);
-      break;
-    case HA_READ_BEFORE_KEY:
-    case HA_READ_PREFIX_LAST:
-    case HA_READ_PREFIX_LAST_OR_PREV:
-      *move_forward = false;
-      rc = read_before_key(kd, full_key_match, key_slice);
-      break;
-    case HA_READ_KEY_OR_PREV:
-    case HA_READ_PREFIX:
-      /* These flags are not used by the SQL layer, so we don't support them
-       * yet. */
-      rc = HA_ERR_UNSUPPORTED;
-      break;
-    default:
-      DBUG_ASSERT(0);
-      break;
-  }
-
-  return rc;
-}
-
-int ha_rocksdb::calc_eq_cond_len(const Rdb_key_def &kd,
-                                 const enum ha_rkey_function &find_flag,
-                                 const rocksdb::Slice &slice,
-                                 const int bytes_changed_by_succ,
-                                 const key_range *const end_key) {
-  if (find_flag == HA_READ_KEY_EXACT) return slice.size();
-
-  if (find_flag == HA_READ_PREFIX_LAST) {
-    /*
-      We have made the kd.successor(m_sk_packed_tuple) call above.
-
-      The slice is at least Rdb_key_def::INDEX_NUMBER_SIZE bytes long.
-    */
-    return slice.size() - bytes_changed_by_succ;
-  }
-
-  if (end_key) {
-    uint end_key_packed_size = 0;
-    end_key_packed_size =
-        kd.pack_index_tuple(table, m_pack_buffer, m_end_key_packed_tuple,
-                            end_key->key, end_key->keypart_map);
-
-    /*
-      Calculating length of the equal conditions here. 4 byte index id is
-      included.
-      Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3)
-       WHERE id1=1 AND id2=1 AND id3>=2 => eq_cond_len= 4+8+4= 16
-       WHERE id1=1 AND id2>=1 AND id3>=2 => eq_cond_len= 4+8= 12
-      Example2: id1 VARCHAR(30), id2 INT, PRIMARY KEY (id1, id2)
-       WHERE id1 = 'AAA' and id2 < 3; => eq_cond_len=13 (varchar used 9 bytes)
-    */
-    rocksdb::Slice end_slice(reinterpret_cast<char *>(m_end_key_packed_tuple),
-                             end_key_packed_size);
-    return slice.difference_offset(end_slice);
-  }
-
-  /*
-    On range scan without any end key condition, there is no
-    eq cond, and eq cond length is the same as index_id size (8 bytes).
-    Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3)
-     WHERE id1>=1 AND id2 >= 2 and id2 <= 5 => eq_cond_len= 4
-  */
-  return Rdb_key_def::INDEX_ID_SIZE;
-}
 
 /**
   @note
@@ -9088,7 +8914,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key,
           next/prev anyway. To avoid correctness issues, just free the
           iterator.
         */
-        release_scan_iterator();
+        m_iterator->reset();
         DBUG_RETURN(rc);
       } else {
         /*
@@ -9124,7 +8950,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key,
 
           rc = get_row_by_rowid(buf, m_last_rowkey.ptr(),
                                 m_last_rowkey.length());
-          release_scan_iterator();
+          m_iterator->reset();
           DBUG_RETURN(rc);
         }
 
@@ -9143,7 +8969,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key,
             update_row_stats(ROWS_READ);
           }
 
-          release_scan_iterator();
+          m_iterator->reset();
           DBUG_RETURN(rc);
         }
       }
@@ -9153,28 +8979,23 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key,
                                       key, keypart_map);
   }
 
-  if (find_flag == HA_READ_KEY_EXACT || find_flag == HA_READ_PREFIX_LAST) {
-    m_sk_match_length = packed_size;
-    memcpy(m_sk_match_prefix, m_sk_packed_tuple, packed_size);
-  } else {
-    kd.get_infimum_key(m_sk_match_prefix, &m_sk_match_length);
-  }
-
-  int bytes_changed_by_succ = 0;
-  if (find_flag == HA_READ_PREFIX_LAST_OR_PREV ||
-      find_flag == HA_READ_PREFIX_LAST || find_flag == HA_READ_AFTER_KEY) {
-    /* See below */
-    bytes_changed_by_succ = kd.successor(m_sk_packed_tuple, packed_size);
-  }
-
   rocksdb::Slice slice(reinterpret_cast<const char *>(m_sk_packed_tuple),
                        packed_size);
 
-  const uint eq_cond_len =
-      calc_eq_cond_len(kd, find_flag, slice, bytes_changed_by_succ, end_range);
+  rocksdb::Slice end_slice;
+  if (end_range && find_flag != HA_READ_KEY_EXACT &&
+      find_flag != HA_READ_PREFIX_LAST) {
+    uint end_key_packed_size = 0;
+    end_key_packed_size =
+        kd.pack_index_tuple(table, m_pack_buffer, m_end_key_packed_tuple,
+                            end_range->key, end_range->keypart_map);
+    end_slice =
+        rocksdb::Slice((char *)m_end_key_packed_tuple, end_key_packed_size);
+  }
 
   Rdb_transaction *const tx = get_or_create_tx(table->in_use);
   const bool is_new_snapshot = !tx->has_snapshot();
+
   // Loop as long as we get a deadlock error AND we end up creating the
   // snapshot here (i.e. it did not exist prior to this)
   for (;;) {
@@ -9187,15 +9008,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key,
       This will open the iterator and position it at a record that's equal or
       greater than the lookup tuple.
     */
-    setup_scan_iterator(kd, &slice, eq_cond_len);
-
-    /*
-      Once we are positioned on from above, move to the position we really
-      want: See storage/rocksdb/rocksdb-range-access.txt
-    */
-    bool move_forward;
-    rc = position_to_correct_key(kd, find_flag, using_full_key, slice,
-                                 &move_forward);
+    rc = m_iterator->seek(find_flag, slice, using_full_key, end_slice);
 
     if (rc) {
       break;
@@ -9206,7 +9019,10 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key,
       then we have all the rows we need.  For a secondary key we now need to
       lookup the primary key.
     */
-    rc = index_next_with_direction_intern(buf, move_forward, true);
+    bool direction = (find_flag == HA_READ_KEY_EXACT) ||
+                     (find_flag == HA_READ_AFTER_KEY) ||
+                     (find_flag == HA_READ_KEY_OR_NEXT);
+    rc = index_next_with_direction_intern(buf, direction, true);
 
     if (!should_recreate_snapshot(rc, is_new_snapshot)) {
       break; /* Exit the loop */
@@ -9214,7 +9030,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key,
 
     // release the snapshot and iterator so they will be regenerated
     tx->release_snapshot();
-    release_scan_iterator();
+    m_iterator->reset();
   }
 
   if (!rc) {
@@ -9247,7 +9063,14 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key,
   DBUG_ENTER_FUNC();
   ha_statistic_increment(&System_status_var::ha_read_key_count);
 
-  DBUG_RETURN(index_read_intern(buf, key, keypart_map, find_flag));
+  int rc = index_read_intern(buf, key, keypart_map, find_flag);
+
+  // The SQL layer generally expects HA_ERR_KEY_NOT_FOUND for this call.
+  if (rc == HA_ERR_END_OF_FILE) {
+    rc = HA_ERR_KEY_NOT_FOUND;
+  }
+
+  DBUG_RETURN(rc);
 }
 
 /**
@@ -9324,7 +9147,7 @@ int ha_rocksdb::check(THD *const thd MY_ATTRIBUTE((__unused__)),
                           table_name, rows, res);
           goto error;
         }
-        rocksdb::Slice key = m_scan_it->key();
+        rocksdb::Slice key = m_iterator->key();
         sec_key_copy.copy(key.data(), key.size(), &my_charset_bin);
         rowkey_copy.copy(m_last_rowkey.ptr(), m_last_rowkey.length(),
                          &my_charset_bin);
@@ -9470,16 +9293,9 @@ rocksdb::Status ha_rocksdb::get_for_update(
     Rdb_transaction *const tx, const Rdb_key_def &key_descr,
     const rocksdb::Slice &key, rocksdb::PinnableSlice *const value) const {
   DBUG_ASSERT(m_lock_rows != RDB_LOCK_NONE);
-
+  DBUG_ASSERT(value == nullptr);
   bool exclusive = m_lock_rows != RDB_LOCK_READ;
-  bool do_validate = my_core::thd_tx_isolation(ha_thd()) > ISO_READ_COMMITTED;
-  rocksdb::Status s =
-      tx->get_for_update(key_descr, key, value, exclusive, do_validate);
-
-#ifndef DBUG_OFF
-  ++rocksdb_num_get_for_update_calls;
-#endif
-  return s;
+  return rdb_tx_get_for_update(tx, key_descr, key, value, exclusive);
 }
 
 bool ha_rocksdb::is_blind_delete_enabled() {
@@ -9522,9 +9338,6 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
     DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
   };);
 
-  bool found;
-  rocksdb::Status s;
-
   /* Pretend row found without looking up */
   if (skip_lookup) {
     /* TODO(yzha) - rows stas are gone in 8.0
@@ -9535,11 +9348,9 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
     DBUG_RETURN(0);
   }
 
-  if (m_lock_rows == RDB_LOCK_NONE) {
-    tx->acquire_snapshot(true);
-    s = tx->get(m_pk_descr->get_cf(), key_slice, &m_retrieved_record);
-  } else if (m_insert_with_update && m_dup_key_found &&
-             m_pk_descr->get_keyno() == m_dupp_errkey) {
+  if (m_insert_with_update && m_dup_key_found &&
+      m_pk_descr->get_keyno() == m_dupp_errkey) {
+    DBUG_ASSERT(m_lock_rows == RDB_LOCK_WRITE);
     DBUG_ASSERT(m_dup_key_tuple.length() == key_slice.size());
     DBUG_ASSERT(
         memcmp(m_dup_key_tuple.ptr(), key_slice.data(), key_slice.size()) == 0);
@@ -9548,43 +9359,26 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
     // m_dup_key_retrieved_record during write_row already, so just move it
     // over.
     m_retrieved_record = std::move(m_dup_key_retrieved_record);
-    s = rocksdb::Status::OK();
+    rc = HA_EXIT_SUCCESS;
   } else {
-    s = get_for_update(tx, *m_pk_descr, key_slice, &m_retrieved_record);
+    tx->acquire_snapshot(false);
+    Rdb_iterator_base iter(ha_thd(), m_pk_descr, m_pk_descr, m_tbl_def);
+    rc = iter.get(&key_slice, &m_retrieved_record, m_lock_rows, skip_ttl_check);
   }
 
-  DBUG_EXECUTE_IF("rocksdb_return_status_corrupted",
-                  dbug_change_status_to_corrupted(&s););
-
-  if (!s.IsNotFound() && !s.ok()) {
-    DBUG_RETURN(tx->set_status_error(table->in_use, s, *m_pk_descr, m_tbl_def,
-                                     m_table_handler));
-  }
-  found = !s.IsNotFound();
-
-  table->m_status = STATUS_NOT_FOUND;
-  if (found) {
-    /* If we found the record, but it's expired, pretend we didn't find it.  */
-    if (!skip_ttl_check && m_pk_descr->has_ttl() &&
-        should_hide_ttl_rec(*m_pk_descr, m_retrieved_record,
-                            tx->m_snapshot_timestamp)) {
-      DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
-    }
-
+  if (!rc) {
     m_last_rowkey.copy((const char *)rowid, rowid_size, &my_charset_bin);
     rc = convert_record_from_storage_format(&key_slice, buf);
 
     if (!rc) {
       table->m_status = 0;
     }
-  } else {
-    /*
-      Note: we don't need to unlock the row. It is intentional that we keep
-      locks on rows that don't exist.
-    */
-    rc = HA_ERR_KEY_NOT_FOUND;
   }
 
+  /*
+    Note: we don't need to unlock the row. It is intentional that we keep
+    locks on rows that don't exist.
+  */
   DBUG_RETURN(rc);
 }
 
@@ -9615,23 +9409,9 @@ int ha_rocksdb::records_from_index(ha_rows *num_rows, uint index) {
 int ha_rocksdb::get_row_by_sk(uchar *buf, const Rdb_key_def &kd,
                               const rocksdb::Slice *key) {
   DBUG_ENTER_FUNC();
-  Rdb_transaction *const tx = get_or_create_tx(table->in_use);
-
-  auto s = tx->get(kd.get_cf(), *key, &m_retrieved_record);
-
-  if (!s.IsNotFound() && !s.ok()) {
-    DBUG_RETURN(
-        tx->set_status_error(table->in_use, s, kd, m_tbl_def, m_table_handler));
-  }
 
-  if (s.IsNotFound()) {
-    DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
-  }
-
-  if (kd.has_ttl() &&
-      should_hide_ttl_rec(kd, m_retrieved_record, tx->m_snapshot_timestamp)) {
-    DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
-  }
+  int rc = m_iterator->get(key, &m_retrieved_record, RDB_LOCK_NONE);
+  if (rc) DBUG_RETURN(rc);
 
   const uint size =
       kd.get_primary_key_tuple(table, *m_pk_descr, key, m_pk_packed_tuple);
@@ -9641,7 +9421,7 @@ int ha_rocksdb::get_row_by_sk(uchar *buf, const Rdb_key_def &kd,
 
   m_last_rowkey.copy((const char *)m_pk_packed_tuple, size, &my_charset_bin);
 
-  int rc = secondary_index_read(active_index, buf, &m_retrieved_record);
+  rc = secondary_index_read(active_index, buf, &m_retrieved_record);
   if (!rc) {
     table->m_status = 0;
   }
@@ -9700,9 +9480,6 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf,
   THD *thd = ha_thd();
   int rc = 0;
   const Rdb_key_def &kd = *m_key_descr_arr[active_index_pos()];
-  Rdb_transaction *const tx = get_or_create_tx(thd);
-  rocksdb::Slice prefix_tuple(reinterpret_cast<char *>(m_sk_match_prefix),
-                              m_sk_match_length);
 
   table->m_status = STATUS_NOT_FOUND;
   /* TODO(yzha) - row stats are gone in 8.0
@@ -9715,8 +9492,8 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf,
       break;
     }
 
-    DBUG_ASSERT(m_scan_it);
-    if (m_scan_it == nullptr) {
+    DBUG_ASSERT(m_iterator != nullptr);
+    if (m_iterator == nullptr) {
       rc = HA_ERR_INTERNAL_ERROR;
       break;
     }
@@ -9725,31 +9502,18 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf,
       skip_next = false;
     } else {
       if (move_forward) {
-        rocksdb_smart_next(kd.m_is_reverse_cf, m_scan_it);
+        rc = m_iterator->next();
       } else {
-        rocksdb_smart_prev(kd.m_is_reverse_cf, m_scan_it);
+        rc = m_iterator->prev();
       }
     }
 
-    if (!is_valid_iterator(m_scan_it)) {
-      rc = HA_ERR_END_OF_FILE;
+    if (rc == HA_ERR_END_OF_FILE) {
       break;
     }
 
-    const rocksdb::Slice &key = m_scan_it->key();
-    const rocksdb::Slice &value = m_scan_it->value();
-
-    // Outside our range, return EOF.
-    if (!kd.value_matches_prefix(key, prefix_tuple)) {
-      rc = HA_ERR_END_OF_FILE;
-      break;
-    }
-
-    // Record is not visible due to TTL, move to next record.
-    if (m_pk_descr->has_ttl() &&
-        should_hide_ttl_rec(kd, value, tx->m_snapshot_timestamp)) {
-      continue;
-    }
+    const rocksdb::Slice &key = m_iterator->key();
+    const rocksdb::Slice &value = m_iterator->value();
 
     if (active_index == table->s->primary_key) {
       if (m_lock_rows != RDB_LOCK_NONE) {
@@ -9958,15 +9722,14 @@ bool ha_rocksdb::skip_unique_check() const {
          use_read_free_rpl();
 }
 
-bool ha_rocksdb::commit_in_the_middle() {
+bool commit_in_the_middle(THD *thd) {
   // It does not make sense to use write unprepared with commit in the middle,
   // since both handle large transactions by flushing the write batches onto
   // disk.
   //
   // For the two to work together, we would need to assign a new xid after
   // committing.
-  return (THDVAR(table->in_use, bulk_load) ||
-          THDVAR(table->in_use, commit_in_the_middle)) &&
+  return (THDVAR(thd, bulk_load) || THDVAR(thd, commit_in_the_middle)) &&
          rocksdb_write_policy != rocksdb::TxnDBWritePolicy::WRITE_UNPREPARED;
 }
 
@@ -9976,7 +9739,7 @@ bool ha_rocksdb::commit_in_the_middle() {
   @retval false if bulk commit was skipped or succeeded
 */
 bool ha_rocksdb::do_bulk_commit(Rdb_transaction *const tx) {
-  return commit_in_the_middle() &&
+  return commit_in_the_middle(table->in_use) &&
          tx->get_write_count() >= THDVAR(table->in_use, bulk_load_size) &&
          tx->flush_batch();
 }
@@ -10200,6 +9963,10 @@ void ha_rocksdb::set_last_rowkey(
   }
 }
 
+void ha_rocksdb::set_last_rowkey(const char *str, size_t len) {
+  m_last_rowkey.copy(str, len, &my_charset_bin);
+}
+
 /**
   Collect update data for primary key
 
@@ -10294,28 +10061,18 @@ int ha_rocksdb::check_and_lock_unique_pk(const uint key_id,
     2) T1 Get(empty) -> T1 Put(insert, not committed yet) -> T2 Get(empty)
        -> T2 Put(insert, blocked) -> T1 commit -> T2 commit(overwrite)
   */
-  const rocksdb::Status s =
-      get_for_update(row_info.tx, *m_pk_descr, row_info.new_pk_slice,
-                     ignore_pk_unique_check ? nullptr : pslice);
-  if (!s.ok() && !s.IsNotFound()) {
-    return row_info.tx->set_status_error(
-        table->in_use, s, *m_key_descr_arr[key_id], m_tbl_def, m_table_handler);
-  }
+  Rdb_iterator_base iter(ha_thd(), m_key_descr_arr[key_id], m_pk_descr,
+                         m_tbl_def);
 
-  bool key_found = ignore_pk_unique_check ? false : !s.IsNotFound();
+  int rc = iter.get(&row_info.new_pk_slice,
+                    ignore_pk_unique_check ? nullptr : pslice, m_lock_rows);
 
-  /*
-    If the pk key has ttl, we may need to pretend the row wasn't
-    found if it is already expired.
-  */
-  DBUG_ASSERT(row_info.tx->has_snapshot() &&
-              row_info.tx->m_snapshot_timestamp != 0);
-  if (key_found && m_pk_descr->has_ttl() &&
-      should_hide_ttl_rec(*m_pk_descr, *pslice,
-                          row_info.tx->m_snapshot_timestamp)) {
-    key_found = false;
+  if (rc && rc != HA_ERR_KEY_NOT_FOUND) {
+    return rc;
   }
 
+  bool key_found = ignore_pk_unique_check ? false : (rc == HA_EXIT_SUCCESS);
+
   if (key_found && row_info.old_data == nullptr && m_insert_with_update) {
     // In INSERT ON DUPLICATE KEY UPDATE ... case, if the insert failed
     // due to a duplicate key, remember the last key and skip the check
@@ -10441,53 +10198,30 @@ int ha_rocksdb::check_and_lock_sk(
 
     The bloom filter may need to be disabled for this lookup.
   */
-  uchar lower_bound_buf[Rdb_key_def::INDEX_ID_SIZE];
-  uchar upper_bound_buf[Rdb_key_def::INDEX_ID_SIZE];
-  rocksdb::Slice lower_bound_slice;
-  rocksdb::Slice upper_bound_slice;
+  Rdb_iterator_base iter(ha_thd(), m_key_descr_arr[key_id], m_pk_descr,
+                         m_tbl_def);
+  int rc = HA_EXIT_SUCCESS;
 
-  const rocksdb::Status s =
-      get_for_update(row_info.tx, kd, new_slice,
-                     all_parts_used ? &m_retrieved_record : nullptr);
-  if (!s.ok() && !s.IsNotFound()) {
-    return row_info.tx->set_status_error(table->in_use, s, kd, m_tbl_def,
-                                         m_table_handler);
+  rc = iter.get(&new_slice, all_parts_used ? &m_retrieved_record : nullptr,
+                m_lock_rows);
+  if (rc && rc != HA_ERR_KEY_NOT_FOUND) {
+    return rc;
   }
 
-  rocksdb::Iterator *iter = nullptr;
+  if (!all_parts_used) {
+    rc = iter.seek(HA_READ_KEY_EXACT, new_slice, false /* full_key_match */,
+                   new_slice, true /* read current */);
 
-  if (all_parts_used) {
-    *found = !s.IsNotFound();
-    if (*found && kd.has_ttl() &&
-        should_hide_ttl_rec(kd, m_retrieved_record,
-                            row_info.tx->m_snapshot_timestamp)) {
-      *found = false;
+    if (rc && rc != HA_ERR_END_OF_FILE) {
+      return rc;
     }
-  } else {
-    const bool total_order_seek = !check_bloom_and_set_bounds(
-        ha_thd(), kd, new_slice, Rdb_key_def::INDEX_ID_SIZE, lower_bound_buf,
-        upper_bound_buf, &lower_bound_slice, &upper_bound_slice);
-    const bool fill_cache = !THDVAR(ha_thd(), skip_fill_cache);
-
-    iter = row_info.tx->get_iterator(kd.get_cf(), total_order_seek, fill_cache,
-                                     lower_bound_slice, upper_bound_slice,
-                                     true /* read current data */,
-                                     false /* acquire snapshot */);
-    /*
-      Need to scan the transaction to see if there is a duplicate key.
-      Also need to scan RocksDB and verify the key has not been deleted
-      in the transaction.
-    */
-    DBUG_ASSERT(row_info.tx->has_snapshot() &&
-                row_info.tx->m_snapshot_timestamp != 0);
-    *found =
-        !read_key_exact(kd, iter, new_slice, row_info.tx->m_snapshot_timestamp);
   }
 
-  int rc = HA_EXIT_SUCCESS;
+  *found = (rc == HA_EXIT_SUCCESS);
 
+  rc = HA_EXIT_SUCCESS;
   if (*found && m_insert_with_update) {
-    const rocksdb::Slice &rkey = all_parts_used ? new_slice : iter->key();
+    const rocksdb::Slice &rkey = all_parts_used ? new_slice : iter.key();
     uint pk_size =
         kd.get_primary_key_tuple(table, *m_pk_descr, &rkey, m_pk_packed_tuple);
     if (pk_size == RDB_INVALID_KEY_LEN) {
@@ -10503,7 +10237,6 @@ int ha_rocksdb::check_and_lock_sk(
     }
   }
 
-  delete iter;
   return rc;
 }
 
@@ -11040,10 +10773,12 @@ int ha_rocksdb::update_write_row(const uchar *const old_data,
  0x0000b3eb003f65c5e78857, and lower bound would be
  0x0000b3eb003f65c5e78859. These cover given eq condition range.
 */
-void ha_rocksdb::setup_iterator_bounds(
-    const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, size_t bound_len,
-    uchar *const lower_bound, uchar *const upper_bound,
-    rocksdb::Slice *lower_bound_slice, rocksdb::Slice *upper_bound_slice) {
+static void setup_iterator_bounds(const Rdb_key_def &kd,
+                                  const rocksdb::Slice &eq_cond,
+                                  size_t bound_len, uchar *const lower_bound,
+                                  uchar *const upper_bound,
+                                  rocksdb::Slice *lower_bound_slice,
+                                  rocksdb::Slice *upper_bound_slice) {
   // If eq_cond is shorter than Rdb_key_def::INDEX_NUMBER_SIZE, we should be
   // able to get better bounds just by using index id directly.
   if (eq_cond.size() <= Rdb_key_def::INDEX_ID_SIZE) {
@@ -11070,91 +10805,6 @@ void ha_rocksdb::setup_iterator_bounds(
   }
 }
 
-/*
-  Open a cursor
-*/
-
-void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd,
-                                     rocksdb::Slice *const slice,
-                                     const uint eq_cond_len) {
-  DBUG_ASSERT(slice->size() >= eq_cond_len);
-
-  Rdb_transaction *const tx = get_or_create_tx(table->in_use);
-
-  bool skip_bloom = true;
-
-  const rocksdb::Slice eq_cond(slice->data(), eq_cond_len);
-  // The size of m_scan_it_lower_bound (and upper) is technically
-  // max_packed_sk_len as calculated in ha_rocksdb::alloc_key_buffers.  Rather
-  // than recalculating that number, we pass in the max of eq_cond_len and
-  // Rdb_key_def::INDEX_NUMBER_SIZE which is guaranteed to be smaller than
-  // max_packed_sk_len, hence ensuring no buffer overrun.
-  //
-  // See ha_rocksdb::setup_iterator_bounds on how the bound_len parameter is
-  // used.
-  if (check_bloom_and_set_bounds(
-          ha_thd(), kd, eq_cond,
-          std::max(eq_cond_len, (uint)Rdb_key_def::INDEX_ID_SIZE),
-          m_scan_it_lower_bound, m_scan_it_upper_bound,
-          &m_scan_it_lower_bound_slice, &m_scan_it_upper_bound_slice)) {
-    skip_bloom = false;
-  }
-
-  /*
-    In some cases, setup_scan_iterator() is called multiple times from
-    the same query but bloom filter can not always be used.
-    Suppose the following query example. id2 is VARCHAR(30) and PRIMARY KEY
-    (id1, id2).
-     select count(*) from t2 WHERE id1=100 and id2 IN ('00000000000000000000',
-    '100');
-    In this case, setup_scan_iterator() is called twice, the first time is for
-    (id1, id2)=(100, '00000000000000000000') and the second time is for (100,
-    '100').
-    If prefix bloom filter length is 24 bytes, prefix bloom filter can be used
-    for the
-    first condition but not for the second condition.
-    If bloom filter condition is changed, currently it is necessary to destroy
-    and
-    re-create Iterator.
-  */
-  if (m_scan_it_skips_bloom != skip_bloom) {
-    release_scan_iterator();
-  }
-
-  /*
-    SQL layer can call rnd_init() multiple times in a row.
-    In that case, re-use the iterator, but re-position it at the table start.
-  */
-  if (!m_scan_it) {
-    const bool fill_cache = !THDVAR(ha_thd(), skip_fill_cache);
-    if (commit_in_the_middle()) {
-      DBUG_ASSERT(m_scan_it_snapshot == nullptr);
-      m_scan_it_snapshot = rdb->GetSnapshot();
-
-      auto read_opts = rocksdb::ReadOptions();
-      // TODO(mung): set based on WHERE conditions
-      read_opts.total_order_seek = true;
-      read_opts.snapshot = m_scan_it_snapshot;
-      m_scan_it = rdb->NewIterator(read_opts, kd.get_cf());
-    } else {
-      m_scan_it = tx->get_iterator(kd.get_cf(), skip_bloom, fill_cache,
-                                   m_scan_it_lower_bound_slice,
-                                   m_scan_it_upper_bound_slice);
-    }
-    m_scan_it_skips_bloom = skip_bloom;
-  }
-}
-
-void ha_rocksdb::release_scan_iterator() {
-  delete m_scan_it;
-  m_scan_it = nullptr;
-
-  if (m_scan_it_snapshot) {
-    rdb->ReleaseSnapshot(m_scan_it_snapshot);
-    m_scan_it_snapshot = nullptr;
-  }
-}
-
 /**
   @return
     HA_EXIT_SUCCESS  OK
@@ -11241,6 +10891,10 @@ int ha_rocksdb::index_init(uint idx, bool sorted MY_ATTRIBUTE((__unused__))) {
   Rdb_transaction *const tx = get_or_create_tx(thd);
   DBUG_ASSERT(tx != nullptr);
 
+  active_index = idx;
+  m_iterator.reset(new Rdb_iterator_base(
+      thd, m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def));
+
   // If m_lock_rows is not RDB_LOCK_NONE then we will be doing a get_for_update
   // when accessing the index, so don't acquire the snapshot right away.
   // Otherwise acquire the snapshot immediately.
@@ -11257,8 +10911,7 @@ int ha_rocksdb::index_end() {
   DBUG_ENTER_FUNC();
 
   m_need_build_decoder = false;
-
-  release_scan_iterator();
+  m_iterator = nullptr;
 
   active_index = MAX_KEY;
   in_range_check_pushed_down = false;
@@ -15141,14 +14794,14 @@ bool rdb_dbug_set_ttl_ignore_pk() { return rocksdb_debug_ttl_ignore_pk; }
 #endif
 
 void rdb_update_global_stats(const operation_type &type, uint count,
-                             bool is_system_table) {
+                             Rdb_tbl_def *td) {
   DBUG_ASSERT(type < ROWS_MAX);
 
   if (count == 0) {
     return;
   }
 
-  if (is_system_table) {
+  if (td && td->m_is_mysql_system_table) {
     global_stats.system_rows[type].add(count);
   } else {
     global_stats.rows[type].add(count);
@@ -16132,13 +15785,33 @@ const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx) {
 
 rocksdb::Iterator *rdb_tx_get_iterator(
     Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family,
-    bool skip_bloom_filter, bool fill_cache,
-    const rocksdb::Slice &lower_bound_slice,
+    bool skip_bloom_filter, const rocksdb::Slice &lower_bound_slice,
     const rocksdb::Slice &upper_bound_slice, bool read_current,
     bool create_snapshot) {
-  return tx->get_iterator(column_family, skip_bloom_filter, fill_cache,
-                          lower_bound_slice, upper_bound_slice, read_current,
-                          create_snapshot);
+  return tx->get_iterator(column_family, skip_bloom_filter, lower_bound_slice,
+                          upper_bound_slice, read_current, create_snapshot);
+}
+
+rocksdb::Iterator *rdb_tx_get_iterator(
+    THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter,
+    const rocksdb::Slice &eq_cond_lower_bound,
+    const rocksdb::Slice &eq_cond_upper_bound,
+    const rocksdb::Snapshot **snapshot, bool read_current,
+    bool create_snapshot) {
+  if (commit_in_the_middle(thd)) {
+    DBUG_ASSERT(*snapshot == nullptr);
+    *snapshot = rdb->GetSnapshot();
+
+    auto read_opts = rocksdb::ReadOptions();
+    // TODO(mung): set based on WHERE conditions
+    read_opts.total_order_seek = true;
+    read_opts.snapshot = *snapshot;
+    return rdb->NewIterator(read_opts, cf);
+  } else {
+    Rdb_transaction *tx = get_tx_from_thd(thd);
+    return tx->get_iterator(cf, skip_bloom_filter, eq_cond_lower_bound,
+                            eq_cond_upper_bound, read_current, create_snapshot);
+  }
 }
 
 bool rdb_tx_started(Rdb_transaction *tx) { return tx->is_tx_started(); }
@@ -16150,6 +15823,22 @@ rocksdb::Status rdb_tx_get(Rdb_transaction *tx,
   return tx->get(column_family, key, value);
 }
 
+rocksdb::Status rdb_tx_get_for_update(Rdb_transaction *tx,
+                                      const Rdb_key_def &kd,
+                                      const rocksdb::Slice &key,
+                                      rocksdb::PinnableSlice *const value,
+                                      bool exclusive) {
+  bool do_validate =
+      my_core::thd_tx_isolation(tx->get_thd()) > ISO_READ_COMMITTED;
+  rocksdb::Status s =
+      tx->get_for_update(kd, key, value, exclusive, do_validate);
+
+#ifndef DBUG_OFF
+  ++rocksdb_num_get_for_update_calls;
+#endif
+  return s;
+}
+
 void rdb_tx_multi_get(Rdb_transaction *tx,
                       rocksdb::ColumnFamilyHandle *const column_family,
                       const size_t num_keys, const rocksdb::Slice *keys,
@@ -16158,6 +15847,12 @@ void rdb_tx_multi_get(Rdb_transaction *tx,
   tx->multi_get(column_family, num_keys, keys, values, statuses, sorted_input);
 }
 
+int rdb_tx_set_status_error(Rdb_transaction *tx, const rocksdb::Status &s,
+                            const Rdb_key_def &kd,
+                            const Rdb_tbl_def *const tbl_def) {
+  return tx->set_status_error(tx->get_thd(), s, kd, tbl_def, nullptr);
+}
+
 /****************************************************************************
  * Multi-Range-Read implementation based on RocksDB's MultiGet() call
  ***************************************************************************/
@@ -16685,8 +16380,7 @@ int ha_rocksdb::multi_range_read_next(char **range_info) {
 
     /* If we found the record, but it's expired, pretend we didn't find it.  */
     if (m_pk_descr->has_ttl() &&
-        should_hide_ttl_rec(*m_pk_descr, m_retrieved_record,
-                            tx->m_snapshot_timestamp)) {
+        rdb_should_hide_ttl_rec(*m_pk_descr, m_retrieved_record, tx)) {
       continue;
     }
 
diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h
index 9c3c3927498..369af0bc9c4 100644
--- a/storage/rocksdb/ha_rocksdb.h
+++ b/storage/rocksdb/ha_rocksdb.h
@@ -77,6 +77,7 @@
 namespace myrocks {
 
 class Rdb_converter;
+class Rdb_iterator;
 class Rdb_key_def;
 class Rdb_tbl_def;
 class Rdb_transaction;
@@ -134,6 +135,8 @@ enum table_cardinality_scan_type {
   SCAN_TYPE_FULL_TABLE,
 };
 
+enum Rdb_lock_type { RDB_LOCK_NONE, RDB_LOCK_READ, RDB_LOCK_WRITE };
+
 class Mrr_rowid_source;
 
 uint32_t rocksdb_perf_context_level(THD *const thd);
@@ -148,20 +151,6 @@ class ha_rocksdb : public my_core::handler {
 
   Rdb_table_handler *m_table_handler;  ///< Open table handler
 
-  /* Iterator used for range scans and for full table/index scans */
-  rocksdb::Iterator *m_scan_it;
-
-  /* Whether m_scan_it was created with skip_bloom=true */
-  bool m_scan_it_skips_bloom;
-
-  const rocksdb::Snapshot *m_scan_it_snapshot;
-
-  /* Buffers used for upper/lower bounds for m_scan_it. */
-  uchar *m_scan_it_lower_bound;
-  uchar *m_scan_it_upper_bound;
-  rocksdb::Slice m_scan_it_lower_bound_slice;
-  rocksdb::Slice m_scan_it_upper_bound_slice;
-
   Rdb_tbl_def *m_tbl_def;
 
   /* Primary Key encoder from KeyTupleFormat to StorageFormat */
@@ -197,13 +186,6 @@ class ha_rocksdb : public my_core::handler {
   Rdb_string_writer m_sk_tails;
   Rdb_string_writer m_pk_unpack_info;
 
-  /*
-    ha_rockdb->index_read_map(.. HA_READ_KEY_EXACT or similar) will save here
-    mem-comparable form of the index lookup tuple.
-  */
-  uchar *m_sk_match_prefix;
-  uint m_sk_match_length;
-
   /* Second buffers, used by UPDATE. */
   uchar *m_sk_packed_tuple_old;
   Rdb_string_writer m_sk_tails_old;
@@ -221,6 +203,8 @@ class ha_rocksdb : public my_core::handler {
   /* class to convert between Mysql format and RocksDB format*/
   std::unique_ptr<Rdb_converter> m_converter;
 
+  std::unique_ptr<Rdb_iterator> m_iterator;
+
   /*
     Pointer to the original TTL timestamp value (8 bytes) during UPDATE.
   */
@@ -269,7 +253,7 @@ class ha_rocksdb : public my_core::handler {
   uint m_total_blob_buffer_allocated = 0;
 
   /* Type of locking to apply to rows */
-  enum { RDB_LOCK_NONE, RDB_LOCK_READ, RDB_LOCK_WRITE } m_lock_rows;
+  Rdb_lock_type m_lock_rows;
 
   /* true means we're doing an index-only read. false means otherwise. */
   bool m_keyread_only;
@@ -327,17 +311,6 @@ class ha_rocksdb : public my_core::handler {
   int secondary_index_read(const int keyno, uchar *const buf,
                            const rocksdb::Slice *value)
       MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
-  static void setup_iterator_bounds(const Rdb_key_def &kd,
-                                    const rocksdb::Slice &eq_cond,
-                                    size_t bound_len, uchar *const lower_bound,
-                                    uchar *const upper_bound,
-                                    rocksdb::Slice *lower_bound_slice,
-                                    rocksdb::Slice *upper_bound_slice);
-  static bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
-                                   const rocksdb::Slice &eq_cond);
-  void setup_scan_iterator(const Rdb_key_def &kd, rocksdb::Slice *slice,
-                           const uint eq_cond_len) MY_ATTRIBUTE((__nonnull__));
-  void release_scan_iterator(void);
 
   rocksdb::Status get_for_update(Rdb_transaction *const tx,
                                  const Rdb_key_def &kd,
@@ -373,7 +346,6 @@ class ha_rocksdb : public my_core::handler {
       MY_ATTRIBUTE((__warn_unused_result__));
   bool is_blind_delete_enabled();
   bool skip_unique_check() const;
-  bool commit_in_the_middle() MY_ATTRIBUTE((__warn_unused_result__));
   bool do_bulk_commit(Rdb_transaction *const tx)
       MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
   bool has_hidden_pk(const TABLE *const table) const
@@ -382,6 +354,7 @@ class ha_rocksdb : public my_core::handler {
   void update_row_stats(const operation_type &type, ulonglong count = 1);
 
   void set_last_rowkey(const uchar *const old_data);
+  void set_last_rowkey(const char *str, size_t len);
 
   int alloc_key_buffers(const TABLE *const table_arg,
                         const Rdb_tbl_def *const tbl_def_arg,
@@ -666,6 +639,8 @@ class ha_rocksdb : public my_core::handler {
       THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond,
       size_t bound_len, uchar *const lower_bound, uchar *const upper_bound,
       rocksdb::Slice *lower_bound_slice, rocksdb::Slice *upper_bound_slice);
+  static bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd,
+                                   const rocksdb::Slice &eq_cond);
 
  private:
   // true <=> The scan uses the default MRR implementation, just redirect all
@@ -680,6 +655,7 @@ class ha_rocksdb : public my_core::handler {
   friend class Mrr_rowid_source;
   friend class Mrr_pk_scan_rowid_source;
   friend class Mrr_sec_key_rowid_source;
+  friend class Rdb_iterator;
 
   // MRR parameters and output values
   rocksdb::Slice *mrr_keys;
@@ -778,11 +754,6 @@ class ha_rocksdb : public my_core::handler {
   int compare_keys(const KEY *const old_key, const KEY *const new_key) const
       MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
 
-  bool should_hide_ttl_rec(const Rdb_key_def &kd,
-                           const rocksdb::Slice &ttl_rec_val,
-                           const int64_t curr_ts)
-      MY_ATTRIBUTE((__warn_unused_result__));
-
   int index_read_intern(uchar *const buf, const uchar *const key,
                         key_part_map keypart_map,
                         enum ha_rkey_function find_flag)
@@ -834,29 +805,6 @@ class ha_rocksdb : public my_core::handler {
                            const bool pk_changed)
       MY_ATTRIBUTE((__warn_unused_result__));
 
-  int read_key_exact(const Rdb_key_def &kd, rocksdb::Iterator *const iter,
-                     const rocksdb::Slice &key_slice,
-                     const int64_t ttl_filter_ts)
-      MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
-  int read_before_key(const Rdb_key_def &kd, const bool using_full_key,
-                      const rocksdb::Slice &key_slice)
-      MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
-  int read_after_key(const Rdb_key_def &kd, const rocksdb::Slice &key_slice)
-      MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
-  int position_to_correct_key(const Rdb_key_def &kd,
-                              const enum ha_rkey_function &find_flag,
-                              const bool full_key_match,
-                              const rocksdb::Slice &key_slice,
-                              bool *const move_forward)
-      MY_ATTRIBUTE((__warn_unused_result__));
-
-  int calc_eq_cond_len(const Rdb_key_def &kd,
-                       const enum ha_rkey_function &find_flag,
-                       const rocksdb::Slice &slice,
-                       const int bytes_changed_by_succ,
-                       const key_range *const end_key)
-      MY_ATTRIBUTE((__warn_unused_result__));
-
   Rdb_tbl_def *get_table_if_exists(const char *const tablename)
       MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
   void read_thd_vars(THD *const thd) MY_ATTRIBUTE((__nonnull__));
@@ -1196,15 +1144,28 @@ const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx);
 
 rocksdb::Iterator *rdb_tx_get_iterator(
     Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family,
-    bool skip_bloom, bool fill_cache, const rocksdb::Slice &lower_bound_slice,
+    bool skip_bloom, const rocksdb::Slice &lower_bound_slice,
     const rocksdb::Slice &upper_bound_slice, bool read_current = false,
     bool create_snapshot = true);
 
+rocksdb::Iterator *rdb_tx_get_iterator(
+    THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter,
+    const rocksdb::Slice &eq_cond_lower_bound,
+    const rocksdb::Slice &eq_cond_upper_bound,
+    const rocksdb::Snapshot **snapshot, bool read_current = false,
+    bool create_snapshot = true);
+
 rocksdb::Status rdb_tx_get(Rdb_transaction *tx,
                            rocksdb::ColumnFamilyHandle *const column_family,
                            const rocksdb::Slice &key,
                            rocksdb::PinnableSlice *const value);
 
+rocksdb::Status rdb_tx_get_for_update(Rdb_transaction *tx,
+                                      const Rdb_key_def &kd,
+                                      const rocksdb::Slice &key,
+                                      rocksdb::PinnableSlice *const value,
+                                      bool exclusive);
+
 void rdb_tx_multi_get(Rdb_transaction *tx,
                       rocksdb::ColumnFamilyHandle *const column_family,
                       const size_t num_keys, const rocksdb::Slice *keys,
@@ -1244,7 +1205,14 @@ inline void rocksdb_smart_prev(bool seek_backward,
 // https://github.com/facebook/rocksdb/wiki/Iterator#error-handling
 bool is_valid_iterator(rocksdb::Iterator *scan_it);
 
+bool rdb_should_hide_ttl_rec(const Rdb_key_def &kd,
+                             const rocksdb::Slice &ttl_rec_val,
+                             Rdb_transaction *tx);
+
 bool rdb_tx_started(Rdb_transaction *tx);
+int rdb_tx_set_status_error(Rdb_transaction *tx, const rocksdb::Status &s,
+                            const Rdb_key_def &kd,
+                            const Rdb_tbl_def *const tbl_def);
 
 extern std::atomic<uint64_t> rocksdb_select_bypass_executed;
 extern std::atomic<uint64_t> rocksdb_select_bypass_rejected;
diff --git a/storage/rocksdb/ha_rocksdb_proto.h b/storage/rocksdb/ha_rocksdb_proto.h
index e72d666a781..58eee57ac27 100644
--- a/storage/rocksdb/ha_rocksdb_proto.h
+++ b/storage/rocksdb/ha_rocksdb_proto.h
@@ -105,7 +105,7 @@ bool rdb_sync_wal_supported();
 
 enum operation_type : int;
 void rdb_update_global_stats(const operation_type &type, uint count,
-                             bool is_system_table = false);
+                             Rdb_tbl_def *td = nullptr);
 
 class Rdb_dict_manager;
 Rdb_dict_manager *rdb_get_dict_manager(void)
diff --git a/storage/rocksdb/nosql_access.cc b/storage/rocksdb/nosql_access.cc
index 1788486ee9f..f13b94b32f1 100644
--- a/storage/rocksdb/nosql_access.cc
+++ b/storage/rocksdb/nosql_access.cc
@@ -678,8 +678,8 @@ class select_exec {
                                     bool use_bloom,
                                     const rocksdb::Slice &lower_bound,
                                     const rocksdb::Slice &upper_bound) {
-      return rdb_tx_get_iterator(m_tx, cf, !use_bloom, true /* fill_cache */,
-                                 lower_bound, upper_bound);
+      return rdb_tx_get_iterator(m_tx, cf, !use_bloom, lower_bound,
+                                 upper_bound);
     }
 
     rocksdb::Status get(rocksdb::ColumnFamilyHandle *cf,
diff --git a/storage/rocksdb/rdb_converter.h b/storage/rocksdb/rdb_converter.h
index 2e6f1ed9689..e121215c2e8 100644
--- a/storage/rocksdb/rdb_converter.h
+++ b/storage/rocksdb/rdb_converter.h
@@ -173,12 +173,12 @@ class Rdb_converter {
   }
 
   const MY_BITMAP *get_lookup_bitmap() { return &m_lookup_bitmap; }
+ private:
 
   int decode_value_header_for_pk(Rdb_string_reader *reader,
                                  const std::shared_ptr<Rdb_key_def> &pk_def,
                                  rocksdb::Slice *unpack_slice);
 
- private:
   void setup_field_encoders();
 
   void get_storage_type(Rdb_field_encoder *const encoder, const uint kp);
diff --git a/storage/rocksdb/rdb_iterator.cc b/storage/rocksdb/rdb_iterator.cc
new file mode 100644
index 00000000000..529cd6dacae
--- /dev/null
+++ b/storage/rocksdb/rdb_iterator.cc
@@ -0,0 +1,359 @@
+/*
+   Copyright (c) 2020, Facebook, Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "./rdb_iterator.h"
+
+#include "scope_guard.h"
+
+namespace myrocks {
+
+Rdb_iterator::~Rdb_iterator() {}
+
+Rdb_iterator_base::Rdb_iterator_base(THD *thd,
+                                     const std::shared_ptr<Rdb_key_def> kd,
+                                     const std::shared_ptr<Rdb_key_def> pkd,
+                                     const Rdb_tbl_def *tbl_def)
+    : m_kd(kd),
+      m_pkd(pkd),
+      m_tbl_def(tbl_def),
+      m_thd(thd),
+      m_scan_it(nullptr),
+      m_scan_it_skips_bloom(false),
+      m_scan_it_snapshot(nullptr),
+      m_scan_it_lower_bound(nullptr),
+      m_scan_it_upper_bound(nullptr),
+      m_prefix_buf(nullptr) {}
+
+Rdb_iterator_base::~Rdb_iterator_base() {
+  release_scan_iterator();
+  my_free(m_scan_it_lower_bound);
+  m_scan_it_lower_bound = nullptr;
+  my_free(m_scan_it_upper_bound);
+  m_scan_it_upper_bound = nullptr;
+  my_free(m_prefix_buf);
+  m_prefix_buf = nullptr;
+}
+
+int Rdb_iterator_base::read_before_key(const bool full_key_match,
+                                       const rocksdb::Slice &key_slice) {
+  /*
+    We are looking for the first record such that
+
+      index_tuple $LT lookup_tuple
+
+    with HA_READ_BEFORE_KEY, $LT = '<',
+    with HA_READ_PREFIX_LAST_OR_PREV, $LT = '<='
+    with HA_READ_PREFIX_LAST, $LT = '=='
+
+    Symmetry with read_after_key is possible if rocksdb supported prefix seeks.
+  */
+  rocksdb_smart_seek(!m_kd->m_is_reverse_cf, m_scan_it, key_slice);
+
+  while (is_valid_iterator(m_scan_it)) {
+    if (thd_killed(m_thd)) {
+      return HA_ERR_QUERY_INTERRUPTED;
+    }
+    /*
+      We are using full key and we've hit an exact match.
+      */
+    if ((full_key_match &&
+         m_kd->value_matches_prefix(m_scan_it->key(), key_slice))) {
+      rocksdb_smart_next(!m_kd->m_is_reverse_cf, m_scan_it);
+      continue;
+    }
+
+    return HA_EXIT_SUCCESS;
+  }
+
+  return HA_ERR_END_OF_FILE;
+}
+
+int Rdb_iterator_base::read_after_key(const rocksdb::Slice &key_slice) {
+  /*
+    We are looking for the first record such that
+
+    index_tuple $GT lookup_tuple
+
+    with HA_READ_AFTER_KEY, $GT = '>',
+    with HA_READ_KEY_OR_NEXT, $GT = '>='
+    with HA_READ_KEY_EXACT, $GT = '=='
+  */
+  rocksdb_smart_seek(m_kd->m_is_reverse_cf, m_scan_it, key_slice);
+
+  return is_valid_iterator(m_scan_it) ? HA_EXIT_SUCCESS : HA_ERR_END_OF_FILE;
+}
+
+void Rdb_iterator_base::release_scan_iterator() {
+  delete m_scan_it;
+  m_scan_it = nullptr;
+
+  if (m_scan_it_snapshot) {
+    auto rdb = rdb_get_rocksdb_db();
+    rdb->ReleaseSnapshot(m_scan_it_snapshot);
+    m_scan_it_snapshot = nullptr;
+  }
+}
+
+void Rdb_iterator_base::setup_scan_iterator(const rocksdb::Slice *const slice,
+                                            const uint eq_cond_len,
+                                            bool read_current) {
+  DBUG_ASSERT(slice->size() >= eq_cond_len);
+
+  bool skip_bloom = true;
+
+  const rocksdb::Slice eq_cond(slice->data(), eq_cond_len);
+
+  // The size of m_scan_it_lower_bound (and upper) is technically
+  // max_packed_sk_len as calculated in ha_rocksdb::alloc_key_buffers.  Rather
+  // than recalculating that number, we pass in the max of eq_cond_len and
+  // Rdb_key_def::INDEX_NUMBER_SIZE which is guaranteed to be smaller than
+  // max_packed_sk_len, hence ensuring no buffer overrun.
+  //
+  // See setup_iterator_bounds on how the bound_len parameter is
+  // used.
+  if (ha_rocksdb::check_bloom_and_set_bounds(
+          m_thd, *m_kd, eq_cond,
+          std::max(eq_cond_len, (uint)Rdb_key_def::INDEX_ID_SIZE),
+          m_scan_it_lower_bound, m_scan_it_upper_bound,
+          &m_scan_it_lower_bound_slice, &m_scan_it_upper_bound_slice)) {
+    skip_bloom = false;
+  }
+
+  /*
+    In some cases, setup_scan_iterator() is called multiple times from
+    the same query but bloom filter can not always be used.
+    Suppose the following query example. id2 is VARCHAR(30) and PRIMARY KEY
+    (id1, id2).
+    select count(*) from t2 WHERE id1=100 and id2 IN ('00000000000000000000',
+    '100');
+    In this case, setup_scan_iterator() is called twice, the first time is for
+    (id1, id2)=(100, '00000000000000000000') and the second time is for (100,
+    '100').
+    If prefix bloom filter length is 24 bytes, prefix bloom filter can be used
+    for the
+    first condition but not for the second condition.
+    If bloom filter condition is changed, currently it is necessary to destroy
+    and
+    re-create Iterator.
+    */
+  if (m_scan_it_skips_bloom != skip_bloom) {
+    release_scan_iterator();
+  }
+
+  /*
+    SQL layer can call rnd_init() multiple times in a row.
+    In that case, re-use the iterator, but re-position it at the table start.
+    */
+  if (!m_scan_it) {
+    m_scan_it = rdb_tx_get_iterator(
+        m_thd, m_kd->get_cf(), skip_bloom, m_scan_it_lower_bound_slice,
+        m_scan_it_upper_bound_slice, &m_scan_it_snapshot, read_current,
+        !read_current);
+    m_scan_it_skips_bloom = skip_bloom;
+  }
+}
+
+int Rdb_iterator_base::calc_eq_cond_len(enum ha_rkey_function find_flag,
+                                        const rocksdb::Slice &start_key,
+                                        const int bytes_changed_by_succ,
+                                        const rocksdb::Slice &end_key) {
+  if (find_flag == HA_READ_KEY_EXACT) return start_key.size();
+
+  if (find_flag == HA_READ_PREFIX_LAST) {
+    /*
+      We have made the kd.successor(m_sk_packed_tuple) call above.
+
+      The slice is at least Rdb_key_def::INDEX_NUMBER_SIZE bytes long.
+    */
+    return start_key.size() - bytes_changed_by_succ;
+  }
+
+  if (!end_key.empty()) {
+    /*
+      Calculating length of the equal conditions here. 4 byte index id is
+      included.
+      Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3)
+       WHERE id1=1 AND id2=1 AND id3>=2 => eq_cond_len= 4+8+4= 16
+       WHERE id1=1 AND id2>=1 AND id3>=2 => eq_cond_len= 4+8= 12
+      Example2: id1 VARCHAR(30), id2 INT, PRIMARY KEY (id1, id2)
+       WHERE id1 = 'AAA' and id2 < 3; => eq_cond_len=13 (varchar used 9 bytes)
+    */
+    return start_key.difference_offset(end_key);
+  }
+
+  /*
+    On range scan without any end key condition, there is no
+    eq cond, and eq cond length is the same as index_id size (4 bytes).
+    Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3)
+     WHERE id1>=1 AND id2 >= 2 and id2 <= 5 => eq_cond_len= 4
+  */
+  return Rdb_key_def::INDEX_ID_SIZE;
+}
+
+int Rdb_iterator_base::next_with_direction(bool move_forward, bool skip_next) {
+  int rc = 0;
+  const auto &kd = *m_kd;
+  Rdb_transaction *const tx = get_tx_from_thd(m_thd);
+
+  for (;;) {
+    DEBUG_SYNC(m_thd, "rocksdb.check_flags_nwd");
+    if (thd_killed(m_thd)) {
+      rc = HA_ERR_QUERY_INTERRUPTED;
+      break;
+    }
+
+    DBUG_ASSERT(m_scan_it != nullptr);
+    if (m_scan_it == nullptr) {
+      rc = HA_ERR_INTERNAL_ERROR;
+      break;
+    }
+
+    if (skip_next) {
+      skip_next = false;
+    } else {
+      if (move_forward) {
+        rocksdb_smart_next(kd.m_is_reverse_cf, m_scan_it);
+      } else {
+        rocksdb_smart_prev(kd.m_is_reverse_cf, m_scan_it);
+      }
+    }
+
+    if (!is_valid_iterator(m_scan_it)) {
+      rc = HA_ERR_END_OF_FILE;
+      break;
+    }
+
+    const rocksdb::Slice &key = m_scan_it->key();
+    const rocksdb::Slice &value = m_scan_it->value();
+
+    // Outside our range, return EOF.
+    if (!kd.value_matches_prefix(key, m_prefix_tuple)) {
+      rc = HA_ERR_END_OF_FILE;
+      break;
+    }
+
+    // Record is not visible due to TTL, move to next record.
+    if (m_pkd->has_ttl() && rdb_should_hide_ttl_rec(kd, value, tx)) {
+      continue;
+    }
+
+    break;
+  }
+
+  return rc;
+}
+
+int Rdb_iterator_base::seek(enum ha_rkey_function find_flag,
+                            const rocksdb::Slice start_key, bool full_key_match,
+                            const rocksdb::Slice end_key, bool read_current) {
+  int rc = 0;
+
+  uint prefix_key_len;
+
+  if (!m_prefix_buf) {
+    const uint packed_len = m_kd->max_storage_fmt_length();
+    m_scan_it_lower_bound = reinterpret_cast<uchar *>(
+        my_malloc(PSI_NOT_INSTRUMENTED, packed_len, MYF(0)));
+    m_scan_it_upper_bound = reinterpret_cast<uchar *>(
+        my_malloc(PSI_NOT_INSTRUMENTED, packed_len, MYF(0)));
+    m_prefix_buf = reinterpret_cast<uchar *>(
+        my_malloc(PSI_NOT_INSTRUMENTED, packed_len, MYF(0)));
+  }
+
+  if (find_flag == HA_READ_KEY_EXACT || find_flag == HA_READ_PREFIX_LAST) {
+    memcpy(m_prefix_buf, start_key.data(), start_key.size());
+    prefix_key_len = start_key.size();
+  } else {
+    m_kd->get_infimum_key(m_prefix_buf, &prefix_key_len);
+  }
+  m_prefix_tuple = rocksdb::Slice((char *)m_prefix_buf, prefix_key_len);
+
+  int bytes_changed_by_succ = 0;
+  uchar *start_key_buf = (uchar *)start_key.data();
+  // We need to undo mutating the start key in case of retries using the same
+  // buffer.
+  auto start_key_guard = create_scope_guard([this, start_key_buf, start_key] {
+    this->m_kd->predecessor(start_key_buf, start_key.size());
+  });
+  if (find_flag == HA_READ_PREFIX_LAST_OR_PREV ||
+      find_flag == HA_READ_PREFIX_LAST || find_flag == HA_READ_AFTER_KEY) {
+    bytes_changed_by_succ = m_kd->successor(start_key_buf, start_key.size());
+  } else {
+    start_key_guard.commit();
+  }
+
+  const uint eq_cond_len =
+      calc_eq_cond_len(find_flag, start_key, bytes_changed_by_succ, end_key);
+
+  /*
+    This will open the iterator and position it at a record that's equal or
+    greater than the lookup tuple.
+  */
+  setup_scan_iterator(&start_key, eq_cond_len, read_current);
+
+  /*
+    Once we are positioned on from above, move to the position we really
+    want: See storage/rocksdb/rocksdb-range-access.txt
+  */
+  bool direction = (find_flag == HA_READ_KEY_EXACT) ||
+                   (find_flag == HA_READ_AFTER_KEY) ||
+                   (find_flag == HA_READ_KEY_OR_NEXT);
+  if (direction) {
+    rc = read_after_key(start_key);
+  } else {
+    rc = read_before_key(full_key_match, start_key);
+  }
+
+  if (rc) {
+    return rc;
+  }
+
+  rc = next_with_direction(direction, true);
+  return rc;
+}
+
+int Rdb_iterator_base::get(const rocksdb::Slice *key,
+                           rocksdb::PinnableSlice *value, Rdb_lock_type type,
+                           bool skip_ttl_check) {
+  int rc = HA_EXIT_SUCCESS;
+  Rdb_transaction *const tx = get_tx_from_thd(m_thd);
+  rocksdb::Status s;
+  if (type == RDB_LOCK_NONE) {
+    s = rdb_tx_get(tx, m_kd->get_cf(), *key, value);
+  } else {
+    s = rdb_tx_get_for_update(tx, *m_kd, *key, value, type == RDB_LOCK_WRITE);
+  }
+
+  DBUG_EXECUTE_IF("rocksdb_return_status_corrupted",
+                  { s = rocksdb::Status::Corruption(); });
+
+  if (!s.IsNotFound() && !s.ok()) {
+    return rdb_tx_set_status_error(tx, s, *m_kd, m_tbl_def);
+  }
+
+  if (s.IsNotFound()) {
+    return HA_ERR_KEY_NOT_FOUND;
+  }
+
+  if (!skip_ttl_check && m_kd->has_ttl() &&
+      rdb_should_hide_ttl_rec(*m_kd, *value, tx)) {
+    return HA_ERR_KEY_NOT_FOUND;
+  }
+
+  return rc;
+}
+
+}  // namespace myrocks
diff --git a/storage/rocksdb/rdb_iterator.h b/storage/rocksdb/rdb_iterator.h
new file mode 100644
index 00000000000..2a0f5bd5760
--- /dev/null
+++ b/storage/rocksdb/rdb_iterator.h
@@ -0,0 +1,121 @@
+/*
+   Copyright (c) 2020, Facebook, Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#pragma once
+
+// MySQL header files
+#include "sql/debug_sync.h"
+#include "sql/handler.h"
+
+// MyRocks header files
+#include "./ha_rocksdb.h"
+#include "./ha_rocksdb_proto.h"
+#include "./rdb_converter.h"
+#include "./rdb_datadic.h"
+
+namespace myrocks {
+
+class Rdb_iterator {
+ public:
+  virtual ~Rdb_iterator() = 0;
+
+  /*
+    direction specifies which logical direction the table is scanned in.
+    start_key is inclusive if scanning forwards, but exclusive when scanning
+    backwards. full_key_match indicates whether the seek key may match the full
+
+    Once rocksdb supports prefix seeks, the API can be simplified since
+    full_key_match is no longer needed.
+  */
+  virtual int seek(enum ha_rkey_function find_flag,
+                   const rocksdb::Slice start_key, bool full_key_match,
+                   const rocksdb::Slice end_key, bool read_current = false) = 0;
+  virtual int get(const rocksdb::Slice *key, rocksdb::PinnableSlice *value,
+                  Rdb_lock_type type, bool skip_ttl_check = false) = 0;
+  virtual int next() = 0;
+  virtual int prev() = 0;
+  virtual rocksdb::Slice key() = 0;
+  virtual rocksdb::Slice value() = 0;
+  virtual void reset() = 0;
+};
+
+class Rdb_iterator_base : public Rdb_iterator {
+ private:
+  int read_before_key(const bool full_key_match,
+                      const rocksdb::Slice &key_slice);
+  int read_after_key(const rocksdb::Slice &key_slice);
+  void release_scan_iterator();
+  void setup_scan_iterator(const rocksdb::Slice *const slice,
+                           const uint eq_cond_len, bool read_current);
+  int calc_eq_cond_len(enum ha_rkey_function find_flag,
+                       const rocksdb::Slice &start_key,
+                       const int bytes_changed_by_succ,
+                       const rocksdb::Slice &end_key);
+  int next_with_direction(bool move_forward, bool skip_next);
+
+ public:
+  Rdb_iterator_base(THD *thd, const std::shared_ptr<Rdb_key_def> kd,
+                    const std::shared_ptr<Rdb_key_def> pkd,
+                    const Rdb_tbl_def *tbl_def);
+
+  ~Rdb_iterator_base() override;
+
+  int seek(enum ha_rkey_function find_flag, const rocksdb::Slice start_key,
+           bool full_key_match, const rocksdb::Slice end_key,
+           bool read_current) override;
+  int get(const rocksdb::Slice *key, rocksdb::PinnableSlice *value,
+          Rdb_lock_type type, bool skip_ttl_check = false) override;
+
+  int next() override { return next_with_direction(true, false); }
+
+  int prev() override { return next_with_direction(false, false); }
+
+  rocksdb::Slice key() override { return m_scan_it->key(); }
+
+  rocksdb::Slice value() override { return m_scan_it->value(); }
+
+  void reset() override { release_scan_iterator(); }
+
+ protected:
+  friend class Rdb_iterator;
+  const std::shared_ptr<Rdb_key_def> m_kd;
+
+  // Rdb_key_def of the primary key
+  const std::shared_ptr<Rdb_key_def> m_pkd;
+
+  const Rdb_tbl_def *m_tbl_def;
+
+  THD *m_thd;
+
+  /* Iterator used for range scans and for full table/index scans */
+  rocksdb::Iterator *m_scan_it;
+
+  /* Whether m_scan_it was created with skip_bloom=true */
+  bool m_scan_it_skips_bloom;
+
+  const rocksdb::Snapshot *m_scan_it_snapshot;
+
+  /* Buffers used for upper/lower bounds for m_scan_it. */
+  uchar *m_scan_it_lower_bound;
+  uchar *m_scan_it_upper_bound;
+  rocksdb::Slice m_scan_it_lower_bound_slice;
+  rocksdb::Slice m_scan_it_upper_bound_slice;
+
+  uchar *m_prefix_buf;
+  rocksdb::Slice m_prefix_tuple;
+};
+
+}  // namespace myrocks


More information about the commits mailing list