[Commits] Rev 3527: cassandra dynamic columns update in file:///home/bell/maria/bzr/work-maria-5.5-cassandra/

sanja at montyprogram.com sanja at montyprogram.com
Fri Sep 28 01:53:27 EEST 2012


At file:///home/bell/maria/bzr/work-maria-5.5-cassandra/

------------------------------------------------------------
revno: 3527
revision-id: sanja at montyprogram.com-20120927225325-93f24l6jzfwjn8ax
parent: sanja at montyprogram.com-20120927153344-21z1v4ct9tka3ft4
committer: sanja at montyprogram.com
branch nick: work-maria-5.5-cassandra
timestamp: Fri 2012-09-28 01:53:25 +0300
message:
  cassandra dynamic columns update
-------------- next part --------------
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-09-27 14:15:13 +0000
+++ b/mysql-test/r/cassandra.result	2012-09-27 22:53:25 +0000
@@ -486,6 +486,40 @@ select rowkey, column_json(dyn) from t1;
 rowkey	column_json(dyn)
 1	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}]
 2	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}]
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}]
+2	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3");
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn3":"3"},{"boolcol":1}]
+2	[{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1;
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn3":"3"},{"boolcol":1}]
+2	[{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd");
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1}]
+2	[{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "12345678901234", "ddd");
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1},{"12345678901234":"ddd"}]
+2	[{"a":"ddd"},{"dyn1":"1"},{"boolcol":0},{"12345678901234":"ddd"}]
+update t1 set dyn=column_add(dyn, "12345678901234", null);
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1}]
+2	[{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2;
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1}]
+2	[{"a":"ddd"},{"dyn1":"1"}]
 delete from t1;
 drop table t1;
 CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1';

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-09-27 14:15:13 +0000
+++ b/mysql-test/t/cassandra.test	2012-09-27 22:53:25 +0000
@@ -588,6 +588,19 @@ CREATE TABLE t1 (rowkey int PRIMARY KEY,
 insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324)); 
 insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0)); 
 select rowkey, column_json(dyn) from t1;
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3"); 
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1;
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd");
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "12345678901234", "ddd");
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "12345678901234", null);
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2;
+select rowkey, column_json(dyn) from t1;
 delete from t1;
 drop table t1;
 

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-09-27 14:15:13 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-09-27 22:53:25 +0000
@@ -17,6 +17,12 @@
 
 #include "cassandra_se.h"
 
+struct st_mysql_lex_string
+{
+  char *str;
+  size_t length;
+};
+
 using namespace std;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
@@ -101,10 +107,13 @@ public:
   /* Writes */
   void clear_insert_buffer();
   void start_row_insert(const char *key, int key_len);
-  void add_insert_column(const char *name, const char *value, int value_len);
+  void add_insert_column(const char *name, int name_len,
+                         const char *value, int value_len);
+  void add_insert_delete_column(const char *name, int name_len);
   void add_row_deletion(const char *key, int key_len,
-                        Column_name_enumerator *col_names);
-  
+                        Column_name_enumerator *col_names,
+                        LEX_STRING *names, uint nnames);
+
   bool do_insert();
 
   /* Reads, point lookups */
@@ -330,8 +339,9 @@ void Cassandra_se_impl::start_row_insert
 }
 
 
-void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, 
-                                         Column_name_enumerator *col_names)
+void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
+                                         Column_name_enumerator *col_names,
+                                         LEX_STRING *names, uint nnames)
 {
   std::string key_to_delete;
   key_to_delete.assign(key, key_len);
@@ -359,6 +369,9 @@ void Cassandra_se_impl::add_row_deletion
   const char *col_name;
   while ((col_name= col_names->get_next_name()))
     slice_pred.column_names.push_back(std::string(col_name));
+  for (uint i= 0; i < nnames; i++)
+    slice_pred.column_names.push_back(std::string(names[i].str,
+                                                  names[i].length));
 
   mut.deletion.predicate= slice_pred;
 
@@ -366,7 +379,9 @@ void Cassandra_se_impl::add_row_deletion
 }
 
 
