[Commits] Rev 3517: Cassandra read to dynamic columns in file:///home/bell/maria/bzr/work-maria-5.5-cassandra/

sanja at montyprogram.com sanja at montyprogram.com
Mon Sep 24 23:36:24 EEST 2012


At file:///home/bell/maria/bzr/work-maria-5.5-cassandra/

------------------------------------------------------------
revno: 3517
revision-id: sanja at montyprogram.com-20120924203621-geuw8b1w9facr6p1
parent: sanja at montyprogram.com-20120920193706-4ox14o05cbro1fhv
committer: sanja at montyprogram.com
branch nick: work-maria-5.5-cassandra
timestamp: Mon 2012-09-24 23:36:21 +0300
message:
  Cassandra read to dynamic columns
-------------- next part --------------
=== modified file 'include/ma_dyncol.h'
--- a/include/ma_dyncol.h	2012-09-20 19:37:06 +0000
+++ b/include/ma_dyncol.h	2012-09-24 20:36:21 +0000
@@ -81,6 +81,7 @@ struct st_dynamic_column_value
     struct {
       LEX_STRING value;
       CHARSET_INFO *charset;
+      my_bool nonfreeable;
     } string;
     struct {
       decimal_digit_t buffer[DECIMAL_BUFF_LENGTH];
@@ -108,6 +109,13 @@ dynamic_column_create_many_fmt(DYNAMIC_C
                                uchar *column_keys,
                                DYNAMIC_COLUMN_VALUE *values,
                                my_bool names);
+enum enum_dyncol_func_result
+dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
+                                        uint column_count,
+                                        void *column_keys,
+                                        DYNAMIC_COLUMN_VALUE *values,
+                                        my_bool new_str,
+                                        my_bool string_keys);
 
 enum enum_dyncol_func_result
 dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr,

=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-09-20 10:22:36 +0000
+++ b/mysql-test/r/cassandra.result	2012-09-24 20:36:21 +0000
@@ -306,3 +306,33 @@ rowkey	countercol
 cnt1	1
 cnt2	100
 drop table t2;
+#
+# Dynamic columns support
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+drop table t2;
+#error: dynamic column is not a blob
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+ERROR 42000: Incorrect column specifier for column 'uuidcol'
+#error: double dynamic column
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNCOL=1, textcol blob DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+ERROR 42000: Incorrect column specifier for column 'textcol'
+#
+# Dynamic column read
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
+insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+drop table t2;
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
+rowkey	column_list(dyn)	column_get(dyn, 'uuidcol' as char)
+1	`uuidcol`	9b5658dc-f32f-11e1-94cd-f46d046e9f09
+2	`uuidcol`	9b5658dc-f32f-11e1-94cd-f46d046e9f0a
+drop table t2;
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+drop table t2;

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-09-20 10:22:36 +0000
+++ b/mysql-test/t/cassandra.test	2012-09-24 20:36:21 +0000
@@ -392,6 +392,42 @@ CREATE TABLE t2 (rowkey varchar(32) PRIM
 select * from t2;
 drop table t2;
 
+--echo #
+--echo # Dynamic columns support
+--echo #
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+drop table t2;
+
+--echo #error: dynamic column is not a blob
+--error ER_WRONG_FIELD_SPEC
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+
+--echo #error: double dynamic column
+--error ER_WRONG_FIELD_SPEC
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNCOL=1, textcol blob DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+
+--echo #
+--echo # Dynamic column read
+--echo #
+#prepare data
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
+insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+drop table t2;
+
+#test dynamic column read
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNCOL=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
+drop table t2;
+
+#cleanup data
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+drop table t2;
+
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'mysys/ma_dyncol.c'
--- a/mysys/ma_dyncol.c	2012-09-20 19:37:06 +0000
+++ b/mysys/ma_dyncol.c	2012-09-24 20:36:21 +0000
@@ -1377,6 +1377,9 @@ dynamic_new_column_store(DYNAMIC_COLUMN
                        DYNCOL_SYZERESERVE))
       goto err;
   }
+  if (!column_count)
+    return ER_DYNCOL_OK;
+
   bzero(str->str, fmt->fixed_hdr);
   str->length= fmt->fixed_hdr;
 
@@ -1497,7 +1500,7 @@ calc_var_sizes(DYN_HEADER *hdr,
   @return ER_DYNCOL_* return code
 */
 
-static enum enum_dyncol_func_result
+enum enum_dyncol_func_result
 dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
                                         uint column_count,
                                         void *column_keys,

=== modified file 'storage/cassandra/CMakeLists.txt'
--- a/storage/cassandra/CMakeLists.txt	2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/CMakeLists.txt	2012-09-24 20:36:21 +0000
@@ -12,7 +12,7 @@ SET(cassandra_sources
      gen-cpp/Cassandra.h)
 
 #INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS})
-INCLUDE_DIRECTORIES(AFTER /home/psergey/cassandra/thrift/include/thrift/) 
+INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift) 
 # 
 STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
 STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-09-16 08:22:21 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-09-24 20:36:21 +0000
