[Commits] Rev 3488: MDEV-431: Cassandra storage engine in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Sun Aug 19 11:50:53 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3488
revision-id: psergey at askmonty.org-20120819085053-ehoiekekjbwz0udm
parent: psergey at askmonty.org-20120818172931-fhf3hhwmshl164dr
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Sun 2012-08-19 12:50:53 +0400
message:
  MDEV-431: Cassandra storage engine
  - Descriptive error messages
  - Unpack PK column on range scans
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-08-18 17:29:31 +0000
+++ b/mysql-test/r/cassandra.result	2012-08-19 08:50:53 +0000
@@ -13,15 +13,26 @@
 ERROR HY000: Unable to connect to foreign data source: Default TException. [Keyspace no_such_keyspace does not exist]
 create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra 
 thrift_host='localhost' keyspace='no_such_keyspace';
-ERROR HY000: Can't create table 'test.t1' (errno: 140)
-create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
+ERROR HY000: Unable to connect to foreign data source: thrift_host, keyspace, and column_family table options must be s
+create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
 thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
+select * from t1;
+rowkey	data1	data2
 insert into t1 values ('rowkey10', 'data1-value', 123456);
 insert into t1 values ('rowkey11', 'data1-value2', 34543);
+insert into t1 values ('rowkey12', 'data1-value3', 454);
 select * from t1;
 rowkey	data1	data2
-	data1-value	123456
-	data1-value2	34543
+rowkey12	data1-value3	454
+rowkey10	data1-value	123456
+rowkey11	data1-value2	34543
+explain
+select * from t1 where rowkey='rowkey11';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	const	PRIMARY	PRIMARY	38	const	1	
+select * from t1 where rowkey='rowkey11';
+rowkey	data1	data2
+rowkey11	data1-value2	34543
 delete from t1;
 select * from t1;
 rowkey	data1	data2

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-08-18 17:29:31 +0000
+++ b/mysql-test/t/cassandra.test	2012-08-19 08:50:53 +0000
@@ -25,7 +25,7 @@
   thrift_host='localhost' keyspace='no_such_keyspace' column_family='colfam';
 
 # No column family specified
---error ER_CANT_CREATE_TABLE
+--error ER_CONNECT_TO_FOREIGN_DATA_SOURCE
 create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra 
   thrift_host='localhost' keyspace='no_such_keyspace';
 
@@ -49,13 +49,19 @@
 ############################################################################
 
 # Now, create a table for real and insert data
-create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
+create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra 
   thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
+select * from t1;
 
 insert into t1 values ('rowkey10', 'data1-value', 123456);
 insert into t1 values ('rowkey11', 'data1-value2', 34543);
+insert into t1 values ('rowkey12', 'data1-value3', 454);
 select * from t1;
 
+explain
+select * from t1 where rowkey='rowkey11';
+select * from t1 where rowkey='rowkey11';
+
 # Check if deletion works
 delete from t1;
 select * from t1;

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-08-18 17:29:31 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-08-19 08:50:53 +0000
@@ -64,6 +64,8 @@
   /* Resultset we're reading */
   std::vector<KeySlice> key_slice_vec;
   std::vector<KeySlice>::iterator key_slice_it;
+  
+  std::string rowkey; /* key of the record we're returning now */
 
   SlicePredicate slice_pred;
 public:
@@ -77,6 +79,7 @@
   bool setup_ddl_checks();
   void first_ddl_column();
   bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
+  void get_rowkey_type(char **name, char **type);
 
   /* Writes */
   void start_prepare_insert(const char *key, int key_len);
@@ -86,6 +89,7 @@
   /* Reads, point lookups */
   bool get_slice(char *key, size_t key_len, bool *found);
   bool get_next_read_column(char **name, char **value, int *value_len);
+  void get_read_rowkey(char **value, int *value_len);
 
   /* Reads, multi-row scans */
   bool get_range_slices();
@@ -193,6 +197,21 @@
   return false;
 }
 
+
+void Cassandra_se_impl::get_rowkey_type(char **name, char **type)
+{
+  if (cf_def.__isset.key_validation_class)
+    *type= (char*)cf_def.key_validation_class.c_str();
+  else
+    *type= NULL;
+
+  if (cf_def.__isset.key_alias)
+    *name= (char*)cf_def.key_alias.c_str();
+  else
+    *name= NULL;
+}
+
+
 /////////////////////////////////////////////////////////////////////////////
 // Data writes
 /////////////////////////////////////////////////////////////////////////////
@@ -269,8 +288,7 @@
   ColumnParent cparent;
   cparent.column_family= column_family;
 
