[Commits] Rev 3495: Cassandra storage engine: bulk INSERT support in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Sun Aug 26 15:06:39 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3495
revision-id: psergey at askmonty.org-20120826120639-9a4gunor39gkqd5q
parent: psergey at askmonty.org-20120823171601-31juuznejpayb5yb
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Sun 2012-08-26 16:06:39 +0400
message:
  Cassandra storage engine: bulk INSERT support
  - bulk inserts themselves
  - control variable and counters.
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-08-23 12:15:28 +0000
+++ b/mysql-test/r/cassandra.result	2012-08-26 12:06:39 +0000
@@ -67,3 +67,28 @@
 INSERT INTO t1 VALUES (1,1),(2,2);
 DELETE FROM t1 ORDER BY a LIMIT 1;
 DROP TABLE t1;
+#
+# Batched INSERT
+#
+show variables like 'cassandra_insert_batch_size';
+Variable_name	Value
+cassandra_insert_batch_size	100
+show status like 'cassandra_row_insert%';
+Variable_name	Value
+Cassandra_row_inserts	8
+Cassandra_row_insert_batches	7
+CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+INSERT INTO t1 VALUES (1,1),(2,2);
+DELETE FROM t1 ORDER BY a LIMIT 1;
+DROP TABLE t1;
+show status like 'cassandra_row_insert%';
+Variable_name	Value
+Cassandra_row_inserts	10
+Cassandra_row_insert_batches	8
+# FLUSH STATUS doesn't work for our variables, just like with InnoDB.
+flush status;
+show status like 'cassandra_row_insert%';
+Variable_name	Value
+Cassandra_row_inserts	10
+Cassandra_row_insert_batches	8

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-08-23 12:15:28 +0000
+++ b/mysql-test/t/cassandra.test	2012-08-26 12:06:39 +0000
@@ -109,6 +109,24 @@
 
 DROP TABLE t1;
 
+--echo #
+--echo # Batched INSERT
+--echo #
+show variables like 'cassandra_insert_batch_size';
+show status like 'cassandra_row_insert%';
+CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+
+INSERT INTO t1 VALUES (1,1),(2,2);
+DELETE FROM t1 ORDER BY a LIMIT 1;
+
+DROP TABLE t1;
+show status like 'cassandra_row_insert%';
+
+--echo # FLUSH STATUS doesn't work for our variables, just like with InnoDB.
+flush status;
+show status like 'cassandra_row_insert%';
+
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-08-20 08:08:29 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-08-26 12:06:39 +0000
@@ -57,7 +57,6 @@
   typedef std::map<std::string,  ColumnFamilyToMutation> KeyToCfMutationMap;
    
   KeyToCfMutationMap batch_mutation; /* Prepare operation here */
-  std::string key_to_insert;
   int64_t insert_timestamp;
   std::vector<Mutation>* insert_list;
    
@@ -83,7 +82,8 @@
   void get_rowkey_type(char **name, char **type);
 
   /* Writes */
-  void start_prepare_insert(const char *key, int key_len);
+  void clear_insert_buffer();
+  void start_row_insert(const char *key, int key_len);
   void add_insert_column(const char *name, const char *value, int value_len);
   bool do_insert();
 
@@ -233,10 +233,17 @@
   return ms;
 }
 
-void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len)
-{
+
+void Cassandra_se_impl::clear_insert_buffer()
+{
+  batch_mutation.clear();
+}
+
+
+void Cassandra_se_impl::start_row_insert(const char *key, int key_len)
+{
+  std::string key_to_insert;
   key_to_insert.assign(key, key_len);
-  batch_mutation.clear();
   batch_mutation[key_to_insert]= ColumnFamilyToMutation();
   ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
 
@@ -270,6 +277,10 @@
   try {
     
     cass->batch_mutate(batch_mutation, cur_consistency_level);
+
+    cassandra_counters.row_inserts+= batch_mutation.size();
+    cassandra_counters.row_insert_batches++;
+
     res= false;
 
   } catch (InvalidRequestException ire) {

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-08-20 08:08:29 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-08-26 12:06:39 +0000
@@ -28,7 +28,8 @@
   virtual void get_rowkey_type(char **name, char **type)=0;
 
   /* Writes */
-  virtual void start_prepare_insert(const char *key, int key_len)=0;
+  virtual void clear_insert_buffer()=0;
+  virtual void start_row_insert(const char *key, int key_len)=0;
   virtual void add_insert_column(const char *name, const char *value, 
                                  int value_len)=0;
   virtual bool do_insert()=0;
@@ -58,4 +59,14 @@
   void print_error(const char *format, ...);
 };
 
+/* A structure with global counters */
+class Cassandra_status_vars
+{
+public:
+  ulong row_inserts;
+  ulong row_insert_batches;
+};
+extern Cassandra_status_vars cassandra_counters;
+
+
 Cassandra_se_interface *get_cassandra_se();

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-08-23 17:16:01 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-08-26 12:06:39 +0000
@@ -56,6 +56,23 @@
 };
 
 
+static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
+  "Number of rows in an INSERT batch",
+  NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
+
+
+static struct st_mysql_sys_var* cassandra_system_variables[]= {
+  MYSQL_SYSVAR(insert_batch_size),
+//  MYSQL_SYSVAR(enum_var),
+//  MYSQL_SYSVAR(ulong_var),
+  NULL
+};
+
+
+Cassandra_status_vars cassandra_counters;
+Cassandra_status_vars cassandra_counters_copy;
+
+
 /**
   @brief
   Function we use in the creation of our hash to get key.
@@ -727,13 +744,16 @@
   my_bitmap_map *old_map;
   DBUG_ENTER("ha_cassandra::write_row");
   
+  if (!doing_insert_batch)
+    se->clear_insert_buffer();
+
   old_map= dbug_tmp_use_all_columns(table, table->read_set);
   
   /* Convert the key */
   char *cass_key;
   int cass_key_len;
   rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
-  se->start_prepare_insert(cass_key, cass_key_len);
+  se->start_row_insert(cass_key, cass_key_len);
 
   /* Convert other fields */
   for (uint i= 1; i < table->s->fields; i++)
@@ -747,7 +767,20 @@
 
   dbug_tmp_restore_column_map(table->read_set, old_map);
   
-  bool res= se->do_insert();
+  bool res;
+  
+  if (doing_insert_batch)
+  {
+    res= 0;
+    if (++insert_rows_batched >= /*insert_batch_size*/
+                                 THDVAR(table->in_use, insert_batch_size))
+    {
+      res= se->do_insert();
+      insert_rows_batched= 0;
+    }
+  }
+  else
+    res= se->do_insert();
 
   if (res)
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
@@ -756,6 +789,28 @@
 }
 
 
