[Commits] Rev 3496: Cassandra storage engine: BKA support in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Mon Aug 27 07:44:59 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3496
revision-id: psergey at askmonty.org-20120827044458-y0x419xfr7n22q1q
parent: psergey at askmonty.org-20120826120639-9a4gunor39gkqd5q
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Mon 2012-08-27 08:44:58 +0400
message:
  Cassandra storage engine: BKA support
  - We use HA_MRR_NO_ASSOC ("optimizer_switch=join_cache_hashed") mode
  - Not able to use BKA's buffers yet.
  - There is a variable to control batch size
  - There are status counters.
  - Nedeed to make some fixes in BKA code (to be checked with Igor)
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-08-26 12:06:39 +0000
+++ b/mysql-test/r/cassandra.result	2012-08-27 04:44:58 +0000
@@ -79,6 +79,7 @@
 Cassandra_row_insert_batches	7
 CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
 thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+delete from t1;
 INSERT INTO t1 VALUES (1,1),(2,2);
 DELETE FROM t1 ORDER BY a LIMIT 1;
 DROP TABLE t1;
@@ -92,3 +93,66 @@
 Variable_name	Value
 Cassandra_row_inserts	10
 Cassandra_row_insert_batches	8
+#
+# Batched Key Access
+#
+# Control variable (we are not yet able to make use of MRR's buffer)
+show variables like 'cassandra_multi%';
+Variable_name	Value
+cassandra_multiget_batch_size	100
+# MRR-related status variables:
+show status like 'cassandra_multi%';
+Variable_name	Value
+Cassandra_multiget_reads	0
+Cassandra_multiget_keys_scanned	0
+Cassandra_multiget_rows_read	0
+CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+delete from t1;
+INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9);
+set @tmp_jcl=@@join_cache_level;
+set join_cache_level=8;
+explain select * from t1 A, t1 B where B.rowkey=A.a;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	A	ALL	NULL	NULL	NULL	NULL	1000	Using where
+1	SIMPLE	B	eq_ref	PRIMARY	PRIMARY	8	test.A.a	1	Using join buffer (flat, BKAH join); multiget_slice
+select * from t1 A, t1 B where B.rowkey=A.a;
+rowkey	a	rowkey	a
+0	0	0	0
+1	1	1	1
+2	2	2	2
+3	3	3	3
+4	4	4	4
+5	5	5	5
+6	6	6	6
+7	7	7	7
+8	8	8	8
+9	9	9	9
+show status like 'cassandra_multi%';
+Variable_name	Value
+Cassandra_multiget_reads	1
+Cassandra_multiget_keys_scanned	10
+Cassandra_multiget_rows_read	10
+insert into t1 values(1, 8);
+insert into t1 values(3, 8);
+insert into t1 values(5, 8);
+insert into t1 values(7, 8);
+select * from t1 A, t1 B where B.rowkey=A.a;
+rowkey	a	rowkey	a
+0	0	0	0
+2	2	2	2
+4	4	4	4
+6	6	6	6
+1	8	8	8
+7	8	8	8
+8	8	8	8
+5	8	8	8
+3	8	8	8
+9	9	9	9
+show status like 'cassandra_multi%';
+Variable_name	Value
+Cassandra_multiget_reads	2
+Cassandra_multiget_keys_scanned	16
+Cassandra_multiget_rows_read	16
+delete from t1;
+drop table t1;

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-08-26 12:06:39 +0000
+++ b/mysql-test/t/cassandra.test	2012-08-27 04:44:58 +0000
@@ -117,6 +117,7 @@
 CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
   thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
 
+delete from t1;
 INSERT INTO t1 VALUES (1,1),(2,2);
 DELETE FROM t1 ORDER BY a LIMIT 1;
 
@@ -127,6 +128,40 @@
 flush status;
 show status like 'cassandra_row_insert%';
 
