[Commits] Rev 3517: Cassandra SE in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Mon Sep 24 18:15:13 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3517
revision-id: psergey at askmonty.org-20120924151512-g9t6nh7pc816iib2
parent: psergey at askmonty.org-20120922193029-vfubttsblnmj8wz5
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Mon 2012-09-24 19:15:12 +0400
message:
  Cassandra SE
  - Add support for Cassandra's 'varint' datatype, mappable to VARBINARY.
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-09-22 19:30:29 +0000
+++ b/mysql-test/r/cassandra.result	2012-09-24 15:15:12 +0000
@@ -326,3 +326,20 @@
 set cassandra_write_consistency='TWO';
 set cassandra_write_consistency='THREE';
 set cassandra_write_consistency=@tmp;
+#
+# varint datatype support
+#
+CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
+select rowkey, hex(varint_col) from t2;
+rowkey	hex(varint_col)
+val-01	01
+val-0x123456	123456
+val-0x12345678	12345678
+drop table t2;
+# now, let's check what happens when MariaDB's column is not wide enough:
+CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
+select rowkey, hex(varint_col) from t2;
+ERROR HY000: Internal error: 'Unable to convert value of field `varint_col` from cassandra's data  format. Source has 4 bytes, data: 12345678'
+drop table t2;

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-09-22 19:30:29 +0000
+++ b/mysql-test/t/cassandra.test	2012-09-24 15:15:12 +0000
@@ -68,6 +68,11 @@
 update cf8 set countercol=countercol+1 where rowkey='cnt1';
 update cf8 set countercol=countercol+100 where rowkey='cnt2';
 
+create columnfamily cf9 (rowkey varchar primary key, varint_col varint);
+insert into cf9 (rowkey, varint_col) values ('val-01', 1);
+insert into cf9 (rowkey, varint_col) values ('val-0x123456', 1193046);
+insert into cf9 (rowkey, varint_col) values ('val-0x12345678', 305419896);
+
 EOF
 --error 0,1,2
 --system cqlsh -3 -f $MYSQLTEST_VARDIR/cassandra_test_init.cql
@@ -413,6 +418,25 @@
 
 set cassandra_write_consistency=@tmp;
 
+--echo #
+--echo # varint datatype support
+--echo #
+# create columnfamily cf9 (rowkey varchar primary key, varint_col varint);
+CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
+--sorted_result
+select rowkey, hex(varint_col) from t2;
+drop table t2;
+
+--echo # now, let's check what happens when MariaDB's column is not wide enough:
+CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
+--sorted_result
+--error ER_INTERNAL_ERROR
+select rowkey, hex(varint_col) from t2;
+drop table t2;
+
+
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-09-22 19:30:29 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-09-24 15:15:12 +0000
@@ -531,7 +531,7 @@
   Field *field;
 
   /* This will save Cassandra's data in the Field */
-  virtual void cassandra_to_mariadb(const char *cass_data, 
+  virtual int cassandra_to_mariadb(const char *cass_data, 
                                     int cass_data_len)=0;
 
   /*
@@ -552,11 +552,12 @@
 {
   double buf;
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
     DBUG_ASSERT(cass_data_len == sizeof(double));
     double *pdata= (double*) cass_data;
     field->store(*pdata);
+    return 0;
   }
   
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -574,11 +575,12 @@
 {
   float buf;
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
     DBUG_ASSERT(cass_data_len == sizeof(float));
     float *pdata= (float*) cass_data;
     field->store(*pdata);
+    return 0;
   }
   
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -608,7 +610,7 @@
   longlong buf;
   bool flip; /* is false when reading counter columns */
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
     longlong tmp;
     DBUG_ASSERT(cass_data_len == sizeof(longlong));
@@ -617,6 +619,7 @@
     else
       memcpy(&tmp, cass_data, sizeof(longlong));
     field->store(tmp);
+    return 0;
   }
   
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -647,10 +650,11 @@
 {
   char buf;
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
     DBUG_ASSERT(cass_data_len == 1);
     field->store(cass_data[0]);
+    return 0;
   }
 
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -668,12 +672,13 @@
 {
   int32_t buf;
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
     int32_t tmp;
     DBUG_ASSERT(cass_data_len == sizeof(int32_t));
     flip32(cass_data, (char*)&tmp);
     field->store(tmp);
+    return 0;
   }
   
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -691,10 +696,14 @@
 class StringCopyConverter : public ColumnDataConverter
 {
   String buf;
+  size_t max_length;
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
+    if ((size_t)cass_data_len > max_length)
+      return 1;
     field->store(cass_data, cass_data_len,field->charset());
+    return 0;
   }
   
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -704,6 +713,7 @@
     *cass_data_len= pstr->length();
     return false;
   }
+  StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
   ~StringCopyConverter(){}
 };
 
