[Commits] Rev 4584: MDEV-4122: LevelDB: store key/index numbers, not names in file:///home/psergey/dev2/mysql-5.6-leveldb/

Sergey Petrunya psergey at askmonty.org
Fri Feb 1 15:30:38 EET 2013


At file:///home/psergey/dev2/mysql-5.6-leveldb/

------------------------------------------------------------
revno: 4584
revision-id: psergey at askmonty.org-20130201133037-nc555rilct4v9yg7
parent: psergey at askmonty.org-20130201123559-w6rler9dilc7fo66
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: mysql-5.6-leveldb
timestamp: Fri 2013-02-01 17:30:37 +0400
message:
  MDEV-4122: LevelDB: store key/index numbers, not names
  - Wrap Key_store access in RW-locks, in case somebody is creating a table
    while somebody else is opening some other table.
=== modified file 'storage/leveldb/ha_leveldb.cc'
--- a/storage/leveldb/ha_leveldb.cc	2013-02-01 12:35:59 +0000
+++ b/storage/leveldb/ha_leveldb.cc	2013-02-01 13:30:37 +0000
@@ -842,7 +842,7 @@ int ha_leveldb::close(void)
 const int HA_ERR_LEVELDB_UNIQUE_NOT_SUPPORTED= HA_ERR_LAST+1;
 
 