@@ -67,6 +67,7 @@ class Cassandra_se_impl: public Cassandr
   std::string rowkey; /* key of the record we're returning now */
 
   SlicePredicate slice_pred;
+  SliceRange slice_pred_sr;
   bool get_slices_returned_less;
 public:
   Cassandra_se_impl() : cass(NULL) {}
@@ -80,6 +81,8 @@ public:
   void first_ddl_column();
   bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
   void get_rowkey_type(char **name, char **type);
+  size_t get_ddl_size();
+  const char* get_default_validator();
 
   /* Writes */
   void clear_insert_buffer();
@@ -89,7 +92,8 @@ public:
 
   /* Reads, point lookups */
   bool get_slice(char *key, size_t key_len, bool *found);
-  bool get_next_read_column(char **name, char **value, int *value_len);
+  bool get_next_read_column(char **name, int *name_len,
+                            char **value, int *value_len );
   void get_read_rowkey(char **value, int *value_len);
 
   /* Reads, multi-row scans */
@@ -103,6 +107,7 @@ public:
 
   /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
   void clear_read_columns();
+  void clear_read_all_columns();
   void add_read_column(const char *name);
  
   /* Reads, MRR scans */
@@ -239,6 +244,16 @@ void Cassandra_se_impl::get_rowkey_type(
     *name= NULL;
 }
 
+size_t Cassandra_se_impl::get_ddl_size()
+{
+  return cf_def.column_metadata.size();
+}
+
+const char* Cassandra_se_impl::get_default_validator()
+{
+  return cf_def.default_validation_class.c_str();
+}
+
 
 /////////////////////////////////////////////////////////////////////////////
 // Data writes
@@ -297,7 +312,7 @@ void Cassandra_se_impl::add_insert_colum
 bool Cassandra_se_impl::do_insert()
 {
   bool res= true;
-  
+
   /*
     zero-size mutations are allowed by Cassandra's batch_mutate but lets not 
     do them (we may attempt to do it if there is a bulk insert that stores
@@ -390,8 +405,8 @@ bool Cassandra_se_impl::get_slice(char *
 }
 
 
-bool Cassandra_se_impl::get_next_read_column(char **name, char **value, 
-                                             int *value_len)
+bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len,
+                                             char **value, int *value_len)
 {
   bool use_counter=false;
   while (1)
@@ -414,12 +429,14 @@ bool Cassandra_se_impl::get_next_read_co
   ColumnOrSuperColumn& cs= *column_data_it;
   if (use_counter)
   {
+    *name_len= cs.counter_column.name.size();
     *name= (char*)cs.counter_column.name.c_str();
     *value= (char*)&cs.counter_column.value;
     *value_len= sizeof(cs.counter_column.value);
   }
   else
   {
+    *name_len= cs.column.name.size();
     *name= (char*)cs.column.name.c_str();
     *value= (char*)cs.column.value.c_str();
     *value_len= cs.column.value.length();
@@ -548,6 +565,13 @@ void Cassandra_se_impl::clear_read_colum
   slice_pred.column_names.clear();
 }
 
+void Cassandra_se_impl::clear_read_all_columns()
+{
+  slice_pred_sr.start = "";
+  slice_pred_sr.finish = "";
+  slice_pred.__set_slice_range(slice_pred_sr);
+}
+
 
 void Cassandra_se_impl::add_read_column(const char *name_arg)
 {
@@ -561,7 +585,7 @@ bool Cassandra_se_impl::truncate()
 {
   bool res= true;
   try {
-    
+
     cass->truncate(column_family);
     res= false;
 

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-09-16 08:22:21 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-09-24 20:36:21 +0000
@@ -26,6 +26,8 @@ public:
   virtual bool next_ddl_column(char **name, int *name_len, char **value, 
                                int *value_len)=0;
   virtual void get_rowkey_type(char **name, char **type)=0;
+  virtual size_t get_ddl_size()=0;
+  virtual const char* get_default_validator()=0;
 
   /* Writes */
   virtual void clear_insert_buffer()=0;
@@ -36,7 +38,8 @@ public:
 
   /* Reads */
   virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
-  virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
+  virtual bool get_next_read_column(char **name, int *name_len,
+                                    char **value, int *value_len)=0;
   virtual void get_read_rowkey(char **value, int *value_len)=0;
 
   /* Reads, multi-row scans */
@@ -44,7 +47,7 @@ public:
   virtual bool get_range_slices(bool last_key_as_start_key)=0;
   virtual void finish_reading_range_slices()=0;
   virtual bool get_next_range_slice_row(bool *eof)=0;
-  
+
   /* Reads, MRR scans */
   virtual void new_lookup_keys()=0;
   virtual int  add_lookup_key(const char *key, size_t key_len)=0;
@@ -53,8 +56,9 @@ public:
 
   /* read_set setup */
   virtual void clear_read_columns()=0;
+  virtual void clear_read_all_columns()=0;
   virtual void add_read_column(const char *name)=0;
-  
+
   virtual bool truncate()=0;
   virtual bool remove_row()=0;
 

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-09-20 10:22:36 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-09-24 20:36:21 +0000
@@ -1,4 +1,4 @@
-/* 
+/*
    Copyright (c) 2012, Monty Program Ab
 
    This program is free software; you can redistribute it and/or modify
@@ -22,15 +22,20 @@
 #include "ha_cassandra.h"
 #include "sql_class.h"
 
+#define DYNCOL_USUAL 20
+#define DYNCOL_DELTA 100
+#define DYNCOL_USUAL_REC 1024
+#define DYNCOL_DELTA_REC 1024
+
 static handler *cassandra_create_handler(handlerton *hton,
-                                       TABLE_SHARE *table, 
+                                       TABLE_SHARE *table,
                                        MEM_ROOT *mem_root);
 
 
 handlerton *cassandra_hton;
 
 
-/* 
+/*
    Hash used to track the number of open tables; variable for example share
    methods
 */
@@ -69,6 +74,25 @@ ha_create_table_option cassandra_table_o
   HA_TOPTION_END
 };
 
