[Commits] Rev 3516: Cassandra SE: make consistency settings user-settable. in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Sat Sep 22 22:30:30 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3516
revision-id: psergey at askmonty.org-20120922193029-vfubttsblnmj8wz5
parent: psergey at askmonty.org-20120920102236-1wb6p3dfkui79yux
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Sat 2012-09-22 23:30:29 +0400
message:
  Cassandra SE: make consistency settings user-settable.
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-09-20 10:22:36 +0000
+++ b/mysql-test/r/cassandra.result	2012-09-22 19:30:29 +0000
@@ -295,6 +295,7 @@
 show variables like 'cassandra_default_thrift_host';
 Variable_name	Value
 cassandra_default_thrift_host	
+set @tmp=@@cassandra_default_thrift_host;
 set cassandra_default_thrift_host='localhost';
 ERROR HY000: Variable 'cassandra_default_thrift_host' is a GLOBAL variable and should be set with SET GLOBAL
 set global cassandra_default_thrift_host='localhost';
@@ -306,3 +307,22 @@
 cnt1	1
 cnt2	100
 drop table t2;
+set global cassandra_default_thrift_host=@tmp;
+#
+# Consistency settings
+#
+show variables like 'cassandra_%consistency';
+Variable_name	Value
+cassandra_read_consistency	ONE
+cassandra_write_consistency	ONE
+set @tmp=@@cassandra_write_consistency;
+# Unfortunately, there is no easy way to check if setting have the effect..
+set cassandra_write_consistency='ONE';
+set cassandra_write_consistency='QUORUM';
+set cassandra_write_consistency='LOCAL_QUORUM';
+set cassandra_write_consistency='EACH_QUORUM';
+set cassandra_write_consistency='ALL';
+set cassandra_write_consistency='ANY';
+set cassandra_write_consistency='TWO';
+set cassandra_write_consistency='THREE';
+set cassandra_write_consistency=@tmp;

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-09-20 10:22:36 +0000
+++ b/mysql-test/t/cassandra.test	2012-09-22 19:30:29 +0000
@@ -382,6 +382,7 @@
 --echo # Check that @@cassandra_default_thrift_host works
 --echo #
 show variables like 'cassandra_default_thrift_host';
+set @tmp=@@cassandra_default_thrift_host;
 --error ER_GLOBAL_VARIABLE
 set cassandra_default_thrift_host='localhost';
 set global cassandra_default_thrift_host='localhost';
@@ -392,6 +393,26 @@
 select * from t2;
 drop table t2;
 
+set global cassandra_default_thrift_host=@tmp;
+
+--echo #
+--echo # Consistency settings
+--echo #
+show variables like 'cassandra_%consistency';
+set @tmp=@@cassandra_write_consistency;
+
+--echo # Unfortunately, there is no easy way to check if setting have the effect..
+set cassandra_write_consistency='ONE';
+set cassandra_write_consistency='QUORUM';
+set cassandra_write_consistency='LOCAL_QUORUM';
+set cassandra_write_consistency='EACH_QUORUM';
+set cassandra_write_consistency='ALL';
+set cassandra_write_consistency='ANY';
+set cassandra_write_consistency='TWO';
+set cassandra_write_consistency='THREE';
+
+set cassandra_write_consistency=@tmp;
+
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-09-16 08:22:21 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-09-22 19:30:29 +0000
@@ -23,6 +23,7 @@
 using namespace apache::thrift::protocol;
 using namespace org::apache::cassandra;
 
+
 void Cassandra_se_interface::print_error(const char *format, ...)
 {
   va_list ap;
@@ -38,10 +39,13 @@
 class Cassandra_se_impl: public Cassandra_se_interface
 {
   CassandraClient *cass; /* Connection to cassandra */
-  ConsistencyLevel::type cur_consistency_level;
 
   std::string column_family;
   std::string keyspace;
+
+  ConsistencyLevel::type write_consistency;
+  ConsistencyLevel::type read_consistency;
+
   
   /* DDL data */
   KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
@@ -69,7 +73,9 @@
   SlicePredicate slice_pred;
   bool get_slices_returned_less;
 public:
-  Cassandra_se_impl() : cass(NULL) {}
+  Cassandra_se_impl() : cass(NULL), 
+                        write_consistency(ConsistencyLevel::ONE),
+                        read_consistency(ConsistencyLevel::ONE) {}
   virtual ~Cassandra_se_impl(){ delete cass; }
   
   /* Connection and DDL checks */
@@ -81,6 +87,9 @@
   bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
   void get_rowkey_type(char **name, char **type);
 
+  /* Settings */
+  void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
+
   /* Writes */
   void clear_insert_buffer();
   void start_row_insert(const char *key, int key_len);
@@ -166,14 +175,20 @@
     print_error("Unknown exception");
   }
 
-  cur_consistency_level= ConsistencyLevel::ONE;
-
   if (!res && setup_ddl_checks())
     res= true;
   return res;
 }
 
 
