[Commits] Rev 3486: MDEV-431: Cassandra storage engine in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Sat Aug 18 20:21:51 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3486
revision-id: psergey at askmonty.org-20120818172150-fnccxt4rh7s7zqqw
parent: psergey at askmonty.org-20120818122835-uceiz93vnnhyuqx8
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Sat 2012-08-18 21:21:50 +0400
message:
  MDEV-431: Cassandra storage engine
  - Got range reads to work (except for unpacking of the rowkey value)
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-08-17 17:13:20 +0000
+++ b/mysql-test/r/cassandra.result	2012-08-18 17:21:50 +0000
@@ -14,7 +14,12 @@
 create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra 
 thrift_host='localhost' keyspace='no_such_keyspace';
 ERROR HY000: Can't create table 'test.t1' (errno: 140)
-create table t1 (rowkey char(36) primary key, column1 char(60)) engine=cassandra 
-thrift_host='localhost' keyspace='mariadbtest' column_family='cf1';
-insert into t1 values ('key0', 'data1');
+create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
+thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
+insert into t1 values ('rowkey10', 'data1-value', 123456);
+insert into t1 values ('rowkey11', 'data1-value2', 34543);
+select * from t1;
+rowkey	data1	data2
+	data1-value	123456
+	data1-value2	34543
 drop table t1;

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-08-18 12:28:35 +0000
+++ b/mysql-test/t/cassandra.test	2012-08-18 17:21:50 +0000
@@ -36,12 +36,12 @@
 
 ./cqlsh --cql3
 
-CREATE KEYSPACE mariadbtest
+CREATE KEYSPACE mariadbtest2
   WITH strategy_class = 'org.apache.cassandra.locator.SimpleStrategy'
   AND strategy_options:replication_factor='1';
 
-USE mariadbtest;
-create columnfamily cf1 ( pk varchar primary key, data1 varchar);
+USE mariadbtest2;
+create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint);
 
 --enable_parsing
 ############################################################################
@@ -49,11 +49,12 @@
 ############################################################################
 
 # Now, create a table for real and insert data
-create table t1 (rowkey char(36) primary key, data1 varchar(60)) engine=cassandra 
-  thrift_host='localhost' keyspace='mariadbtest' column_family='cf1';
-
-insert into t1 values ('key0', 'data1');
-
+create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
+  thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
+
+insert into t1 values ('rowkey10', 'data1-value', 123456);
+insert into t1 values ('rowkey11', 'data1-value2', 34543);
+select * from t1;
 drop table t1;
 
 ############################################################################

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-08-18 12:28:35 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-08-18 17:21:50 +0000
@@ -60,7 +60,12 @@
   std::string key_to_insert;
   int64_t insert_timestamp;
   std::vector<Mutation>* insert_list;
+   
+  /* Resultset we're reading */
+  std::vector<KeySlice> key_slice_vec;
+  std::vector<KeySlice>::iterator key_slice_it;
 
+  SlicePredicate slice_pred;
 public:
   Cassandra_se_impl() : cass(NULL) {}
   virtual ~Cassandra_se_impl(){ delete cass; }
@@ -78,10 +83,18 @@
   void add_insert_column(const char *name, const char *value, int value_len);
   bool do_insert();
 
-  /* Reads */
+  /* Reads, point lookups */
   bool get_slice(char *key, size_t key_len, bool *found);
   bool get_next_read_column(char **name, char **value, int *value_len);
 
+  /* Reads, multi-row scans */
+  bool get_range_slices();
+  void finish_reading_range_slices();
+  bool get_next_range_slice_row();
+
+  /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
+  void clear_read_columns();
+  void add_read_column(const char *name);
 };
 
 
@@ -265,7 +278,7 @@
 
   try {
     cass->get_slice(column_data_vec, rowkey_str, cparent, slice_pred, 
-                    ConsistencyLevel::ONE);
+                    cur_consistency_level);
 
     if (column_data_vec.size() == 0)
     {
@@ -318,3 +331,70 @@
 }
 
 
+bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as parameters
+{
+  bool res= true;
+  
+  ColumnParent cparent;
+  cparent.column_family= column_family;
+  
+  /* SlicePredicate can be used to limit columns we will retrieve */
+   // Try passing nothing...
+
+  KeyRange key_range; // Try passing nothing, too.
+  key_range.__isset.start_key=true;
+  key_range.__isset.end_key=true;
+  key_range.start_key.assign("", 0);
+  key_range.end_key.assign("", 0);
+
+  try {
+  
+    cass->get_range_slices(key_slice_vec,
+                           cparent, slice_pred, key_range, 
+                           cur_consistency_level);
+    res= false;
+
+  } catch (InvalidRequestException ire) {
+    print_error("%s [%s]", ire.what(), ire.why.c_str());
+  } catch (UnavailableException ue) {
+    print_error("UnavailableException: %s", ue.what());
+  } catch (TimedOutException te) {
+    print_error("TimedOutException: %s", te.what());
+  }
+
+  key_slice_it= key_slice_vec.begin();
+  return res;
+}
+
+
+bool Cassandra_se_impl::get_next_range_slice_row()
+{
+  if (key_slice_it == key_slice_vec.end())
+    return true;
+  
+  column_data_vec= key_slice_it->columns;
+  column_data_it= column_data_vec.begin();
+  key_slice_it++;
+  return false;
+}
+
+
+void Cassandra_se_impl::finish_reading_range_slices()
+{
+  key_slice_vec.clear();
+}
+
+
+void Cassandra_se_impl::clear_read_columns()
+{
+  slice_pred.column_names.clear();
+}
+
+
+void Cassandra_se_impl::add_read_column(const char *name_arg)
+{
+  std::string name(name_arg);
+  slice_pred.__isset.column_names= true;
+  slice_pred.column_names.push_back(name);
+}
+

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-08-18 12:28:35 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-08-18 17:21:50 +0000
@@ -7,22 +7,6 @@
 */
 
 
-/* 
-  Storage for (name,value) pairs. name==NULL means 'non-object'.
-
-  This should be used for 
-  - shipping data from sql to cassandra for INSERTs 
-  - shipping data from cassandra to SQL for record reads.
-
-*/
-class NameAndValue
-{
-public:
-  char *name;
-  char *value;
-  size_t value_len;
-};
-
 /*
   Interface to one cassandra column family, i.e. one 'table'
 */
@@ -52,6 +36,15 @@
   virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
   virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
 
+  /* Reads, multi-row scans */
+  virtual bool get_range_slices()=0;
+  virtual void finish_reading_range_slices()=0;
+  virtual bool get_next_range_slice_row()=0;
+  
+  /* read_set setup */
+  virtual void clear_read_columns()=0;
+  virtual void add_read_column(const char *name)=0;
+
   /* Passing error messages up to ha_cassandra */
   char err_buffer[512];
   const char *error_str() { return err_buffer; }

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-08-18 12:28:35 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-08-18 17:21:50 +0000
@@ -212,8 +212,7 @@
 
 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-   se(NULL), names_and_vals(NULL),
-   field_converters(NULL)
+   se(NULL), field_converters(NULL)
 {}
 
 
@@ -265,11 +264,7 @@
   DBUG_ENTER("ha_cassandra::close");
   delete se;
   se= NULL;
-  if (names_and_vals)
-  {
-    my_free(names_and_vals);
-    names_and_vals= NULL;
-  }
+  free_field_converters();
   DBUG_RETURN(free_share(share));
 }
 
@@ -589,6 +584,7 @@
   }
 }
 
+
 void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
 
 int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