-  std::string rowkey_str;
-  rowkey_str.assign(key, key_len);
+  rowkey.assign(key, key_len);
 
   SlicePredicate slice_pred;
   SliceRange sr;
@@ -279,7 +297,7 @@
   slice_pred.__set_slice_range(sr);
 
   try {
-    cass->get_slice(column_data_vec, rowkey_str, cparent, slice_pred, 
+    cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, 
                     cur_consistency_level);
 
     if (column_data_vec.size() == 0)
@@ -333,6 +351,15 @@
 }
 
 
+/* Return the rowkey for the record that was read */
+
+void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
+{
+  *value= (char*)rowkey.c_str();
+  *value_len= rowkey.length();
+}
+
+
 bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as parameters
 {
   bool res= true;
@@ -375,6 +402,7 @@
     return true;
   
   column_data_vec= key_slice_it->columns;
+  rowkey= key_slice_it->key;
   column_data_it= column_data_vec.begin();
   key_slice_it++;
   return false;

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-08-18 17:29:31 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-08-19 08:50:53 +0000
@@ -25,6 +25,7 @@
   virtual void first_ddl_column()=0;
   virtual bool next_ddl_column(char **name, int *name_len, char **value, 
                                int *value_len)=0;
+  virtual void get_rowkey_type(char **name, char **type)=0;
 
   /* Writes */
   virtual void start_prepare_insert(const char *key, int key_len)=0;
@@ -35,6 +36,7 @@
   /* Reads */
   virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
   virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
+  virtual void get_read_rowkey(char **value, int *value_len)=0;
 
   /* Reads, multi-row scans */
   virtual bool get_range_slices()=0;

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-08-18 17:29:31 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-08-19 08:50:53 +0000
@@ -212,7 +212,7 @@
 
 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-   se(NULL), field_converters(NULL)
+   se(NULL), field_converters(NULL),rowkey_converter(NULL)
 {}
 
 
@@ -251,7 +251,6 @@
 
   if (setup_field_converters(table->field, table->s->fields))
   {
-    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
 
@@ -341,8 +340,12 @@
 */
 #endif
   DBUG_ASSERT(!se);
-  if (!options->host || !options->keyspace || !options->column_family)
+  if (!options->host  || !options->keyspace || !options->column_family)
+  {
+    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), 
+             "thrift_host, keyspace, and column_family table options must be specified");
     DBUG_RETURN(HA_WRONG_CREATE_OPTION);
+  }
   se= get_cassandra_se();
   se->set_column_family(options->column_family);
   if (se->connect(options->host, options->keyspace))
@@ -515,6 +518,7 @@
 
     case MYSQL_TYPE_VAR_STRING:
     case MYSQL_TYPE_VARCHAR:
+    //case MYSQL_TYPE_STRING:  <-- todo: should we allow end-padded 'CHAR(N)'?
       if (!strcmp(validator_name, validator_blob) ||
           !strcmp(validator_name, validator_ascii) ||
           !strcmp(validator_name, validator_text))
@@ -560,7 +564,12 @@
         n_mapped++;
         ColumnDataConverter **conv= field_converters + (*field)->field_index;
         if (!(*conv= map_field_to_validator(*field, col_type)))
+        {
+          se->print_error("Failed to map column %s to datatype %s", 
+                          (*field)->field_name, col_type);
+          my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
           return true;
+        }
         (*conv)->field= *field;
       }
     }
@@ -568,6 +577,28 @@
 
   if (n_mapped != n_fields - 1)
     return true;
+  
+  /* 
+    Setup type conversion for row_key. It may also have a name, but we ignore
+    it currently
+  */
+  se->get_rowkey_type(&col_name, &col_type);
+  if (col_type != NULL)
+  {
+    if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
+    {
+      se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
+      my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+      return true;
+    }
+    rowkey_converter->field= *field_arg;
+  }
+  else
+  {
+    se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+    return true;
+  }
 
   return false;
 }
