[Commits] Rev 3520: Cassandra SE: Add capability to retry failed API calls in file:///data0/psergey/dev2/5.5-cassandra-r01/

Sergey Petrunya psergey at askmonty.org
Wed Sep 26 13:13:04 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r01/

------------------------------------------------------------
revno: 3520
revision-id: psergey at askmonty.org-20120926101303-7jtpgv0wmnp7b3lb
parent: psergey at askmonty.org-20120925122019-qbbrzjcimn3o5smc
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r01
timestamp: Wed 2012-09-26 14:13:03 +0400
message:
  Cassandra SE: Add capability to retry failed API calls
  - Add capability to retry calls that have failed with UnavailableException or
    [Cassandra's] TimedOutException. 
  - We don't retry for Thrift errors yet, although could easily do, now.
=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-09-22 19:30:29 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-09-26 10:13:03 +0000
@@ -45,6 +45,9 @@
 
   ConsistencyLevel::type write_consistency;
   ConsistencyLevel::type read_consistency;
+  
+  /* How many times to retry an operation before giving up */
+  int thrift_call_retries_to_do;
 
   
   /* DDL data */
@@ -72,10 +75,12 @@
 
   SlicePredicate slice_pred;
   bool get_slices_returned_less;
+  bool get_slice_found_rows;
 public:
   Cassandra_se_impl() : cass(NULL), 
                         write_consistency(ConsistencyLevel::ONE),
-                        read_consistency(ConsistencyLevel::ONE) {}
+                        read_consistency(ConsistencyLevel::ONE),
+                        thrift_call_retries_to_do(0) {}
   virtual ~Cassandra_se_impl(){ delete cass; }
   
   /* Connection and DDL checks */
@@ -94,6 +99,7 @@
   void clear_insert_buffer();
   void start_row_insert(const char *key, int key_len);
   void add_insert_column(const char *name, const char *value, int value_len);
+  
   bool do_insert();
 
   /* Reads, point lookups */
@@ -105,6 +111,8 @@
 private:
   bool have_rowkey_to_skip;
   std::string rowkey_to_skip;
+
+  bool get_range_slices_param_last_key_as_start_key;
 public:
   bool get_range_slices(bool last_key_as_start_key);
   void finish_reading_range_slices();
@@ -119,19 +127,30 @@
   int  add_lookup_key(const char *key, size_t key_len);
   bool multiget_slice();
 
-private:
-  std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */
-  std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
-  std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
-public:
   bool get_next_multiget_row();
 
   bool truncate();
+
   bool remove_row();
 
 private:
+  bool retryable_truncate();
+  bool retryable_do_insert();
+  bool retryable_remove_row();
+  bool retryable_setup_ddl_checks();
+  bool retryable_multiget_slice();
+  bool retryable_get_range_slices();
+  bool retryable_get_slice();
+
+  std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */
+  std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
+  std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
+
   /* Non-inherited utility functions: */
   int64_t get_i64_timestamp();
+  
+  typedef bool (Cassandra_se_impl::*retryable_func_t)();
+  bool try_operation(retryable_func_t func);
 };
 
 
@@ -189,34 +208,36 @@
 }
 
 
-bool Cassandra_se_impl::setup_ddl_checks()
+bool Cassandra_se_impl::retryable_setup_ddl_checks()
 {
   try {
+
     cass->describe_keyspace(ks_def, keyspace);
-
-    std::vector<CfDef>::iterator it;
-    for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
-    {
-      cf_def= *it;
-      if (!cf_def.name.compare(column_family))
-        return false;
-    }
-
-    print_error("describe_keyspace() didn't return our column family");
-
-  } catch (InvalidRequestException ire) {
-    print_error("%s [%s]", ire.what(), ire.why.c_str());
+    
   } catch (NotFoundException nfe) {
-    print_error("keyspace not found: %s", nfe.what());
-  }catch(TException e){
-    print_error("Thrift exception: %s", e.what());
-  } catch (...) {
-    print_error("Unknown exception");
-  }
-
+    print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what());
+    return true;
+  }
+
+  std::vector<CfDef>::iterator it;
+  for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
+  {
+    cf_def= *it;
+    if (!cf_def.name.compare(column_family))
+      return false;
+  }
+
+  print_error("Column family %s not found in keyspace %s",
+               column_family.c_str(),
+               keyspace.c_str());
   return true;
 }
 
