[Commits] Rev 3485: MDEV-431: Cassandra storage engine in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Sat Aug 18 15:28:36 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3485
revision-id: psergey at askmonty.org-20120818122835-uceiz93vnnhyuqx8
parent: psergey at askmonty.org-20120817171320-m3v5t0ac0upd5cxc
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Sat 2012-08-18 16:28:35 +0400
message:
  MDEV-431: Cassandra storage engine
  - Introduce type converters (so far rather trivial)
  - switch INSERT to using batch_mutate()
=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-08-17 17:13:20 +0000
+++ b/mysql-test/t/cassandra.test	2012-08-18 12:28:35 +0000
@@ -49,7 +49,7 @@
 ############################################################################
 
 # Now, create a table for real and insert data
-create table t1 (rowkey char(36) primary key, column1 char(60)) engine=cassandra 
+create table t1 (rowkey char(36) primary key, data1 varchar(60)) engine=cassandra 
   thrift_host='localhost' keyspace='mariadbtest' column_family='cf1';
 
 insert into t1 values ('key0', 'data1');

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-08-18 12:28:35 +0000
@@ -43,53 +43,56 @@
   std::string column_family;
   std::string keyspace;
   
-  /* DDL checks */
+  /* DDL data */
   KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
   CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
   std::vector<ColumnDef>::iterator column_ddl_it;
 
   /* The list that was returned by the last key lookup */
-  std::vector<ColumnOrSuperColumn> col_supercol_vec;
+  std::vector<ColumnOrSuperColumn> column_data_vec;
+  std::vector<ColumnOrSuperColumn>::iterator column_data_it;
+
+  /* Insert preparation */
+  typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
+  typedef std::map<std::string,  ColumnFamilyToMutation> KeyToCfMutationMap;
+   
+  KeyToCfMutationMap batch_mutation; /* Prepare operation here */
+  std::string key_to_insert;
+  int64_t insert_timestamp;
+  std::vector<Mutation>* insert_list;
 
 public:
-  
   Cassandra_se_impl() : cass(NULL) {}
   virtual ~Cassandra_se_impl(){ delete cass; }
   
+  /* Connection and DDL checks */
   bool connect(const char *host, const char *keyspace);
-
-  virtual void set_column_family(const char *cfname)
-  {
-    column_family.assign(cfname); 
-  }
-
-  virtual bool insert(NameAndValue *fields);
-  virtual bool get_slice(char *key, size_t key_len, NameAndValue *row, bool *found);
-   
-
-  /* Functions to enumerate ColumnFamily's DDL data */
+  void set_column_family(const char *cfname) { column_family.assign(cfname); }
+
   bool setup_ddl_checks();
   void first_ddl_column();
   bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
+
+  /* Writes */
+  void start_prepare_insert(const char *key, int key_len);
+  void add_insert_column(const char *name, const char *value, int value_len);
+  bool do_insert();
+
+  /* Reads */
+  bool get_slice(char *key, size_t key_len, bool *found);
+  bool get_next_read_column(char **name, char **value, int *value_len);
+
 };
 
 
+/////////////////////////////////////////////////////////////////////////////
+// Connection and setup
+/////////////////////////////////////////////////////////////////////////////
 Cassandra_se_interface *get_cassandra_se()
 {
   return new Cassandra_se_impl;
 }
 
-#define CASS_TRY(x) try { \
-     x;  \
-  }catch(TTransportException te){ \
-    print_error("%s [%d]", te.what(), te.getType()); \
-  }catch(InvalidRequestException ire){ \
-    print_error("%s [%s]", ire.what(), ire.why.c_str()); \
-  }catch(NotFoundException nfe){ \
-    print_error("%s", nfe.what()); \
-  } catch(...) { \
-    print_error("Unknown Exception"); \
-  }
 
 bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg)
 {
@@ -121,10 +124,9 @@
     print_error("Unknown Exception");
   }
 
-  // For now:
   cur_consistency_level= ConsistencyLevel::ONE;
 
-  if (setup_ddl_checks())
+  if (!res && setup_ddl_checks())
     res= true;
   return res;
 }
@@ -176,13 +178,20 @@
   return false;
 }
 
+/////////////////////////////////////////////////////////////////////////////
+// Data writes
+/////////////////////////////////////////////////////////////////////////////
 