+--echo #
+--echo # Batched Key Access
+--echo #
+
+--echo # Control variable (we are not yet able to make use of MRR's buffer)
+show variables like 'cassandra_multi%';
+
+--echo # MRR-related status variables:
+show status like 'cassandra_multi%';
+
+CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+delete from t1;
+INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9);
+
+set @tmp_jcl=@@join_cache_level;
+set join_cache_level=8;
+explain select * from t1 A, t1 B where B.rowkey=A.a;
+
+select * from t1 A, t1 B where B.rowkey=A.a;
+show status like 'cassandra_multi%';
+
+# The following INSERTs are really UPDATEs
+insert into t1 values(1, 8);
+insert into t1 values(3, 8);
+insert into t1 values(5, 8);
+insert into t1 values(7, 8);
+
+select * from t1 A, t1 B where B.rowkey=A.a;
+show status like 'cassandra_multi%';
+
+
+delete from t1;
+drop table t1;
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'sql/sql_join_cache.cc'
--- a/sql/sql_join_cache.cc	2012-03-24 17:21:22 +0000
+++ b/sql/sql_join_cache.cc	2012-08-27 04:44:58 +0000
@@ -3876,8 +3876,11 @@
       If a record in in an incremental cache contains no fields then the
       association for the last record in cache will be equal to cache->end_pos
     */ 
+    /* 
+    psergey: this makes no sense where HA_MRR_NO_ASSOC is used.
     DBUG_ASSERT(cache->buff <= (uchar *) (*ptr) &&
                 (uchar *) (*ptr) <= cache->end_pos);
+    */
     if (join_tab->table->vfield)
       update_virtual_fields(join->thd, join_tab->table);
   }
@@ -4543,7 +4546,7 @@
 {
   last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0;
   if (no_association &&
-      (curr_matching_chain= get_matching_chain_by_join_key()))
+      !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
     return 1;
   last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain);
   return 0;

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-08-26 12:06:39 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-08-27 04:44:58 +0000
@@ -100,7 +100,18 @@
   /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
   void clear_read_columns();
   void add_read_column(const char *name);
-  
+ 
+  /* Reads, MRR scans */
+  void new_lookup_keys();
+  int  add_lookup_key(const char *key, size_t key_len);
+  bool multiget_slice();
+
+  std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */
+  std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
+  std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
+
+  bool get_next_multiget_row();
+
   bool truncate();
   bool remove_row();
 
@@ -522,3 +533,72 @@
 
   return res;
 }
+
+/////////////////////////////////////////////////////////////////////////////
+// MRR reads
+/////////////////////////////////////////////////////////////////////////////
+
+void Cassandra_se_impl::new_lookup_keys()
+{
+  mrr_keys.clear();
+}
+
+
+int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
+{
+  mrr_keys.push_back(std::string(key, key_len));
+  return mrr_keys.size();
+}
+
+
+bool Cassandra_se_impl::multiget_slice()
+{
+  ColumnParent cparent;
+  cparent.column_family= column_family;
+
+  SlicePredicate slice_pred;
+  SliceRange sr;
+  sr.start = "";
+  sr.finish = "";
+  slice_pred.__set_slice_range(sr);
+
+  bool res= true;
+
+  try {
+    
+    cassandra_counters.multiget_reads++;
+    cassandra_counters.multiget_keys_scanned += mrr_keys.size();
+
+    cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, 
+                         cur_consistency_level);
+
+    cassandra_counters.multiget_rows_read += mrr_result.size();
+    
+    res= false;
+    mrr_result_it= mrr_result.begin();
+
+  } catch (InvalidRequestException ire) {
+    print_error("%s [%s]", ire.what(), ire.why.c_str());
+  } catch (UnavailableException ue) {
+    print_error("UnavailableException: %s", ue.what());
+  } catch (TimedOutException te) {
+    print_error("TimedOutException: %s", te.what());
+  }
+
+  return res;
+}
+
+
+bool Cassandra_se_impl::get_next_multiget_row()
+{
+  if (mrr_result_it == mrr_result.end())
+    return true; /* EOF */
+
+  column_data_vec= mrr_result_it->second;
+  rowkey= mrr_result_it->first;
+
+  column_data_it= column_data_vec.begin();
+  mrr_result_it++;
+  return false;
+}
+

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-08-26 12:06:39 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-08-27 04:44:58 +0000
@@ -41,11 +41,16 @@
 
   /* Reads, multi-row scans */
   int read_batch_size;
