[Commits] Rev 4582: MDEV-4122: LevelDB: store key/index numbers, not names in file:///home/psergey/dev2/mysql-5.6-leveldb/

Sergey Petrunya psergey at askmonty.org
Fri Feb 1 12:12:10 EET 2013


At file:///home/psergey/dev2/mysql-5.6-leveldb/

------------------------------------------------------------
revno: 4582
revision-id: psergey at askmonty.org-20130201101208-tmts96q8d5jk8rmv
parent: psergey at askmonty.org-20130130042107-1qt0d5mog2jvbvcl
committer: Sergey Petrunya <psergey at askmonty.org>
branch nick: mysql-5.6-leveldb
timestamp: Fri 2013-02-01 14:12:08 +0400
message:
  MDEV-4122: LevelDB: store key/index numbers, not names
  - First implementation that passes a testcase (no new features yet).
=== modified file 'storage/leveldb/ha_leveldb.cc'
--- a/storage/leveldb/ha_leveldb.cc	2013-01-30 04:21:07 +0000
+++ b/storage/leveldb/ha_leveldb.cc	2013-02-01 10:12:08 +0000
@@ -41,7 +41,10 @@ handlerton *leveldb_hton;
 
 leveldb::DB *ldb= NULL;
 leveldb::DB *ldb_dict= NULL;
+
 Table_ddl_manager ddl_manager;
+Keydef_store keydef_store;
+
 LockTable row_locks;
 
 LDBSE_KEYDEF *make_pk_keyseg(TABLE *table_arg, uint keynr);
@@ -147,19 +150,17 @@ class Primary_key_comparator : public le
       If they are not equal, end of story. We don't follow the comparison
       collation but that does not matter.
     */