-void Cassandra_se_impl::add_insert_column(const char *name, const char *value, 
+void Cassandra_se_impl::add_insert_column(const char *name,
+                                          int name_len,
+                                          const char *value,
                                           int value_len)
 {
   Mutation mut;
@@ -374,7 +389,10 @@ void Cassandra_se_impl::add_insert_colum
   mut.column_or_supercolumn.__isset.column= true;
 
   Column& col=mut.column_or_supercolumn.column;
-  col.name.assign(name);
+  if (name_len)
+    col.name.assign(name, name_len);
+  else
+    col.name.assign(name);
   col.value.assign(value, value_len);
   col.timestamp= insert_timestamp;
   col.__isset.value= true;
@@ -382,6 +400,23 @@ void Cassandra_se_impl::add_insert_colum
   insert_list->push_back(mut);
 }
 
+void Cassandra_se_impl::add_insert_delete_column(const char *name,
+                                                 int name_len)
+{
+  Mutation mut;
+  mut.__isset.deletion= true;
+  mut.deletion.__isset.timestamp= true;
+  mut.deletion.timestamp= insert_timestamp;
+  mut.deletion.__isset.predicate= true;
+
+  SlicePredicate slice_pred;
+  slice_pred.__isset.column_names= true;
+  slice_pred.column_names.push_back(std::string(name, name_len));
+  mut.deletion.predicate= slice_pred;
+
+  insert_list->push_back(mut);
+}
+
 
 bool Cassandra_se_impl::retryable_do_insert()
 {

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-09-27 14:15:13 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-09-27 22:53:25 +0000
@@ -6,6 +6,8 @@
   both together causes compile errors due to conflicts).
 */
 
+struct st_mysql_lex_string;
+typedef struct st_mysql_lex_string LEX_STRING;
 
 /* We need to define this here so that ha_cassandra.cc also has access to it */
 typedef enum
@@ -56,9 +58,12 @@ public:
   /* Writes */
   virtual void clear_insert_buffer()=0;
   virtual void add_row_deletion(const char *key, int key_len,
-                                Column_name_enumerator *col_names)=0;
+                                Column_name_enumerator *col_names,
+                                LEX_STRING *names, uint nnames)=0;
   virtual void start_row_insert(const char *key, int key_len)=0;
-  virtual void add_insert_column(const char *name, const char *value, 
+  virtual void add_insert_delete_column(const char *name, int name_len)= 0;
+  virtual void add_insert_column(const char *name, int name_len,
+                                 const char *value,
                                  int value_len)=0;
   virtual bool do_insert()=0;
 

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-09-27 14:15:13 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-09-27 22:53:25 +0000
@@ -1830,10 +1830,11 @@ int ha_cassandra::read_cassandra_columns
       table->field[dyncol_field]->set_null();
     else
     {
-      table->field[dyncol_field]->set_notnull();
-      table->field[dyncol_field]->store(dynamic_rec.str,
-                                        dynamic_rec.length,
-                                        &my_charset_bin);
+      Field_blob *blob= (Field_blob *)table->field[dyncol_field];
+      blob->set_notnull();
+      blob->store_length(dynamic_rec.length);
+      *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
+        dynamic_rec.str;
     }
   }
 
@@ -1856,21 +1857,24 @@ err:
   return res;
 }
 
