[Commits] Rev 3518: Cassandra SE, multiple connections in file:///data0/psergey/dev2/5.5-cassandra-r02/

Sergey Petrunya psergey at askmonty.org
Fri Sep 21 16:27:05 EEST 2012


At file:///data0/psergey/dev2/5.5-cassandra-r02/

------------------------------------------------------------
revno: 3518
revision-id: psergey at askmonty.org-20120921132705-d7h7hz04wtm8h1z2
parent: psergey at askmonty.org-20120920175224-mgeb9lxj0s89uudb
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: 5.5-cassandra-r02
timestamp: Fri 2012-09-21 17:27:05 +0400
message:
  Cassandra SE, multiple connections
  - Started to add error handling
=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-09-20 17:52:24 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-09-21 13:27:05 +0000
@@ -153,8 +153,9 @@
   void on_set_keyspace();
   void on_set_keyspace_fail();
   void on_describe_keyspace(org::apache::cassandra::KsDef ks_def_arg);
-  void on_batch_mutate_done();
-  void on_batch_mutate_fail();
+  typedef std::pair<Cassandra_se_impl*, int> Fail_func_param;
+  static void on_batch_mutate_fail(Fail_func_param param, const Cassandra_batch_mutate_result&);
+  void on_batch_mutate_done(int idx);
 
   uint connected;
   uint keyspace_set;
@@ -233,9 +234,10 @@
 
   keyspace.assign(keyspace_arg);
 
+  protocolFactory= boost::shared_ptr<protocol::TProtocolFactory>(new protocol::TBinaryProtocolFactory());
+  clients.resize(hosts_list.size());
+
   try {
-    protocolFactory= boost::shared_ptr<protocol::TProtocolFactory>(new protocol::TBinaryProtocolFactory());
-    clients.resize(hosts_list.size());
 
     for (uint i= 0; i < hosts_list.size(); i++)
     {
@@ -462,15 +464,36 @@
 }
 
 
-void Cassandra_se_impl::on_batch_mutate_done()
+void Cassandra_se_impl::on_batch_mutate_done(int idx)
 {
   batch_mutations_to_do--;
 }
 
-void Cassandra_se_impl::on_batch_mutate_fail()
+
+void 
+Cassandra_se_impl::on_batch_mutate_fail(Cassandra_se_impl::Fail_func_param param, 
+                                        const Cassandra_batch_mutate_result& res)
 {
-  batch_mutations_to_do--;
-  error_happened= true;
+  Cassandra_se_impl *obj= param.first;
+  int idx= param.second;
+  obj->batch_mutations_to_do--;
+  if (res.__isset.ire)
+  {
+    obj->print_error("%s [%s]", res.ire.what(), res.ire.why.c_str());
+  }
+  else if (res.__isset.ue)
+  {
+    obj->print_error("UnavailableException: %s", res.ue.what());
+  }
+  else if (res.__isset.te)
+  {
+    obj->print_error("TimedOutException: %s", res.te.what());
+  }
+  else
+    obj->print_error("Unknown error");
+
+  fprintf(stderr, "on_batch_mutate_fail obj %d reason %s\n", idx, obj->err_buffer);
+  obj->error_happened= true;
 }
 
 
@@ -491,6 +514,14 @@
   
   batch_mutations_to_do= 0;
   error_happened= false;
+
+  /*
+  fprintf(stderr, "batch_mutation: ");
+  for (uint j= 0; j < size; j++)
+    fprintf(stderr, " %d ", (int)batch_mutations[j].size());
+  fprintf(stderr, "\n");
+  */
+
   for (uint i= 0; i < size; i++)
   {
     /*
@@ -503,8 +534,10 @@
  
     batch_mutations_to_do++;
     async_clients[i]->batch_mutate(batch_mutations[i], cur_consistency_level).
-      setCallback(boost::bind(&Cassandra_se_impl::on_batch_mutate_done, this)).
-        setErrback(boost::bind(&Cassandra_se_impl::on_batch_mutate_fail, this));
+      setCallback(boost::bind(&Cassandra_se_impl::on_batch_mutate_done, this, i)).
+        setErrback(
+            boost::bind(&Cassandra_se_impl::on_batch_mutate_fail,  
+                        Cassandra_se_impl::Fail_func_param(this, i), _1));
 
     cassandra_counters.row_inserts+= batch_mutations[i].size();
     cassandra_counters.row_insert_batches++;



More information about the commits mailing list