+/**
+  Structure for CREATE TABLE options (field options).
+*/
+
+struct ha_field_option_struct
+{
+  bool dyncol_field;
+};
+
+ha_create_table_option cassandra_field_option_list[]=
+{
+  /*
+    Collect all other columns as dynamic here,
+    the valid values are YES/NO, ON/OFF, 1/0.
+    The default is 0, that is true, yes, on.
+  */
+  HA_FOPTION_BOOL("DYNCOL", dyncol_field, 0),
+  HA_FOPTION_END
+};
 
 static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
   "Number of rows in an INSERT batch",
@@ -203,17 +227,16 @@ static int cassandra_init_func(void *p)
 
   cassandra_hton->state=   SHOW_OPTION_YES;
   cassandra_hton->create=  cassandra_create_handler;
-  /* 
+  /*
     Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
     TABLE to create an *empty* table from scratch. Cassandra table won't be
     emptied if re-created.
   */
-  cassandra_hton->flags=   0; 
+  cassandra_hton->flags=   0;
   cassandra_hton->table_options= cassandra_table_option_list;
-  //cassandra_hton->field_options= example_field_option_list;
-  cassandra_hton->field_options= NULL;
-  
-  mysql_mutex_init(0 /* no instrumentation */, 
+  cassandra_hton->field_options= cassandra_field_option_list;
+
+  mysql_mutex_init(0 /* no instrumentation */,
                    &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
 
   DBUG_RETURN(0);
@@ -310,7 +333,7 @@ static int free_share(CASSANDRA_SHARE *s
 
 
 static handler* cassandra_create_handler(handlerton *hton,
-                                       TABLE_SHARE *table, 
+                                       TABLE_SHARE *table,
                                        MEM_ROOT *mem_root)
 {
   return new (mem_root) ha_cassandra(hton, table);
@@ -319,7 +342,11 @@ static handler* cassandra_create_handler
 
 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-   se(NULL), field_converters(NULL), rowkey_converter(NULL)
+   se(NULL), field_converters(NULL),
+   special_type_field_converters(NULL),
+   special_type_field_names(NULL), n_special_type_fields(0),
+   rowkey_converter(NULL),
+   dyncol_field(0), dyncol_set(0)
 {}
 
 
@@ -333,6 +360,29 @@ const char **ha_cassandra::bas_ext() con
 }
 
 
+int ha_cassandra::check_field_options(Field **fields)
+{
+  Field **field;
+  uint i;
+  DBUG_ENTER("ha_cassandra::check_field_options");
+  for (field= fields, i= 0; *field; field++, i++)
+  {
+    ha_field_option_struct *field_options= (*field)->option_struct;
+    if (field_options && field_options->dyncol_field)
+    {
+      if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
+      {
+         my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
+         DBUG_RETURN(HA_WRONG_CREATE_OPTION);
+      }
+      dyncol_set= 1;
+      dyncol_field= i;
+    }
+  }
+  DBUG_RETURN(0);
+}
+
+
 int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
 {
   ha_table_option_struct *options= table->s->option_struct;
@@ -357,6 +407,9 @@ int ha_cassandra::open(const char *name,
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
 
+  if ((res= check_field_options(table->s->field)))
+    DBUG_RETURN(res);
+
   if (setup_field_converters(table->field, table->s->fields))
   {
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -438,31 +491,12 @@ int ha_cassandra::create(const char *nam
       table_arg->key_info[0].key_parts != 1 ||
       table_arg->key_info[0].key_part[0].fieldnr != 1)
   {
-    my_error(ER_WRONG_COLUMN_NAME, MYF(0), 
+    my_error(ER_WRONG_COLUMN_NAME, MYF(0),
              "Table must have PRIMARY KEY defined over the first column");
     DBUG_RETURN(HA_WRONG_CREATE_OPTION);
   }
-
-#ifndef DBUG_OFF
-/*  
-  DBUG_PRINT("info", ("strparam: '%-.64s'  ullparam: %llu  enumparam: %u  "\
-                      "boolparam: %u",
-                      (options->strparam ? options->strparam : "<NULL>"),
-                      options->ullparam, options->enumparam, options->boolparam));
-
-  psergey-todo: check table definition!
-  for (Field **field= table_arg->s->field; *field; field++)
-  {
-    ha_field_option_struct *field_options= (*field)->option_struct;
-    DBUG_ASSERT(field_options);
-    DBUG_PRINT("info", ("field: %s  complex: '%-.64s'",
-                         (*field)->field_name,
-                         (field_options->complex_param_to_parse_it_in_engine ?
-                          field_options->complex_param_to_parse_it_in_engine :
-                          "<NULL>")));
-  }
-*/
-#endif
+  if ((res= check_field_options(table_arg->s->field)))
+    DBUG_RETURN(res);
   DBUG_ASSERT(!se);
   if ((res= check_table_options(options)))
     DBUG_RETURN(res);
@@ -476,7 +510,7 @@ int ha_cassandra::create(const char *nam
     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
-  
+
   if (setup_field_converters(table_arg->s->field, table_arg->s->fields))
   {
     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
@@ -500,7 +534,7 @@ public:
   Field *field;
 
   /* This will save Cassandra's data in the Field */
-  virtual void cassandra_to_mariadb(const char *cass_data, 
+  virtual void cassandra_to_mariadb(const char *cass_data,
                                     int cass_data_len)=0;
 
   /*
@@ -527,7 +561,7 @@ public:
     double *pdata= (double*) cass_data;
     field->store(*pdata);
   }
-  
+
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
   {
     buf= field->val_real();
@@ -732,6 +766,20 @@ static int convert_hex_digit(const char
 
 const char map2number[]="0123456789abcdef";
 
+static void convert_uuid2string (char *str, const char *cass_data)
+{
+  char *ptr= str;
+  /* UUID arrives as 16-byte number in network byte order */
+  for (uint i=0; i < 16; i++)
+  {
+    *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
+    *(ptr++)= map2number[cass_data[i] & 0xF];
+    if (i == 3 || i == 5 || i == 7 || i == 9)
+      *(ptr++)= '-';
+  }
+  *ptr= 0;
+}
+
 class UuidDataConverter : public ColumnDataConverter
 {
   char buf[16]; /* Binary UUID representation */
@@ -741,16 +789,7 @@ public:
   {
     DBUG_ASSERT(cass_data_len==16);
     char str[37];
-    char *ptr= str;
-    /* UUID arrives as 16-byte number in network byte order */
-    for (uint i=0; i < 16; i++)
-    {
-      *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
-      *(ptr++)= map2number[cass_data[i] & 0xF];
-      if (i == 3 || i == 5 || i == 7 || i == 9)
-        *(ptr++)= '-';
-    }
-    *ptr= 0;
+    convert_uuid2string(str, cass_data);
     field->store(str, 36,field->charset());
   }
 
@@ -787,6 +826,124 @@ public:
   ~UuidDataConverter(){}
 };
 
+/**
+  Converting dynamic columns types to/from casandra types
+*/
+bool cassandra_to_dyncol_intLong(const char *cass_data,
+                                 int cass_data_len __attribute__((unused)),
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_INT;
+#ifdef WORDS_BIGENDIAN
+  value->x.long_value= (longlong *)*cass_data;
+#else
+  flip64(cass_data, (char *)&value->x.long_value);
+#endif
+  return 0;
+}
+bool cassandra_to_dyncol_intInt32(const char *cass_data,
+                                  int cass_data_len __attribute__((unused)),
+                                  DYNAMIC_COLUMN_VALUE *value)
+{
+  int32 tmp;
+  value->type= DYN_COL_INT;
+#ifdef WORDS_BIGENDIAN
+  tmp= *((int32 *)cass_data);
+#else
+  flip32(cass_data, (char *)&tmp);
+#endif
+  value->x.long_value= tmp;
+  return 0;
+}
+bool cassandra_to_dyncol_intCounter(const char *cass_data,
+                                    int cass_data_len __attribute__((unused)),
+                                    DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_INT;
+#ifdef WORDS_BIGENDIAN
+  flip64(cass_data, &value->x.long_value);
+#else
+  value->x.long_value= *((longlong *)cass_data);
+#endif
+  return 0;
+}
+bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
+                                     int cass_data_len __attribute__((unused)),
+                                     DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_DOUBLE;
+  value->x.double_value= *((float *)cass_data);
+  return 0;
+}
+bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
+                                      int cass_data_len __attribute__((unused)),
+                                      DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_DOUBLE;
+  value->x.double_value= *((double *)cass_data);
+  return 0;
+}
+bool cassandra_to_dyncol_strStr(const char *cass_data,
+                                int cass_data_len,
+                                DYNAMIC_COLUMN_VALUE *value,
+                                CHARSET_INFO *cs)
+{
+  value->type= DYN_COL_STRING;
+  value->x.string.charset= cs;
+  value->x.string.value.str= (char *)cass_data;
+  value->x.string.value.length= cass_data_len;
+  value->x.string.nonfreeable= TRUE; // do not try to free
+  return 0;
+}
+bool cassandra_to_dyncol_strBytes(const char *cass_data,
+                                  int cass_data_len,
+                                  DYNAMIC_COLUMN_VALUE *value)
+{
+  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+                                    &my_charset_bin);
+}
+bool cassandra_to_dyncol_strAscii(const char *cass_data,
+                                  int cass_data_len,
+                                  DYNAMIC_COLUMN_VALUE *value)
+{
+  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+                                    &my_charset_latin1_bin);
+}
+bool cassandra_to_dyncol_strUTF8(const char *cass_data,
+                                 int cass_data_len,
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+                                    &my_charset_utf8_unicode_ci);
+}
+
+bool cassandra_to_dyncol_strUUID(const char *cass_data,
+                                 int cass_data_len,
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_STRING;
+  value->x.string.charset= &my_charset_bin;
+  value->x.string.value.str= (char *)my_malloc(37, MYF(0));
+  if (!value->x.string.value.str)
+  {
+    value->x.string.value.length= 0;
+    value->x.string.nonfreeable= TRUE;
+    return 1;
+  }
+  convert_uuid2string(value->x.string.value.str, cass_data);
+  value->x.string.value.length= 36;
+  value->x.string.nonfreeable= FALSE;
+  return 0;
+}
+
+bool cassandra_to_dyncol_intBool(const char *cass_data,
+                                 int cass_data_len,
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_INT;
+  value->x.long_value= (cass_data[0] ? 1 : 0);
+  return 0;
+}
 
 const char * const validator_bigint=  "org.apache.cassandra.db.marshal.LongType";
 const char * const validator_int=     "org.apache.cassandra.db.marshal.Int32Type";
@@ -806,6 +963,90 @@ const char * const validator_uuid= "org.
 const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
 
 
+static CASSANDRA_TYPE_DEF cassandra_types[]=
+{
+  {
+    validator_bigint,
+    &cassandra_to_dyncol_intLong
+  },
+  {
+    validator_int,
+    &cassandra_to_dyncol_intInt32
+  },
+  {
+    validator_counter,
+    cassandra_to_dyncol_intCounter
+  },
+  {
+    validator_float,
+    &cassandra_to_dyncol_doubleFloat
+  },
+  {
+    validator_double,
+    &cassandra_to_dyncol_doubleDouble
+  },
+  {
+    validator_blob,
+    &cassandra_to_dyncol_strBytes
+  },
+  {
+    validator_ascii,
+    &cassandra_to_dyncol_strAscii
+  },
+  {
+    validator_text,
+    &cassandra_to_dyncol_strUTF8
+  },
+  {
+    validator_timestamp,
+    &cassandra_to_dyncol_intLong
+  },
+  {
+    validator_uuid,
+    &cassandra_to_dyncol_strUUID
+  },
+  {
+    validator_boolean,
+    &cassandra_to_dyncol_intBool
+  }
+};
+
+CASSANDRA_TYPE get_cassandra_type(const char *validator)
+{
+  CASSANDRA_TYPE rc;
+  switch(validator[32])
+  {
+  case 'L':
+    rc= CT_BIGINT;
+    break;
+  case 'I':
+    rc= CT_INT;
+    break;
+  case 'C':
+    rc= CT_COUNTER;
+    break;
+  case 'F':
+    rc= CT_FLOAT;
+    break;
+  case 'D':
+    rc= (validator[33] == 'o' ? CT_DOUBLE : CT_TIMESTAMP);
+    break;
+  case 'B':
+    rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
+    break;
+  case 'A':
+    rc= CT_ASCII;
+    break;
+  case 'U':
+    rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
+    break;
+  default:
+    rc= CT_BLOB;
+  }
+  DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
+  return rc;
+}
+
 ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
 {
   ColumnDataConverter *res= NULL;
@@ -836,16 +1077,16 @@ ColumnDataConverter *map_field_to_valida
       if (!strcmp(validator_name, validator_double))
         res= new DoubleDataConverter;
       break;
-    
+
     case MYSQL_TYPE_TIMESTAMP:
       if (!strcmp(validator_name, validator_timestamp))
         res= new TimestampDataConverter;
       break;
 
     case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
-      if (!strcmp(validator_name, validator_uuid) && 
+      if (!strcmp(validator_name, validator_uuid) &&
           field->real_type() == MYSQL_TYPE_STRING &&
-          field->field_length == 36) 
+          field->field_length == 36)
       {
         // UUID maps to CHAR(36), its text representation
         res= new UuidDataConverter;
@@ -879,61 +1120,139 @@ bool ha_cassandra::setup_field_converter
   int  col_name_len;
   char *col_type;
   int col_type_len;
+  size_t ddl_fields= se->get_ddl_size();
+  const char *default_type= se->get_default_validator();
+  uint max_non_default_fields;
+  DBUG_ENTER("ha_cassandra::setup_field_converters");
+  DBUG_ASSERT(default_type);
 
   DBUG_ASSERT(!field_converters);
-  size_t memsize= sizeof(ColumnDataConverter*) * n_fields;
+  DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
+
+  /*
+    We always should take into account that in case of using dynamic columns
+    sql description contain one field which does not described in
+    Cassandra DDL also key field is described separately. So that
+    is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
+  */
+  max_non_default_fields= ddl_fields + 2 - n_fields;
+  if (ddl_fields < (n_fields - dyncol_set - 1))
+  {
+    se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+    DBUG_RETURN(true);
+  }
+
+  /* allocate memory in one chunk */
+  size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
+    (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
+    (dyncol_set ? max_non_default_fields : 0);
   if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
-    return true;
+    DBUG_RETURN(true);
   bzero(field_converters, memsize);
   n_field_converters= n_fields;
+  if (dyncol_set)
+  {
+    special_type_field_converters=
+      (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
+    special_type_field_names=
+      ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
+  }
+
+  if (dyncol_set)
+  {
+    if (init_dynamic_array(&dynamic_values,
+                           sizeof(DYNAMIC_COLUMN_VALUE),
+                           DYNCOL_USUAL, DYNCOL_DELTA))
+      DBUG_RETURN(true);
+    else
+      if (init_dynamic_array(&dynamic_names,
+                             sizeof(LEX_STRING),
+                             DYNCOL_USUAL, DYNCOL_DELTA))
+      {
+        delete_dynamic(&dynamic_values);
+        DBUG_RETURN(true);
+      }
+      else
+        if (init_dynamic_string(&dynamic_rec, NULL,
+                                DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
+        {
+          delete_dynamic(&dynamic_values);
+          delete_dynamic(&dynamic_names);
+          DBUG_RETURN(true);
+        }
+
+    /* Dynamic column field has special processing */
+    field_converters[dyncol_field]= NULL;
+
+    default_type_def= cassandra_types + get_cassandra_type(default_type);
+  }
 
   se->first_ddl_column();
   uint n_mapped= 0;
   while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
                               &col_type_len))
   {
+    Field **field;
+    uint i;
     /* Mapping for the 1st field is already known */
-    for (Field **field= field_arg + 1; *field; field++)
+    for (field= field_arg + 1, i= 1; *field; field++, i++)
     {
-      if (!strcmp((*field)->field_name, col_name))
+      if ((!dyncol_set || dyncol_field != i) &&
+          !strcmp((*field)->field_name, col_name))
       {
         n_mapped++;
         ColumnDataConverter **conv= field_converters + (*field)->field_index;
         if (!(*conv= map_field_to_validator(*field, col_type)))
         {
-          se->print_error("Failed to map column %s to datatype %s", 
+          se->print_error("Failed to map column %s to datatype %s",
                           (*field)->field_name, col_type);
           my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-          return true;
+          DBUG_RETURN(true);
         }
         (*conv)->field= *field;
       }
     }
+    if (dyncol_set && !(*field)) // is needed and not found
+    {
+      DBUG_PRINT("info",("Field not found: %s", col_name));
+      if (strcmp(col_type, default_type))
+      {
+        DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
+                           col_name, col_type));
+        special_type_field_names[n_special_type_fields].length= col_name_len;
+        special_type_field_names[n_special_type_fields].str= col_name;
+        special_type_field_converters[n_special_type_fields]=
+          cassandra_types[get_cassandra_type(col_type)];
+        n_special_type_fields++;
+      }
+    }
   }
 
-  if (n_mapped != n_fields - 1)
+  if (n_mapped != n_fields - 1 - dyncol_set)
   {
-    se->print_error("Some of SQL fields were not mapped to Cassandra's fields"); 
+    se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
-  
-  /* 
+
+  /*
     Setup type conversion for row_key.
   */
   se->get_rowkey_type(&col_name, &col_type);
   if (col_name && strcmp(col_name, (*field_arg)->field_name))
   {
-    se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", col_name);
+    se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
+                    col_name);
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
   if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
   {
     se->print_error("target column family has no key_alias defined, "
                     "PRIMARY KEY column must be named 'rowkey'");
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
 
   if (col_type != NULL)
@@ -942,7 +1261,7 @@ bool ha_cassandra::setup_field_converter
     {
       se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
       my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-      return true;
+      DBUG_RETURN(true);
     }
     rowkey_converter->field= *field_arg;
   }
@@ -950,10 +1269,10 @@ bool ha_cassandra::setup_field_converter
   {
     se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
 
-  return false;
+  DBUG_RETURN(false);
 }
 
 
@@ -962,10 +1281,20 @@ void ha_cassandra::free_field_converters
   delete rowkey_converter;
   rowkey_converter= NULL;
 
+  if (dyncol_set)
+  {
+    delete_dynamic(&dynamic_values);
+    delete_dynamic(&dynamic_names);
+    dynstr_free(&dynamic_rec);
+  }
   if (field_converters)
   {
     for (uint i=0; i < n_field_converters; i++)
-      delete field_converters[i];
+      if (field_converters[i])
+      {
+        DBUG_ASSERT(!dyncol_set || i == dyncol_field);
+        delete field_converters[i];
+      }
     my_free(field_converters);
     field_converters= NULL;
   }
@@ -980,7 +1309,7 @@ int ha_cassandra::index_read_map(uchar *
 {
   int rc= 0;
   DBUG_ENTER("ha_cassandra::index_read_map");
-  
+
   if (find_flag != HA_READ_KEY_EXACT)
     DBUG_RETURN(HA_ERR_WRONG_COMMAND);
 
@@ -1007,7 +1336,7 @@ int ha_cassandra::index_read_map(uchar *
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
     rc= HA_ERR_INTERNAL_ERROR;
   }
-  
+
   /* TODO: what if we're not reading all columns?? */
   if (!found)
   {
@@ -1021,15 +1350,24 @@ int ha_cassandra::index_read_map(uchar *
   DBUG_RETURN(rc);
 }
 
+void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num)
+{
+  for (uint i= 0; i < num; i++)
+    if (vals[i].type == DYN_COL_STRING &&
+        !vals[i].x.string.nonfreeable)
+      my_free(vals[i].x.string.value.str);
+}
 
 void ha_cassandra::read_cassandra_columns(bool unpack_pk)
 {
   char *cass_name;
   char *cass_value;
-  int cass_value_len;
+  int cass_value_len, cass_name_len;
   Field **field;
-  
-  /* 
+  bool do_dyncol= dyncol_set;
+
+
+  /*
     cassandra_to_mariadb() calls will use field->store(...) methods, which
     require that the column is in the table->write_set
   */
@@ -1040,23 +1378,84 @@ void ha_cassandra::read_cassandra_column
   for (field= table->field + 1; *field; field++)
     (*field)->set_null();
 
-  while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
+  while (!se->get_next_read_column(&cass_name, &cass_name_len,
+                                   &cass_value, &cass_value_len))
   {
     // map to our column. todo: use hash or something..
-    int idx=1;
+    uint idx=1;
+    bool found= 0;
     for (field= table->field + 1; *field; field++)
     {
       idx++;
-      if (!strcmp((*field)->field_name, cass_name))
+      if ((!dyncol_set || dyncol_field != idx) &&
+          !strcmp((*field)->field_name, cass_name))
       {
         int fieldnr= (*field)->field_index;
+        found= 1;
         (*field)->set_notnull();
-        field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
+        field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
+                                                        cass_value_len);
         break;
       }
     }
+    if (do_dyncol && !found)
+    {
+      DYNAMIC_COLUMN_VALUE val;
+      LEX_STRING nm;
+      CASSANDRA_TYPE_DEF *type= default_type_def;
+      for(uint i= 0; i < n_special_type_fields; i++)
+      {
+        if (cass_name_len == (int)special_type_field_names[i].length &&
+            memcmp(cass_name, special_type_field_names[i].str,
+                   cass_name_len) == 0)
+        {
+          type= special_type_field_converters + i;
+          break;
+        }
+      }
+
+      nm.str= cass_name;
+      nm.length= cass_name_len;
+      if ((*(type->cassandra_to_dynamic))(cass_value, cass_value_len, &val) ||
+          insert_dynamic(&dynamic_names, (uchar *) &nm) ||
+          insert_dynamic(&dynamic_values, (uchar *) &val))
+      {
+        free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
+                     dynamic_values.elements);
+        do_dyncol= FALSE;
+      }
+    }
   }
-  
+
+  dynamic_rec.length= 0;
+  if (do_dyncol)
+  {
+    if (dynamic_column_create_many_internal_fmt(&dynamic_rec,
+                                                dynamic_names.elements,
+                                                dynamic_names.buffer,
+                                                (DYNAMIC_COLUMN_VALUE *)
+                                                dynamic_values.buffer,
+                                                FALSE,
+                                                TRUE) < 0)
+      dynamic_rec.length= 0;
+
+    free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
+                 dynamic_values.elements);
+    dynamic_values.elements= dynamic_names.elements= 0;
+  }
+  if (dyncol_set)
+  {
+    if (dynamic_rec.length == 0)
+      table->field[dyncol_field]->set_null();
+    else
+    {
+      table->field[dyncol_field]->set_notnull();
+      table->field[dyncol_field]->store(dynamic_rec.str,
+                                        dynamic_rec.length,
+                                        &my_charset_bin);
+    }
+  }
+
   if (unpack_pk)
   {
     /* Unpack rowkey to primary key */
@@ -1165,9 +1564,16 @@ int ha_cassandra::rnd_init(bool scan)
     DBUG_RETURN(0);
   }
 
-  se->clear_read_columns();
-  for (uint i= 1; i < table->s->fields; i++)
-    se->add_read_column(table->field[i]->field_name);
+  if (dyncol_set)
+  {
+    se->clear_read_all_columns();
+  }
+  else
+  {
+    se->clear_read_columns();
+    for (uint i= 1; i < table->s->fields; i++)
+      se->add_read_column(table->field[i]->field_name);
+  }
 
   se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
   bres= se->get_range_slices(false);

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-09-20 10:22:36 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-09-24 20:36:21 +0000
@@ -40,6 +40,27 @@ class ColumnDataConverter;
 
 struct ha_table_option_struct;
 
+
+struct st_dynamic_column_value;
+
+typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
+                                   int cass_data_len,
+                                   struct st_dynamic_column_value *value);
+struct cassandra_type_def
+{
+  const char *name;
+  CAS2DYN_CONVERTER cassandra_to_dynamic;
+};
+
+typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
+
+enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
+  CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN};
+
+typedef enum cassandtra_type_enum CASSANDRA_TYPE;
+
+
+
 /** @brief
   Class definition for the storage engine
 */
@@ -47,23 +68,35 @@ class ha_cassandra: public handler
 {
   THR_LOCK_DATA lock;      ///< MySQL lock
   CASSANDRA_SHARE *share;    ///< Shared lock info
-  
+
   Cassandra_se_interface *se;
 
+  /* description of static part of the table definition */
   ColumnDataConverter **field_converters;
   uint n_field_converters;
 
+  CASSANDRA_TYPE_DEF *default_type_def;
+  /* description of dynamic columns part */
+  CASSANDRA_TYPE_DEF *special_type_field_converters;
+  LEX_STRING *special_type_field_names;
+  uint n_special_type_fields;
+  DYNAMIC_ARRAY dynamic_values, dynamic_names;
+  DYNAMIC_STRING dynamic_rec;
+
   ColumnDataConverter *rowkey_converter;
 
   bool setup_field_converters(Field **field, uint n_fields);
   void free_field_converters();
-  
+
   void read_cassandra_columns(bool unpack_pk);
   int check_table_options(struct ha_table_option_struct* options);
 
   bool doing_insert_batch;
   ha_rows insert_rows_batched;
-  
+
+  uint dyncol_field;
+  bool dyncol_set;
+
   /* Used to produce 'wrong column %s at row %lu' warnings */
   ha_rows insert_lineno;
 public:
@@ -186,6 +219,7 @@ public:
 private:
   bool source_exhausted;
   bool mrr_start_read();
+  int check_field_options(Field **fields);
 public:
 
   /*



More information about the commits mailing list