-int ha_cassandra::write_dynamic_row()
+int ha_cassandra::read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                              String *valcol, char **freenames)
 {
-  DYNAMIC_ARRAY vals, names;
-  String valcol, *strcol;
+  String *strcol;
   DYNAMIC_COLUMN col;
-  char *free_names;
-  uint i;
   enum enum_dyncol_func_result rc;
-  DBUG_ENTER("ha_cassandra::write_dynamic_row");
-  DBUG_ASSERT(dyncol_set);
+  DBUG_ENTER("ha_cassandra::read_dyncol");
 
   Field *field= table->field[dyncol_field];
   DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
   /* It is blob and it does not use buffer */
-  strcol= field->val_str(NULL, &valcol);
+  strcol= field->val_str(NULL, valcol);
+  if (field->is_null())
+  {
+    bzero(vals, sizeof(DYNAMIC_ARRAY));
+    bzero(names, sizeof(DYNAMIC_ARRAY));
+    DBUG_RETURN(0); // nothing to write
+  }
   /*
     dynamic_column_vals only read the string so we can
     cheat here with assignment
@@ -1878,21 +1882,31 @@ int ha_cassandra::write_dynamic_row()
   bzero(&col, sizeof(col));
   col.str= (char *)strcol->ptr();
   col.length= strcol->length();
-  if ((rc= dynamic_column_vals(&col, &names, &vals, &free_names)) < 0)
+  if ((rc= dynamic_column_vals(&col, names, vals, freenames)) < 0)
   {
     dynamic_column_error_message(rc);
     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
   }
-  DBUG_ASSERT(names.elements == vals.elements);
-  for (i= 0; i < names.elements; i++)
+  DBUG_RETURN(0);
+}
+
+int ha_cassandra::write_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names)
+{
+  uint i;
+  DBUG_ENTER("ha_cassandra::write_dynamic_row");
+  DBUG_ASSERT(dyncol_set);
+
+
+  DBUG_ASSERT(names->elements == vals->elements);
+  for (i= 0; i < names->elements; i++)
   {
-    char buff[16], stringname[256];
+    char buff[16];
     CASSANDRA_TYPE_DEF *type;
     void *freemem= NULL;
     char *cass_data;
     int cass_data_len;
-    LEX_STRING *name= dynamic_element(&names, i, LEX_STRING*);
-    DYNAMIC_COLUMN_VALUE *val= dynamic_element(&vals, i, DYNAMIC_COLUMN_VALUE*);
+    LEX_STRING *name= dynamic_element(names, i, LEX_STRING*);
+    DYNAMIC_COLUMN_VALUE *val= dynamic_element(vals, i, DYNAMIC_COLUMN_VALUE*);
 
     DBUG_PRINT("info", ("field %*s", (int)name->length, name->str));
     type= get_cassandra_field_def(name->str, (int) name->length);
@@ -1903,21 +1917,21 @@ int ha_cassandra::write_dynamic_row()
                name->str, insert_lineno);
       DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
     }
-    /* prepare \0 ending name */
-    DBUG_ASSERT(name->length < sizeof(stringname) - 1);
-    memcpy(stringname, (name->str), name->length);
-    stringname[name->length]= '\0';
-    se->add_insert_column(stringname,
+    se->add_insert_column(name->str, name->length,
                           cass_data, cass_data_len);
     if (freemem)
       my_free(freemem);
   }
+  DBUG_RETURN(0);
+}
 
-  delete_dynamic(&names);
-  delete_dynamic(&vals);
+void ha_cassandra::free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                                    char *free_names)
+{
+  delete_dynamic(names);
+  delete_dynamic(vals);
   if (free_names)
     my_free(free_names);
-  DBUG_RETURN(0);
 }
 
 int ha_cassandra::write_row(uchar *buf)
@@ -1951,9 +1965,15 @@ int ha_cassandra::write_row(uchar *buf)
     int cass_data_len;
     if (dyncol_set && dyncol_field == i)
     {
+      String valcol;
+      DYNAMIC_ARRAY vals, names;
+      char *free_names;
       int rc;
       DBUG_ASSERT(field_converters[i] == NULL);
-      if ((rc= write_dynamic_row()))
+      if (!(rc= read_dyncol(&vals, &names, &valcol, &free_names)))
+        rc= write_dynamic_row(&vals, &names);
+      free_dynamic_row(&vals, &names, free_names);
+      if (rc)
         return rc;
     }
     else
@@ -1966,7 +1986,7 @@ int ha_cassandra::write_row(uchar *buf)
         dbug_tmp_restore_column_map(table->read_set, old_map);
         DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
       }
-      se->add_insert_column(field_converters[i]->field->field_name,
+      se->add_insert_column(field_converters[i]->field->field_name, 0,
                             cass_data, cass_data_len);
     }
   }
