[Commits] Rev 3523: Cassandra SE in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Thu Sep 27 10:59:15 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3523
revision-id: psergey at askmonty.org-20120927075914-c90so471vax0fhop
parent: psergey at askmonty.org-20120926150212-6v3iffny5o0exffu
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Thu 2012-09-27 11:59:14 +0400
message:
  Cassandra SE
  - Support UPDATE statements
  - Follow what CQL does: don't show deleted rows (they show up as rows without any columns in reads)
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-09-26 15:02:12 +0000
+++ b/mysql-test/r/cassandra.result	2012-09-27 07:59:14 +0000
@@ -41,7 +41,6 @@
 pk	data1	data2
 rowkey12	data1-value3	454
 rowkey10	data1-value	123456
-rowkey11	NULL	NULL
 delete from t1;
 select * from t1;
 pk	data1	data2
@@ -381,3 +380,30 @@
 Writes made during ALTER TABLE
 0
 drop table t2;
+#
+# UPDATE command support
+#
+create table t1 (pk varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
+thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
+insert into t1 values ('rowkey10', 'data1-value', 123456);
+insert into t1 values ('rowkey11', 'data1-value2', 34543);
+insert into t1 values ('rowkey12', 'data1-value3', 454);
+select * from t1;
+pk	data1	data2
+rowkey12	data1-value3	454
+rowkey10	data1-value	123456
+rowkey11	data1-value2	34543
+update t1 set data1='updated-1' where pk='rowkey11';
+select * from t1;
+pk	data1	data2
+rowkey12	data1-value3	454
+rowkey10	data1-value	123456
+rowkey11	updated-1	34543
+update t1 set pk='new-rowkey12' where pk='rowkey12';
+select * from t1;
+pk	data1	data2
+rowkey10	data1-value	123456
+new-rowkey12	data1-value3	454
+rowkey11	updated-1	34543
+delete from t1;
+drop table t1;

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-09-26 15:02:12 +0000
+++ b/mysql-test/t/cassandra.test	2012-09-27 07:59:14 +0000
@@ -492,6 +492,25 @@
 
 drop table t2;
 
+--echo #
+--echo # UPDATE command support
+--echo #
+create table t1 (pk varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
+  thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
+
+insert into t1 values ('rowkey10', 'data1-value', 123456);
+insert into t1 values ('rowkey11', 'data1-value2', 34543);
+insert into t1 values ('rowkey12', 'data1-value3', 454);
+select * from t1;
+
+update t1 set data1='updated-1' where pk='rowkey11';
+select * from t1;
+update t1 set pk='new-rowkey12' where pk='rowkey12';
+select * from t1;
+
+delete from t1;
+drop table t1;
+
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-09-26 10:57:45 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-09-27 07:59:14 +0000
@@ -99,6 +99,8 @@
   void clear_insert_buffer();
   void start_row_insert(const char *key, int key_len);
   void add_insert_column(const char *name, const char *value, int value_len);
+  void add_row_deletion(const char *key, int key_len,
+                        Column_name_enumerator *col_names);
   
   bool do_insert();
 
@@ -313,6 +315,42 @@
 }
 
 
+void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, 
+                                         Column_name_enumerator *col_names)
+{
+  std::string key_to_delete;
+  key_to_delete.assign(key, key_len);
+  
+  batch_mutation[key_to_delete]= ColumnFamilyToMutation();
+  ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete];
+
+  cf_mut[column_family]= std::vector<Mutation>();
+  std::vector<Mutation> &mutation_list= cf_mut[column_family];
+
+  Mutation mut;
+  mut.__isset.deletion= true;
+  mut.deletion.__isset.timestamp= true;
+  mut.deletion.timestamp= get_i64_timestamp();
+  mut.deletion.__isset.predicate= true;
+  
+  /*
+    Attempting to delete columns with SliceRange causes exception with message
+    "Deletion does not yet support SliceRange predicates".
+
+    Delete all columns individually.
+  */
+  SlicePredicate slice_pred;
+  slice_pred.__isset.column_names= true;
+  const char *col_name;
+  while ((col_name= col_names->get_next_name()))
+    slice_pred.column_names.push_back(std::string(col_name));
+
+  mut.deletion.predicate= slice_pred;
+
+  mutation_list.push_back(mut);
+}
+
+
 void Cassandra_se_impl::add_insert_column(const char *name, const char *value, 
                                           int value_len)
 {
@@ -531,7 +569,13 @@
     }
   }
   