-bool Cassandra_se_impl::insert(NameAndValue *fields)
+void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len)
 {
-  ColumnParent cparent;
-  cparent.column_family= column_family;
-  
-  Column c;
+  key_to_insert.assign(key, key_len);
+  batch_mutation.clear();
+  batch_mutation[key_to_insert]= ColumnFamilyToMutation();
+  ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
+
+  cf_mut[column_family]= std::vector<Mutation>();
+  insert_list= &cf_mut[column_family];
+
   struct timeval td;
   gettimeofday(&td, NULL);
   int64_t ms = td.tv_sec;
@@ -190,37 +199,57 @@
   int64_t usec = td.tv_usec;
   usec = usec / 1000;
   ms += usec;
-  c.timestamp = ms;
-  c.__isset.timestamp = true;
-
-  std::string key;
-  key.assign(fields->value, fields->value_len);
-  fields++;
-
-  bool res= false;
+  insert_timestamp= ms;
+}
+
+
+void Cassandra_se_impl::add_insert_column(const char *name, const char *value, 
+                                          int value_len)
+{
+  Mutation mut;
+  mut.__isset.column_or_supercolumn= true;
+  mut.column_or_supercolumn.__isset.column= true;
+
+  Column& col=mut.column_or_supercolumn.column;
+  col.name.assign(name);
+  col.value.assign(value, value_len);
+  col.timestamp= insert_timestamp;
+  col.__isset.value= true;
+  col.__isset.timestamp= true;
+  insert_list->push_back(mut);
+}
+
+
+bool Cassandra_se_impl::do_insert()
+{
+  bool res= true;
   try {
-    /* TODO: switch to batch_mutate(). Or, even to CQL? */
-
-    // TODO: what should INSERT table (co1, col2) VALUES ('foo', 'bar') mean?
-    //       in SQL, it sets all columns.. what should it mean here? can we have
-    //       it to work only for specified columns? (if yes, what do for
-    //       VALUES()?)    
-    c.__isset.value= true;
-    for(;fields->name; fields++)
-    {
-      c.name.assign(fields->name);
-      c.value.assign(fields->value, fields->value_len);
-      cass->insert(key, cparent, c, ConsistencyLevel::ONE);
-    }
-
-  } catch (...) {
-   res= true;
+    
+    cass->batch_mutate(batch_mutation, cur_consistency_level);
+    res= false;
+
+  } catch (InvalidRequestException ire) {
+    print_error("%s [%s]", ire.what(), ire.why.c_str());
+  } catch (UnavailableException ue) {
+    print_error("UnavailableException: %s", ue.what());
+  } catch (TimedOutException te) {
+    print_error("TimedOutException: %s", te.what());
   }
+
   return res;
 }
 
 
-bool Cassandra_se_impl::get_slice(char *key, size_t key_len, NameAndValue *row, bool *found)
+/////////////////////////////////////////////////////////////////////////////
+// Reading data
+/////////////////////////////////////////////////////////////////////////////
+
+/*
+  Make one key lookup. If the record is found, the result is stored locally and 
+  the caller should iterate over it.
+*/
+
+bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
 {
   ColumnParent cparent;
   cparent.column_family= column_family;
@@ -235,37 +264,57 @@
   slice_pred.__set_slice_range(sr);
 
   try {
-    std::vector<ColumnOrSuperColumn> &res= col_supercol_vec;
-    cass->get_slice(res, rowkey_str, cparent, slice_pred, ConsistencyLevel::ONE);
-    *found= true;
+    cass->get_slice(column_data_vec, rowkey_str, cparent, slice_pred, 
+                    ConsistencyLevel::ONE);
 
-    std::vector<ColumnOrSuperColumn>::iterator it;
-    if (res.size() == 0)
+    if (column_data_vec.size() == 0)
     {
-      /* 
+      /*
         No columns found. Cassandra doesn't allow records without any column =>
         this means the seach key doesn't exist
       */
       *found= false;
       return false;
     }
-    for (it= res.begin(); it < res.end(); it++)
-    {
-      ColumnOrSuperColumn cs= *it;
-      if (!cs.__isset.column)
-        return true;
-      row->name= (char*)cs.column.name.c_str();
-      row->value= (char*)cs.column.value.c_str();
-      row->value_len= cs.column.value.length();
-      row++;
-    }
-    row->name= NULL;
+    *found= true;
+
   } catch (InvalidRequestException ire) {
+    print_error("%s [%s]", ire.what(), ire.why.c_str());
     return true;
   } catch (UnavailableException ue) {
+    print_error("UnavailableException: %s", ue.what());
     return true;
   } catch (TimedOutException te) {
+    print_error("TimedOutException: %s", te.what());
     return true;
   }
-  return false;
-}
+
+  column_data_it= column_data_vec.begin();
+  return false;
+}
+
+
+bool Cassandra_se_impl::get_next_read_column(char **name, char **value, 
+                                             int *value_len)
+{
+  while (1)
+  {
+    if (column_data_it == column_data_vec.end())
+      return true;
+
+    if (((*column_data_it).__isset.column))
+      break; /* Ok it's a real column. Should be always the case. */
+
+    column_data_it++;
+  }
+
+  ColumnOrSuperColumn& cs= *column_data_it;
+  *name= (char*)cs.column.name.c_str();
+  *value= (char*)cs.column.value.c_str();
+  *value_len= cs.column.value.length();
+
+  column_data_it++;
+  return false;
+}
+
+

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-08-18 12:28:35 +0000
@@ -43,10 +43,14 @@
                                int *value_len)=0;
 
   /* Writes */
