[Commits] Rev 3503: Cassandra SE: fix batched insert to flush its buffers after insert operation. in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Wed Aug 29 19:27:12 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3503
revision-id: psergey at askmonty.org-20120829162711-cdqfvic3os2rkd0o
parent: psergey at askmonty.org-20120829071404-zlqj9qm5co7tc22b
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Wed 2012-08-29 20:27:11 +0400
message:
  Cassandra SE: fix batched insert to flush its buffers after insert operation.
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-08-29 07:05:46 +0000
+++ b/mysql-test/r/cassandra.result	2012-08-29 16:27:11 +0000
@@ -210,4 +210,19 @@
 rowkey	datecol
 1	2012-08-29 01:23:45
 delete from t2;
+#
+# (no MDEV#) Check that insert counters work correctly
+#
+create table t0 (a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+set cassandra_insert_batch_size=10;
+insert into t2 select A.a+10*B.a, now() from t0 A, t0 B;
+inserts	insert_batches
+100	10
+set cassandra_insert_batch_size=1;
+insert into t2 select A.a+10*B.a+100, now() from t0 A, t0 B;
+inserts	insert_batches
+100	100
+delete from t2;
 drop table t2;
+drop table t0;

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-08-29 07:05:46 +0000
+++ b/mysql-test/t/cassandra.test	2012-08-29 16:27:11 +0000
@@ -232,7 +232,51 @@
 select * from t2;
 delete from t2;
 
+--echo #
+--echo # (no MDEV#) Check that insert counters work correctly
+--echo #
+create table t0 (a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+let $start_inserts=`select variable_value from information_schema.SESSION_STATUS 
+                   where variable_name ='Cassandra_row_inserts'`;
+let $start_insert_batches=`select variable_value from information_schema.SESSION_STATUS 
+                          where variable_name ='Cassandra_row_insert_batches'`;
+
+set cassandra_insert_batch_size=10;
+insert into t2 select A.a+10*B.a, now() from t0 A, t0 B;
+
+--disable_query_log
+eval select
+   (select variable_value - $start_inserts from information_schema.SESSION_STATUS 
+     where variable_name ='Cassandra_row_inserts') 
+   AS 'inserts',
+   (select variable_value - $start_insert_batches from information_schema.SESSION_STATUS 
+     where variable_name ='Cassandra_row_insert_batches') 
+   AS 'insert_batches';
+--enable_query_log
+
+let $start_inserts=`select variable_value from information_schema.SESSION_STATUS 
+                   where variable_name ='Cassandra_row_inserts'`;
+let $start_insert_batches=`select variable_value from information_schema.SESSION_STATUS 
+                          where variable_name ='Cassandra_row_insert_batches'`;
+
+set cassandra_insert_batch_size=1;
+insert into t2 select A.a+10*B.a+100, now() from t0 A, t0 B;
+
+--disable_query_log
+eval select
+   (select variable_value - $start_inserts from information_schema.SESSION_STATUS 
+     where variable_name ='Cassandra_row_inserts') 
+   AS 'inserts',
+   (select variable_value - $start_insert_batches from information_schema.SESSION_STATUS 
+     where variable_name ='Cassandra_row_insert_batches') 
+   AS 'insert_batches';
+--enable_query_log
+
+delete from t2;
 drop table t2;
+drop table t0;
+
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-08-28 16:22:45 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-08-29 16:27:11 +0000
@@ -291,6 +291,15 @@
 bool Cassandra_se_impl::do_insert()
 {
   bool res= true;
+  
+  /*
+    zero-size mutations are allowed by Cassandra's batch_mutate but lets not 
+    do them (we may attempt to do it if there is a bulk insert that stores
+    exactly @@cassandra_insert_batch_size*n elements.
+  */
+  if (batch_mutation.empty())
+    return false;
+
   try {
     
     cass->batch_mutate(batch_mutation, cur_consistency_level);
@@ -298,6 +307,7 @@
     cassandra_counters.row_inserts+= batch_mutation.size();
     cassandra_counters.row_insert_batches++;
 
+    clear_insert_buffer();
     res= false;
 
   } catch (InvalidRequestException ire) {
@@ -623,3 +633,4 @@
   return false;
 }
 
+



More information about the commits mailing list