-  if (have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key))
+  /*
+    (1) - skip the last row that we have read in the previous batch.
+    (2) - Rows that were deleted show up as rows without any columns. Skip
+          them, like CQL does.
+  */
+  if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1)
+      key_slice_it->columns.size() == 0) // (2)
   {
     key_slice_it++;
     goto restart;

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-09-26 10:13:03 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-09-27 07:59:14 +0000
@@ -20,6 +20,14 @@
   THREE = 8-1,
 } enum_cassandra_consistency_level;
 
+
+class Column_name_enumerator
+{
+public:
+  virtual const char* get_next_name()=0;
+  virtual ~Column_name_enumerator(){}
+};
+
 /*
   Interface to one cassandra column family, i.e. one 'table'
 */
@@ -45,6 +53,8 @@
 
   /* Writes */
   virtual void clear_insert_buffer()=0;
+  virtual void add_row_deletion(const char *key, int key_len,
+                                Column_name_enumerator *col_names)=0;
   virtual void start_row_insert(const char *key, int key_len)=0;
   virtual void add_insert_column(const char *name, const char *value, 
                                  int value_len)=0;

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-09-26 15:02:12 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-09-27 07:59:14 +0000
@@ -1605,11 +1605,95 @@
 }
 
 
+class Column_name_enumerator_impl : public Column_name_enumerator
+{
+  ha_cassandra *obj;
+  uint idx;
+public:
+  Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
+  const char* get_next_name()
+  {
+    if (idx == obj->table->s->fields)
+      return NULL;
+    else
+      return obj->table->field[idx++]->field_name;
+  }
+};
+
+
 int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
 {
-
+  my_bitmap_map *old_map;
   DBUG_ENTER("ha_cassandra::update_row");
-  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+  /* Currently, it is guaranteed that new_data == table->record[0] */
+  
+  /* For now, just rewrite the full record */
+  se->clear_insert_buffer();
+  
+
+  old_map= dbug_tmp_use_all_columns(table, table->read_set);
+
+  char *old_key;
+  int old_key_len;
+  se->get_read_rowkey(&old_key, &old_key_len);
+
+  /* Get the key we're going to write */
+  char *new_key;
+  int new_key_len;
+  if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
+  {
+    my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+             rowkey_converter->field->field_name, insert_lineno);
+    dbug_tmp_restore_column_map(table->read_set, old_map);
+    DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+  }
+
+  /*
+    Compare it to the key we've read. For all types that Cassandra supports, 
+    binary byte-wise comparison can be used
+  */
+  bool new_primary_key;
+  if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
+    new_primary_key= true;
+  else
+    new_primary_key= false;
+
+
+  if (new_primary_key)
+  {
+    /* 
+      Primary key value changed. This is essentially a DELETE + INSERT. 
+      Add a DELETE operation into the batch
+    */
+    Column_name_enumerator_impl name_enumerator(this);
+    se->add_row_deletion(old_key, old_key_len, &name_enumerator);
+  }
+
+  se->start_row_insert(new_key, new_key_len);
+
+  /* Convert other fields */
+  for (uint i= 1; i < table->s->fields; i++)
+  {
+    char *cass_data;
+    int cass_data_len;
+    if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
+    {
+      my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+               field_converters[i]->field->field_name, insert_lineno);
+      dbug_tmp_restore_column_map(table->read_set, old_map);
+      DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+    }
+    se->add_insert_column(field_converters[i]->field->field_name, 
+                          cass_data, cass_data_len);
+  }
+  dbug_tmp_restore_column_map(table->read_set, old_map);
+  
+  bool res= se->do_insert();
+
+  if (res)
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+  
+  DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
 }
 
 

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-09-24 15:15:12 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-09-27 07:59:14 +0000
@@ -45,6 +45,7 @@
 */
 class ha_cassandra: public handler
 {
+  friend class Column_name_enumerator_impl;
   THR_LOCK_DATA lock;      ///< MySQL lock
   CASSANDRA_SHARE *share;    ///< Shared lock info
   



More information about the commits mailing list