-  virtual bool insert(NameAndValue *fields)=0;
+  virtual void start_prepare_insert(const char *key, int key_len)=0;
+  virtual void add_insert_column(const char *name, const char *value, 
+                                 int value_len)=0;
+  virtual bool do_insert()=0;
 
   /* Reads */
-  virtual bool get_slice(char *key, size_t key_len, NameAndValue *row, bool *found)=0 ;
+  virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
+  virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
 
   /* Passing error messages up to ha_cassandra */
   char err_buffer[512];

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-08-18 12:28:35 +0000
@@ -212,7 +212,8 @@
 
 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-   se(NULL), names_and_vals(NULL)
+   se(NULL), names_and_vals(NULL),
+   field_converters(NULL)
 {}
 
 
@@ -249,6 +250,12 @@
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
 
+  if (setup_field_converters(table->field, table->s->fields))
+  {
+    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+  }
+
   DBUG_RETURN(0);
 }
 
@@ -317,16 +324,6 @@
     DBUG_RETURN(HA_WRONG_CREATE_OPTION);
   }
 
-/*
-  pfield++;
-  if (strcmp((*pfield)->field_name, "data"))
-  {
-    my_error(ER_WRONG_COLUMN_NAME, MYF(0), "Second column must be named 'data'");
-    DBUG_RETURN(HA_WRONG_CREATE_OPTION);
-  }
-*/
-  
-
 
 #ifndef DBUG_OFF
 /*  
@@ -358,29 +355,12 @@
     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
-
-  /* 
-    TODO: what about mapping the primary key? It has a 'type', too...
-    see CfDef::key_validation_class ? see also CfDef::key_alias?
-  */
-  se->first_ddl_column();
-  char *col_name;
-  int  col_name_len;
-  char *col_type;
-  int col_type_len;
-  while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
-                              &col_type_len))
+  
+  if (setup_field_converters(table_arg->s->field, table_arg->s->fields))
   {
-    /* Mapping for the 1st field is already known */
-    for (Field **field= table_arg->s->field + 1; *field; field++)
-    {
-      if (!strcmp((*field)->field_name, col_name))
-      {
-        //map_field_to_type(field, col_type);
-      }
-    }
+    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
-
   DBUG_RETURN(0);
 }
 
@@ -391,36 +371,223 @@
 
 */
 