-
   virtual bool get_range_slices(bool last_key_as_start_key)=0;
   virtual void finish_reading_range_slices()=0;
   virtual bool get_next_range_slice_row(bool *eof)=0;
   
+  /* Reads, MRR scans */
+  virtual void new_lookup_keys()=0;
+  virtual int  add_lookup_key(const char *key, size_t key_len)=0;
+  virtual bool multiget_slice()=0;
+  virtual bool get_next_multiget_row()=0;
+
   /* read_set setup */
   virtual void clear_read_columns()=0;
   virtual void add_read_column(const char *name)=0;
@@ -59,13 +64,20 @@
   void print_error(const char *format, ...);
 };
 
+
 /* A structure with global counters */
 class Cassandra_status_vars
 {
 public:
   ulong row_inserts;
   ulong row_insert_batches;
+  
+  ulong multiget_reads;
+  ulong multiget_keys_scanned;
+  ulong multiget_rows_read;
 };
+
+
 extern Cassandra_status_vars cassandra_counters;
 
 

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-08-26 12:06:39 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-08-27 04:44:58 +0000
@@ -60,15 +60,35 @@
   "Number of rows in an INSERT batch",
   NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
 
+static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
+  "Number of rows in a multiget(MRR) batch",
+  NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
 
 static struct st_mysql_sys_var* cassandra_system_variables[]= {
   MYSQL_SYSVAR(insert_batch_size),
+  MYSQL_SYSVAR(multiget_batch_size),
 //  MYSQL_SYSVAR(enum_var),
 //  MYSQL_SYSVAR(ulong_var),
   NULL
 };
 
 