@@ -622,34 +618,49 @@
   }
   else
   {
-    char *cass_name;
-    char *cass_value;
-    int cass_value_len;
-    Field **field;
-
-    /* Start with all fields being NULL */
+    read_cassandra_columns();
+  }
+
+  DBUG_RETURN(rc);
+}
+
+
+void ha_cassandra::read_cassandra_columns()
+{
+  char *cass_name;
+  char *cass_value;
+  int cass_value_len;
+  Field **field;
+  
+  /* 
+    cassandra_to_mariadb() calls will use field->store(...) methods, which
+    require that the column is in the table->write_set
+  */
+  my_bitmap_map *old_map;
+  old_map= dbug_tmp_use_all_columns(table, table->write_set);
+
+  /* Start with all fields being NULL */
+  for (field= table->field + 1; *field; field++)
+    (*field)->set_null();
+
+  while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
+  {
+    // map to our column. todo: use hash or something..
+    int idx=1;
     for (field= table->field + 1; *field; field++)
-      (*field)->set_null();
-
-    while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
     {
-      // map to our column. todo: use hash or something..
-      int idx=1;
-      for (field= table->field + 1; *field; field++)
+      idx++;
+      if (!strcmp((*field)->field_name, cass_name))
       {
-        idx++;
-        if (!strcmp((*field)->field_name, cass_name))
-        {
-          int fieldnr= (*field)->field_index;
-          (*field)->set_notnull();
-          field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
-          break;
-        }
+        int fieldnr= (*field)->field_index;
+        (*field)->set_notnull();
+        field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
+        break;
       }
     }
   }
 
-  DBUG_RETURN(rc);
+  dbug_tmp_restore_column_map(table->write_set, old_map);
 }
 
 
@@ -690,20 +701,51 @@
 }
 
 
-NameAndValue *ha_cassandra::get_names_and_vals()
-{
-  if (names_and_vals)
-    return names_and_vals;
+int ha_cassandra::rnd_init(bool scan)
+{
+  bool bres;
+  DBUG_ENTER("ha_cassandra::rnd_init");
+  if (!scan)
+    DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+
+  se->clear_read_columns();
+  for (uint i= 1; i < table->s->fields; i++)
+    se->add_read_column(table->field[i]->field_name);
+
+  bres= se->get_range_slices();
+
+  DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
+}
+
+
+int ha_cassandra::rnd_end()
+{
+  DBUG_ENTER("ha_cassandra::rnd_end");
+
+  se->finish_reading_range_slices();
+  DBUG_RETURN(0);
+}
+
+
+int ha_cassandra::rnd_next(uchar *buf)
+{
+  int rc;
+  DBUG_ENTER("ha_cassandra::rnd_next");
+
+  // Unpack and return the next record.
+  if (se->get_next_range_slice_row())
+  {
+    rc= HA_ERR_END_OF_FILE;
+  }
   else
   {
-    size_t size= sizeof(NameAndValue) * (table->s->fields + 1);
-    names_and_vals= (NameAndValue*)my_malloc(size ,0);
-    memset(names_and_vals, 0, size);
-    return names_and_vals;
+    read_cassandra_columns();
+    rc= 0;
   }
+
+  DBUG_RETURN(rc);
 }
 
-
 /////////////////////////////////////////////////////////////////////////////
 // Dummy implementations start
 /////////////////////////////////////////////////////////////////////////////
@@ -743,27 +785,6 @@
   DBUG_RETURN(rc);
 }
 
-int ha_cassandra::rnd_init(bool scan)
-{
-  DBUG_ENTER("ha_cassandra::rnd_init");
-  DBUG_RETURN(0);
-}
-
-int ha_cassandra::rnd_end()
-{
-  DBUG_ENTER("ha_cassandra::rnd_end");
-  DBUG_RETURN(0);
-}
-
-
-int ha_cassandra::rnd_next(uchar *buf)
-{
-  int rc;
-  DBUG_ENTER("ha_cassandra::rnd_next");
-  rc= HA_ERR_END_OF_FILE;
-  DBUG_RETURN(rc);
-}
-
 void ha_cassandra::position(const uchar *record)
 {
   DBUG_ENTER("ha_cassandra::position");

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-08-18 12:28:35 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-08-18 17:21:50 +0000
@@ -36,19 +36,17 @@
   
   Cassandra_se_interface *se;
 
-  /* pre-allocated array of #fields elements */
-  NameAndValue *names_and_vals;
-  NameAndValue *get_names_and_vals();
-
-
   ColumnDataConverter **field_converters;
   uint n_field_converters;
   bool setup_field_converters(Field **field, uint n_fields);
   void free_field_converters();
+  
+  void read_cassandra_columns();
 public:
   ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_cassandra()
   {
+    free_field_converters();
     delete se;
   }
 



More information about the commits mailing list