[Commits] Rev 3491: Read records in batches when doing full table scan. in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Mon Aug 20 11:08:30 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3491
revision-id: psergey at askmonty.org-20120820080829-m5hixakcwpbd42mf
parent: psergey at askmonty.org-20120819105458-52zs1mjbrfqaji1h
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Mon 2012-08-20 12:08:29 +0400
message:
  Read records in batches when doing full table scan.
=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-08-19 09:21:23 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-08-20 08:08:29 +0000
@@ -68,6 +68,7 @@
   std::string rowkey; /* key of the record we're returning now */
 
   SlicePredicate slice_pred;
+  bool get_slices_returned_less;
 public:
   Cassandra_se_impl() : cass(NULL) {}
   virtual ~Cassandra_se_impl(){ delete cass; }
@@ -92,9 +93,9 @@
   void get_read_rowkey(char **value, int *value_len);
 
   /* Reads, multi-row scans */
-  bool get_range_slices();
+  bool get_range_slices(bool last_key_as_start_key);
   void finish_reading_range_slices();
-  bool get_next_range_slice_row();
+  bool get_next_range_slice_row(bool *eof);
 
   /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
   void clear_read_columns();
@@ -369,7 +370,7 @@
 }
 
 
-bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as parameters
+bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
 {
   bool res= true;
   
@@ -380,11 +381,16 @@
    // Try passing nothing...
 
   KeyRange key_range; // Try passing nothing, too.
-  key_range.__isset.start_key=true;
-  key_range.__isset.end_key=true;
-  key_range.start_key.assign("", 0);
+  key_range.__isset.start_key= true;
+  key_range.__isset.end_key= true;
+
+  if (last_key_as_start_key)
+    key_range.start_key= rowkey;
+  else
+    key_range.start_key.assign("", 0);
+
   key_range.end_key.assign("", 0);
-
+  key_range.count= read_batch_size;
   try {
   
     cass->get_range_slices(key_slice_vec,
@@ -392,6 +398,11 @@
                            cur_consistency_level);
     res= false;
 
+    if (key_slice_vec.size() < (uint)read_batch_size)
+      get_slices_returned_less= true;
+    else
+      get_slices_returned_less= false;
+
   } catch (InvalidRequestException ire) {
     print_error("%s [%s]", ire.what(), ire.why.c_str());
   } catch (UnavailableException ue) {
@@ -405,11 +416,32 @@
 }
 
 
-bool Cassandra_se_impl::get_next_range_slice_row()
+/* Switch to next row. This may produce an error */
+bool Cassandra_se_impl::get_next_range_slice_row(bool *eof)
 {
   if (key_slice_it == key_slice_vec.end())
-    return true;
-  
+  {
+    if (get_slices_returned_less)
+    {
+      *eof= true;
+      return false;
+    }
+
+    /*
+      We have read through all columns in this batch. Try getting the next
+      batch.
+    */
+    if (get_range_slices(true))
+      return true;
+
+    if (key_slice_vec.empty())
+    {
+      *eof= true;
+      return false;
+    }
+  }
+ 
+  *eof= false;
   column_data_vec= key_slice_it->columns;
   rowkey= key_slice_it->key;
   column_data_it= column_data_vec.begin();

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-08-19 09:21:23 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-08-20 08:08:29 +0000
@@ -39,9 +39,11 @@
   virtual void get_read_rowkey(char **value, int *value_len)=0;
 
   /* Reads, multi-row scans */
-  virtual bool get_range_slices()=0;
+  int read_batch_size;
+
+  virtual bool get_range_slices(bool last_key_as_start_key)=0;
   virtual void finish_reading_range_slices()=0;
-  virtual bool get_next_range_slice_row()=0;
+  virtual bool get_next_range_slice_row(bool *eof)=0;
   
   /* read_set setup */
   virtual void clear_read_columns()=0;

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-08-19 10:54:58 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-08-20 08:08:29 +0000
@@ -212,7 +212,8 @@
 
 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-   se(NULL), field_converters(NULL),rowkey_converter(NULL)
+   se(NULL), field_converters(NULL), rowkey_converter(NULL),
+   rnd_batch_size(10*1000)
 {}
 
 
@@ -760,7 +761,8 @@
   for (uint i= 1; i < table->s->fields; i++)
     se->add_read_column(table->field[i]->field_name);
 
-  bres= se->get_range_slices();
+  se->read_batch_size= rnd_batch_size;
+  bres= se->get_range_slices(false);
   if (bres)
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
 
@@ -780,17 +782,23 @@
 int ha_cassandra::rnd_next(uchar *buf)
 {
   int rc;
+  bool reached_eof;
   DBUG_ENTER("ha_cassandra::rnd_next");
 
   // Unpack and return the next record.
-  if (se->get_next_range_slice_row())
+  if (se->get_next_range_slice_row(&reached_eof))
   {
-    rc= HA_ERR_END_OF_FILE;
+    rc= HA_ERR_INTERNAL_ERROR;
   }
   else
   {
-    read_cassandra_columns(true);
-    rc= 0;
+    if (reached_eof)
+      rc= HA_ERR_END_OF_FILE;
+    else
+    {
+      read_cassandra_columns(true);
+      rc= 0;
+    }
   }
 
   DBUG_RETURN(rc);

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-08-19 08:50:53 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-08-20 08:08:29 +0000
@@ -45,6 +45,8 @@
   void free_field_converters();
   
   void read_cassandra_columns(bool unpack_pk);
+
+  ha_rows rnd_batch_size;
 public:
   ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_cassandra()



More information about the commits mailing list