+static SHOW_VAR cassandra_status_variables[]= {
+  {"row_inserts",
+    (char*) &cassandra_counters.row_inserts,         SHOW_LONG},
+  {"row_insert_batches",
+    (char*) &cassandra_counters.row_insert_batches,  SHOW_LONG},
+
+  {"multiget_reads",
+    (char*) &cassandra_counters.multiget_reads,      SHOW_LONG},
+  {"multiget_keys_scanned",
+    (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
+  {"multiget_rows_read",
+    (char*) &cassandra_counters.multiget_rows_read,  SHOW_LONG},
+  {NullS, NullS, SHOW_LONG}
+};
+
+
 Cassandra_status_vars cassandra_counters;
 Cassandra_status_vars cassandra_counters_copy;
 
@@ -772,8 +792,7 @@
   if (doing_insert_batch)
   {
     res= 0;
-    if (++insert_rows_batched >= /*insert_batch_size*/
-                                 THDVAR(table->in_use, insert_batch_size))
+    if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
     {
       res= se->do_insert();
       insert_rows_batched= 0;
@@ -955,6 +974,135 @@
   return 0;
 }
 
+/////////////////////////////////////////////////////////////////////////////
+// MRR implementation
+/////////////////////////////////////////////////////////////////////////////
+
+
+/*
+ - The key can be only primary key
+  - allow equality-ranges only.
+  - anything else?
+*/
+ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
+                                                  void *seq_init_param, 
+                                                  uint n_ranges, uint *bufsz,
+                                                  uint *flags, COST_VECT *cost)
+{
+  /* No support for const ranges so far */
+  return HA_POS_ERROR;
+}
+
+
+ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
+                              uint key_parts, uint *bufsz, 
+                              uint *flags, COST_VECT *cost)
+{
+  /* Can only be equality lookups on the primary key... */
+  // TODO anything else?
+  *flags &= ~HA_MRR_USE_DEFAULT_IMPL;
+  *flags |= HA_MRR_NO_ASSOCIATION;
+
+  return 10;
+}
+
+
+int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
+                          uint n_ranges, uint mode, HANDLER_BUFFER *buf)
+{
+  int res;
+  mrr_iter= seq->init(seq_init_param, n_ranges, mode);
+  mrr_funcs= *seq;
+  res= mrr_start_read();
+  return (res? HA_ERR_INTERNAL_ERROR: 0);
+}
+
+
+bool ha_cassandra::mrr_start_read()
+{
+  uint key_len;
+
+  my_bitmap_map *old_map;
+  old_map= dbug_tmp_use_all_columns(table, table->read_set);
+  
+  se->new_lookup_keys();
+
+  while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
+  {
+    char *cass_key;
+    int cass_key_len;
+    
+    DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
+
+    uchar *key= (uchar*)mrr_cur_range.start_key.key;
+    key_len= mrr_cur_range.start_key.length;
+    //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
+    store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
+
+    rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
+    
+    // Primitive buffer control
+    if (se->add_lookup_key(cass_key, cass_key_len) > 
+        THDVAR(table->in_use, multiget_batch_size))
+      break;
+  }
+
+  dbug_tmp_restore_column_map(table->read_set, old_map);
+
+  return se->multiget_slice();
+}
+
+
+int ha_cassandra::multi_range_read_next(range_id_t *range_info)
+{
+  int res;
+  while(1)
+  {
+    if (!se->get_next_multiget_row())
+    {
+      read_cassandra_columns(true);
+      res= 0;
+      break;
+    }
+    else 
+    {
+      if (source_exhausted)
+      {
+        res= HA_ERR_END_OF_FILE;
+        break;
+      }
+      else
+      {
+        if (mrr_start_read())
+        {
+          res= HA_ERR_INTERNAL_ERROR;
+          break;
+        }
+      }
+    }
+    /* 
+      We get here if we've refilled the buffer and done another read. Try
+      reading from results again
+    */
+  }
+  return res;
+}
+
+
+int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
+{
+  const char *mrr_str= "multiget_slice";
+
+  if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
+  {
+    uint mrr_str_len= strlen(mrr_str);
+    uint copy_len= min(mrr_str_len, size);
+    memcpy(str, mrr_str, size);
+    return copy_len;
+  }
+  return 0;
+}
+
 
 /////////////////////////////////////////////////////////////////////////////
 // Dummy implementations start
@@ -1073,15 +1221,6 @@
 // Dummy implementations end
 /////////////////////////////////////////////////////////////////////////////
 
-static SHOW_VAR cassandra_status_variables[]= {
-  {"row_inserts",
-    (char*) &cassandra_counters.row_inserts,         SHOW_LONG},
-  {"row_insert_batches",
-    (char*) &cassandra_counters.row_insert_batches,  SHOW_LONG},
-  {NullS, NullS, SHOW_LONG}
-};
-
-
 static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
 {
   //innodb_export_status();

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-08-26 12:06:39 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-08-27 04:44:58 +0000
@@ -154,6 +154,24 @@
   
   virtual int reset();
   
+
+  int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
+                            uint n_ranges, uint mode, HANDLER_BUFFER *buf);
+  int multi_range_read_next(range_id_t *range_info);
+  ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
+                                      void *seq_init_param, 
+                                      uint n_ranges, uint *bufsz,
+                                      uint *flags, COST_VECT *cost);
+  ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
+                                uint key_parts, uint *bufsz, 
+                                uint *flags, COST_VECT *cost);
+  int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size);
+
+private:
+  bool source_exhausted;
+  bool mrr_start_read();
+public:
+
   /*
     Everything below are methods that we implement in ha_example.cc.
 



More information about the commits mailing list