-int ha_leveldb::create_key_defs(TABLE *table_arg, bool write_ddl)
+int ha_leveldb::create_key_defs(TABLE *table_arg, const char *db_table, uint len)
 {
   uint i;
   DBUG_ENTER("ha_leveldb::create_key_defs");
@@ -868,11 +868,8 @@ int ha_leveldb::create_key_defs(TABLE *t
     key_descr[i]= make_pk_keyseg(table_arg, i);
     key_descr[i]->index_number= keydef_store.get_next_number();
 
-    if (write_ddl)
-    {
-      keydef_store.put(key_descr[i]);
-      key_descr[i]->write_to(ldb_dict);
-    }
+    keydef_store.put(key_descr[i]);
+    key_descr[i]->write_to(ldb_dict);
   }
   pk_descr= key_descr[table_arg->s->primary_key];
 
@@ -881,7 +878,8 @@ int ha_leveldb::create_key_defs(TABLE *t
   
   tbl_def->n_keys= n_keys;
   tbl_def->key_descr= key_descr;
-  make_dbname_tablename(&tbl_def->dbname_tablename, table_arg);
+  //make_dbname_tablename(&tbl_def->dbname_tablename, table_arg);
+  tbl_def->dbname_tablename.append(db_table, len);
   ddl_manager.put(tbl_def);
   tbl_def->write_to(ldb_dict);
 
@@ -922,13 +920,37 @@ int ha_leveldb::create(const char *name,
                        HA_CREATE_INFO *create_info)
 {
   int res;
+  const char *tablename= name;
   DBUG_ENTER("ha_leveldb::create");
   
   /* 
+    Construct dbname.tablename ourselves, because parititioning 
+    passes strings like "./test/t14#P#p0" for individual partitions,
+    while table_arg->s->table_name has none of that.
+  */
+  DBUG_ASSERT(tablename[0] == '.' && tablename[1] == '/');
+  tablename += 2;
+
+  const char *p= tablename;
+  for (; *p != '/'; p++)
+  {
+    if (*p =='\0')
+    {
+      DBUG_ASSERT(0); // We were not passed table name?
+      DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+    }
+  }
+  /* Note: this will delete all indexes, primary and secondary */
+  StringBuffer<256> strbuf;
+  strbuf.append(tablename, p - tablename);
+  strbuf.append('.');
+  strbuf.append(p + 1);
+
+  /* 
     We have HA_REQUIRE_PRIMARY_KEY flag, which guarantees there will be a
     primary key
   */
-  if ((res= create_key_defs(table_arg, true)))
+  if ((res= create_key_defs(table_arg, strbuf.ptr(), strbuf.length())))
     DBUG_RETURN(res);
   
   /*

=== modified file 'storage/leveldb/ha_leveldb.h'
--- a/storage/leveldb/ha_leveldb.h	2013-02-01 10:12:08 +0000
+++ b/storage/leveldb/ha_leveldb.h	2013-02-01 13:30:37 +0000
@@ -100,7 +100,7 @@ class ha_leveldb: public handler
   
   bool skip_scan_it_next_call;
 
-  int create_key_defs(TABLE *table_arg, bool write_ddl);
+  int create_key_defs(TABLE *table_arg, const char *db_table, uint len);
   int secondary_index_read(int keyno, uchar *buf);
   void setup_index_scan(leveldb::Slice *slice);
   int get_row_by_rowid(const uchar *pk_tuple_for_lookup, uchar *buf);

=== modified file 'storage/leveldb/ldb_datadic.cc'
--- a/storage/leveldb/ldb_datadic.cc	2013-02-01 12:35:59 +0000
+++ b/storage/leveldb/ldb_datadic.cc	2013-02-01 13:30:37 +0000
@@ -710,6 +710,7 @@ void Keydef_store::free_hash_elem(void* 
 
 bool Keydef_store::init(leveldb::DB *ldb_dict)
 {
+  mysql_rwlock_init(0, &rwlock);
   (void) my_hash_init(&keydef_hash, /*system_charset_info*/&my_charset_bin, 32,0,0,
                       (my_hash_get_key) Keydef_store::get_hash_key,
                       Keydef_store::free_hash_elem, 0);
@@ -759,11 +760,16 @@ bool Keydef_store::init(leveldb::DB *ldb
 }
 
 
-LDBSE_KEYDEF* Keydef_store::find(uint index_no)
+LDBSE_KEYDEF* Keydef_store::find(uint index_no, bool lock)
 {
   LDBSE_KEYDEF *rec;
+  
+  if (lock)
+    mysql_rwlock_rdlock(&rwlock);
   rec= (LDBSE_KEYDEF*)my_hash_search(&keydef_hash, (uchar*)&index_no,
                                      LDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+  if (lock)
+    mysql_rwlock_unlock(&rwlock);
   return rec;
 }
 
@@ -773,24 +779,25 @@ LDBSE_KEYDEF* Keydef_store::find(uint in
 int Keydef_store::put(LDBSE_KEYDEF *key_descr)
 {
   LDBSE_KEYDEF *rec;
-  rec= (LDBSE_KEYDEF*)find(key_descr->index_number);
+  my_bool res;
+  mysql_rwlock_wrlock(&rwlock);
+  rec= (LDBSE_KEYDEF*)find(key_descr->index_number, false);
   if (rec)
   {
     // this will free the old record.
     my_hash_delete(&keydef_hash, (uchar*) rec);
   }
-
-  if (my_hash_insert(&keydef_hash, (uchar*)key_descr))
-  {
-    return 1;
-  }
-  return 0;
+  res= my_hash_insert(&keydef_hash, (uchar*)key_descr);
+ 
+  mysql_rwlock_unlock(&rwlock);
+  return res;
 }
 
 
 void Keydef_store::cleanup()
 {
   my_hash_free(&keydef_hash);
+  mysql_rwlock_destroy(&rwlock);
   sequence.cleanup();
 }
 

=== modified file 'storage/leveldb/ldb_datadic.h'
--- a/storage/leveldb/ldb_datadic.h	2013-02-01 12:35:59 +0000
+++ b/storage/leveldb/ldb_datadic.h	2013-02-01 13:30:37 +0000
@@ -205,13 +205,15 @@ class Keydef_store 
 {
   HASH keydef_hash; // Contains LDBSE_KEYDEF elements
   Sequence_generator sequence;
+  
+  mysql_rwlock_t rwlock;
 public:
   int get_next_number() { return sequence.get_next_number(); }
   bool init(leveldb::DB *ldb_dict);
   void cleanup();
   
   int put(LDBSE_KEYDEF *key_descr);
-  LDBSE_KEYDEF *find(uint index_no);
+  LDBSE_KEYDEF *find(uint index_no, bool lock=true);
 private:
   static uchar* get_hash_key(LDBSE_KEYDEF *rec, size_t *length,
                              my_bool not_used __attribute__((unused)));



More information about the commits mailing list