[Commits] Rev 4588: LevelDB: bulk loading support. in file:///home/psergey/dev2/mysql-5.6-leveldb/

Sergey Petrunya psergey at askmonty.org
Fri Feb 1 19:06:39 EET 2013


At file:///home/psergey/dev2/mysql-5.6-leveldb/

------------------------------------------------------------
revno: 4588
revision-id: psergey at askmonty.org-20130201170639-xng9xmo6l7fhca8o
parent: psergey at askmonty.org-20130201153402-4i4k4cneqk2ntf6g
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: mysql-5.6-leveldb
timestamp: Fri 2013-02-01 21:06:39 +0400
message:
  LevelDB: bulk loading support.
=== modified file 'mysql-test/r/leveldb.result'
--- a/mysql-test/r/leveldb.result	2013-02-01 14:44:52 +0000
+++ b/mysql-test/r/leveldb.result	2013-02-01 17:06:39 +0000
@@ -800,7 +800,7 @@ drop table t44;
 #
 # ALTER TABLE tests
 #
-create table t45 (pk int primary key, col1 varchar(12));
+create table t45 (pk int primary key, col1 varchar(12)) engine=leveldb;
 insert into t45 values (1, 'row1');
 insert into t45 values (2, 'row2');
 alter table t45 rename t46;
@@ -811,3 +811,22 @@ pk	col1
 drop table t46;
 drop table t45;
 ERROR 42S02: Unknown table 'test.t45'
+#
+# Check Bulk loading
+#
+show variables like 'leveldb%';
+Variable_name	Value
+leveldb_bulk_load	OFF
+leveldb_bulk_load_size	1000
+leveldb_lock_wait_timeout	1
+create table t47 (pk int primary key, col1 varchar(12)) engine=leveldb;
+insert into t47 values (1, 'row1');
+insert into t47 values (2, 'row2');
+set leveldb_bulk_load=1;
+insert into t47 values (1, 'row1-NEW'),(2, 'row2-NEW');
+set leveldb_bulk_load=0;
+select * from t47;
+pk	col1
+1	row1-NEW
+2	row2-NEW
+drop table t47;

=== modified file 'mysql-test/t/leveldb.test'
--- a/mysql-test/t/leveldb.test	2013-02-01 14:44:52 +0000
+++ b/mysql-test/t/leveldb.test	2013-02-01 17:06:39 +0000
@@ -704,7 +704,7 @@ drop table t44;
 --echo #
 --echo # ALTER TABLE tests
 --echo #
-create table t45 (pk int primary key, col1 varchar(12));
+create table t45 (pk int primary key, col1 varchar(12)) engine=leveldb;
 insert into t45 values (1, 'row1');
 insert into t45 values (2, 'row2');
 alter table t45 rename t46;
@@ -713,3 +713,17 @@ drop table t46;
 --error ER_BAD_TABLE_ERROR
 drop table t45;
 
+
+--echo #
+--echo # Check Bulk loading
+--echo #
+show variables like 'leveldb%';
+create table t47 (pk int primary key, col1 varchar(12)) engine=leveldb;
+insert into t47 values (1, 'row1');
+insert into t47 values (2, 'row2');
+set leveldb_bulk_load=1;
+insert into t47 values (1, 'row1-NEW'),(2, 'row2-NEW');
+set leveldb_bulk_load=0;
+select * from t47;
+drop table t47;
+