-const char * const validator_bigint="org.apache.cassandra.db.marshal.LongType";
-const char * const validator_int="org.apache.cassandra.db.marshal.Int32Type";
+/* Converter base */
+class ColumnDataConverter
+{
+public:
+  Field *field;
+
+  /* This will save Cassandra's data in the Field */
+  virtual void cassandra_to_mariadb(const char *cass_data, 
+                                    int cass_data_len)=0;
+
+  /*
+    This will get data from the Field pointer, store Cassandra's form
+    in internal buffer, and return pointer/size.
+  */
+  virtual void mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
+  virtual ~ColumnDataConverter() {};
+};
+
+
+class DoubleDataConverter : public ColumnDataConverter
+{
+  double buf;
+public:
+  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  {
+    DBUG_ASSERT(cass_data_len == sizeof(double));
+    double *pdata= (double*) cass_data;
+    field->store(*pdata);
+  }
+  
+  void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
+  {
+    buf= field->val_real();
+    *cass_data= (char*)&buf;
+    *cass_data_len=sizeof(double);
+  }
+  ~DoubleDataConverter(){}
+};
+
+
+class FloatDataConverter : public ColumnDataConverter
+{
+  float buf;
+public:
+  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  {
+    DBUG_ASSERT(cass_data_len == sizeof(float));
+    float *pdata= (float*) cass_data;
+    field->store(*pdata);
+  }
+  
+  void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
+  {
+    buf= field->val_real();
+    *cass_data= (char*)&buf;
+    *cass_data_len=sizeof(float);
+  }
+  ~FloatDataConverter(){}
+};
+
+
+class BigintDataConverter : public ColumnDataConverter
+{
+  longlong buf;
+public:
+  void flip(const char *from, char* to)
+  {
+    to[0]= from[7];
+    to[1]= from[6];
+    to[2]= from[5];
+    to[3]= from[4];
+    to[4]= from[3];
+    to[5]= from[2];
+    to[6]= from[1];
+    to[7]= from[0];
+  }
+  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  {
+    longlong tmp;
+    DBUG_ASSERT(cass_data_len == sizeof(longlong));
+    flip(cass_data, (char*)&tmp);
+    field->store(tmp);
+  }
+  
+  void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
+  {
+    longlong tmp= field->val_int();
+    flip((const char*)&tmp, (char*)&buf);
+    *cass_data= (char*)&buf;
+    *cass_data_len=sizeof(longlong);
+  }
+  ~BigintDataConverter(){}
+};
+
+
+class StringCopyConverter : public ColumnDataConverter
+{
+  String buf;
+public:
+  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  {
+    field->store(cass_data, cass_data_len,field->charset());
+  }
+  
+  void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
+  {
+    String *pstr= field->val_str(&buf);
+    *cass_data= (char*)pstr->c_ptr();
+    *cass_data_len= pstr->length();
+  }
+  ~StringCopyConverter(){}
+};
+
+
+const char * const validator_bigint=  "org.apache.cassandra.db.marshal.LongType";
+const char * const validator_int=     "org.apache.cassandra.db.marshal.Int32Type";
 const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";
 
-const char * const validator_float= "org.apache.cassandra.db.marshal.FloatType";
-const char * const validator_double= "org.apache.cassandra.db.marshal.DoubleType";
-
-void map_field_to_type(Field *field, const char *validator_name)
+const char * const validator_float=   "org.apache.cassandra.db.marshal.FloatType";
+const char * const validator_double=  "org.apache.cassandra.db.marshal.DoubleType";
+
+const char * const validator_blob=    "org.apache.cassandra.db.marshal.BytesType";
+const char * const validator_ascii=   "org.apache.cassandra.db.marshal.AsciiType";
+const char * const validator_text=    "org.apache.cassandra.db.marshal.UTF8Type";
+
+
+ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
 {
+  ColumnDataConverter *res= NULL;
+
   switch(field->type()) {
     case MYSQL_TYPE_TINY:
     case MYSQL_TYPE_SHORT:
     case MYSQL_TYPE_LONG:
     case MYSQL_TYPE_LONGLONG:
       if (!strcmp(validator_name, validator_bigint))
-      {
-        //setup bigint validator
-      }
+        res= new BigintDataConverter;
       break;
     case MYSQL_TYPE_FLOAT:
       if (!strcmp(validator_name, validator_float))
+        res= new FloatDataConverter;
       break;
     case MYSQL_TYPE_DOUBLE:
       if (!strcmp(validator_name, validator_double))
-      break;
-    default:
-      DBUG_ASSERT(0);
-  }
-}
-
+        res= new DoubleDataConverter;
+      break;
+
+    case MYSQL_TYPE_VAR_STRING:
+    case MYSQL_TYPE_VARCHAR:
+      if (!strcmp(validator_name, validator_blob) ||
+          !strcmp(validator_name, validator_ascii) ||
+          !strcmp(validator_name, validator_text))
+      {
+        res= new StringCopyConverter;
+      }
+      break;
+    default:;
+  }
+  return res;
+}
+
+
+bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
+{
+  char *col_name;
+  int  col_name_len;
+  char *col_type;
+  int col_type_len;
+
+  DBUG_ASSERT(!field_converters);
+  size_t memsize= sizeof(ColumnDataConverter*) * n_fields;
+  if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
+    return true;
+  bzero(field_converters, memsize);
+  n_field_converters= n_fields;
+
+  /* 
+    TODO: what about mapping the primary key? It has a 'type', too...
+    see CfDef::key_validation_class ? see also CfDef::key_alias?
+  */
+
+  se->first_ddl_column();
+  uint n_mapped= 0;
+  while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
+                              &col_type_len))
+  {
+    /* Mapping for the 1st field is already known */
+    for (Field **field= field_arg + 1; *field; field++)
+    {
+      if (!strcmp((*field)->field_name, col_name))
+      {
+        n_mapped++;
+        ColumnDataConverter **conv= field_converters + (*field)->field_index;
+        if (!(*conv= map_field_to_validator(*field, col_type)))
+          return true;
+        (*conv)->field= *field;
+      }
+    }
+  }
+
+  if (n_mapped != n_fields - 1)
+    return true;
+
+  return false;
+}
+
+
+void ha_cassandra::free_field_converters()
+{
+  if (field_converters)
+  {
+    for (uint i=0; i < n_field_converters; i++)
+      delete field_converters[i];
+    my_free(field_converters);
+    field_converters= NULL;
+  }
+}
 
 void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
 