+bool Cassandra_se_impl::setup_ddl_checks()
+{
+  return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks);
+}
+
 
 void Cassandra_se_impl::first_ddl_column()
 {
@@ -309,41 +330,29 @@
 }
 
 
+bool Cassandra_se_impl::retryable_do_insert()
+{
+  cass->batch_mutate(batch_mutation, write_consistency);
+
+  cassandra_counters.row_inserts+= batch_mutation.size();
+  cassandra_counters.row_insert_batches++;
+
+  clear_insert_buffer();
+  return 0;
+}
+
+
 bool Cassandra_se_impl::do_insert()
 {
-  bool res= true;
-  
   /*
-    zero-size mutations are allowed by Cassandra's batch_mutate but lets not 
+    zero-size mutations are allowed by Cassandra's batch_mutate but lets not
     do them (we may attempt to do it if there is a bulk insert that stores
     exactly @@cassandra_insert_batch_size*n elements.
   */
   if (batch_mutation.empty())
     return false;
-
-  try {
-    
-    cass->batch_mutate(batch_mutation, write_consistency);
-
-    cassandra_counters.row_inserts+= batch_mutation.size();
-    cassandra_counters.row_insert_batches++;
-
-    clear_insert_buffer();
-    res= false;
-
-  } catch (InvalidRequestException ire) {
-    print_error("%s [%s]", ire.what(), ire.why.c_str());
-  } catch (UnavailableException ue) {
-    print_error("UnavailableException: %s", ue.what());
-  } catch (TimedOutException te) {
-    print_error("TimedOutException: %s", te.what());
-  }catch(TException e){
-    print_error("Thrift exception: %s", e.what());
-  } catch (...) {
-    print_error("Unknown exception");
-  }
-
-  return res;
+ 
+  return try_operation(&Cassandra_se_impl::retryable_do_insert);
 }
 
 
@@ -358,47 +367,39 @@
 
 bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
 {
-  ColumnParent cparent;
-  cparent.column_family= column_family;
-
+  bool res;
   rowkey.assign(key, key_len);
 
+  if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice)))
+    *found= get_slice_found_rows;
+  return res;
+}
+
+
+bool Cassandra_se_impl::retryable_get_slice()
+{
+  ColumnParent cparent;
+  cparent.column_family= column_family;
+
   SlicePredicate slice_pred;
   SliceRange sr;
   sr.start = "";
   sr.finish = "";
   slice_pred.__set_slice_range(sr);
 
-  try {
-    cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, 
-                    read_consistency);
-
-    if (column_data_vec.size() == 0)
-    {
-      /*
-        No columns found. Cassandra doesn't allow records without any column =>
-        this means the seach key doesn't exist
-      */
-      *found= false;
-      return false;
-    }
-    *found= true;
-
-  } catch (InvalidRequestException ire) {
-    print_error("%s [%s]", ire.what(), ire.why.c_str());
-    return true;
-  } catch (UnavailableException ue) {
-    print_error("UnavailableException: %s", ue.what());
-    return true;
-  } catch (TimedOutException te) {
-    print_error("TimedOutException: %s", te.what());
-    return true;
-  }catch(TException e){
-    print_error("Thrift exception: %s", e.what());
-  } catch (...) {
-    print_error("Unknown exception");
-    return true;
+  cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, 
+                  read_consistency);
+
+  if (column_data_vec.size() == 0)
+  {
+    /*
+      No columns found. Cassandra doesn't allow records without any column =>
+      this means the seach key doesn't exist
+    */
+    get_slice_found_rows= false;
+    return false;
   }
+  get_slice_found_rows= true;
 
   column_data_it= column_data_vec.begin();
   return false;