+void ha_cassandra::start_bulk_insert(ha_rows rows)
+{
+  doing_insert_batch= true;
+  insert_rows_batched= 0;
+
+  se->clear_insert_buffer();
+}
+
+
+int ha_cassandra::end_bulk_insert()
+{
+  DBUG_ENTER("ha_cassandra::end_bulk_insert");
+  
+  /* Flush out the insert buffer */
+  doing_insert_batch= false;
+  bool bres= se->do_insert();
+  se->clear_insert_buffer();
+
+  DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
+}
+
+
 int ha_cassandra::rnd_init(bool scan)
 {
   bool bres;
@@ -893,19 +948,14 @@
   DBUG_RETURN(rc);
 }
 
-#if 0
-void ha_cassandra::start_bulk_insert(ha_rows rows)
-{
-  /* Do nothing? */
-}
-
-
-int ha_cassandra::end_bulk_insert()
-{
-  // TODO!
+
+int ha_cassandra::reset()
+{
+  doing_insert_batch= false;
   return 0;
 }
-#endif 
+
+
 /////////////////////////////////////////////////////////////////////////////
 // Dummy implementations start
 /////////////////////////////////////////////////////////////////////////////
@@ -1023,20 +1073,32 @@
 // Dummy implementations end
 /////////////////////////////////////////////////////////////////////////////
 
-
-static struct st_mysql_sys_var* cassandra_system_variables[]= {
-//  MYSQL_SYSVAR(enum_var),
-//  MYSQL_SYSVAR(ulong_var),
-  NULL
+static SHOW_VAR cassandra_status_variables[]= {
+  {"row_inserts",
+    (char*) &cassandra_counters.row_inserts,         SHOW_LONG},
+  {"row_insert_batches",
+    (char*) &cassandra_counters.row_insert_batches,  SHOW_LONG},
+  {NullS, NullS, SHOW_LONG}
 };
 
 
+static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
+{
+  //innodb_export_status();
+  cassandra_counters_copy= cassandra_counters; 
+
+  var->type= SHOW_ARRAY;
+  var->value= (char *) &cassandra_status_variables;
+  return 0;
+}
+
+
 struct st_mysql_storage_engine cassandra_storage_engine=
 { MYSQL_HANDLERTON_INTERFACE_VERSION };
 
 static struct st_mysql_show_var func_status[]=
 {
-//  {"example_func_example",  (char *)show_func_example, SHOW_FUNC},
+  {"Cassandra",  (char *)show_cassandra_vars, SHOW_FUNC},
   {0,0,SHOW_UNDEF}
 };
 

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-08-23 17:16:01 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-08-26 12:06:39 +0000
@@ -47,6 +47,9 @@
   void read_cassandra_columns(bool unpack_pk);
 
   ha_rows rnd_batch_size;
+
+  bool doing_insert_batch;
+  ha_rows insert_rows_batched;
 public:
   ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_cassandra()
@@ -145,10 +148,12 @@
   */
   virtual double read_time(uint, uint, ha_rows rows)
   { return (double) rows /  20.0+1; }
-#if 0
+
   virtual void start_bulk_insert(ha_rows rows);
   virtual int end_bulk_insert();
-#endif  
+  
+  virtual int reset();
+  
   /*
     Everything below are methods that we implement in ha_example.cc.
 



More information about the commits mailing list