@@ -2366,13 +2386,16 @@ public:
 
 int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
 {
+  DYNAMIC_ARRAY oldvals, oldnames, vals, names;
+  String oldvalcol, valcol;
+  char *oldfree_names= NULL, *free_names= NULL;
   my_bitmap_map *old_map;
+  int res;
   DBUG_ENTER("ha_cassandra::update_row");
   /* Currently, it is guaranteed that new_data == table->record[0] */
-  
+  DBUG_ASSERT(new_data == table->record[0]);
   /* For now, just rewrite the full record */
   se->clear_insert_buffer();
-  
 
   old_map= dbug_tmp_use_all_columns(table, table->read_set);
 
@@ -2401,6 +2424,22 @@ int ha_cassandra::update_row(const uchar
   else
     new_primary_key= false;
 
+  if (dyncol_set)
+  {
+    Field *field= table->field[dyncol_field];
+    /* move to get old_data */
+    my_ptrdiff_t diff;
+    diff= (my_ptrdiff_t) (old_data - new_data);
+    field->move_field_offset(diff);      // Points now at old_data
+    if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names)))
+      DBUG_RETURN(res);
+    field->move_field_offset(-diff);     // back to new_data
+    if ((res= read_dyncol(&vals, &names, &valcol, &free_names)))
+    {
+      free_dynamic_row(&oldnames, &oldvals, oldfree_names);
+      DBUG_RETURN(res);
+    }
+  }
 
   if (new_primary_key)
   {
@@ -2409,7 +2448,10 @@ int ha_cassandra::update_row(const uchar
       Add a DELETE operation into the batch
     */
     Column_name_enumerator_impl name_enumerator(this);
-    se->add_row_deletion(old_key, old_key_len, &name_enumerator);
+    se->add_row_deletion(old_key, old_key_len, &name_enumerator,
+                         (LEX_STRING *)oldnames.buffer,
+                         (dyncol_set ? oldnames.elements : 0));
+    oldnames.elements= oldvals.elements= 0; // they will be deleted
   }
 
   se->start_row_insert(new_key, new_key_len);
@@ -2419,23 +2461,64 @@ int ha_cassandra::update_row(const uchar
   {
     char *cass_data;
     int cass_data_len;
-    if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
+    if (dyncol_set && dyncol_field == i)
     {
-      my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
-               field_converters[i]->field->field_name, insert_lineno);
-      dbug_tmp_restore_column_map(table->read_set, old_map);
-      DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+      DBUG_ASSERT(field_converters[i] == NULL);
+      if ((res= write_dynamic_row(&vals, &names)))
+        goto err;
+    }
+    else
+    {
+      if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
+      {
+        my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+                 field_converters[i]->field->field_name, insert_lineno);
+        dbug_tmp_restore_column_map(table->read_set, old_map);
+        DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+      }
+      se->add_insert_column(field_converters[i]->field->field_name, 0,
+                            cass_data, cass_data_len);
+    }
+  }
+  if (dyncol_set)
+  {
+    /* find removed fields */
+    uint i= 0, j= 0;
+    LEX_STRING *onames= (LEX_STRING *)oldnames.buffer;
+    LEX_STRING *nnames= (LEX_STRING *)names.buffer;
+    /* both array are sorted */
+    for(; i < oldnames.elements; i++)
+    {
+      int scmp= 0;
+      while (j < names.elements &&
+             (nnames[j].length < onames[i].length ||
+             (nnames[j].length == onames[i].length &&
+              (scmp= memcmp(nnames[j].str, onames[i].str,
+                            onames[i].length)) < 0)))
+        j++;
+      if (j < names.elements &&
+          nnames[j].length == onames[i].length &&
+          scmp == 0)
+        j++;
+      else
+        se->add_insert_delete_column(onames[i].str, onames[i].length);
     }
-    se->add_insert_column(field_converters[i]->field->field_name, 
-                          cass_data, cass_data_len);
   }
+
   dbug_tmp_restore_column_map(table->read_set, old_map);
-  
-  bool res= se->do_insert();
+
+  res= se->do_insert();
 
   if (res)
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-  
+
+err:
+  if (dyncol_set)
+  {
+    free_dynamic_row(&oldnames, &oldvals, oldfree_names);
+    free_dynamic_row(&names, &vals, free_names);
+  }
+
   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
 }
 

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-09-27 14:15:13 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-09-27 22:53:25 +0000
@@ -229,7 +229,11 @@ private:
   bool source_exhausted;
   bool mrr_start_read();
   int check_field_options(Field **fields);
-  int write_dynamic_row();
+  int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                  String *valcol, char **freenames);
+  int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals);
+  void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                               char *free_names);
   CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
                                                int cass_name_length);
 public:



More information about the commits mailing list