+void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level, 
+                                               ulong write_cons_level)
+{
+  write_cons_level= (ConsistencyLevel::type)(write_cons_level + 1);
+  read_cons_level=  (ConsistencyLevel::type)(read_cons_level + 1);
+}
+
+
 bool Cassandra_se_impl::setup_ddl_checks()
 {
   try {
@@ -308,7 +323,7 @@
 
   try {
     
-    cass->batch_mutate(batch_mutation, cur_consistency_level);
+    cass->batch_mutate(batch_mutation, write_consistency);
 
     cassandra_counters.row_inserts+= batch_mutation.size();
     cassandra_counters.row_insert_batches++;
@@ -356,7 +371,7 @@
 
   try {
     cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, 
-                    cur_consistency_level);
+                    read_consistency);
 
     if (column_data_vec.size() == 0)
     {
@@ -471,7 +486,7 @@
   
     cass->get_range_slices(key_slice_vec,
                            cparent, slice_pred, key_range, 
-                           cur_consistency_level);
+                           read_consistency);
     res= false;
 
     if (key_slice_vec.size() < (uint)read_batch_size)
@@ -589,7 +604,7 @@
 
   try {
     
-    cass->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level);
+    cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
     res= false;
 
   } catch (InvalidRequestException ire) {
@@ -643,7 +658,7 @@
     cassandra_counters.multiget_keys_scanned += mrr_keys.size();
 
     cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, 
-                         cur_consistency_level);
+                         read_consistency);
 
     cassandra_counters.multiget_rows_read += mrr_result.size();
     

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-09-16 08:22:21 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-09-22 19:30:29 +0000
@@ -7,6 +7,19 @@
 */
 
 
+/* We need to define this here so that ha_cassandra.cc also has access to it */
+typedef enum
+{
+  ONE = 1-1,
+  QUORUM = 2-1,
+  LOCAL_QUORUM = 3-1,
+  EACH_QUORUM = 4-1,
+  ALL = 5-1,
+  ANY = 6-1,
+  TWO = 7-1,
+  THREE = 8-1,
+} enum_cassandra_consistency_level;
+
 /*
   Interface to one cassandra column family, i.e. one 'table'
 */
@@ -19,6 +32,9 @@
   /* Init */
   virtual bool connect(const char *host, int port, const char *keyspace)=0;
   virtual void set_column_family(const char *cfname) = 0;
+
+  /* Settings */
+  virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0;
    
   /* Check underlying DDL */
   virtual bool setup_ddl_checks()=0;

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-09-20 10:22:36 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-09-22 19:30:29 +0000
@@ -82,6 +82,35 @@
   "Number of rows in an rnd_read (full scan) batch",
   NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
 
+/* These match values in enum_cassandra_consistency_level */
+const char *cassandra_consistency_level[] = 
+{
+  "ONE",
+  "QUORUM",
+  "LOCAL_QUORUM",
+  "EACH_QUORUM",
+  "ALL",
+  "ANY",
+  "TWO",
+  "THREE",
+   NullS
+};
+
+TYPELIB cassandra_consistency_level_typelib= {
+  array_elements(cassandra_consistency_level) - 1, "",
+  cassandra_consistency_level, NULL
+};
+
+
+static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
+  "Cassandra consistency level to use for write operations", NULL, NULL,
+  ONE, &cassandra_consistency_level_typelib);
+
+static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
+  "Cassandra consistency level to use for read operations", NULL, NULL,
+  ONE, &cassandra_consistency_level_typelib);
+
+
 mysql_mutex_t cassandra_default_host_lock;
 static char* cassandra_default_thrift_host = NULL;
 static char cassandra_default_host_buf[256]="";
@@ -130,6 +159,8 @@
   MYSQL_SYSVAR(rnd_batch_size),
 
   MYSQL_SYSVAR(default_thrift_host),
+  MYSQL_SYSVAR(write_consistency),
+  MYSQL_SYSVAR(read_consistency),
   NULL
 };
 
@@ -1297,6 +1328,11 @@
 {
   doing_insert_batch= false;
   insert_lineno= 0;
+  if (se)
+  {
+    se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
+                               THDVAR(table->in_use, write_consistency));
+  }
   return 0;
 }
 



More information about the commits mailing list