@@ -456,7 +457,15 @@
 
 bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
 {
-  bool res= true;
+  get_range_slices_param_last_key_as_start_key= last_key_as_start_key;
+  
+  return try_operation(&Cassandra_se_impl::retryable_get_range_slices);
+}
+
+
+bool Cassandra_se_impl::retryable_get_range_slices()
+{
+  bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key;
   
   ColumnParent cparent;
   cparent.column_family= column_family;
@@ -482,32 +491,17 @@
 
   key_range.end_key.assign("", 0);
   key_range.count= read_batch_size;
-  try {
-  
-    cass->get_range_slices(key_slice_vec,
-                           cparent, slice_pred, key_range, 
-                           read_consistency);
-    res= false;
-
-    if (key_slice_vec.size() < (uint)read_batch_size)
-      get_slices_returned_less= true;
-    else
-      get_slices_returned_less= false;
-
-  } catch (InvalidRequestException ire) {
-    print_error("%s [%s]", ire.what(), ire.why.c_str());
-  } catch (UnavailableException ue) {
-    print_error("UnavailableException: %s", ue.what());
-  } catch (TimedOutException te) {
-    print_error("TimedOutException: %s", te.what());
-  }catch(TException e){
-    print_error("Thrift exception: %s", e.what());
-  } catch (...) {
-    print_error("Unknown exception");
-  }
+
+  cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range,
+                         read_consistency);
+
+  if (key_slice_vec.size() < (uint)read_batch_size)
+    get_slices_returned_less= true;
+  else
+    get_slices_returned_less= false;
 
   key_slice_it= key_slice_vec.begin();
-  return res;
+  return false;
 }
 
 
@@ -574,50 +568,78 @@
 
 bool Cassandra_se_impl::truncate()
 {
-  bool res= true;
-  try {
-    
-    cass->truncate(column_family);
-    res= false;
-
-  } catch (InvalidRequestException ire) {
-    print_error("%s [%s]", ire.what(), ire.why.c_str());
-  } catch (UnavailableException ue) {
-    print_error("UnavailableException: %s", ue.what());
-  } catch (TimedOutException te) {
-    print_error("TimedOutException: %s", te.what());
-  }catch(TException e){
-    print_error("Thrift exception: %s", e.what());
-  } catch (...) {
-    print_error("Unknown exception");
-  }
-
-  return res;
-}
+  return try_operation(&Cassandra_se_impl::retryable_truncate);
+}
+
+
+bool Cassandra_se_impl::retryable_truncate()
+{
+  cass->truncate(column_family);
+  return 0;
+}
+
 
 bool Cassandra_se_impl::remove_row()
 {
-  bool res= true;
-
+  return try_operation(&Cassandra_se_impl::retryable_remove_row);
+}
+
+
+bool Cassandra_se_impl::retryable_remove_row()
+{
   ColumnPath column_path;
   column_path.column_family= column_family;
-
-  try {
-    
-    cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
-    res= false;
-
-  } catch (InvalidRequestException ire) {
-    print_error("%s [%s]", ire.what(), ire.why.c_str());
-  } catch (UnavailableException ue) {
-    print_error("UnavailableException: %s", ue.what());
-  } catch (TimedOutException te) {
-    print_error("TimedOutException: %s", te.what());
-  }catch(TException e){
-    print_error("Thrift exception: %s", e.what());
-  } catch (...) {
-    print_error("Unknown exception");
-  }
+  cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
+  return 0;
+}
+
+/*
+  This function will try a Cassandra operation, and handle errors.
+
+*/
+bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
+{
+  bool res;
+  int n_retries= thrift_call_retries_to_do;
+
+  do
+  {
+    res= true;
+
+    try {
+      
+      if ((res= (this->*func_to_call)()))
+      {
+        /*
+          The function call was made successfully (without timeouts, etc),
+          but something inside it returned 'true'. 
+          This is supposedly a failure (or "not found" or other negative
+          result). We need to return this to the caller.
+        */
+        n_retries= 0; 
+      }
+
+    } catch (InvalidRequestException ire) {
+      n_retries= 0; /* there is no point in retrying this operation */
+      print_error("%s [%s]", ire.what(), ire.why.c_str());
+    } catch (UnavailableException ue) {
+      cassandra_counters.unavailable_exceptions++;
+      if (!--n_retries)
+        print_error("UnavailableException: %s", ue.what());
+    } catch (TimedOutException te) {
+      cassandra_counters.timeout_exceptions++;
+      if (!--n_retries)
+        print_error("TimedOutException: %s", te.what());
+    }catch(TException e){
+      /* todo: we may use retry for certain kinds of Thrift errors */
+      n_retries= 0; 
+      print_error("Thrift exception: %s", e.what());
+    } catch (...) {
+      n_retries= 0; /* Don't retry */
+      print_error("Unknown exception");
+    }
+
+  } while (res && n_retries > 0);
 
   return res;
 }