@@ -575,6 +606,9 @@
 
 void ha_cassandra::free_field_converters()
 {
+  delete rowkey_converter;
+  rowkey_converter= NULL;
+
   if (field_converters)
   {
     for (uint i=0; i < n_field_converters; i++)
@@ -588,8 +622,8 @@
 void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
 
 int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
-                               key_part_map keypart_map,
-                               enum ha_rkey_function find_flag)
+                                 key_part_map keypart_map,
+                                 enum ha_rkey_function find_flag)
 {
   int rc;
   DBUG_ENTER("ha_cassandra::index_read_map");
@@ -597,19 +631,26 @@
   if (find_flag != HA_READ_KEY_EXACT)
     DBUG_RETURN(HA_ERR_WRONG_COMMAND);
 
-  // todo: decode the search key.
   uint key_len= calculate_key_len(table, active_index, key, keypart_map);
   store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
-
+#if 0  
   char buff[256]; 
   String tmp(buff,sizeof(buff), &my_charset_bin);
   tmp.length(0);
   String *str;
   str= table->field[0]->val_str(&tmp);
-  
+#endif
+
+  char *cass_key;
+  int cass_key_len;
+  rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
+
   bool found;
-  if (se->get_slice((char*)str->ptr(), str->length(), &found))
+  if (se->get_slice(cass_key, cass_key_len, &found))
+  {
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
     rc= HA_ERR_INTERNAL_ERROR;
+  }
   
   /* TODO: what if we're not reading all columns?? */
   if (!found)
@@ -618,14 +659,14 @@
   }
   else
   {
-    read_cassandra_columns();
+    read_cassandra_columns(false);
   }
 
   DBUG_RETURN(rc);
 }
 
 
-void ha_cassandra::read_cassandra_columns()
+void ha_cassandra::read_cassandra_columns(bool unpack_pk)
 {
   char *cass_name;
   char *cass_value;
@@ -659,6 +700,15 @@
       }
     }
   }
+  
+  if (unpack_pk)
+  {
+    /* Unpack rowkey to primary key */
+    field= table->field;
+    (*field)->set_notnull();
+    se->get_read_rowkey(&cass_value, &cass_value_len);
+    rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len);
+  }
 
   dbug_tmp_restore_column_map(table->write_set, old_map);
 }
@@ -667,12 +717,13 @@
 int ha_cassandra::write_row(uchar *buf)
 {
   my_bitmap_map *old_map;
-  char buff[512]; 
+//  char buff[512]; 
   DBUG_ENTER("ha_cassandra::write_row");
   
   old_map= dbug_tmp_use_all_columns(table, table->read_set);
   
   /* Convert the key (todo: unify with the rest of the processing) */
+#if 0  
   {
     Field *pk_col= table->field[0];
     String tmp(buff,sizeof(buff), &my_charset_bin);
@@ -682,6 +733,11 @@
 
     se->start_prepare_insert(str->ptr(), str->length());
   }
+#endif
+  char *cass_key;
+  int cass_key_len;
+  rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
+  se->start_prepare_insert(cass_key, cass_key_len);
 
   /* Convert other fields */
   for (uint i= 1; i < table->s->fields; i++)
@@ -697,6 +753,9 @@
   
   bool res= se->do_insert();
 
+  if (res)
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+
   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
 }
 
@@ -713,6 +772,8 @@
     se->add_read_column(table->field[i]->field_name);
 
   bres= se->get_range_slices();
+  if (bres)
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
 
   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
 }
@@ -739,7 +800,7 @@
   }
   else
   {
-    read_cassandra_columns();
+    read_cassandra_columns(true);
     rc= 0;
   }
 
@@ -753,11 +814,22 @@
   DBUG_ENTER("ha_cassandra::delete_all_rows");
 
   bres= se->truncate();
+  
+  if (bres)
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
 
   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
 }
 
 
+int ha_cassandra::delete_row(const uchar *buf)
+{
+  DBUG_ENTER("ha_cassandra::delete_row");
+  // todo: delete the row we've just read.
+  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+}
+
+
 /////////////////////////////////////////////////////////////////////////////
 // Dummy implementations start
 /////////////////////////////////////////////////////////////////////////////
@@ -815,7 +887,8 @@
                                      key_range *max_key)
 {
   DBUG_ENTER("ha_cassandra::records_in_range");
-  DBUG_RETURN(10);                         // low number to force index usage
+  //DBUG_RETURN(10);                         // low number to force index usage
+  DBUG_RETURN(HA_POS_ERROR);
 }
 
 
@@ -866,13 +939,6 @@
 }
 
 
-int ha_cassandra::delete_row(const uchar *buf)
-{
-  DBUG_ENTER("ha_cassandra::delete_row");
-  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
-}
-
-
 /**
   check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
   if new and old definition are compatible

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-08-18 17:21:50 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-08-19 08:50:53 +0000
@@ -38,10 +38,13 @@
 
   ColumnDataConverter **field_converters;
   uint n_field_converters;
+
+  ColumnDataConverter *rowkey_converter;
+
   bool setup_field_converters(Field **field, uint n_fields);
   void free_field_converters();
   
-  void read_cassandra_columns();
+  void read_cassandra_columns(bool unpack_pk);
 public:
   ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_cassandra()



More information about the commits mailing list