@@ -445,27 +612,42 @@
   str= table->field[0]->val_str(&tmp);
   
   bool found;
-  if (se->get_slice((char*)str->ptr(), str->length(), get_names_and_vals(), &found))
+  if (se->get_slice((char*)str->ptr(), str->length(), &found))
     rc= HA_ERR_INTERNAL_ERROR;
+  
+  /* TODO: what if we're not reading all columns?? */
+  if (!found)
+  {
+    rc= HA_ERR_KEY_NOT_FOUND;
+  }
   else
   {
-    if (found)
+    char *cass_name;
+    char *cass_value;
+    int cass_value_len;
+    Field **field;
+
+    /* Start with all fields being NULL */
+    for (field= table->field + 1; *field; field++)
+      (*field)->set_null();
+
+    while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
     {
-      //NameAndValue *nv= get_names_and_vals();
-      // TODO: walk through the (name, value) pairs and return values.
+      // map to our column. todo: use hash or something..
+      int idx=1;
+      for (field= table->field + 1; *field; field++)
+      {
+        idx++;
+        if (!strcmp((*field)->field_name, cass_name))
+        {
+          int fieldnr= (*field)->field_index;
+          (*field)->set_notnull();
+          field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
+          break;
+        }
+      }
     }
-    else
-      rc= HA_ERR_KEY_NOT_FOUND;
-  }
-#ifdef NEW_CODE
-  
-  se->get_slice();
-
-  for each column
-  {
-    find column;
-  }
-#endif
+  }
 
   DBUG_RETURN(rc);
 }
@@ -475,35 +657,34 @@
 {
   my_bitmap_map *old_map;
   char buff[512]; 
-  NameAndValue *tuple;
-  NameAndValue *nv;
   DBUG_ENTER("ha_cassandra::write_row");
   
-  /* Temporary malloc-happy code just to get INSERTs to work */
-  nv= tuple= get_names_and_vals();
   old_map= dbug_tmp_use_all_columns(table, table->read_set);
-
-  for (Field **field= table->field; *field; field++, nv++)
+  
+  /* Convert the key (todo: unify with the rest of the processing) */
   {
+    Field *pk_col= table->field[0];
     String tmp(buff,sizeof(buff), &my_charset_bin);
+    String *str;
     tmp.length(0);
-    String *str;
-    str= (*field)->val_str(&tmp);
-    nv->name= (char*)(*field)->field_name;
-    nv->value_len= str->length();
-    nv->value= (char*)my_malloc(nv->value_len, MYF(0));
-    memcpy(nv->value, str->ptr(), nv->value_len);
-  }
-  nv->name= NULL;
+    str= pk_col->val_str(&tmp);
+
+    se->start_prepare_insert(str->ptr(), str->length());
+  }
+
+  /* Convert other fields */
+  for (uint i= 1; i < table->s->fields; i++)
+  {
+    char *cass_data;
+    int cass_data_len;
+    field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len);
+    se->add_insert_column(field_converters[i]->field->field_name, 
+                          cass_data, cass_data_len);
+  }
+
   dbug_tmp_restore_column_map(table->read_set, old_map);
   
-  //invoke!
-  bool res= se->insert(tuple);
-
-  for (nv= tuple; nv->name; nv++)
-  {
-    my_free(nv->value);
-  }
+  bool res= se->do_insert();
 
   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
 }

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-08-18 12:28:35 +0000
@@ -24,6 +24,7 @@
   THR_LOCK lock;
 } CASSANDRA_SHARE;
 
+class ColumnDataConverter;
 
 /** @brief
   Class definition for the storage engine
@@ -38,6 +39,12 @@
   /* pre-allocated array of #fields elements */
   NameAndValue *names_and_vals;
   NameAndValue *get_names_and_vals();
+
+
+  ColumnDataConverter **field_converters;
+  uint n_field_converters;
+  bool setup_field_converters(Field **field, uint n_fields);
+  void free_field_converters();
 public:
   ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_cassandra()



More information about the commits mailing list