-    while (pa < pa_end && pb < pb_end)
-    {
-      if (*pa < *pb)
-        return -1;
-      if (*pa > *pb)
-        return 1;
-      // ok, now *pa==*pb
-      char c= *pa;
-      pa++; pb++;
+    DBUG_ASSERT(pa_end - pa >= LDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+    DBUG_ASSERT(pb_end - pb >= LDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+    int cmp;
+    if ((cmp= memcmp(pa, pb, LDBSE_KEYDEF::INDEX_NUMBER_SIZE)))
+      return cmp;
 
-      if (c == '\0')
-        break;
-    }
+    uint num;
+    memcpy(&num, pa, LDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+
+    pa += LDBSE_KEYDEF::INDEX_NUMBER_SIZE;
+    pb += LDBSE_KEYDEF::INDEX_NUMBER_SIZE;
 
     /* Ok, both tuples are from the same table.  */
     if (pa_end == pa)
@@ -169,7 +170,7 @@ class Primary_key_comparator : public le
     if (pb_end == pb)
       return 1;
 
-    LDBSE_KEYDEF *kdef= ddl_manager.find((uchar*)a.data(), pa - a.data() - 1);
+    LDBSE_KEYDEF *kdef= keydef_store.find(num);
     DBUG_ASSERT(kdef);
     
     // for length, must pass length(a).
@@ -436,7 +437,10 @@ static int leveldb_init_func(void *p)
     DBUG_ASSERT(0);
   }
 
-  if (ddl_manager.init(ldb_dict))
+  if (keydef_store.init(ldb_dict))
+    DBUG_RETURN(1);
+
+  if (ddl_manager.init(ldb_dict, &keydef_store))
     DBUG_RETURN(1);
 
   row_locks.init(Primary_key_comparator::compare_keys, 
@@ -613,8 +617,8 @@ static handler* leveldb_create_handler(h
 
 ha_leveldb::ha_leveldb(handlerton *hton, TABLE_SHARE *table_arg)
   : handler(hton, table_arg), scan_it(NULL), pk_descr(NULL), 
-    key_descr(NULL), free_key_defs(false), pk_tuple(NULL),
-    pk_packed_tuple(NULL), timeout_sec(1), lock_rows(FALSE),
+    key_descr(NULL), pk_tuple(NULL), pk_packed_tuple(NULL), 
+    timeout_sec(1), lock_rows(FALSE),
     keyread_only(FALSE)
 {}
 
@@ -746,23 +750,34 @@ void ha_leveldb::unpack_blobs_from_retri
 }
 
 
+static void make_dbname_tablename(StringBuffer<64> *str, TABLE *table_arg)
+{
+  str->append(table_arg->s->db.str, table_arg->s->db.length);
+  str->append('.');
+  str->append(table_arg->s->table_name.str, table_arg->s->table_name.length);
+  str->c_ptr_safe();
+}
+
+
 int ha_leveldb::open(const char *name, int mode, uint test_if_locked)
 {
-  int res;
   DBUG_ENTER("ha_leveldb::open");
 
   if (!(share = get_share(name, table)))
-    DBUG_RETURN(1);
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
   thr_lock_data_init(&share->lock,&lock,NULL);
   
-  /* Set up reading/writing */
   /* 
     note: pk_descr may be non-NULL here, when using partitions. It seems, 
     it's ok to discard it
   */
-  if ((res= create_key_defs(table, false)))
-    DBUG_RETURN(res);
-  free_key_defs= true; // pk_descr is our property, it is not from the cache
+  StringBuffer<64> fullname;
+  make_dbname_tablename(&fullname, table);
+  if (!(tbl_def= ddl_manager.find((uchar*)fullname.c_ptr(), fullname.length())))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+  key_descr= tbl_def->key_descr;
+  pk_descr= key_descr[table->s->primary_key];
 
   pk_key_parts= table->key_info[table->s->primary_key].user_defined_key_parts;
 
@@ -809,18 +824,8 @@ int ha_leveldb::close(void)
 {
   DBUG_ENTER("ha_leveldb::close");
   
-  if (key_descr)
-  {
-    if (free_key_defs)
-    {
-      for (uint i= 0; i < table->s->keys; i++)
-        delete key_descr[i];
-    }
-    free_key_defs= false;
-    pk_descr= NULL;
-    my_free(key_descr);
-    key_descr= NULL;
-  }
+  pk_descr= NULL;
+  key_descr= NULL;
 
   if (pk_tuple)
     my_free(pk_tuple);
@@ -833,42 +838,57 @@ int ha_leveldb::close(void)
   DBUG_RETURN(free_share(share));
 }
 
+
 const int HA_ERR_LEVELDB_UNIQUE_NOT_SUPPORTED= HA_ERR_LAST+1;
 
+
 int ha_leveldb::create_key_defs(TABLE *table_arg, bool write_ddl)
 {
+  uint i;
   DBUG_ENTER("ha_leveldb::create_key_defs");
-
   uint n_keys= table_arg->s->keys;
-
-  if (!(key_descr= (LDBSE_KEYDEF**)my_malloc(sizeof(LDBSE_KEYDEF*) * n_keys,
-                                             MYF(MY_ZEROFILL))))
-    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
-
-  for (uint i= 0; i < table_arg->s->keys; i++)
+  
+  /* Don't allow to create unique indexes */
+  for (i= 0; i < table_arg->s->keys; i++)
   {
-    //TODO: balk at attempts to create unique indexes.
     if (table_arg->key_info[i].flags & HA_NOSAME && 
         i != table_arg->s->primary_key)
     {
-      goto error;
+      DBUG_RETURN(HA_ERR_LEVELDB_UNIQUE_NOT_SUPPORTED); 
     }
+  }
+
+  /* Create table/key descriptions and put them into the data dictionary */
+  if (!(key_descr= (LDBSE_KEYDEF**)my_malloc(sizeof(LDBSE_KEYDEF*) * n_keys,
+                                             MYF(MY_ZEROFILL))))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  
+  for (i= 0; i < table_arg->s->keys; i++)
+  {
     key_descr[i]= make_pk_keyseg(table_arg, i);
+    key_descr[i]->index_number= keydef_store.get_next_number();
 
     if (write_ddl)
     {
-      ddl_manager.put(key_descr[i]);
+      keydef_store.put(key_descr[i]);
       key_descr[i]->write_to(ldb_dict);
     }
   }
-
-  //pk_descr= make_pk_keyseg(table_arg, table_arg->s->primary_key);
   pk_descr= key_descr[table_arg->s->primary_key];
+
+  if (!(tbl_def= new LDBSE_TABLE_DEF))
+    goto error;
+  
+  tbl_def->n_keys= n_keys;
+  tbl_def->key_descr= key_descr;
+  make_dbname_tablename(&tbl_def->dbname_tablename, table_arg);
+  ddl_manager.put(tbl_def);
+  tbl_def->write_to(ldb_dict);
+
   DBUG_RETURN(0);
 error:
   //todo: free what was allocated.
-  //todo: serg's recommended way of error handling, with get_error_message
-  DBUG_RETURN(HA_ERR_LEVELDB_UNIQUE_NOT_SUPPORTED); 
+  DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
 }
 
 
@@ -921,11 +941,6 @@ int ha_leveldb::create(const char *name,
         share->next_number_key_offset)
       DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
   }
-  /*
-    Don't free key definitions. We have inserted them into the DDL cache.
-  */
-  free_key_defs= false;
-
   DBUG_RETURN(0);
 }
 
@@ -1392,11 +1407,16 @@ int ha_leveldb::index_next_with_directio
 
 int ha_leveldb::index_first(uchar *buf)
 {
+  uchar *key;
+  uint key_size;
   int rc;
   DBUG_ENTER("ha_leveldb::index_first");
+  
+  key= (active_index == table->s->primary_key)? pk_packed_tuple : 
+                                                sec_key_packed_tuple;
+  key_descr[active_index]->get_infimum_key(key, &key_size);
+  leveldb::Slice index_key((const char*)key, key_size);
 
-  leveldb::Slice index_key(key_descr[active_index]->dbname_tablename_key.c_ptr(),
-                           key_descr[active_index]->dbname_tablename_key.length());
   setup_index_scan(&index_key);
   skip_scan_it_next_call= TRUE;
 
@@ -1407,17 +1427,15 @@ int ha_leveldb::index_first(uchar *buf)
 
 int ha_leveldb::index_last(uchar *buf)
 {
+  uchar *key;
+  uint key_size;
   int rc;
   DBUG_ENTER("ha_leveldb::index_last");
   
-  /* Make a key that's right after this index */
-  uint namelen= key_descr[active_index]->dbname_tablename_key.length();
-  memcpy(sec_key_packed_tuple, 
-         key_descr[active_index]->dbname_tablename_key.c_ptr(),
-         namelen+1);
-  sec_key_packed_tuple[namelen-1]= ((char)active_index) + 2;
-
-  leveldb::Slice index_key((char*)sec_key_packed_tuple, namelen + 1);
+  key= (active_index == table->s->primary_key)? pk_packed_tuple : 
+                                                sec_key_packed_tuple;
+  key_descr[active_index]->get_supremum_key(key, &key_size);
+  leveldb::Slice index_key((const char*)key, key_size);
 
   setup_index_scan(&index_key);
 
@@ -1582,8 +1600,10 @@ int ha_leveldb::rnd_init(bool scan)
   
   if (scan)
   {
-    leveldb::Slice table_key(pk_descr->dbname_tablename_key.c_ptr(),
-                             pk_descr->dbname_tablename_key.length());
+    uint key_size;
+    pk_descr->get_infimum_key(pk_packed_tuple, &key_size);
+    leveldb::Slice table_key((const char*)pk_packed_tuple, key_size);
+
     setup_index_scan(&table_key);
     skip_scan_it_next_call= true;
   }
@@ -2227,19 +2247,16 @@ ulong Primary_key_comparator::get_hashnr
 {
   ulong nr=1, nr2=4;
   HA_KEYSEG *seg,*endseg;
-  
   const uchar *key= (uchar*)key_arg;
-  const uchar *key_end = key + key_len;
-  const uchar *ptr;
-  for (ptr= key; ptr != key_end && *ptr; ptr++) {}
-  DBUG_ASSERT(ptr[0] =='\0');
+  int keyno;
+  memcpy(&keyno, key_arg, LDBSE_KEYDEF::INDEX_NUMBER_SIZE);
 
-  LDBSE_KEYDEF *kdef= ddl_manager.find((uchar*)key, ptr - key);
+  LDBSE_KEYDEF *kdef= keydef_store.find(keyno);
   DBUG_ASSERT(kdef);
   
-  my_charset_bin.coll->hash_sort(&my_charset_bin, (const uchar*)key, ptr-key, &nr, &nr2);
-
-  key= ptr + 1; // Walk over the \0 terminator and then the key tuple begins.
+  my_charset_bin.coll->hash_sort(&my_charset_bin, (const uchar*)key, 
+                                 LDBSE_KEYDEF::INDEX_NUMBER_SIZE, &nr, &nr2);
+  key += LDBSE_KEYDEF::INDEX_NUMBER_SIZE;
 
   // The code below code was borrowed from hp_hashnr:
 

=== modified file 'storage/leveldb/ha_leveldb.h'
--- a/storage/leveldb/ha_leveldb.h	2013-01-29 18:06:11 +0000
+++ b/storage/leveldb/ha_leveldb.h	2013-02-01 10:12:08 +0000
@@ -43,6 +43,7 @@ typedef struct st_leveldb_share {
 } LEVELDB_SHARE;
 
 class LDBSE_KEYDEF;
+class LDBSE_TABLE_DEF;
 class Row_lock;
 
 /** @brief
@@ -56,12 +57,13 @@ class ha_leveldb: public handler
   /* When doing a full table scan, this is the iterator we're using */
   leveldb::Iterator* scan_it;
 
+  LDBSE_TABLE_DEF *tbl_def;
+
   /* Primary Key encoder from KeyTupleFormat to StorageFormat */
   LDBSE_KEYDEF *pk_descr;
 
   /* Array of index descriptors */
   LDBSE_KEYDEF **key_descr;
-  bool free_key_defs;
   
   /* 
     Number of key parts in PK. This is the same as 

=== modified file 'storage/leveldb/ldb_datadic.cc'
--- a/storage/leveldb/ldb_datadic.cc	2013-01-30 04:21:07 +0000
+++ b/storage/leveldb/ldb_datadic.cc	2013-02-01 10:12:08 +0000
@@ -288,14 +288,14 @@ LDBSE_KEYDEF *table2myisam_equivalent(TA
 }
 
 
-void LDBSE_KEYDEF::write_int(String *out, uint32 val)
+void write_int(String *out, uint32 val)
 {
   uint buf= htonl(val);
   out->append((char*)&buf, 4);
 }
 
 
-uint32 LDBSE_KEYDEF::read_int(char **data)
+uint32 read_int(char **data)
 {
   uint buf;
   memcpy(&buf, *data, sizeof(uint32));
@@ -316,29 +316,22 @@ LDBSE_KEYDEF *make_pk_keyseg(TABLE *tabl
   if (mi_open_equivalent(keydef))
     return NULL; // TODO: free memory
 
-  DBUG_ASSERT(keynr < MAX_INDEXES);
-  uchar key_byte= keynr;
-
-  TABLE_SHARE *s= table_arg->s;
-  keydef->dbname_tablename_key.length(0);
-  keydef->dbname_tablename_key.append(s->db.str, s->db.length);
-  keydef->dbname_tablename_key.append('.');
-  keydef->dbname_tablename_key.append(s->table_name.str, s->table_name.length);
-  keydef->dbname_tablename_key.append(key_byte + 1);
-  keydef->dbname_tablename_key.c_ptr_safe();
-
-  keydef->keyno_= keynr + 1;
   return keydef;
 }
 
 
+/*
+  Write an entry that maps index_nr -> index_ddl_data.
+*/
 void LDBSE_KEYDEF::write_to(leveldb::DB *ldb_dict)
 {
-  StringBuffer<64> keybuf;
-  
-  keybuf.append(dbname_tablename_key);
-
+  StringBuffer<32> indexno;
   StringBuffer<512> value;
+   
+  indexno.append('\0');
+  write_int(&indexno, index_number);
+  DBUG_ASSERT(indexno.length() == INDEX_NUMBER_SIZE + 1);
+
   /* Store values of all fields */
   write_int(&value, keysegs);
   write_int(&value, flag);
@@ -361,7 +354,7 @@ void LDBSE_KEYDEF::write_to(leveldb::DB 
     write_int(&value, s->bit_length);
   }
   
-  leveldb::Slice skey(keybuf.c_ptr(), keybuf.length());
+  leveldb::Slice skey(indexno.ptr(), indexno.length());
   leveldb::Slice svalue(value.c_ptr(), value.length());
 
   // todo: the below should have sync=on
@@ -369,12 +362,19 @@ void LDBSE_KEYDEF::write_to(leveldb::DB 
 }
 
 
-bool LDBSE_KEYDEF::read_from(const char *name, size_t name_len, 
+/*
+  Read an entry that maps index_nr -> index_ddl_data.
+*/
+bool LDBSE_KEYDEF::read_from(const char *index_nr, size_t index_nr_len, 
                              const char *data, size_t datalen)
 {
-  dbname_tablename_key.set(name, name_len, &my_charset_bin);
+  //dbname_tablename_key.set(name, name_len, &my_charset_bin);
   //keyno_= *((uchar*)name + name_len) - 1;
-  //DBUG_ASSERT(keyno_ < MAX_INDEXES);
+
+  if (index_nr_len != INDEX_NUMBER_SIZE)
+    return true;
+  char *keybuf= (char*)index_nr;
+  index_number= read_int(&keybuf);
 
   char *pbuf= (char*)data;
 
@@ -448,9 +448,8 @@ uint ldbse_pack_key(LDBSE_KEYDEF *keydef
   /* 
     Addition for leveldb: first, store the {dbname.tablename}{KEYNO+1}\0 
   */
-  uint name_size= keydef->dbname_tablename_key.length() + 1;
-  memcpy(key, keydef->dbname_tablename_key.c_ptr(), name_size);
-  key += name_size;
+  memcpy(key, &keydef->index_number, LDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+  key += LDBSE_KEYDEF::INDEX_NUMBER_SIZE;
   
   for (; keyseg != keyseg_end; old+= keyseg->length, keyseg++)
   {
@@ -552,11 +551,9 @@ int ldbse_put_key_in_record(LDBSE_KEYDEF
   uchar *blob_ptr;
   DBUG_ENTER("ldbse_put_key_in_record");
   
-  DBUG_ASSERT(!memcmp(keydef->dbname_tablename_key.c_ptr(), 
-                      key,
-                      keydef->dbname_tablename_key.length() + 1));
+  DBUG_ASSERT(!memcmp(&keydef->index_number, key, LDBSE_KEYDEF::INDEX_NUMBER_SIZE));
   //key_end= key + key_len;
-  key += keydef->dbname_tablename_key.length() + 1;
+  key += LDBSE_KEYDEF::INDEX_NUMBER_SIZE;
 
   //blob_ptr= (uchar*) info->lastkey2;             /* Place to put blob parts */
   //key=(uchar*) info->lastkey;                    /* KEy that was read */
@@ -692,87 +689,242 @@ int ldbse_put_key_in_record(LDBSE_KEYDEF
 #endif  
 } /* _mi_put_key_in_record */
 
+
+///////////////////////////////////////////////////////////////////////////////////////////
+// Keydef_store
+///////////////////////////////////////////////////////////////////////////////////////////
+uchar* Keydef_store::get_hash_key(LDBSE_KEYDEF *rec, size_t *length,
+                                  my_bool not_used __attribute__((unused)))
+{
+  *length= LDBSE_KEYDEF::INDEX_NUMBER_SIZE;
+  return (uchar*) &(rec->index_number);
+}
+
+
+void Keydef_store::free_hash_elem(void* data)
+{
+  LDBSE_KEYDEF* elem= (LDBSE_KEYDEF*)data;
+  delete elem;
+}
+
+
+bool Keydef_store::init(leveldb::DB *ldb_dict)
+{
+  (void) my_hash_init(&keydef_hash, /*system_charset_info*/&my_charset_bin, 32,0,0,
+                      (my_hash_get_key) Keydef_store::get_hash_key,
+                      Keydef_store::free_hash_elem, 0);
+
+  /* Read the data dictionary and populate the hash */
+  leveldb::Iterator* it;
+  it= ldb_dict->NewIterator(leveldb::ReadOptions());
+  int i= 0;
+  uint max_index_number= 0;
+  for (it->SeekToFirst(); it->Valid(); it->Next()) 
+  {
+    LDBSE_KEYDEF* keydef= new LDBSE_KEYDEF;
+    leveldb::Slice key= it->key();
+    leveldb::Slice val= it->value();
+  
+    if (key.size() && *key.data() != 0) /* Not an numeric id */
+      break;
+
+    if (key.size() != LDBSE_KEYDEF::INDEX_NUMBER_SIZE + 1)
+    {
+      sql_print_error("LevelDB: Key store got key of size %d", key.size());
+      delete it;
+      return true;
+    }
+    if (keydef->read_from(key.data() + 1, key.size() - 1, val.data(), val.size()))
+    {
+      sql_print_error("LevelDB: Key store failed to load key definition");
+      delete it;
+      return true;
+    }
+    if (max_index_number < keydef->index_number)
+      max_index_number= keydef->index_number;
+    put(keydef);
+    i++;
+  }
+
+  if (!it->status().ok())
+  {
+    std::string s= it->status().ToString();
+    sql_print_error("LevelDB: Key Store: load error: %s", s.c_str());
+  }
+  delete it;
+  
+  sequence.init(max_index_number+1);
+  sql_print_information("LevelDB: Key Store: loaded DDL mappings for %d indexes", i);
+  return false;
+}
+
+
+LDBSE_KEYDEF* Keydef_store::find(uint index_no)
+{
+  LDBSE_KEYDEF *rec;
+  rec= (LDBSE_KEYDEF*)my_hash_search(&keydef_hash, (uchar*)&index_no,
+                                     LDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+  return rec;
+}
+
+
+/* Return 0 - ok, other value - error */
+
+int Keydef_store::put(LDBSE_KEYDEF *key_descr)
+{
+  LDBSE_KEYDEF *rec;
+  rec= (LDBSE_KEYDEF*)find(key_descr->index_number);
+  if (rec)
+  {
+    // this will free the old record.
+    my_hash_delete(&keydef_hash, (uchar*) rec);
+  }
+
+  if (my_hash_insert(&keydef_hash, (uchar*)key_descr))
+  {
+    return 1;
+  }
+  return 0;
+}
+
+
+void Keydef_store::cleanup()
+{
+  my_hash_free(&keydef_hash);
+  sequence.cleanup();
+}
+
+
 ///////////////////////////////////////////////////////////////////////////////////////////
 // Table_ddl_manager
 ///////////////////////////////////////////////////////////////////////////////////////////
 
-uchar* Table_ddl_manager::get_hash_key(LDBSE_KEYDEF *rec, size_t *length,
+void LDBSE_TABLE_DEF::write_to(leveldb::DB *ldb_dict)
+{
+  StringBuffer<32> indexes;
+
+  for (uint i; i < n_keys; i++)
+  {
+    write_int(&indexes, key_descr[i]->index_number);
+  }
+  leveldb::Slice skey(dbname_tablename.ptr(), 
+                      dbname_tablename.length());
+
+  leveldb::Slice svalue(indexes.c_ptr(), indexes.length());
+
+  // todo: the below should have sync=on
+  ldb_dict->Put(leveldb::WriteOptions(), skey, svalue); 
+}
+
+
+uchar* Table_ddl_manager::get_hash_key(LDBSE_TABLE_DEF *rec, size_t *length,
                                        my_bool not_used __attribute__((unused)))
 {
-  *length= rec->dbname_tablename_key.length();
-  return (uchar*) rec->dbname_tablename_key.c_ptr();
+  *length= rec->dbname_tablename.length();
+  return (uchar*) rec->dbname_tablename.c_ptr();
 }
 
 
 void Table_ddl_manager::free_hash_elem(void* data)
 {
-  LDBSE_KEYDEF* elem= (LDBSE_KEYDEF*)data;
+  LDBSE_TABLE_DEF* elem= (LDBSE_TABLE_DEF*)data;
   delete elem;
 }
 
 
-bool Table_ddl_manager::init(leveldb::DB *ldb_dict)
+bool Table_ddl_manager::init(leveldb::DB *ldb_dict, Keydef_store *keydef_store)
 {
   (void) my_hash_init(&ddl_hash, /*system_charset_info*/&my_charset_bin, 32,0,0,
                       (my_hash_get_key) Table_ddl_manager::get_hash_key,
                       Table_ddl_manager::free_hash_elem, 0);
-  
-  leveldb::Options options;
-  options.create_if_missing = true;
+
+  char number_one=1;
+  leveldb::Slice one_in_slice(&number_one, 1);
   
   /* Read the data dictionary and populate the hash */
   leveldb::Iterator* it;
   it= ldb_dict->NewIterator(leveldb::ReadOptions());
   int i= 0;
-  for (it->SeekToFirst(); it->Valid(); it->Next()) 
+  for (it->Seek(one_in_slice); it->Valid(); it->Next()) 
   {
-    LDBSE_KEYDEF* keydef= new LDBSE_KEYDEF;
+    char *ptr;
+    char *ptr_end;
+    LDBSE_TABLE_DEF *tdef= new LDBSE_TABLE_DEF;
     leveldb::Slice key= it->key();
     leveldb::Slice val= it->value();
-    if (keydef->read_from(key.data(), key.size(), val.data(), val.size()))
+
+    tdef->dbname_tablename.append(key.data(), key.size());
+    // Now, read the DDLs.
+    
+    if (val.size() < LDBSE_KEYDEF::INDEX_NUMBER_SIZE)
     {
-      sql_print_error("Incorrect DDL data for table %s", key.data());
+      sql_print_error("LevelDB: Table_store: no keys defined in %s", key.data());
       return true;
     }
-    put(keydef);
+    if (val.size() % LDBSE_KEYDEF::INDEX_NUMBER_SIZE)
+    {
+      sql_print_error("LevelDB: Table_store: invalid keylist for table %s", 
+                      tdef->dbname_tablename.c_ptr_safe());
+      return true;
+    }
+    tdef->n_keys= val.size() / LDBSE_KEYDEF::INDEX_NUMBER_SIZE;
+    if (!(tdef->key_descr= (LDBSE_KEYDEF**)my_malloc(sizeof(LDBSE_KEYDEF*) * 
+                                                     tdef->n_keys, 
+                                                     MYF(MY_ZEROFILL))))
+      return true;
+
+    ptr= (char*)val.data();
+    ptr_end= ptr + val.size();
+    for (uint keyno=0; ptr < ptr_end; keyno++)
+    {
+      int index_number= read_int(&ptr);
+      LDBSE_KEYDEF *key_def= keydef_store->find(index_number);
+      if (!key_def)
+      {
+        sql_print_error("LevelDB: Table_store: table %s couldn't find key %d", 
+                        tdef->dbname_tablename.c_ptr_safe(), index_number);
+        return true;
+      }
+      tdef->key_descr[keyno]= key_def;
+    }
+    put(tdef);
     i++;
   }
 
   if (!it->status().ok())
   {
     std::string s= it->status().ToString();
-    sql_print_error("Error scanning DDLs: %s", s.c_str());
+    sql_print_error("LevelDB: Table_store: load error: %s", s.c_str());
   }
   delete it;
-
-  sql_print_information("LevelDB: loaded DDL data for %d tables", i);
+  sql_print_information("LevelDB: Table_store: loaded DDL data for %d tables", i);
   return false;
 }
 
 
-LDBSE_KEYDEF* Table_ddl_manager::find(uchar *table_name, uint table_name_len)
+LDBSE_TABLE_DEF* Table_ddl_manager::find(uchar *table_name, uint table_name_len)
 {
-  LDBSE_KEYDEF *rec;
-  rec= (LDBSE_KEYDEF*)my_hash_search(&ddl_hash, (uchar*)table_name,
-                                     table_name_len);
+  LDBSE_TABLE_DEF *rec;
+  rec= (LDBSE_TABLE_DEF*)my_hash_search(&ddl_hash, (uchar*)table_name,
+                                        table_name_len);
   return rec;
 }
 
 
 /* Return 0 - ok, other value - error */
 
-int Table_ddl_manager::put(LDBSE_KEYDEF *key_descr)
+int Table_ddl_manager::put(LDBSE_TABLE_DEF *tbl)
 {
-  LDBSE_KEYDEF *rec;
-  rec= (LDBSE_KEYDEF*)find((uchar*)key_descr->dbname_tablename_key.c_ptr(),
-                           key_descr->dbname_tablename_key.length());
+  LDBSE_TABLE_DEF *rec;
+  rec= (LDBSE_TABLE_DEF*)find((uchar*)tbl->dbname_tablename.c_ptr(),
+                               tbl->dbname_tablename.length());
   if (rec)
   {
     // this will free the old record.
     my_hash_delete(&ddl_hash, (uchar*) rec);
   }
 
-  if (my_hash_insert(&ddl_hash, (uchar*)key_descr))
+  if (my_hash_insert(&ddl_hash, (uchar*)tbl))
   {
     return 1;
   }
@@ -780,10 +932,10 @@ int Table_ddl_manager::put(LDBSE_KEYDEF 
 }
 
 
-void Table_ddl_manager::remove(LDBSE_KEYDEF *rec)
+void Table_ddl_manager::remove(LDBSE_TABLE_DEF *tbl)
 {
-  my_hash_delete(&ddl_hash, (uchar*) rec);
-  delete rec;
+  my_hash_delete(&ddl_hash, (uchar*) tbl);
+  delete tbl;
 }
 
 

=== modified file 'storage/leveldb/ldb_datadic.h'
--- a/storage/leveldb/ldb_datadic.h	2013-01-24 16:47:13 +0000
+++ b/storage/leveldb/ldb_datadic.h	2013-02-01 10:12:08 +0000
@@ -35,7 +35,27 @@ class LDBSE_KEYDEF
       parts= keysegs;
     return ldbse_pack_key(this, parts, key, old);
   }
-   
+  
+  /*
+    Get the key that is "infimum" for all keys in the table
+     
+  */
+  inline void get_infimum_key(uchar *key, uint *size)
+  {
+    memcpy(key, &index_number, INDEX_NUMBER_SIZE);
+    *size= INDEX_NUMBER_SIZE;
+  }
+  
+  /*
+    Get the key that is a "supremum" for all keys in the table
+    TODO: maybe, use LookupTuple technique to cover this case also?
+  */
+  inline void get_supremum_key(uchar *key, uint *size)
+  {
+    uint next_index_number= index_number +1;
+    memcpy(key, &next_index_number, INDEX_NUMBER_SIZE);
+    *size= INDEX_NUMBER_SIZE;
+  }
 
   /*
     This can be used to compare prefixes.
@@ -46,7 +66,7 @@ class LDBSE_KEYDEF
   {
     DBUG_ASSERT(in_table(pa, a_len));
     DBUG_ASSERT(in_table(pb, b_len));
-    uint size= dbname_tablename_key.length() + 1;
+    uint size= INDEX_NUMBER_SIZE;
     pa += size;
     pb += size;
     a_len -= size;
@@ -69,9 +89,9 @@ class LDBSE_KEYDEF
 
   bool in_table(const char *key, uint keylen)
   {
-    if (keylen < dbname_tablename_key.length())
+    if (keylen < INDEX_NUMBER_SIZE)
       return false;
-    if (memcmp(key, dbname_tablename_key.ptr(), dbname_tablename_key.length()))
+    if (memcmp(key, &index_number, INDEX_NUMBER_SIZE))
       return false;
     else
       return true;
@@ -80,10 +100,10 @@ class LDBSE_KEYDEF
   /* Return max key length in StorageFormat */
   uint max_storage_fmt_length() 
   {
-    return dbname_tablename_key.length() + 1 + maxlength;
+    return INDEX_NUMBER_SIZE + maxlength;
   }
 
-  bool read_from(const char *name, size_t name_len, 
+  bool read_from(const char *index_nr, size_t index_nr_len, 
                  const char *data, size_t datalen);
   void write_to(leveldb::DB *ldb_dict);
 
@@ -91,13 +111,13 @@ class LDBSE_KEYDEF
   LDBSE_KEYDEF(): seg(NULL), end(NULL) {}
   ~LDBSE_KEYDEF();
 
-  /* 
-    Stores 'dbname.tablename'{keyno}\0
-    keyno is a number in binary form. It takes one byte.
-  */
-  String dbname_tablename_key;
+  
+  enum {
+    INDEX_NUMBER_SIZE= 4
+  };
 
-  uint keyno_;
+  /* Global number of this index (used as prefix in StorageFormat) */
+  uint index_number;
 
 public: // but ought to be private:
   /* Primary key parameters: */
@@ -111,31 +131,106 @@ class LDBSE_KEYDEF
 
   bool alloc_key_parts(uint n_parts);
 private:
-  void   write_int(String *out, uint32 val);
-  uint32 read_int(char **data);
+//  void   write_int(String *out, uint32 val);
+//  uint32 read_int(char **data);
 };
 
 
+class LDBSE_TABLE_DEF
+{
+public:
+  /* Stores 'dbname.tablename' */
+  StringBuffer<64> dbname_tablename;
+
+  uint n_keys;
+  LDBSE_KEYDEF **key_descr;
+
+  void write_to(leveldb::DB *ldb_dict);
+};
+
+
+/* 
+  A thread-safe sequential number generator. Its performance is not a concern
+*/
+class Sequence_generator
+{
+  int next_number;
+
+  mysql_mutex_t mutex;
+public:
+  void init(int initial_number)
+  {
+    mysql_mutex_init(0 , &mutex, MY_MUTEX_INIT_FAST);
+    next_number= initial_number;
+  }
+
+  int get_next_number() 
+  {
+    int res;
+    mysql_mutex_lock(&mutex);
+    res= next_number++;
+    mysql_mutex_unlock(&mutex);
+    return res;
+  }
+
+  void cleanup()
+  {
+    mysql_mutex_destroy(&mutex);
+  }
+};
+
 /*
-  This is a limited "data dictionary". 
+  This contains a mapping of 
+
+    number -> LDBSE_KEYDEF*
 
-  It maps "dbname.tablename" to LDBSE_KEYDEF objects. 
+  and also keeps max_used_number, so that one can insert a new element.
 
-  It ought to be thread-safe but currently isn't.
+  One should never delete elements.
+
+  STORAGE 
+    keys are in form: '\0' {number} 
+
+    so that they are different from "dbname.tablename" keys that are stored in
+    the same db.
 */
-class Table_ddl_manager
+class Keydef_store 
 {
-  HASH ddl_hash; // Contains LDBSE_KEYDEF elements
+  HASH keydef_hash; // Contains LDBSE_KEYDEF elements
+  Sequence_generator sequence;
 public:
+  int get_next_number() { return sequence.get_next_number(); }
   bool init(leveldb::DB *ldb_dict);
   void cleanup();
+  
   int put(LDBSE_KEYDEF *key_descr);
-  void remove(LDBSE_KEYDEF *rec);
-  LDBSE_KEYDEF *find(uchar *table_name, uint len);
-
+  LDBSE_KEYDEF *find(uint index_no);
 private:
   static uchar* get_hash_key(LDBSE_KEYDEF *rec, size_t *length,
                              my_bool not_used __attribute__((unused)));
   static void free_hash_elem(void* data);
 };
 
+
+/*
+  This contains a mapping of 
+
+     dbname.table_name -> array{LDBSE_KEYDEF}.
+*/
+
+class Table_ddl_manager
+{
+  HASH ddl_hash; // Contains LDBSE_TABLE_DEF elements
+public:
+  bool init(leveldb::DB *ldb_dict, Keydef_store *keydef_store);
+  void cleanup();
+  int put(LDBSE_TABLE_DEF *key_descr);
+  void remove(LDBSE_TABLE_DEF *rec);
+  LDBSE_TABLE_DEF *find(uchar *table_name, uint len);
+
+private:
+  static uchar* get_hash_key(LDBSE_TABLE_DEF *rec, size_t *length,
+                             my_bool not_used __attribute__((unused)));
+  static void free_hash_elem(void* data);
+};
+



More information about the commits mailing list