=== modified file 'storage/leveldb/ha_leveldb.cc'
--- a/storage/leveldb/ha_leveldb.cc	2013-02-01 15:34:02 +0000
+++ b/storage/leveldb/ha_leveldb.cc	2013-02-01 17:06:39 +0000
@@ -64,9 +64,24 @@ static MYSQL_THDVAR_ULONG(lock_wait_time
   "Number of seconds to wait for lock",
   NULL, NULL, /*default*/ 1, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
 
+// #define MYSQL_THDVAR_BOOL(name, opt, comment, check, update, def)
+static MYSQL_THDVAR_BOOL(bulk_load, PLUGIN_VAR_RQCMDARG,
+  "Use bulk-load mode for inserts", NULL, NULL, FALSE);
+
+#if 0
+static MYSQL_THDVAR_ULONG(bulk_load, PLUGIN_VAR_RQCMDARG,
+  "Number of seconds to wait for lock",
+  NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1, 0);
+#endif
+
+static MYSQL_THDVAR_ULONG(bulk_load_size, PLUGIN_VAR_RQCMDARG,
+  "Max #records in a batch for bulk-load mode",
+  NULL, NULL, /*default*/ 1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
 
 static struct st_mysql_sys_var* leveldb_system_variables[]= {
   MYSQL_SYSVAR(lock_wait_timeout),
+  MYSQL_SYSVAR(bulk_load),
+  MYSQL_SYSVAR(bulk_load_size),
   NULL
 };
 
@@ -224,7 +239,8 @@ class Ldb_transaction
 
   /* The changes made by this transaction (and not yet applied) */
   leveldb::WriteBatch *batch;
-  
+   
+  uint n_writes;
   /* 
     These are needed to use LF-Hash. They are allocated per-thread. Logically,
     they are not part of the transaction but it's convenient to have them here.
@@ -256,17 +272,39 @@ class Ldb_transaction
   bool commit()
   {
     bool res= false;
+    flush_batch();
+    res= flush_batch_intern();
+    /* rollback() will delete snapshot, batch and locks */
+    rollback();
+    return res;
+  }
+
+private:
+  int flush_batch_intern()
+  {
+    bool res= false;
     if (batch) 
     {
       leveldb::Status s;
       s= ldb->Write(leveldb::WriteOptions(), batch);
       res= !s.ok(); // we return true when something failed
     }
-    /* rollback() will delete snapshot, batch and locks */
-    rollback();
     return res;
   }
 
+public:
+  int flush_batch()
+  {
+    bool bres= flush_batch_intern();
+    if (batch)
+    {
+      delete batch;
+      batch= NULL;
+    }
+    n_writes= 0;
+    return bres;
+  }
+
   void prepare_for_write()
   {
     if (!batch)
@@ -287,10 +325,11 @@ class Ldb_transaction
       batch= NULL;
     }
     release_locks();
+    n_writes++;
   }
 
   Ldb_transaction() : 
-    snapshot(NULL), batch(NULL), pins(NULL)
+    snapshot(NULL), batch(NULL), n_writes(0), pins(NULL)
   {}
 
   ~Ldb_transaction() 
@@ -1543,28 +1582,34 @@ int ha_leveldb::write_row(uchar *buf)
   DBUG_ASSERT(lock_rows);
 
   Ldb_transaction *trx= get_or_create_trx(table->in_use);
-
-  /* Get a lock */
-  Row_lock *lock;
-  if (!(lock= row_locks.get_lock(trx->pins, pk_packed_tuple, packed_size,
-                                 timeout_sec)))
+  
+  bool do_locking= !THDVAR(table->in_use, bulk_load);
+  Row_lock *lock= NULL; // init to shut up the compiler
+  if (do_locking)
   {
-    DBUG_RETURN(HA_ERR_LOCK_WAIT_TIMEOUT);
-  }
-   
-  // Make a lookup to make sure we're not overwriting anything
+    /* Get a lock */
+    if (!(lock= row_locks.get_lock(trx->pins, pk_packed_tuple, packed_size,
+                                   timeout_sec)))
+    {
+      DBUG_RETURN(HA_ERR_LOCK_WAIT_TIMEOUT);
+    }
+     
+    // Make a lookup to make sure we're not overwriting anything
 
-  leveldb::ReadOptions options; // note: intentionally not using snapshot
-  leveldb::Status s= ldb->Get(options, key_slice, &retrieved_record);
-  if (!s.IsNotFound()) 
-  {
-    row_locks.release_lock(trx->pins, lock);
-    DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+    leveldb::ReadOptions options; // note: intentionally not using snapshot
+    leveldb::Status s= ldb->Get(options, key_slice, &retrieved_record);
+    if (!s.IsNotFound()) 
+    {
+      row_locks.release_lock(trx->pins, lock);
+      DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+    }
   }
 
   // Ok, we keep a lock. Prepare a write.
   trx->prepare_for_write();
-  trx->add_lock(lock);
+  
+  if (do_locking)
+    trx->add_lock(lock);
 
   // Write every key.
   for (uint i= 0; i < table->s->keys; i++)
@@ -1584,6 +1629,7 @@ int ha_leveldb::write_row(uchar *buf)
                                          packed_size);
       leveldb::Slice empty_slice(empty_str, 0);
       trx->batch->Put(secondary_key_slice, empty_slice);
+      trx->n_writes++;
     }
   }
  
@@ -1593,8 +1639,16 @@ int ha_leveldb::write_row(uchar *buf)
 
   leveldb::Slice value_slice;
   convert_record_to_storage_format(&value_slice);
-
+  
   trx->batch->Put(key_slice, value_slice);
+  trx->n_writes++;
+
+  if (do_locking && trx->n_writes > THDVAR(table->in_use, bulk_load_size))
+  {
+    // bulk insert mode: flush the batch if it became too big.
+    if (trx->flush_batch())
+      DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  }
 
   DBUG_RETURN(0);
 }



More information about the commits mailing list