@@ -638,9 +660,14 @@
   return mrr_keys.size();
 }
 
-
 bool Cassandra_se_impl::multiget_slice()
 {
+  return try_operation(&Cassandra_se_impl::retryable_multiget_slice);
+}
+
+
+bool Cassandra_se_impl::retryable_multiget_slice()
+{
   ColumnParent cparent;
   cparent.column_family= column_family;
 
@@ -650,34 +677,15 @@
   sr.finish = "";
   slice_pred.__set_slice_range(sr);
 
-  bool res= true;
-
-  try {
-    
-    cassandra_counters.multiget_reads++;
-    cassandra_counters.multiget_keys_scanned += mrr_keys.size();
-
-    cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, 
-                         read_consistency);
-
-    cassandra_counters.multiget_rows_read += mrr_result.size();
-    
-    res= false;
-    mrr_result_it= mrr_result.begin();
-
-  } catch (InvalidRequestException ire) {
-    print_error("%s [%s]", ire.what(), ire.why.c_str());
-  } catch (UnavailableException ue) {
-    print_error("UnavailableException: %s", ue.what());
-  } catch (TimedOutException te) {
-    print_error("TimedOutException: %s", te.what());
-  }catch(TException e){
-    print_error("Thrift exception: %s", e.what());
-  } catch (...) {
-    print_error("Unknown exception");
-  }
-
-  return res;
+  cassandra_counters.multiget_reads++;
+  cassandra_counters.multiget_keys_scanned += mrr_keys.size();
+  cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
+                       read_consistency);
+
+  cassandra_counters.multiget_rows_read += mrr_result.size();
+  mrr_result_it= mrr_result.begin();
+
+  return false;
 }
 
 

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-09-22 19:30:29 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-09-26 10:13:03 +0000
@@ -91,6 +91,9 @@
   ulong multiget_reads;
   ulong multiget_keys_scanned;
   ulong multiget_rows_read;
+
+  ulong timeout_exceptions;
+  ulong unavailable_exceptions;
 };
 
 

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-09-25 12:20:19 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-09-26 10:13:03 +0000
@@ -82,6 +82,11 @@
   "Number of rows in an rnd_read (full scan) batch",
   NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
 
+static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
+  "Number of times to retry Cassandra calls that failed due to timeouts or "
+  "network communication problems. The default, 0, means not to retry.",
+  NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);
+
 /* These match values in enum_cassandra_consistency_level */
 const char *cassandra_consistency_level[] = 
 {
@@ -161,6 +166,7 @@
   MYSQL_SYSVAR(default_thrift_host),
   MYSQL_SYSVAR(write_consistency),
   MYSQL_SYSVAR(read_consistency),
+  MYSQL_SYSVAR(failure_retries),
   NULL
 };
 
@@ -177,6 +183,11 @@
     (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
   {"multiget_rows_read",
     (char*) &cassandra_counters.multiget_rows_read,  SHOW_LONG},
+
+  {"timeout_exceptions",
+    (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
+  {"unavailable_exceptions",
+    (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
   {NullS, NullS, SHOW_LONG}
 };
 
@@ -1678,7 +1689,6 @@
 
 static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
 {
-  //innodb_export_status();
   cassandra_counters_copy= cassandra_counters; 
 
   var->type= SHOW_ARRAY;



More information about the commits mailing list