@@ -712,7 +722,7 @@
 {
   int64_t buf;
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
     /* Cassandra data is milliseconds-since-epoch in network byte order */
     int64_t tmp;
@@ -724,6 +734,7 @@
       - microsecond fraction of a second.
     */
     ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
+    return 0;
   }
 
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -768,7 +779,7 @@
   char buf[16]; /* Binary UUID representation */
   String str_buf;
 public:
-  void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
   {
     DBUG_ASSERT(cass_data_len==16);
     char str[37];
@@ -783,6 +794,7 @@
     }
     *ptr= 0;
     field->store(str, 36,field->charset());
+    return 0;
   }
 
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -836,6 +848,11 @@
 
 const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
 
+/*
+  VARINTs are stored as little-endian big numbers.
+*/
+const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
+
 
 ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
 {
@@ -885,14 +902,20 @@
       /* fall through: */
     case MYSQL_TYPE_VARCHAR:
     case MYSQL_TYPE_VAR_STRING:
+    {
+      bool is_varint;
       if (!strcmp(validator_name, validator_blob) ||
           !strcmp(validator_name, validator_ascii) ||
-          !strcmp(validator_name, validator_text))
+          !strcmp(validator_name, validator_text) ||
+          (is_varint= !strcmp(validator_name, validator_varint)))
       {
-        res= new StringCopyConverter;
+        size_t max_size= (size_t)-1;
+        if (is_varint)
+          max_size= field->field_length;
+        res= new StringCopyConverter(max_size);
       }
       break;
-
+    }
     case MYSQL_TYPE_LONG:
       if (!strcmp(validator_name, validator_int))
         res= new Int32DataConverter;
@@ -1041,24 +1064,43 @@
   
   /* TODO: what if we're not reading all columns?? */
   if (!found)
-  {
     rc= HA_ERR_KEY_NOT_FOUND;
-  }
   else
+    rc= read_cassandra_columns(false);
+
+  DBUG_RETURN(rc);
+}
+
+
+void ha_cassandra::print_conversion_error(const char *field_name, 
+                                          char *cass_value,
+                                          int cass_value_len)
+{
+  char buf[32];
+  char *p= cass_value;
+  size_t i= 0;
+  for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
   {
-    read_cassandra_columns(false);
+    buf[i++]= map2number[(*p >> 4) & 0xF];
+    buf[i++]= map2number[*p & 0xF];
   }
+  buf[i]=0;
 
-  DBUG_RETURN(rc);
+  se->print_error("Unable to convert value for field `%s` from Cassandra's data"
+                  " format. Source data is %d bytes, 0x%s%s",
+                  field_name, cass_value_len, buf, 
+                  (i == sizeof(buf) - 1)? "..." : "");
+  my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
 }
 
 
-void ha_cassandra::read_cassandra_columns(bool unpack_pk)
+int ha_cassandra::read_cassandra_columns(bool unpack_pk)
 {
   char *cass_name;
   char *cass_value;
   int cass_value_len;
   Field **field;
+  int res= 0;
   
   /* 
     cassandra_to_mariadb() calls will use field->store(...) methods, which
@@ -1082,7 +1124,14 @@
       {
         int fieldnr= (*field)->field_index;
         (*field)->set_notnull();
-        field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
+        if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
+                                                            cass_value_len))
+        {
+          print_conversion_error((*field)->field_name, cass_value, 
+                                 cass_value_len);
+          res=1;
+          goto err;
+        }
         break;
       }
     }
@@ -1094,10 +1143,17 @@
     field= table->field;
     (*field)->set_notnull();
     se->get_read_rowkey(&cass_value, &cass_value_len);
-    rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len);
+    if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
+    {
+      print_conversion_error((*field)->field_name, cass_value, cass_value_len);
+      res=1;
+      goto err;
+    }
   }
 
+err:
   dbug_tmp_restore_column_map(table->write_set, old_map);
+  return res;
 }
 
 
@@ -1234,10 +1290,7 @@
     if (reached_eof)
       rc= HA_ERR_END_OF_FILE;
     else
-    {
-      read_cassandra_columns(true);
-      rc= 0;
-    }
+      rc= read_cassandra_columns(true);
   }
 
   DBUG_RETURN(rc);
@@ -1422,8 +1475,7 @@
   {
     if (!se->get_next_multiget_row())
     {
-      read_cassandra_columns(true);
-      res= 0;
+      res= read_cassandra_columns(true);
       break;
     }
     else 

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-09-20 10:22:36 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-09-24 15:15:12 +0000
@@ -58,7 +58,7 @@
   bool setup_field_converters(Field **field, uint n_fields);
   void free_field_converters();
   
-  void read_cassandra_columns(bool unpack_pk);
+  int read_cassandra_columns(bool unpack_pk);
   int check_table_options(struct ha_table_option_struct* options);
 
   bool doing_insert_batch;
@@ -66,6 +66,8 @@
   
   /* Used to produce 'wrong column %s at row %lu' warnings */
   ha_rows insert_lineno;
+  void print_conversion_error(const char *field_name, 
+                              char *cass_value, int cass_value_len);
 public:
   ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_cassandra()



More information about the commits mailing list