[Commits] dbf594d: Re-commit in Git: Port to RocksDB. Let LevelDB engine be optional.

Sergei Petrunia psergey at askmonty.org
Fri Jul 11 23:59:52 EEST 2014


revision-id: dbf594d7bccf16585bd721e37bcf5d3559c4556e
parent(s): 491fbd248d0cda2c7920af425194ccf1e1660a2f
committer: Sergei Petrunia
branch nick: webscalesql-5.6
timestamp: 2014-07-12 00:59:52 +0400
message:

Re-commit in Git: Port to RocksDB. Let LevelDB engine be optional.
Also added HTON_SUPPORTS_EXTENDED_KEYS to both engines.

-----------------------

 mysql-test/r/rocksdb.result               | 1171 ++++++++++++++
 mysql-test/r/rocksdb_qcache.result        |   37 +
 mysql-test/t/rocksdb.test                 | 1053 ++++++++++++
 mysql-test/t/rocksdb_qcache-master.opt    |    1 +
 mysql-test/t/rocksdb_qcache.test          |   28 +
 storage/rocksdb/CMakeLists.txt            |   56 +
 storage/rocksdb/ha_rocksdb.cc             | 2512 +++++++++++++++++++++++++++++
 storage/rocksdb/ha_rocksdb.h              |  307 ++++
 storage/rocksdb/rdb_applyiter.cc          |  207 +++
 storage/rocksdb/rdb_applyiter.h           |   53 +
 storage/rocksdb/rdb_datadic.cc            |  795 +++++++++
 storage/rocksdb/rdb_datadic.h             |  359 +++++
 storage/rocksdb/rdb_locks.cc              |  311 ++++
 storage/rocksdb/rdb_locks.h               |   96 ++
 storage/rocksdb/rdb_rowmods.cc            |  364 +++++
 storage/rocksdb/rdb_rowmods.h             |  140 ++
 storage/rocksdb/unittest/CMakeLists.txt   |   11 +
 storage/rocksdb/unittest/test_rowlocks.cc |  165 ++
 storage/rocksdb/unittest/thr_template.cc  |   93 ++
 19 files changed, 7759 insertions(+)

diff --git a/mysql-test/r/rocksdb.result b/mysql-test/r/rocksdb.result
new file mode 100644
index 0000000..553c43a
--- /dev/null
+++ b/mysql-test/r/rocksdb.result
@@ -0,0 +1,1171 @@
+select * from information_schema.engines where engine = 'rocksdb';
+ENGINE	SUPPORT	COMMENT	TRANSACTIONS	XA	SAVEPOINTS
+ROCKSDB	YES	RocksDB storage engine	YES	NO	NO
+drop table if exists t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
+drop table if exists t11,t12,t13,t14,t15,t16,t17,t18,t19,t20;
+drop table if exists t21,t22,t23,t24,t25,t26,t27,t28,t29;
+drop table if exists t30,t31,t32,t33,t34,t35,t36,t37,t38,t39;
+drop table if exists t40,t41,t42,t43,t44,t45,t46,t47,t48,t49;
+create table t0 (a int primary key) engine=rocksdb;
+show create table t0;
+Table	Create Table
+t0	CREATE TABLE `t0` (
+  `a` int(11) NOT NULL,
+  PRIMARY KEY (`a`)
+) ENGINE=ROCKSDB DEFAULT CHARSET=latin1
+drop table t0;
+# Try creating a table without PK:
+create table t1(a int) engine=rocksdb;
+ERROR 42000: This table type requires a primary key
+create table t1 (a int primary key, b int) engine=rocksdb;
+insert into t1 values (1,1);
+insert into t1 values (2,2);
+select * from t1;
+a	b
+1	1
+2	2
+# Check that we can create another table and insert there
+create table t2 (a varchar(10) primary key, b varchar(10)) engine=rocksdb;
+insert into t2 value ('abc','def');
+insert into t2 value ('hijkl','mnopq');
+select * from t2;
+a	b
+abc	def
+hijkl	mnopq
+# Select again from t1 to see that records from different tables dont mix
+select * from t1;
+a	b
+1	1
+2	2
+explain select * from t2 where a='no-such-key';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	NULL	NULL	NULL	NULL	NULL	NULL	NULL	Impossible WHERE noticed after reading const tables
+explain select * from t2 where a='abc';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t2	const	PRIMARY	PRIMARY	12	const	1	NULL
+select * from t2 where a='abc';
+a	b
+abc	def
+# Try a composite PK
+create table t3 (
+pk1 int, 
+pk2 varchar(10),
+col1 varchar(10),
+primary key(pk1, pk2)
+) engine=rocksdb;
+insert into t3 values (2,'two', 'row#2');
+insert into t3 values (3,'three', 'row#3');
+insert into t3 values (1,'one', 'row#1');
+select * from t3;
+pk1	pk2	col1
+1	one	row#1
+2	two	row#2
+3	three	row#3
+select * from t3 where pk1=3 and pk2='three';
+pk1	pk2	col1
+3	three	row#3
+drop table t1, t2, t3;
+# 
+# Test blob values
+#
+create table t4 (a int primary key, b blob) engine=rocksdb;
+insert into t4 values (1, repeat('quux-quux', 60));
+insert into t4 values (10, repeat('foo-bar', 43));
+insert into t4 values (5, repeat('foo-bar', 200));
+insert into t4 values (2, NULL);
+select 
+a,
+(case a 
+when 1  then b=repeat('quux-quux', 60)
+when 10 then b=repeat('foo-bar', 43)
+when 5  then b=repeat('foo-bar', 200)
+when 2  then b is null
+else 'IMPOSSIBLE!' end) as CMP
+from t4;
+a	CMP
+1	1
+2	1
+5	1
+10	1
+drop table t4;
+#
+# Test blobs of various sizes
+# 
+# TINYBLOB
+create table t5 (a int primary key, b tinyblob) engine=rocksdb;
+insert into t5 values (1, repeat('quux-quux', 6));
+insert into t5 values (10, repeat('foo-bar', 4));
+insert into t5 values (5, repeat('foo-bar', 2));
+select 
+a,
+(case a 
+when 1  then b=repeat('quux-quux', 6)
+when 10 then b=repeat('foo-bar', 4)
+when 5  then b=repeat('foo-bar', 2)
+else 'IMPOSSIBLE!' end) as CMP
+from t5;
+a	CMP
+1	1
+5	1
+10	1
+drop table t5;
+# MEDIUMBLOB
+create table t6 (a int primary key, b mediumblob) engine=rocksdb;
+insert into t6 values (1, repeat('AB', 65000));
+insert into t6 values (10, repeat('bbb', 40000));
+insert into t6 values (5, repeat('foo-bar', 2));
+select 
+a,
+(case a 
+when 1  then b=repeat('AB', 65000)
+when 10 then b=repeat('bbb', 40000)
+when 5  then b=repeat('foo-bar', 2)
+else 'IMPOSSIBLE!' end) as CMP
+from t6;
+a	CMP
+1	1
+5	1
+10	1
+drop table t6;
+# LONGBLOB
+create table t7 (a int primary key, b longblob) engine=rocksdb;
+insert into t7 values (1, repeat('AB', 65000));
+insert into t7 values (10, repeat('bbb', 40000));
+insert into t7 values (5, repeat('foo-bar', 2));
+select 
+a,
+(case a 
+when 1  then b=repeat('AB', 65000)
+when 10 then b=repeat('bbb', 40000)
+when 5  then b=repeat('foo-bar', 2)
+else 'IMPOSSIBLE!' end) as CMP
+from t7;
+a	CMP
+1	1
+5	1
+10	1
+drop table t7;
+#
+# Check if DELETEs work
+# 
+create table t8 (a varchar(10) primary key, col1 varchar(12)) engine=rocksdb;
+insert into t8 values 
+('one', 'eins'),
+('two', 'zwei'),
+('three', 'drei'),
+('four', 'vier'),
+('five', 'funf');
+# Delete by PK
+explain delete from t8 where a='three';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t8	range	PRIMARY	PRIMARY	12	NULL	1	Using where
+delete from t8 where a='three';
+select * from t8;
+a	col1
+five	funf
+four	vier
+one	eins
+two	zwei
+# Delete while doing a full table scan
+delete from t8 where col1='eins' or col1='vier';
+select * from t8;
+a	col1
+five	funf
+two	zwei
+# delete w/o WHERE:
+delete from t8;
+select * from t8;
+a	col1
+#
+# Test UPDATEs
+#
+insert into t8 values 
+('one', 'eins'),
+('two', 'zwei'),
+('three', 'drei'),
+('four', 'vier'),
+('five', 'funf');
+update t8 set col1='dva' where a='two';
+update t8 set a='fourAAA' where col1='vier';
+select * from t8;
+a	col1
+five	funf
+fourAAA	vier
+one	eins
+three	drei
+two	dva
+delete from t8;
+#
+# Basic transactions tests
+#
+begin;
+insert into t8 values ('trx1-val1', 'data');
+insert into t8 values ('trx1-val2', 'data');
+rollback;
+select * from t8;
+a	col1
+begin;
+insert into t8 values ('trx1-val1', 'data');
+insert into t8 values ('trx1-val2', 'data');
+commit;
+select * from t8;
+a	col1
+trx1-val1	data
+trx1-val2	data
+drop table t8;
+#
+# Check if DROP TABLE works
+#
+create table t8 (a varchar(10) primary key, col1 varchar(12)) engine=rocksdb;
+select * from t8;
+a	col1
+insert into t8 values ('foo','foo');
+drop table t8;
+create table t8 (a varchar(10) primary key, col1 varchar(12)) engine=rocksdb;
+select * from t8;
+a	col1
+drop table t8;
+#
+# MDEV-3961: Assertion ... on creating a TEMPORARY RocksDB table
+#
+CREATE TEMPORARY TABLE t10 (pk INT PRIMARY KEY) ENGINE=RocksDB;
+ERROR HY000: Table storage engine 'ROCKSDB' does not support the create option 'TEMPORARY'
+#
+# MDEV-3963: JOIN or WHERE conditions involving keys on RocksDB tables don't work
+#
+CREATE TABLE t10 (i INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t10 VALUES (1),(3);
+CREATE TABLE t11 (j INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t11 VALUES (1),(4);
+select * from t10;
+i
+1
+3
+select * from t11;
+j
+1
+4
+EXPLAIN
+SELECT * FROM t10, t11 WHERE i=j;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10	index	PRIMARY	PRIMARY	4	NULL	1000	Using index
+1	SIMPLE	t11	eq_ref	PRIMARY	PRIMARY	4	test.t10.i	1	Using index
+SELECT * FROM t10, t11 WHERE i=j;
+i	j
+1	1
+DROP TABLE t10,t11;
+#
+# MDEV-3962: SELECT with ORDER BY causes "ERROR 1030 (HY000): Got error 122
+#
+CREATE TABLE t12 (pk INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t12 VALUES (2),(1);
+SELECT * FROM t12 ORDER BY pk;
+pk
+1
+2
+DROP TABLE t12;
+#
+# MDEV-3964: Assertion `!pk_descr' fails in ha_rocksdb::open on adding partitions ...
+#
+create table t14 (pk int primary key) engine=RocksDB partition by hash(pk) partitions 2;
+drop table t14;
+#
+# MDEV-3960: Server crashes on running DISCARD TABLESPACE on a RocksDB table
+#
+create table t9 (i int primary key) engine=rocksdb;
+alter table t9 discard tablespace;
+ERROR HY000: Table storage engine for 't9' doesn't have this option
+drop table t9;
+#
+# MDEV-3959: Assertion `slice->size() == table->s->reclength' fails ...
+#   on accessing a table after ALTER
+#
+CREATE TABLE t15 (a INT, rocksdb_pk INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t15 VALUES (1,1),(5,2);
+ALTER TABLE t15 DROP COLUMN a;
+DROP TABLE t15;
+#
+# MDEV-3968: UPDATE produces a wrong result while modifying a PK on a RocksDB table
+#
+create table t16 (pk int primary key, a char(8)) engine=RocksDB;
+insert into t16 values (1,'a'),(2,'b'),(3,'c'),(4,'d');
+update t16 set pk=100, a = 'updated' where a in ('b','c');
+ERROR 23000: Duplicate entry '100' for key 'PRIMARY'
+select * from t16;
+pk	a
+1	a
+2	b
+3	c
+4	d
+drop table t16;
+#
+# MDEV-3970: A set of assorted crashes on inserting a row into a RocksDB table 
+#
+drop table if exists t_very_long_table_name;
+CREATE TABLE `t_very_long_table_name` (
+`c` char(1) NOT NULL,
+`c0` char(0) NOT NULL,
+`c1` char(1) NOT NULL,
+`c20` char(20) NOT NULL,
+`c255` char(255) NOT NULL,
+PRIMARY KEY (`c255`)
+) ENGINE=RocksDB DEFAULT CHARSET=latin1;
+INSERT INTO t_very_long_table_name VALUES ('a', '', 'c', REPEAT('a',20), REPEAT('x',255));
+drop table t_very_long_table_name;
+#
+# Test table locking and read-before-write checks.
+#
+create table t17 (pk varchar(12) primary key, col1 varchar(12)) engine=rocksdb;
+insert into t17 values ('row1', 'val1');
+insert into t17 values ('row1', 'val1-try2');
+ERROR 23000: Duplicate entry 'row1' for key 'PRIMARY'
+insert into t17 values ('ROW1', 'val1-try2');
+ERROR 23000: Duplicate entry 'ROW1' for key 'PRIMARY'
+insert into t17 values ('row2', 'val2');
+insert into t17 values ('row3', 'val3');
+# This is ok
+update t17 set pk='row4' where pk='row1';
+# This will try to overwrite another row:
+update t17 set pk='row3' where pk='row2';
+ERROR 23000: Duplicate entry 'row3' for key 'PRIMARY'
+select * from t17;
+pk	col1
+row2	val2
+row3	val3
+row4	val1
+#
+# Locking tests
+#
+# First, make sure there's no locking when transactions update different rows
+set autocommit=0;
+update t17 set col1='UPD1' where pk='row2';
+update t17 set col1='UPD2' where pk='row3';
+commit;
+select * from t17;
+pk	col1
+row2	UPD1
+row3	UPD2
+row4	val1
+# Check the variable
+show variables like 'rocksdb_lock_wait_timeout';
+Variable_name	Value
+rocksdb_lock_wait_timeout	1
+set rocksdb_lock_wait_timeout=2;
+show variables like 'rocksdb_lock_wait_timeout';
+Variable_name	Value
+rocksdb_lock_wait_timeout	2
+# Try updating the same row from two transactions
+begin;
+update t17 set col1='UPD2-AA' where pk='row2';
+update t17 set col1='UPD2-BB' where pk='row2';
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+set rocksdb_lock_wait_timeout=1000;
+update t17 set col1='UPD2-CC' where pk='row2';
+rollback;
+select * from t17 where pk='row2';
+pk	col1
+row2	UPD2-CC
+drop table t17;
+#
+#  MDEV-4035: RocksDB: SELECT produces different results inside a transaction (read is not repeatable)
+#
+create table t18 (pk int primary key, i int) engine=RocksDB;
+begin;
+select * from t18;
+pk	i
+select * from t18 where pk = 1;
+pk	i
+connect  con1,localhost,root,,;
+insert into t18 values (1,100);
+connection default;
+select * from t18;
+pk	i
+select * from t18 where pk = 1;
+pk	i
+commit;
+drop table t18;
+#
+# MDEV-4036: RocksDB: INSERT .. ON DUPLICATE KEY UPDATE does not work, produces ER_DUP_KEY
+#
+create table t19 (pk int primary key, i int) engine=RocksDB;
+insert into t19 values (1,1);
+insert into t19 values (1,100) on duplicate key update i = 102;
+select * from t19;
+pk	i
+1	102
+drop table t19;
+# MDEV-4037: RocksDB: REPLACE doesn't work, produces ER_DUP_KEY
+create table t20 (pk int primary key, i int) engine=RocksDB;
+insert into t20 values (1,1);
+replace into t20 values (1,100);
+select * from t20;
+pk	i
+1	100
+drop table t20;
+#
+# MDEV-4041: Server crashes in Primary_key_comparator::get_hashnr on INSERT 
+#
+create table t21 (v varbinary(16) primary key, i int) engine=RocksDB;
+insert into t21 values ('a',1);
+select * from t21;
+v	i
+a	1
+drop table t21;
+#
+# MDEV-4047: RocksDB: Assertion `0' fails in Protocol::end_statement() on multi-table INSERT IGNORE
+#
+CREATE TABLE t22 (a int primary key) ENGINE=RocksDB;
+INSERT INTO t22 VALUES (1),(2);
+CREATE TABLE t23 (b int primary key) ENGINE=RocksDB;
+INSERT INTO t23 SELECT * FROM t22;
+DELETE IGNORE t22.*, t23.* FROM t22, t23 WHERE b < a;
+DROP TABLE t22,t23;
+#
+# MDEV-4046: RocksDB: Multi-table DELETE locks itself and ends with ER_LOCK_WAIT_TIMEOUT
+#
+CREATE TABLE t24 (pk int primary key) ENGINE=RocksDB;
+INSERT INTO t24 VALUES (1),(2);
+CREATE TABLE t25 LIKE t24;
+INSERT INTO t25 SELECT * FROM t24;
+DELETE t25.* FROM t24, t25;
+DROP TABLE t24,t25;
+#
+# MDEV-4044: RocksDB: UPDATE or DELETE with ORDER BY locks itself
+#
+create table t26 (pk int primary key, c char(1)) engine=RocksDB;
+insert into t26 values (1,'a'),(2,'b');
+update t26 set c = 'x' order by pk limit 1;
+delete from t26 order by pk limit 1;
+select * from t26;
+pk	c
+2	b
+drop table t26;
+#
+# Test whether SELECT ... FOR UPDATE puts locks
+#
+create table t27(pk varchar(10) primary key, col1 varchar(20)) engine=RocksDB;
+insert into t27 values 
+('row1', 'row1data'),
+('row2', 'row2data'),
+('row3', 'row3data');
+connection con1;
+begin;
+select * from t27 where pk='row3' for update;
+pk	col1
+row3	row3data
+connection default;
+set rocksdb_lock_wait_timeout=1;
+update t27 set col1='row2-modified' where pk='row3';
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+connection con1;
+rollback;
+connection default;
+disconnect con1;
+drop table t27;
+#
+# MDEV-4060: RocksDB: Assertion `! trx->batch' fails in 
+#
+create table t28 (pk int primary key, a int) engine=RocksDB;
+insert into t28 values (1,10),(2,20);
+begin;
+update t28 set a = 100 where pk = 3;
+rollback;
+select * from t28;
+pk	a
+1	10
+2	20
+drop table t28;
+# 
+# Secondary indexes
+#
+create table t30 (
+pk varchar(16) not null primary key, 
+key1 varchar(16) not null, 
+col1 varchar(16) not null,
+key(key1)
+) engine=rocksdb;
+insert into t30 values ('row1', 'row1-key', 'row1-data');
+insert into t30 values ('row2', 'row2-key', 'row2-data');
+insert into t30 values ('row3', 'row3-key', 'row3-data');
+explain
+select * from t30 where key1='row2-key';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	ref	key1	key1	18	const	10	Using where
+select * from t30 where key1='row2-key';
+pk	key1	col1
+row2	row2-key	row2-data
+explain 
+select * from t30 where key1='row1';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	ref	key1	key1	18	const	10	Using where
+# This will produce nothing:
+select * from t30 where key1='row1';
+pk	key1	col1
+explain
+select key1 from t30;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	ALL	NULL	NULL	NULL	NULL	1000	NULL
+select key1 from t30;
+key1
+row1-key
+row2-key
+row3-key
+# Create a duplicate record
+insert into t30 values ('row2a', 'row2-key', 'row2a-data');
+# Can we see it?
+select * from t30 where key1='row2-key';
+pk	key1	col1
+row2	row2-key	row2-data
+row2a	row2-key	row2a-data
+delete from t30 where pk='row2';
+select * from t30 where key1='row2-key';
+pk	key1	col1
+row2a	row2-key	row2a-data
+#
+# Range scans on secondary index
+#
+delete from t30;
+insert into t30 values 
+('row1', 'row1-key', 'row1-data'),
+('row2', 'row2-key', 'row2-data'),
+('row3', 'row3-key', 'row3-data'),
+('row4', 'row4-key', 'row4-data'),
+('row5', 'row5-key', 'row5-data');
+explain 
+select * from t30 where key1 <='row3-key';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	key1	key1	18	NULL	10	Using where
+select * from t30 where key1 <='row3-key';
+pk	key1	col1
+row1	row1-key	row1-data
+row2	row2-key	row2-data
+row3	row3-key	row3-data
+explain 
+select * from t30 where key1 between 'row2-key' and 'row4-key';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	key1	key1	18	NULL	10	Using where
+select * from t30 where key1 between 'row2-key' and 'row4-key';
+pk	key1	col1
+row2	row2-key	row2-data
+row3	row3-key	row3-data
+row4	row4-key	row4-data
+explain 
+select * from t30 where key1 in ('row2-key','row4-key');
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	key1	key1	18	NULL	20	Using where
+select * from t30 where key1 in ('row2-key','row4-key');
+pk	key1	col1
+row2	row2-key	row2-data
+row4	row4-key	row4-data
+explain 
+select key1 from t30 where key1 in ('row2-key','row4-key');
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	key1	key1	18	NULL	20	Using where
+select key1 from t30 where key1 in ('row2-key','row4-key');
+key1
+row2-key
+row4-key
+explain 
+select * from t30 where key1 > 'row1-key' and key1 < 'row4-key';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	key1	key1	18	NULL	10	Using where
+select * from t30 where key1 > 'row1-key' and key1 < 'row4-key';
+pk	key1	col1
+row2	row2-key	row2-data
+row3	row3-key	row3-data
+explain 
+select * from t30 order by key1 limit 3;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	index	NULL	key1	18	NULL	3	NULL
+select * from t30 order by key1 limit 3;
+pk	key1	col1
+row1	row1-key	row1-data
+row2	row2-key	row2-data
+row3	row3-key	row3-data
+explain 
+select * from t30 order by key1 desc limit 3;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	index	NULL	key1	18	NULL	3	NULL
+select * from t30 order by key1 desc limit 3;
+pk	key1	col1
+row5	row5-key	row5-data
+row4	row4-key	row4-data
+row3	row3-key	row3-data
+#
+# Range scans on primary key
+#
+explain 
+select * from t30 where pk <='row3';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	PRIMARY	PRIMARY	18	NULL	10	Using where
+select * from t30 where pk <='row3';
+pk	key1	col1
+row1	row1-key	row1-data
+row2	row2-key	row2-data
+row3	row3-key	row3-data
+explain 
+select * from t30 where pk between 'row2' and 'row4';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	PRIMARY	PRIMARY	18	NULL	10	Using where
+select * from t30 where pk between 'row2' and 'row4';
+pk	key1	col1
+row2	row2-key	row2-data
+row3	row3-key	row3-data
+row4	row4-key	row4-data
+explain 
+select * from t30 where pk in ('row2','row4');
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	range	PRIMARY	PRIMARY	18	NULL	2	Using where
+select * from t30 where pk in ('row2','row4');
+pk	key1	col1
+row2	row2-key	row2-data
+row4	row4-key	row4-data
+explain 
+select * from t30 order by pk limit 3;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t30	index	NULL	PRIMARY	18	NULL	3	NULL
+select * from t30 order by pk limit 3;
+pk	key1	col1
+row1	row1-key	row1-data
+row2	row2-key	row2-data
+row3	row3-key	row3-data
+drop table t30;
+#
+# MDEV-3841: RocksDB: Reading by PK prefix does not work
+#
+create table t31 (i int, j int, k int, primary key(i,j,k)) engine=RocksDB;
+insert into t31 values (1,10,100),(2,20,200);
+select * from t31 where i = 1;
+i	j	k
+1	10	100
+select * from t31 where j = 10;
+i	j	k
+1	10	100
+select * from t31 where k = 100;
+i	j	k
+1	10	100
+select * from t31 where i = 1 and j = 10;
+i	j	k
+1	10	100
+select * from t31 where i = 1 and k = 100;
+i	j	k
+1	10	100
+select * from t31 where j = 10 and k = 100;
+i	j	k
+1	10	100
+select * from t31 where i = 1 and j = 10 and k = 100;
+i	j	k
+1	10	100
+drop table t31;
+#
+# MDEV-4055: RocksDB: UPDATE/DELETE by a multi-part PK does not work
+#
+create table t32 (i int, j int, k int, primary key(i,j,k), a varchar(8)) engine=RocksDB;
+insert into t32 values 
+(1,10,100,''),
+(2,20,200,'');
+select * from t32 where i = 1 and j = 10 and k = 100;
+i	j	k	a
+1	10	100	
+update t32 set a = 'updated' where i = 1 and j = 10 and k = 100;
+select * from t32;
+i	j	k	a
+1	10	100	updated
+2	20	200	
+drop table t32;
+#
+# MDEV-3841: RocksDB: Assertion `0' fails in ha_rocksdb::index_read_map on range select with ORDER BY .. DESC
+#
+CREATE TABLE t33 (pk INT PRIMARY KEY, a CHAR(1)) ENGINE=RocksDB;
+INSERT INTO t33 VALUES (1,'a'),(2,'b');
+SELECT * FROM t33 WHERE pk <= 10 ORDER BY pk DESC;
+pk	a
+2	b
+1	a
+DROP TABLE t33;
+#
+# MDEV-4081: RocksDB throws error 122 on an attempt to create a table with unique index
+#
+create table t33 (pk int primary key, u int, unique index(u)) engine=RocksDB;
+ERROR HY000: Got error 187 'Unique indexes are not supported' from ROCKSDB
+#
+# MDEV-4077: RocksDB: Wrong result (duplicate row) on select with range 
+#
+CREATE TABLE t34 (pk INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t34 VALUES (10),(11);
+SELECT pk FROM t34 WHERE pk > 5 AND pk < 15;
+pk
+10
+11
+SELECT pk FROM t34 WHERE pk BETWEEN 5 AND 15;
+pk
+10
+11
+SELECT pk FROM t34 WHERE pk > 5;
+pk
+10
+11
+SELECT pk FROM t34 WHERE pk < 15;
+pk
+10
+11
+drop table t34;
+#
+# MDEV-4086: RocksDB does not allow a query with multi-part pk and index and ORDER BY .. DEC
+#
+create table t35 (a int, b int, c int, d int, e int, primary key (a,b,c), key (a,c,d,e)) engine=RocksDB;
+insert into t35 values (1,1,1,1,1),(2,2,2,2,2);
+select * from t35 where a = 1 and c = 1 and d = 1 order by e desc;
+a	b	c	d	e
+1	1	1	1	1
+drop table t35;
+#
+# MDEV-4084: RocksDB: Wrong result on IN subquery with index
+#
+CREATE TABLE t36 (pk INT PRIMARY KEY, a INT, KEY(a)) ENGINE=RocksDB;
+INSERT INTO t36 VALUES (1,10),(2,20);
+SELECT 3 IN ( SELECT a FROM t36 );
+3 IN ( SELECT a FROM t36 )
+0
+drop table t36;
+#
+# MDEV-4084: RocksDB: Wrong result on IN subquery with index
+#
+CREATE TABLE t37 (pk INT PRIMARY KEY, a INT, b CHAR(1), KEY(a), KEY(a,b)) 
+ENGINE=RocksDB;
+INSERT INTO t37 VALUES (1,10,'x'), (2,20,'y');
+SELECT MAX(a) FROM t37 WHERE a < 100;
+MAX(a)
+20
+DROP TABLE t37;
+#
+# MDEV-4090: RocksDB: Wrong result (duplicate rows) on range access with secondary key and ORDER BY DESC
+#
+CREATE TABLE t38 (pk INT PRIMARY KEY, i INT, KEY(i)) ENGINE=RocksDB;
+INSERT INTO t38 VALUES (1,10), (2,20);
+SELECT i FROM t38 WHERE i NOT IN (8) ORDER BY i DESC;
+i
+20
+10
+drop table t38;
+#
+# MDEV-4092: RocksDB: Assertion `in_table(pa, a_len)' fails in LDBSE_KEYDEF::cmp_full_keys 
+#            with a multi-part key and ORDER BY .. DESC
+#
+CREATE TABLE t40 (pk1 INT PRIMARY KEY, a INT, b VARCHAR(1), KEY(b,a)) ENGINE=RocksDB;
+INSERT INTO t40 VALUES (1, 7,'x'),(2,8,'y');
+CREATE TABLE t41 (pk2 INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t41 VALUES (1),(2);
+SELECT * FROM t40, t41 WHERE pk1 = pk2 AND b = 'o' ORDER BY a DESC;
+pk1	a	b	pk2
+DROP TABLE t40,t41;
+#
+# MDEV-4093: RocksDB: IN subquery by secondary key with NULL among values returns true instead of NULL
+#
+CREATE TABLE t42 (pk INT PRIMARY KEY, a INT, KEY(a)) ENGINE=RocksDB;
+INSERT INTO t42 VALUES (1, NULL),(2, 8);
+SELECT ( 3 ) NOT IN ( SELECT a FROM t42 );
+( 3 ) NOT IN ( SELECT a FROM t42 )
+NULL
+DROP TABLE t42;
+#
+# MDEV-4094: RocksDB: Wrong result on SELECT and ER_KEY_NOT_FOUND on 
+#            DELETE with search by NULL-able secondary key ...
+#
+CREATE TABLE t43 (pk INT PRIMARY KEY, a INT, b CHAR(1), KEY(a)) ENGINE=RocksDB;
+INSERT INTO t43 VALUES (1,8,'g'),(2,9,'x');
+UPDATE t43 SET pk = 10 WHERE a = 8;
+REPLACE INTO t43 ( a ) VALUES ( 8 );
+Warnings:
+Warning	1364	Field 'pk' doesn't have a default value
+REPLACE INTO t43 ( b ) VALUES ( 'y' );
+Warnings:
+Warning	1364	Field 'pk' doesn't have a default value
+SELECT * FROM t43 WHERE a = 8;
+pk	a	b
+10	8	g
+DELETE FROM t43 WHERE a = 8;
+DROP TABLE t43;
+#
+# Basic AUTO_INCREMENT tests
+#
+create table t44(pk int primary key auto_increment, col1 varchar(12)) engine=rocksdb;
+insert into t44 (col1) values ('row1');
+insert into t44 (col1) values ('row2');
+insert into t44 (col1) values ('row3');
+select * from t44;
+pk	col1
+1	row1
+2	row2
+3	row3
+drop table t44;
+#
+# ALTER TABLE tests
+#
+create table t45 (pk int primary key, col1 varchar(12)) engine=rocksdb;
+insert into t45 values (1, 'row1');
+insert into t45 values (2, 'row2');
+alter table t45 rename t46;
+select * from t46;
+pk	col1
+1	row1
+2	row2
+drop table t46;
+drop table t45;
+ERROR 42S02: Unknown table 'test.t45'
+#
+# Check Bulk loading
+#
+show variables like 'rocksdb%';
+Variable_name	Value
+rocksdb_bulk_load	OFF
+rocksdb_bulk_load_size	1000
+rocksdb_lock_wait_timeout	1
+rocksdb_max_row_locks	1073741824
+create table t47 (pk int primary key, col1 varchar(12)) engine=rocksdb;
+insert into t47 values (1, 'row1');
+insert into t47 values (2, 'row2');
+set rocksdb_bulk_load=1;
+insert into t47 values (1, 'row1-NEW'),(2, 'row2-NEW');
+set rocksdb_bulk_load=0;
+select * from t47;
+pk	col1
+1	row1-NEW
+2	row2-NEW
+drop table t47;
+#
+# Fix TRUNCATE over empty table (transaction is committed when it wasn't
+# started)
+#
+create table t48(pk int primary key auto_increment, col1 varchar(12)) engine=rocksdb;
+set autocommit=0;
+truncate table t48;
+set autocommit=1;
+drop table t48;
+#
+# MDEV-4059: RocksDB: query waiting for a lock cannot be killed until query timeout exceeded
+#
+create table t49 (pk int primary key, a int) engine=RocksDB;
+insert into t49 values (1,10),(2,20);
+begin;
+update t49 set a = 100 where pk = 1;
+connect  con1,localhost,root,,;
+set rocksdb_lock_wait_timeout=5000;
+set @var1= to_seconds(now());
+update t49 set a = 1000 where pk = 1;
+connect  con2,localhost,root,,;
+kill query $con1_id;
+connection con1;
+ERROR 70100: Query execution was interrupted
+set @var2= to_seconds(now());
+select (@var2 - at var1) < 400;
+(@var2 - at var1) < 400
+1
+connection default;
+disconnect con1;
+commit;
+drop table t49;
+#
+# Index-only tests for INT-based columns
+#
+create table t1 (pk int primary key, key1 int, col1 int, key(key1)) engine=rocksdb;
+insert into t1 values (1,1,1);
+insert into t1 values (2,2,2);
+insert into t1 values (-5,-5,-5);
+# INT column uses index-only:
+explain
+select key1 from t1 where key1=2;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ref	key1	key1	5	const	10	Using index
+select key1 from t1 where key1=2;
+key1
+2
+select key1 from t1 where key1=-5;
+key1
+-5
+drop table t1;
+create table t2 (pk int primary key, key1 int unsigned, col1 int, key(key1)) engine=rocksdb;
+insert into t2 values (1,1,1), (2,2,2);
+# INT UNSIGNED column uses index-only:
+explain
+select key1 from t2 where key1=2;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t2	ref	key1	key1	5	const	10	Using index
+select key1 from t2 where key1=2;
+key1
+2
+drop table t2;
+create table t3 (pk bigint primary key, key1 bigint, col1 int, key(key1)) engine=rocksdb;
+insert into t3 values (1,1,1), (2,2,2);
+# BIGINT uses index-only:
+explain 
+select key1 from t3 where key1=2;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t3	ref	key1	key1	9	const	10	Using index
+select key1 from t3 where key1=2;
+key1
+2
+drop table t3;
+#
+# Index-only reads for string columns
+#
+create table t1 (
+pk int primary key, 
+key1 char(10) character set binary,
+col1 int,
+key (key1)
+) engine=rocksdb;
+insert into t1 values(1, 'one',11), (2,'two',22);
+explain 
+select key1 from t1 where key1='one';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ref	key1	key1	11	const	10	Using where; Using index
+# The following will produce no rows. This looks like a bug,
+#  but it is actually correct behavior. Binary strings are end-padded
+#  with \0 character (and not space).  Comparison does not ignore
+#   the tail of \0.
+select key1 from t1 where key1='one';
+key1
+explain
+select hex(key1) from t1 where key1='one\0\0\0\0\0\0\0';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ref	key1	key1	11	const	10	Using where; Using index
+select hex(key1) from t1 where key1='one\0\0\0\0\0\0\0';
+hex(key1)
+6F6E6500000000000000
+drop table t1;
+create table t2 (
+pk int primary key, 
+key1 char(10) collate latin1_bin,
+col1 int,
+key (key1)
+) engine=rocksdb;
+insert into t2 values(1, 'one',11), (2,'two',22);
+explain 
+select key1 from t2 where key1='one';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t2	ref	key1	key1	11	const	10	Using where; Using index
+select key1 from t2 where key1='one';
+key1
+one
+drop table t2;
+create table t3 (
+pk int primary key, 
+key1 char(10) collate utf8_bin,
+col1 int,
+key (key1)
+) engine=rocksdb;
+insert into t3 values(1, 'one',11), (2,'two',22);
+explain 
+select key1 from t3 where key1='one';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t3	ref	key1	key1	31	const	10	Using where; Using index
+select key1 from t3 where key1='one';
+key1
+one
+drop table t3;
+# a VARCHAR column
+create table t4 (
+pk int primary key, 
+key1 varchar(10) collate latin1_bin, 
+key(key1)
+) engine=rocksdb;
+insert into t4 values(1, 'one'), (2,'two'),(3,'threee'),(55,'fifty-five');
+explain 
+select key1 from t4 where key1='two';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t4	ref	key1	key1	13	const	10	Using where; Using index
+select key1 from t4 where key1='two';
+key1
+two
+select key1 from t4 where key1='fifty-five';
+key1
+fifty-five
+explain 
+select key1 from t4 where key1 between 's' and 'u';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t4	range	key1	key1	13	NULL	10	Using where; Using index
+select key1 from t4 where key1 between 's' and 'u';
+key1
+threee
+two
+drop table t4;
+# 
+# MDEV-4305: RocksDB: Assertion `((keypart_map + 1) & keypart_map) == 0' fails in calculate_key_len
+# 
+CREATE TABLE t1 (pk1 INT, pk2 CHAR(32), i INT, PRIMARY KEY(pk1,pk2), KEY(i)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,'test1',6),(2,'test2',8);
+SELECT * FROM t1 WHERE i != 3 OR  pk1 > 9;
+pk1	pk2	i
+1	test1	6
+2	test2	8
+DROP TABLE t1;
+# 
+# MDEV-4298: RocksDB: Assertion `thd->is_error() || kill_errno' fails in ha_rows filesort
+# 
+CREATE TABLE t1 (pk INT PRIMARY KEY, i INT, KEY(i)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,1),(2,2);
+BEGIN;
+UPDATE t1 SET i = 100;
+connect  con1,localhost,root,,test;
+DELETE IGNORE FROM t1 ORDER BY i;
+ERROR HY000: Lock wait timeout exceeded; try restarting transaction
+disconnect con1;
+connection default;
+COMMIT;
+DROP TABLE t1;
+#
+# MDEV-4313: RocksDB: Server crashes in LDBSE_KEYDEF::setup on dropping the primary key column
+#
+CREATE TABLE t1 (pk INT PRIMARY KEY, i INT NOT NULL, KEY(i)) ENGINE=RocksDB;
+ALTER TABLE t1 DROP COLUMN `pk`;
+ERROR HY000: Got error 188 'Table must have a PRIMARY KEY' from ROCKSDB
+DROP TABLE t1;
+#
+# MDEV-4324: RocksDB: Valgrind "Use of uninitialised value" warnings on inserting value into varchar field
+#  (testcase only)
+#
+CREATE TABLE t1 (pk INT PRIMARY KEY, c VARCHAR(4)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,'foo'), (2,'bar');
+DROP TABLE t1;
+#
+# MDEV-4304: RocksDB: Index-only scan by a field with utf8_bin collation returns garbage symbols
+#
+CREATE TABLE t1 (pk INT PRIMARY KEY, c1 CHAR(1), c2 CHAR(1), KEY(c1)) ENGINE=RocksDB CHARSET utf8 COLLATE utf8_bin;
+INSERT INTO t1 VALUES (1,'h','h');
+SELECT * FROM t1;
+pk	c1	c2
+1	h	h
+SELECT c1 FROM t1;
+c1
+h
+DROP TABLE t1;
+#
+# MDEV-4300: RocksDB: Server crashes in inline_mysql_mutex_lock on SELECT .. FOR UPDATE
+#
+CREATE TABLE t2 (pk INT PRIMARY KEY, i INT, KEY (i)) ENGINE=RocksDB;
+INSERT INTO t2 VALUES (1,4),(2,5);
+SELECT 1 FROM t2 WHERE i < 0 FOR UPDATE;
+1
+DROP TABLE t2;
+#
+# MDEV-4301: RocksDB: Assertion `pack_info != __null' fails in LDBSE_KEYDEF::unpack_record
+#
+CREATE TABLE t1 (pk INT PRIMARY KEY, i INT, c CHAR(1), KEY(c,i)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,4,'d'),(2,8,'e');
+SELECT MAX( pk ) FROM t1 WHERE i = 105 AND c = 'h';
+MAX( pk )
+NULL
+DROP TABLE t1;
+#
+# MDEV-4337: RocksDB: Inconsistent results comparing a char field with an int field
+#
+create table t1 (c char(1), i int, primary key(c), key(i)) engine=RocksDB;
+insert into t1 values ('2',2),('6',6);
+select * from t1 where c = i;
+c	i
+2	2
+6	6
+select * from t1 ignore index (i) where c = i;
+c	i
+2	2
+6	6
+drop table t1;
+#
+# Test statement rollback inside a transaction
+#
+create table t1 (pk varchar(12) primary key) engine=rocksdb;
+insert into t1 values ('old-val1'),('old-val2');
+create table t2 (pk varchar(12) primary key) engine=rocksdb;
+insert into t2 values ('new-val2'),('old-val1');
+begin;
+insert into t1 values ('new-val1');
+insert into t1 select * from t2;
+ERROR 23000: Duplicate entry 'old-val1' for key 'PRIMARY'
+commit;
+select * from t1;
+pk
+new-val1
+old-val1
+old-val2
+drop table t1, t2;
+#
+# MDEV-4383: RocksDB: Wrong result of DELETE .. ORDER BY .. LIMIT: 
+#   rows that should be deleted remain in the table
+#
+CREATE TABLE t2 (pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=RocksDB;
+CREATE TABLE t1 (pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t1 (pk) VALUES (NULL),(NULL);
+BEGIN;
+INSERT INTO t2 (pk) VALUES (NULL),(NULL);
+INSERT INTO t1 (pk) VALUES (NULL),(NULL),(NULL),(NULL),(NULL),(NULL);
+SELECT * FROM t1 ORDER BY pk LIMIT 9;
+pk
+1
+2
+3
+4
+5
+6
+7
+8
+affected rows: 8
+DELETE FROM t1 ORDER BY pk LIMIT 9;
+affected rows: 8
+SELECT * FROM t1 ORDER BY pk LIMIT 9;
+pk
+affected rows: 0
+DROP TABLE t1,t2;
+#
+# MDEV-4374: RocksDB: Valgrind warnings 'Use of uninitialised value' on 
+#   inserting into a varchar column
+#
+CREATE TABLE t1 (pk INT PRIMARY KEY, a VARCHAR(32)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,'foo'),(2,'bar');
+DROP TABLE t1;
+#
+# MDEV-4061: RocksDB: Changes from an interrupted query are still applied
+#
+create table t1 (pk int primary key, a int) engine=RocksDB;
+insert into t1 values (1,10),(2,20);
+set autocommit = 1;
+update t1 set a = sleep(100) where pk = 1;
+connect  con1,localhost,root,,;
+kill query $con_id;
+connection default;
+ERROR 70100: Query execution was interrupted
+select * from t1;
+pk	a
+1	10
+2	20
+disconnect con1;
+drop table t1;
+#
+# MDEV-4099: RocksDB: Wrong results with index and range access after INSERT IGNORE or REPLACE
+#
+CREATE TABLE t1 (pk INT PRIMARY KEY, a SMALLINT, b INT, KEY (a)) ENGINE=RocksDB;
+INSERT IGNORE INTO t1 VALUES (1, 157, 0), (2, 1898, -504403), (1, -14659,  0);
+SELECT * FROM t1;
+pk	a	b
+1	157	0
+2	1898	-504403
+SELECT pk FROM t1;
+pk
+1
+2
+SELECT * FROM t1 WHERE a != 97;
+pk	a	b
+1	157	0
+2	1898	-504403
+DROP TABLE t1;
+#
+# Test @@rocksdb_max_row_locks
+#
+CREATE TABLE t1 (pk INT PRIMARY KEY, a int) ENGINE=RocksDB;
+set @a=-1;
+insert into t1 select (@a:=@a+1), 1234 from information_schema.session_variables limit 100;
+set @tmp1= @@rocksdb_max_row_locks;
+set rocksdb_max_row_locks= 20;
+update t1 set a=a+10;
+ERROR HY000: Got error 189 'Number of locks held reached @@rocksdb_max_row_locks' from ROCKSDB
+DROP TABLE t1;
diff --git a/mysql-test/r/rocksdb_qcache.result b/mysql-test/r/rocksdb_qcache.result
new file mode 100644
index 0000000..7a17dab
--- /dev/null
+++ b/mysql-test/r/rocksdb_qcache.result
@@ -0,0 +1,37 @@
+create table t1 (pk int primary key, c char(8)) engine=RocksDB;
+insert into t1 values (1,'new'),(2,'new');
+select * from t1;
+pk	c
+1	new
+2	new
+connect  con1,localhost,root,,;
+update t1 set c = 'updated';
+connection default;
+flush status;
+show status like 'Qcache_hits';
+Variable_name	Value
+Qcache_hits	0
+show global status like 'Qcache_hits';
+Variable_name	Value
+Qcache_hits	0
+select * from t1;
+pk	c
+1	updated
+2	updated
+select sql_no_cache * from t1;
+pk	c
+1	updated
+2	updated
+select * from t1 where pk = 1;
+pk	c
+1	updated
+show status like 'Qcache_hits';
+Variable_name	Value
+Qcache_hits	0
+show status like 'Qcache_not_cached';
+Variable_name	Value
+Qcache_not_cached	3
+show global status like 'Qcache_hits';
+Variable_name	Value
+Qcache_hits	0
+drop table t1;
diff --git a/mysql-test/t/rocksdb.test b/mysql-test/t/rocksdb.test
new file mode 100644
index 0000000..52be54a
--- /dev/null
+++ b/mysql-test/t/rocksdb.test
@@ -0,0 +1,1053 @@
+#
+# RocksDB Storage Engine tests
+#
+select * from information_schema.engines where engine = 'rocksdb';
+
+--disable_warnings
+drop table if exists t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
+drop table if exists t11,t12,t13,t14,t15,t16,t17,t18,t19,t20;
+drop table if exists t21,t22,t23,t24,t25,t26,t27,t28,t29;
+drop table if exists t30,t31,t32,t33,t34,t35,t36,t37,t38,t39;
+drop table if exists t40,t41,t42,t43,t44,t45,t46,t47,t48,t49;
+--enable_warnings
+create table t0 (a int primary key) engine=rocksdb;
+show create table t0;
+drop table t0;
+
+--echo # Try creating a table without PK:
+--error ER_REQUIRES_PRIMARY_KEY
+create table t1(a int) engine=rocksdb;
+
+create table t1 (a int primary key, b int) engine=rocksdb;
+insert into t1 values (1,1);
+insert into t1 values (2,2);
+
+select * from t1;
+
+--echo # Check that we can create another table and insert there
+create table t2 (a varchar(10) primary key, b varchar(10)) engine=rocksdb;
+insert into t2 value ('abc','def');
+insert into t2 value ('hijkl','mnopq');
+select * from t2;
+
+--echo # Select again from t1 to see that records from different tables dont mix
+select * from t1;
+
+explain select * from t2 where a='no-such-key';
+explain select * from t2 where a='abc';
+select * from t2 where a='abc';
+
+--echo # Try a composite PK
+create table t3 (
+  pk1 int, 
+  pk2 varchar(10),
+  col1 varchar(10),
+  primary key(pk1, pk2)
+) engine=rocksdb;
+
+insert into t3 values (2,'two', 'row#2');
+insert into t3 values (3,'three', 'row#3');
+insert into t3 values (1,'one', 'row#1');
+
+select * from t3;
+select * from t3 where pk1=3 and pk2='three';
+
+drop table t1, t2, t3;
+
+--echo # 
+--echo # Test blob values
+--echo #
+
+create table t4 (a int primary key, b blob) engine=rocksdb;
+insert into t4 values (1, repeat('quux-quux', 60));
+insert into t4 values (10, repeat('foo-bar', 43));
+insert into t4 values (5, repeat('foo-bar', 200));
+
+insert into t4 values (2, NULL);
+
+
+select 
+ a,
+ (case a 
+   when 1  then b=repeat('quux-quux', 60)
+   when 10 then b=repeat('foo-bar', 43)
+   when 5  then b=repeat('foo-bar', 200)
+   when 2  then b is null
+   else 'IMPOSSIBLE!' end) as CMP
+from t4;
+
+drop table t4;
+
+--echo #
+--echo # Test blobs of various sizes
+--echo # 
+
+--echo # TINYBLOB
+create table t5 (a int primary key, b tinyblob) engine=rocksdb;
+insert into t5 values (1, repeat('quux-quux', 6));
+insert into t5 values (10, repeat('foo-bar', 4));
+insert into t5 values (5, repeat('foo-bar', 2));
+select 
+ a,
+ (case a 
+   when 1  then b=repeat('quux-quux', 6)
+   when 10 then b=repeat('foo-bar', 4)
+   when 5  then b=repeat('foo-bar', 2)
+   else 'IMPOSSIBLE!' end) as CMP
+from t5;
+drop table t5;
+
+--echo # MEDIUMBLOB
+create table t6 (a int primary key, b mediumblob) engine=rocksdb;
+insert into t6 values (1, repeat('AB', 65000));
+insert into t6 values (10, repeat('bbb', 40000));
+insert into t6 values (5, repeat('foo-bar', 2));
+select 
+ a,
+ (case a 
+   when 1  then b=repeat('AB', 65000)
+   when 10 then b=repeat('bbb', 40000)
+   when 5  then b=repeat('foo-bar', 2)
+   else 'IMPOSSIBLE!' end) as CMP
+from t6;
+drop table t6;
+
+--echo # LONGBLOB
+create table t7 (a int primary key, b longblob) engine=rocksdb;
+insert into t7 values (1, repeat('AB', 65000));
+insert into t7 values (10, repeat('bbb', 40000));
+insert into t7 values (5, repeat('foo-bar', 2));
+select 
+ a,
+ (case a 
+   when 1  then b=repeat('AB', 65000)
+   when 10 then b=repeat('bbb', 40000)
+   when 5  then b=repeat('foo-bar', 2)
+   else 'IMPOSSIBLE!' end) as CMP
+from t7;
+drop table t7;
+
+
+--echo #
+--echo # Check if DELETEs work
+--echo # 
+create table t8 (a varchar(10) primary key, col1 varchar(12)) engine=rocksdb;
+
+insert into t8 values 
+ ('one', 'eins'),
+ ('two', 'zwei'),
+ ('three', 'drei'),
+ ('four', 'vier'),
+ ('five', 'funf');
+
+--echo # Delete by PK
+explain delete from t8 where a='three';
+delete from t8 where a='three';
+
+select * from t8;
+
+--echo # Delete while doing a full table scan
+delete from t8 where col1='eins' or col1='vier';
+select * from t8;
+
+--echo # delete w/o WHERE:
+delete from t8;
+select * from t8;
+
+--echo #
+--echo # Test UPDATEs
+--echo #
+insert into t8 values 
+ ('one', 'eins'),
+ ('two', 'zwei'),
+ ('three', 'drei'),
+ ('four', 'vier'),
+ ('five', 'funf');
+
+update t8 set col1='dva' where a='two';
+
+update t8 set a='fourAAA' where col1='vier';
+
+select * from t8;
+delete from t8;
+
+--echo #
+--echo # Basic transactions tests
+--echo #
+begin; 
+insert into t8 values ('trx1-val1', 'data');
+insert into t8 values ('trx1-val2', 'data');
+rollback;
+select * from t8;
+
+begin; 
+insert into t8 values ('trx1-val1', 'data');
+insert into t8 values ('trx1-val2', 'data');
+commit;
+select * from t8;
+
+drop table t8;
+
+--echo #
+--echo # Check if DROP TABLE works
+--echo #
+create table t8 (a varchar(10) primary key, col1 varchar(12)) engine=rocksdb;
+select * from t8;
+insert into t8 values ('foo','foo');
+drop table t8;
+create table t8 (a varchar(10) primary key, col1 varchar(12)) engine=rocksdb;
+select * from t8;
+drop table t8;
+
+--echo #
+--echo # MDEV-3961: Assertion ... on creating a TEMPORARY RocksDB table
+--echo #
+--error ER_ILLEGAL_HA_CREATE_OPTION
+CREATE TEMPORARY TABLE t10 (pk INT PRIMARY KEY) ENGINE=RocksDB;
+
+--echo #
+--echo # MDEV-3963: JOIN or WHERE conditions involving keys on RocksDB tables don't work
+--echo #
+CREATE TABLE t10 (i INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t10 VALUES (1),(3);
+CREATE TABLE t11 (j INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t11 VALUES (1),(4);
+
+select * from t10;
+select * from t11;
+EXPLAIN
+SELECT * FROM t10, t11 WHERE i=j;
+SELECT * FROM t10, t11 WHERE i=j;
+
+DROP TABLE t10,t11;
+
+--echo #
+--echo # MDEV-3962: SELECT with ORDER BY causes "ERROR 1030 (HY000): Got error 122
+--echo #
+CREATE TABLE t12 (pk INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t12 VALUES (2),(1);
+SELECT * FROM t12 ORDER BY pk;
+DROP TABLE t12;
+
+--echo #
+--echo # MDEV-3964: Assertion `!pk_descr' fails in ha_rocksdb::open on adding partitions ...
+--echo #
+create table t14 (pk int primary key) engine=RocksDB partition by hash(pk) partitions 2;
+#--error ER_GET_ERRNO
+#alter table t14 add partition partitions 2;
+# ^^ works, but causes weird warnings in error log.
+drop table t14;
+
+--echo #
+--echo # MDEV-3960: Server crashes on running DISCARD TABLESPACE on a RocksDB table
+--echo #
+create table t9 (i int primary key) engine=rocksdb;
+--error ER_ILLEGAL_HA
+alter table t9 discard tablespace;
+drop table t9;
+
+--echo #
+--echo # MDEV-3959: Assertion `slice->size() == table->s->reclength' fails ...
+--echo #   on accessing a table after ALTER
+--echo #
+CREATE TABLE t15 (a INT, rocksdb_pk INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t15 VALUES (1,1),(5,2);
+#--error ER_ILLEGAL_HA
+ALTER TABLE t15 DROP COLUMN a;
+DROP TABLE t15;
+
+--echo #
+--echo # MDEV-3968: UPDATE produces a wrong result while modifying a PK on a RocksDB table
+--echo #
+create table t16 (pk int primary key, a char(8)) engine=RocksDB;
+insert into t16 values (1,'a'),(2,'b'),(3,'c'),(4,'d');
+
+# 
+# Not anymore: The following query will still eat a record because of CANT-SEE-OWN-CHANGES
+# property. 
+#
+--error ER_DUP_ENTRY
+update t16 set pk=100, a = 'updated' where a in ('b','c');
+select * from t16;
+drop table t16;
+
+--echo #
+--echo # MDEV-3970: A set of assorted crashes on inserting a row into a RocksDB table 
+--echo #
+--disable_warnings
+drop table if exists t_very_long_table_name;
+--enable_warnings
+
+CREATE TABLE `t_very_long_table_name` (
+   `c` char(1) NOT NULL,
+   `c0` char(0) NOT NULL,
+   `c1` char(1) NOT NULL,
+   `c20` char(20) NOT NULL,
+   `c255` char(255) NOT NULL,
+   PRIMARY KEY (`c255`)
+ ) ENGINE=RocksDB DEFAULT CHARSET=latin1;
+INSERT INTO t_very_long_table_name VALUES ('a', '', 'c', REPEAT('a',20), REPEAT('x',255));
+drop table t_very_long_table_name;
+
+
+--echo #
+--echo # Test table locking and read-before-write checks.
+--echo #
+create table t17 (pk varchar(12) primary key, col1 varchar(12)) engine=rocksdb;
+insert into t17 values ('row1', 'val1');
+
+--error ER_DUP_ENTRY
+insert into t17 values ('row1', 'val1-try2');
+--error ER_DUP_ENTRY
+insert into t17 values ('ROW1', 'val1-try2');
+
+insert into t17 values ('row2', 'val2');
+insert into t17 values ('row3', 'val3');
+
+--echo # This is ok
+update t17 set pk='row4' where pk='row1';
+
+--echo # This will try to overwrite another row:
+--error ER_DUP_ENTRY
+update t17 set pk='row3' where pk='row2';
+
+select * from t17;
+
+--echo #
+--echo # Locking tests
+--echo #
+
+connect (con1,localhost,root,,);
+
+--echo # First, make sure there's no locking when transactions update different rows
+connection con1;
+set autocommit=0;
+update t17 set col1='UPD1' where pk='row2';
+
+connection default;
+update t17 set col1='UPD2' where pk='row3';
+
+connection con1;
+commit;
+
+connection default;
+select * from t17;
+
+--echo # Check the variable
+show variables like 'rocksdb_lock_wait_timeout';
+set rocksdb_lock_wait_timeout=2; # seconds
+show variables like 'rocksdb_lock_wait_timeout';
+
+--echo # Try updating the same row from two transactions
+connection con1;
+begin;
+update t17 set col1='UPD2-AA' where pk='row2';
+
+connection default;
+--error ER_LOCK_WAIT_TIMEOUT
+update t17 set col1='UPD2-BB' where pk='row2';
+
+set rocksdb_lock_wait_timeout=1000; # seconds
+--send 
+  update t17 set col1='UPD2-CC' where pk='row2';
+
+connection con1;
+rollback;
+
+connection default;
+reap;
+select * from t17 where pk='row2';
+
+drop table t17;
+
+disconnect con1;
+--echo #
+--echo #  MDEV-4035: RocksDB: SELECT produces different results inside a transaction (read is not repeatable)
+--echo #
+--enable_connect_log
+
+create table t18 (pk int primary key, i int) engine=RocksDB;
+begin;
+select * from t18;
+select * from t18 where pk = 1;
+
+--connect (con1,localhost,root,,)
+insert into t18 values (1,100);
+
+--connection default
+select * from t18;
+select * from t18 where pk = 1;
+commit;
+
+drop table t18;
+
+--echo #
+--echo # MDEV-4036: RocksDB: INSERT .. ON DUPLICATE KEY UPDATE does not work, produces ER_DUP_KEY
+--echo #
+create table t19 (pk int primary key, i int) engine=RocksDB;
+insert into t19 values (1,1);
+insert into t19 values (1,100) on duplicate key update i = 102;
+select * from t19;
+drop table t19;
+
+--echo # MDEV-4037: RocksDB: REPLACE doesn't work, produces ER_DUP_KEY
+create table t20 (pk int primary key, i int) engine=RocksDB;
+insert into t20 values (1,1);
+replace into t20 values (1,100);
+select * from t20;
+drop table t20;
+
+--echo #
+--echo # MDEV-4041: Server crashes in Primary_key_comparator::get_hashnr on INSERT 
+--echo #
+create table t21 (v varbinary(16) primary key, i int) engine=RocksDB;
+insert into t21 values ('a',1);
+select * from t21;
+drop table t21;
+
+--echo #
+--echo # MDEV-4047: RocksDB: Assertion `0' fails in Protocol::end_statement() on multi-table INSERT IGNORE
+--echo #
+
+CREATE TABLE t22 (a int primary key) ENGINE=RocksDB;
+INSERT INTO t22 VALUES (1),(2);
+CREATE TABLE t23 (b int primary key) ENGINE=RocksDB;
+INSERT INTO t23 SELECT * FROM t22;
+DELETE IGNORE t22.*, t23.* FROM t22, t23 WHERE b < a;
+DROP TABLE t22,t23;
+
+--echo #
+--echo # MDEV-4046: RocksDB: Multi-table DELETE locks itself and ends with ER_LOCK_WAIT_TIMEOUT
+--echo #
+CREATE TABLE t24 (pk int primary key) ENGINE=RocksDB;
+INSERT INTO t24 VALUES (1),(2);
+
+CREATE TABLE t25 LIKE t24;
+INSERT INTO t25 SELECT * FROM t24;
+
+DELETE t25.* FROM t24, t25;
+DROP TABLE t24,t25;
+
+--echo #
+--echo # MDEV-4044: RocksDB: UPDATE or DELETE with ORDER BY locks itself
+--echo #
+create table t26 (pk int primary key, c char(1)) engine=RocksDB;
+insert into t26 values (1,'a'),(2,'b');
+update t26 set c = 'x' order by pk limit 1;
+delete from t26 order by pk limit 1;
+select * from t26;
+drop table t26;
+
+
+--echo #
+--echo # Test whether SELECT ... FOR UPDATE puts locks
+--echo #
+create table t27(pk varchar(10) primary key, col1 varchar(20)) engine=RocksDB;
+insert into t27 values 
+  ('row1', 'row1data'),
+  ('row2', 'row2data'),
+  ('row3', 'row3data');
+
+connection con1;
+begin;
+select * from t27 where pk='row3' for update;
+
+connection default;
+set rocksdb_lock_wait_timeout=1;
+--error ER_LOCK_WAIT_TIMEOUT
+update t27 set col1='row2-modified' where pk='row3';
+
+connection con1;
+rollback;
+connection default;
+disconnect con1;
+
+drop table t27;
+
+--echo #
+--echo # MDEV-4060: RocksDB: Assertion `! trx->batch' fails in 
+--echo #
+create table t28 (pk int primary key, a int) engine=RocksDB;
+insert into t28 values (1,10),(2,20);
+begin;
+update t28 set a = 100 where pk = 3;
+rollback;
+select * from t28;
+drop table t28;
+
+
+--echo # 
+--echo # Secondary indexes
+--echo #
+create table t30 (
+  pk varchar(16) not null primary key, 
+  key1 varchar(16) not null, 
+  col1 varchar(16) not null,
+  key(key1)
+) engine=rocksdb;
+
+insert into t30 values ('row1', 'row1-key', 'row1-data');
+insert into t30 values ('row2', 'row2-key', 'row2-data');
+insert into t30 values ('row3', 'row3-key', 'row3-data');
+
+explain
+select * from t30 where key1='row2-key';
+select * from t30 where key1='row2-key';
+
+explain 
+select * from t30 where key1='row1';
+--echo # This will produce nothing:
+select * from t30 where key1='row1';
+
+explain
+select key1 from t30;
+select key1 from t30;
+
+--echo # Create a duplicate record
+insert into t30 values ('row2a', 'row2-key', 'row2a-data');
+
+--echo # Can we see it?
+select * from t30 where key1='row2-key';
+
+delete from t30 where pk='row2';
+select * from t30 where key1='row2-key';
+
+--echo #
+--echo # Range scans on secondary index
+--echo #
+delete from t30;
+insert into t30 values 
+  ('row1', 'row1-key', 'row1-data'),
+  ('row2', 'row2-key', 'row2-data'),
+  ('row3', 'row3-key', 'row3-data'),
+  ('row4', 'row4-key', 'row4-data'),
+  ('row5', 'row5-key', 'row5-data');
+
+explain 
+select * from t30 where key1 <='row3-key'; 
+select * from t30 where key1 <='row3-key'; 
+
+explain 
+select * from t30 where key1 between 'row2-key' and 'row4-key';
+select * from t30 where key1 between 'row2-key' and 'row4-key';
+
+explain 
+select * from t30 where key1 in ('row2-key','row4-key');
+select * from t30 where key1 in ('row2-key','row4-key');
+
+explain 
+select key1 from t30 where key1 in ('row2-key','row4-key');
+select key1 from t30 where key1 in ('row2-key','row4-key');
+
+explain 
+select * from t30 where key1 > 'row1-key' and key1 < 'row4-key';
+select * from t30 where key1 > 'row1-key' and key1 < 'row4-key';
+
+explain 
+select * from t30 order by key1 limit 3;
+select * from t30 order by key1 limit 3;
+
+explain 
+select * from t30 order by key1 desc limit 3;
+select * from t30 order by key1 desc limit 3;
+
+--echo #
+--echo # Range scans on primary key
+--echo #
+explain 
+select * from t30 where pk <='row3'; 
+select * from t30 where pk <='row3'; 
+
+explain 
+select * from t30 where pk between 'row2' and 'row4';
+select * from t30 where pk between 'row2' and 'row4';
+
+explain 
+select * from t30 where pk in ('row2','row4');
+select * from t30 where pk in ('row2','row4');
+
+explain 
+select * from t30 order by pk limit 3;
+select * from t30 order by pk limit 3;
+
+drop table t30;
+
+
+--echo #
+--echo # MDEV-3841: RocksDB: Reading by PK prefix does not work
+--echo #
+create table t31 (i int, j int, k int, primary key(i,j,k)) engine=RocksDB;
+insert into t31 values (1,10,100),(2,20,200);
+select * from t31 where i = 1;
+select * from t31 where j = 10;
+select * from t31 where k = 100;
+select * from t31 where i = 1 and j = 10;
+select * from t31 where i = 1 and k = 100;
+select * from t31 where j = 10 and k = 100;
+select * from t31 where i = 1 and j = 10 and k = 100;
+drop table t31;
+
+--echo #
+--echo # MDEV-4055: RocksDB: UPDATE/DELETE by a multi-part PK does not work
+--echo #
+create table t32 (i int, j int, k int, primary key(i,j,k), a varchar(8)) engine=RocksDB;
+insert into t32 values 
+  (1,10,100,''),
+  (2,20,200,'');
+select * from t32 where i = 1 and j = 10 and k = 100;
+update t32 set a = 'updated' where i = 1 and j = 10 and k = 100;
+select * from t32;
+drop table t32;
+
+--echo #
+--echo # MDEV-3841: RocksDB: Assertion `0' fails in ha_rocksdb::index_read_map on range select with ORDER BY .. DESC
+--echo #
+CREATE TABLE t33 (pk INT PRIMARY KEY, a CHAR(1)) ENGINE=RocksDB;
+INSERT INTO t33 VALUES (1,'a'),(2,'b');
+SELECT * FROM t33 WHERE pk <= 10 ORDER BY pk DESC;
+DROP TABLE t33;
+
+--echo #
+--echo # MDEV-4081: RocksDB throws error 122 on an attempt to create a table with unique index
+--echo #
+--error ER_GET_ERRMSG
+create table t33 (pk int primary key, u int, unique index(u)) engine=RocksDB;
+
+
+--echo #
+--echo # MDEV-4077: RocksDB: Wrong result (duplicate row) on select with range 
+--echo #
+CREATE TABLE t34 (pk INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t34 VALUES (10),(11);
+SELECT pk FROM t34 WHERE pk > 5 AND pk < 15;
+SELECT pk FROM t34 WHERE pk BETWEEN 5 AND 15;
+SELECT pk FROM t34 WHERE pk > 5;
+SELECT pk FROM t34 WHERE pk < 15;
+drop table t34;
+
+--echo #
+--echo # MDEV-4086: RocksDB does not allow a query with multi-part pk and index and ORDER BY .. DEC
+--echo #
+create table t35 (a int, b int, c int, d int, e int, primary key (a,b,c), key (a,c,d,e)) engine=RocksDB;
+insert into t35 values (1,1,1,1,1),(2,2,2,2,2);
+select * from t35 where a = 1 and c = 1 and d = 1 order by e desc;
+drop table t35;
+
+--echo #
+--echo # MDEV-4084: RocksDB: Wrong result on IN subquery with index
+--echo #
+CREATE TABLE t36 (pk INT PRIMARY KEY, a INT, KEY(a)) ENGINE=RocksDB;
+INSERT INTO t36 VALUES (1,10),(2,20);
+SELECT 3 IN ( SELECT a FROM t36 );
+drop table t36;
+
+--echo #
+--echo # MDEV-4084: RocksDB: Wrong result on IN subquery with index
+--echo #
+CREATE TABLE t37 (pk INT PRIMARY KEY, a INT, b CHAR(1), KEY(a), KEY(a,b)) 
+  ENGINE=RocksDB;
+INSERT INTO t37 VALUES (1,10,'x'), (2,20,'y');
+SELECT MAX(a) FROM t37 WHERE a < 100;
+DROP TABLE t37;
+
+--echo #
+--echo # MDEV-4090: RocksDB: Wrong result (duplicate rows) on range access with secondary key and ORDER BY DESC
+--echo #
+CREATE TABLE t38 (pk INT PRIMARY KEY, i INT, KEY(i)) ENGINE=RocksDB;
+INSERT INTO t38 VALUES (1,10), (2,20);
+SELECT i FROM t38 WHERE i NOT IN (8) ORDER BY i DESC;
+drop table t38;
+
+--echo #
+--echo # MDEV-4092: RocksDB: Assertion `in_table(pa, a_len)' fails in LDBSE_KEYDEF::cmp_full_keys 
+--echo #            with a multi-part key and ORDER BY .. DESC
+--echo #
+CREATE TABLE t40 (pk1 INT PRIMARY KEY, a INT, b VARCHAR(1), KEY(b,a)) ENGINE=RocksDB;
+INSERT INTO t40 VALUES (1, 7,'x'),(2,8,'y');
+
+CREATE TABLE t41 (pk2 INT PRIMARY KEY) ENGINE=RocksDB;
+INSERT INTO t41 VALUES (1),(2);
+
+SELECT * FROM t40, t41 WHERE pk1 = pk2 AND b = 'o' ORDER BY a DESC;
+DROP TABLE t40,t41;
+
+--echo #
+--echo # MDEV-4093: RocksDB: IN subquery by secondary key with NULL among values returns true instead of NULL
+--echo #
+CREATE TABLE t42 (pk INT PRIMARY KEY, a INT, KEY(a)) ENGINE=RocksDB;
+INSERT INTO t42 VALUES (1, NULL),(2, 8);
+SELECT ( 3 ) NOT IN ( SELECT a FROM t42 );
+DROP TABLE t42;
+
+--echo #
+--echo # MDEV-4094: RocksDB: Wrong result on SELECT and ER_KEY_NOT_FOUND on 
+--echo #            DELETE with search by NULL-able secondary key ...
+--echo #
+CREATE TABLE t43 (pk INT PRIMARY KEY, a INT, b CHAR(1), KEY(a)) ENGINE=RocksDB;
+INSERT INTO t43 VALUES (1,8,'g'),(2,9,'x');
+UPDATE t43 SET pk = 10 WHERE a = 8;
+REPLACE INTO t43 ( a ) VALUES ( 8 );
+REPLACE INTO t43 ( b ) VALUES ( 'y' );
+SELECT * FROM t43 WHERE a = 8;
+DELETE FROM t43 WHERE a = 8;
+DROP TABLE t43;
+
+--echo #
+--echo # Basic AUTO_INCREMENT tests
+--echo #
+create table t44(pk int primary key auto_increment, col1 varchar(12)) engine=rocksdb;
+insert into t44 (col1) values ('row1');
+insert into t44 (col1) values ('row2');
+insert into t44 (col1) values ('row3');
+select * from t44;
+drop table t44;
+
+--echo #
+--echo # ALTER TABLE tests
+--echo #
+create table t45 (pk int primary key, col1 varchar(12)) engine=rocksdb;
+insert into t45 values (1, 'row1');
+insert into t45 values (2, 'row2');
+alter table t45 rename t46;
+select * from t46;
+drop table t46;
+--error ER_BAD_TABLE_ERROR
+drop table t45;
+
+
+--echo #
+--echo # Check Bulk loading
+--echo #
+show variables like 'rocksdb%';
+create table t47 (pk int primary key, col1 varchar(12)) engine=rocksdb;
+insert into t47 values (1, 'row1');
+insert into t47 values (2, 'row2');
+set rocksdb_bulk_load=1;
+insert into t47 values (1, 'row1-NEW'),(2, 'row2-NEW');
+set rocksdb_bulk_load=0;
+select * from t47;
+drop table t47;
+
+--echo #
+--echo # Fix TRUNCATE over empty table (transaction is committed when it wasn't
+--echo # started)
+--echo #
+create table t48(pk int primary key auto_increment, col1 varchar(12)) engine=rocksdb;
+set autocommit=0;
+#--error ER_ILLEGAL_HA
+truncate table t48;
+set autocommit=1;
+drop table t48;
+
+--echo #
+--echo # MDEV-4059: RocksDB: query waiting for a lock cannot be killed until query timeout exceeded
+--echo #
+--enable_connect_log
+
+create table t49 (pk int primary key, a int) engine=RocksDB;
+insert into t49 values (1,10),(2,20);
+begin;
+update t49 set a = 100 where pk = 1;
+
+--connect (con1,localhost,root,,)
+--let $con1_id = `SELECT CONNECTION_ID()`
+set rocksdb_lock_wait_timeout=5000;
+set @var1= to_seconds(now());
+send update t49 set a = 1000 where pk = 1;
+
+--connect (con2,localhost,root,,)
+--echo kill query \$con1_id;
+--disable_query_log
+eval kill query $con1_id;
+--enable_query_log
+--connection con1
+--error ER_QUERY_INTERRUPTED
+--reap
+set @var2= to_seconds(now());
+
+select (@var2 - at var1) < 400;
+
+--connection default
+--disconnect con1
+
+commit;
+drop table t49;
+
+--echo #
+--echo # Index-only tests for INT-based columns
+--echo #
+create table t1 (pk int primary key, key1 int, col1 int, key(key1)) engine=rocksdb;
+insert into t1 values (1,1,1);
+insert into t1 values (2,2,2);
+insert into t1 values (-5,-5,-5);
+--echo # INT column uses index-only:
+explain
+select key1 from t1 where key1=2;
+select key1 from t1 where key1=2;
+select key1 from t1 where key1=-5;
+drop table t1;
+
+
+create table t2 (pk int primary key, key1 int unsigned, col1 int, key(key1)) engine=rocksdb;
+insert into t2 values (1,1,1), (2,2,2);
+--echo # INT UNSIGNED column uses index-only:
+explain
+select key1 from t2 where key1=2;
+select key1 from t2 where key1=2;
+drop table t2;
+
+
+create table t3 (pk bigint primary key, key1 bigint, col1 int, key(key1)) engine=rocksdb;
+insert into t3 values (1,1,1), (2,2,2);
+--echo # BIGINT uses index-only:
+explain 
+select key1 from t3 where key1=2;
+select key1 from t3 where key1=2;
+drop table t3;
+
+--echo #
+--echo # Index-only reads for string columns
+--echo #
+create table t1 (
+  pk int primary key, 
+  key1 char(10) character set binary,
+  col1 int,
+  key (key1)
+) engine=rocksdb;
+insert into t1 values(1, 'one',11), (2,'two',22);
+explain 
+select key1 from t1 where key1='one';
+--echo # The following will produce no rows. This looks like a bug,
+--echo #  but it is actually correct behavior. Binary strings are end-padded
+--echo #  with \0 character (and not space).  Comparison does not ignore
+--echo #   the tail of \0.
+select key1 from t1 where key1='one';
+explain
+select hex(key1) from t1 where key1='one\0\0\0\0\0\0\0';
+select hex(key1) from t1 where key1='one\0\0\0\0\0\0\0';
+drop table t1;
+
+
+create table t2 (
+  pk int primary key, 
+  key1 char(10) collate latin1_bin,
+  col1 int,
+  key (key1)
+) engine=rocksdb;
+insert into t2 values(1, 'one',11), (2,'two',22);
+explain 
+select key1 from t2 where key1='one';
+select key1 from t2 where key1='one';
+drop table t2;
+
+
+create table t3 (
+  pk int primary key, 
+  key1 char(10) collate utf8_bin,
+  col1 int,
+  key (key1)
+) engine=rocksdb;
+insert into t3 values(1, 'one',11), (2,'two',22);
+explain 
+select key1 from t3 where key1='one';
+select key1 from t3 where key1='one';
+drop table t3;
+
+
+--echo # a VARCHAR column
+create table t4 (
+  pk int primary key, 
+  key1 varchar(10) collate latin1_bin, 
+  key(key1)
+) engine=rocksdb; 
+insert into t4 values(1, 'one'), (2,'two'),(3,'threee'),(55,'fifty-five');
+
+explain 
+select key1 from t4 where key1='two';
+select key1 from t4 where key1='two';
+
+select key1 from t4 where key1='fifty-five';
+
+explain 
+select key1 from t4 where key1 between 's' and 'u';
+select key1 from t4 where key1 between 's' and 'u';
+
+drop table t4;
+
+--echo # 
+--echo # MDEV-4305: RocksDB: Assertion `((keypart_map + 1) & keypart_map) == 0' fails in calculate_key_len
+--echo # 
+CREATE TABLE t1 (pk1 INT, pk2 CHAR(32), i INT, PRIMARY KEY(pk1,pk2), KEY(i)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,'test1',6),(2,'test2',8);
+SELECT * FROM t1 WHERE i != 3 OR  pk1 > 9;
+DROP TABLE t1;
+
+--echo # 
+--echo # MDEV-4298: RocksDB: Assertion `thd->is_error() || kill_errno' fails in ha_rows filesort
+--echo # 
+CREATE TABLE t1 (pk INT PRIMARY KEY, i INT, KEY(i)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,1),(2,2);
+BEGIN;
+UPDATE t1 SET i = 100;
+
+--connect (con1,localhost,root,,test)
+--error ER_LOCK_WAIT_TIMEOUT
+DELETE IGNORE FROM t1 ORDER BY i;
+--disconnect con1
+
+--connection default
+COMMIT;
+DROP TABLE t1;
+
+--echo #
+--echo # MDEV-4313: RocksDB: Server crashes in LDBSE_KEYDEF::setup on dropping the primary key column
+--echo #
+CREATE TABLE t1 (pk INT PRIMARY KEY, i INT NOT NULL, KEY(i)) ENGINE=RocksDB;
+--error ER_GET_ERRMSG
+ALTER TABLE t1 DROP COLUMN `pk`;
+DROP TABLE t1;
+
+--echo #
+--echo # MDEV-4324: RocksDB: Valgrind "Use of uninitialised value" warnings on inserting value into varchar field
+--echo #  (testcase only)
+--echo #
+CREATE TABLE t1 (pk INT PRIMARY KEY, c VARCHAR(4)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,'foo'), (2,'bar');
+DROP TABLE t1;
+
+--echo #
+--echo # MDEV-4304: RocksDB: Index-only scan by a field with utf8_bin collation returns garbage symbols
+--echo #
+CREATE TABLE t1 (pk INT PRIMARY KEY, c1 CHAR(1), c2 CHAR(1), KEY(c1)) ENGINE=RocksDB CHARSET utf8 COLLATE utf8_bin;
+INSERT INTO t1 VALUES (1,'h','h');
+SELECT * FROM t1;
+SELECT c1 FROM t1;
+DROP TABLE t1;
+
+--echo #
+--echo # MDEV-4300: RocksDB: Server crashes in inline_mysql_mutex_lock on SELECT .. FOR UPDATE
+--echo #
+CREATE TABLE t2 (pk INT PRIMARY KEY, i INT, KEY (i)) ENGINE=RocksDB;
+INSERT INTO t2 VALUES (1,4),(2,5);
+SELECT 1 FROM t2 WHERE i < 0 FOR UPDATE;
+DROP TABLE t2;
+
+--echo #
+--echo # MDEV-4301: RocksDB: Assertion `pack_info != __null' fails in LDBSE_KEYDEF::unpack_record
+--echo #
+CREATE TABLE t1 (pk INT PRIMARY KEY, i INT, c CHAR(1), KEY(c,i)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,4,'d'),(2,8,'e');
+SELECT MAX( pk ) FROM t1 WHERE i = 105 AND c = 'h';
+DROP TABLE t1;
+
+--echo #
+--echo # MDEV-4337: RocksDB: Inconsistent results comparing a char field with an int field
+--echo #
+create table t1 (c char(1), i int, primary key(c), key(i)) engine=RocksDB;
+insert into t1 values ('2',2),('6',6);
+select * from t1 where c = i;
+select * from t1 ignore index (i) where c = i;
+drop table t1;
+
+
+--echo #
+--echo # Test statement rollback inside a transaction
+--echo #
+create table t1 (pk varchar(12) primary key) engine=rocksdb;
+insert into t1 values ('old-val1'),('old-val2');
+
+create table t2 (pk varchar(12) primary key) engine=rocksdb;
+insert into t2 values ('new-val2'),('old-val1');
+
+begin;
+insert into t1 values ('new-val1');
+--error ER_DUP_ENTRY
+insert into t1 select * from t2;
+commit;
+
+select * from t1;
+drop table t1, t2;
+
+--echo #
+--echo # MDEV-4383: RocksDB: Wrong result of DELETE .. ORDER BY .. LIMIT: 
+--echo #   rows that should be deleted remain in the table
+--echo #
+CREATE TABLE t2 (pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=RocksDB;
+CREATE TABLE t1 (pk INT AUTO_INCREMENT PRIMARY KEY) ENGINE=RocksDB;
+
+INSERT INTO t1 (pk) VALUES (NULL),(NULL);
+BEGIN;
+INSERT INTO t2 (pk) VALUES (NULL),(NULL);
+INSERT INTO t1 (pk) VALUES (NULL),(NULL),(NULL),(NULL),(NULL),(NULL);
+
+--enable_info
+SELECT * FROM t1 ORDER BY pk LIMIT 9; 
+DELETE FROM t1 ORDER BY pk LIMIT 9;
+SELECT * FROM t1 ORDER BY pk LIMIT 9;
+--disable_info
+
+DROP TABLE t1,t2;
+
+--echo #
+--echo # MDEV-4374: RocksDB: Valgrind warnings 'Use of uninitialised value' on 
+--echo #   inserting into a varchar column
+--echo #
+CREATE TABLE t1 (pk INT PRIMARY KEY, a VARCHAR(32)) ENGINE=RocksDB;
+INSERT INTO t1 VALUES (1,'foo'),(2,'bar');
+DROP TABLE t1;
+
+
+--echo #
+--echo # MDEV-4061: RocksDB: Changes from an interrupted query are still applied
+--echo #
+
+--enable_connect_log
+
+create table t1 (pk int primary key, a int) engine=RocksDB;
+insert into t1 values (1,10),(2,20);
+
+--let $con_id = `select connection_id()`
+
+set autocommit = 1;
+--send
+update t1 set a = sleep(100) where pk = 1;
+
+--connect (con1,localhost,root,,)
+--echo kill query \$con_id;
+--disable_query_log
+eval kill query $con_id;
+--enable_query_log
+
+--connection default
+--error ER_QUERY_INTERRUPTED
+--reap
+
+select * from t1;
+--disconnect con1
+--disable_connect_log
+drop table t1;
+
+
+--echo #
+--echo # MDEV-4099: RocksDB: Wrong results with index and range access after INSERT IGNORE or REPLACE
+--echo #
+CREATE TABLE t1 (pk INT PRIMARY KEY, a SMALLINT, b INT, KEY (a)) ENGINE=RocksDB;
+INSERT IGNORE INTO t1 VALUES (1, 157, 0), (2, 1898, -504403), (1, -14659,  0);
+SELECT * FROM t1;
+SELECT pk FROM t1;
+SELECT * FROM t1 WHERE a != 97;
+DROP TABLE t1;
+
+
+--echo #
+--echo # Test @@rocksdb_max_row_locks
+--echo #
+CREATE TABLE t1 (pk INT PRIMARY KEY, a int) ENGINE=RocksDB;
+set @a=-1;
+insert into t1 select (@a:=@a+1), 1234 from information_schema.session_variables limit 100;
+set @tmp1= @@rocksdb_max_row_locks;
+set rocksdb_max_row_locks= 20;
+--error ER_GET_ERRMSG
+update t1 set a=a+10;
+DROP TABLE t1;
+
diff --git a/mysql-test/t/rocksdb_qcache-master.opt b/mysql-test/t/rocksdb_qcache-master.opt
new file mode 100644
index 0000000..a00258b
--- /dev/null
+++ b/mysql-test/t/rocksdb_qcache-master.opt
@@ -0,0 +1 @@
+--query_cache_type=1
diff --git a/mysql-test/t/rocksdb_qcache.test b/mysql-test/t/rocksdb_qcache.test
new file mode 100644
index 0000000..4747599
--- /dev/null
+++ b/mysql-test/t/rocksdb_qcache.test
@@ -0,0 +1,28 @@
+# Important:
+# The test needs to be run with --mysqld=--query-cache-type=1
+
+-- source include/have_query_cache.inc
+--enable_connect_log
+
+create table t1 (pk int primary key, c char(8)) engine=RocksDB;
+insert into t1 values (1,'new'),(2,'new');
+
+select * from t1;
+
+--connect (con1,localhost,root,,)
+
+update t1 set c = 'updated';
+#select * from t1;
+
+--connection default
+flush status;
+show status like 'Qcache_hits';
+show global status like 'Qcache_hits';
+select * from t1;
+select sql_no_cache * from t1;
+select * from t1 where pk = 1;
+show status like 'Qcache_hits';
+show status like 'Qcache_not_cached';
+show global status like 'Qcache_hits';
+
+drop table t1;
diff --git a/storage/rocksdb/CMakeLists.txt b/storage/rocksdb/CMakeLists.txt
new file mode 100644
index 0000000..3dbb922
--- /dev/null
+++ b/storage/rocksdb/CMakeLists.txt
@@ -0,0 +1,56 @@
+# TODO: Copyrights
+#
+# Optional environment variables that may be used, if library
+# files are located in non-standard path:
+#
+# ROCKSDB_INCLUDE=   # path to rocksdb include directory
+# ROCKSDB_LIBRARIES= # path to rocksdb shared or static libraries
+
+SET(ROCKSDB_SE_PLUGIN_STATIC  "rocksdb")
+SET(ROCKSDB_SE_PLUGIN_MANDATORY TRUE)
+
+
+FIND_PATH(Rocksdb_INCLUDE_DIRS rocksdb/db.h PATHS
+$ENV{ROCKSDB_INCLUDE}      # environment variable to be used optionally
+${Rocksdb_INCLUDE_DIR}     # this may be set
+)
+IF(NOT EXISTS ${Rocksdb_INCLUDE_DIRS})
+  MESSAGE(SEND_ERROR "No rocksdb include directory found! Consider using environment variable ROCKSDB_INCLUDE=...")
+  RETURN()
+ENDIF()
+
+FIND_LIBRARY(Rocksdb_LIBS NAMES rocksdb
+             PATHS ${Rocksdb_LIB_PATHS} ${Rocksdb_LIB} $ENV{ROCKSDB_LIBRARIES})
+IF(EXISTS ${Rocksdb_LIBS})
+  GET_FILENAME_COMPONENT(LINK_DIR ${Rocksdb_LIBS} ABSOLUTE)
+ELSE()
+  MESSAGE(SEND_ERROR "No rocksdb library directory found! Consider using environment variable ROCKSDB_LIBRARIES=...")
+  RETURN()
+ENDIF()
+
+INCLUDE_DIRECTORIES(AFTER ${Rocksdb_INCLUDE_DIRS})
+SET(CMAKE_REQUIRED_INCLUDES ${Rocksdb_INCLUDE_DIRS})
+
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11")
+
+# TODO is there a better way to do this?
+LINK_LIBRARIES(snappy)
+
+CHECK_CXX_SOURCE_COMPILES(
+"
+#include <rocksdb/db.h>
+int main() { return 0; }
+" ROCKSDB_OK)
+
+IF(ROCKSDB_OK)
+  SET(ROCKSDB_SOURCES 
+      ha_rocksdb.cc ha_rocksdb.h 
+      rdb_datadic.cc rdb_datadic.h
+      rdb_locks.cc rdb_locks.h 
+      rdb_rowmods.cc rdb_rowmods.h
+      rdb_applyiter.cc rdb_applyiter.h)
+
+  MYSQL_ADD_PLUGIN(rocksdb_se ${ROCKSDB_SOURCES} STORAGE_ENGINE STATIC_ONLY MANDATORY LINK_LIBRARIES ${LINK_DIR})
+
+  ADD_SUBDIRECTORY(unittest)
+ENDIF(ROCKSDB_OK)
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
new file mode 100644
index 0000000..28c1dc1
--- /dev/null
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -0,0 +1,2512 @@
+/* 
+   Copyright (c) 2012, Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation        // gcc: Class implementation
+#endif
+
+#include <mysql/plugin.h>
+#include "ha_rocksdb.h"
+#include "sql_class.h"
+#include "sql_array.h"
+
+#include "my_bit.h"
+
+#include "rdb_datadic.h"
+#include "rdb_locks.h"
+#include "rdb_rowmods.h"
+
+void dbug_dump_database(rocksdb::DB *db);
+
+static handler *rocksdb_create_handler(handlerton *hton,
+                                       TABLE_SHARE *table, 
+                                       MEM_ROOT *mem_root);
+
+void key_copy(uchar *to_key, uchar *from_record, KEY *key_info,
+              uint key_length);
+
+handlerton *rocksdb_hton;
+
+rocksdb::DB *rdb= NULL;
+
+Table_ddl_manager ddl_manager;
+
+LockTable row_locks;
+
+/* 
+   Hash used to track the number of open tables; variable for example share
+   methods
+*/
+static HASH rocksdb_open_tables;
+
+/* The mutex used to init the hash; variable for example share methods */
+mysql_mutex_t rocksdb_mutex;
+
+
+//TODO: 0 means don't wait at all, and we don't support it yet?
+static MYSQL_THDVAR_ULONG(lock_wait_timeout, PLUGIN_VAR_RQCMDARG,
+  "Number of seconds to wait for lock",
+  NULL, NULL, /*default*/ 1, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
+
+static MYSQL_THDVAR_BOOL(bulk_load, PLUGIN_VAR_RQCMDARG,
+  "Use bulk-load mode for inserts", NULL, NULL, FALSE);
+
+static MYSQL_THDVAR_ULONG(max_row_locks, PLUGIN_VAR_RQCMDARG,
+  "Maximum number of locks a transaction can have",
+  NULL, NULL, /*default*/ 1024*1024*1024, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
+
+static MYSQL_THDVAR_ULONG(bulk_load_size, PLUGIN_VAR_RQCMDARG,
+  "Max #records in a batch for bulk-load mode",
+  NULL, NULL, /*default*/ 1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
+
+static struct st_mysql_sys_var* rocksdb_system_variables[]= {
+  MYSQL_SYSVAR(lock_wait_timeout),
+  MYSQL_SYSVAR(max_row_locks),
+  MYSQL_SYSVAR(bulk_load),
+  MYSQL_SYSVAR(bulk_load_size),
+  NULL
+};
+
+
+#if 0
+static SHOW_VAR rocksdb_status_variables[]= {
+  {"row_insert_batches",
+    (char*) &rocksdb_counters.row_insert_batches,  SHOW_LONG},
+...
+  {NullS, NullS, SHOW_LONG}
+};
+#endif
+
+///////////////////////////////////////////////////////////////////////////////////////////
+
+/**
+  @brief
+  Function we use in the creation of our hash to get key.
+*/
+
+static uchar* rocksdb_get_key(ROCKSDB_SHARE *share, size_t *length,
+                             my_bool not_used __attribute__((unused)))
+{
+  *length=share->table_name_length;
+  return (uchar*) share->table_name;
+}
+
+/* 
+  The following is needed as an argument for thd_enter_cond, irrespectively of
+  whether we're compiling with P_S or not.
+*/
+PSI_stage_info stage_waiting_on_row_lock= { 0, "Waiting for row lock", 0};
+
+static PSI_stage_info *all_rocksdb_stages[]=
+{
+  & stage_waiting_on_row_lock
+};
+
+
+#ifdef HAVE_PSI_INTERFACE
+static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_ROCKSDB_SHARE_mutex;
+
+static PSI_mutex_info all_rocksdb_mutexes[]=
+{
+  { &ex_key_mutex_example, "rocksdb", PSI_FLAG_GLOBAL},
+  { &ex_key_mutex_ROCKSDB_SHARE_mutex, "ROCKSDB_SHARE::mutex", 0}
+};
+
+static void init_rocksdb_psi_keys()
+{
+  const char* category= "rocksdb";
+  int count;
+
+  if (PSI_server == NULL)
+    return;
+
+  count= array_elements(all_rocksdb_mutexes);
+  PSI_server->register_mutex(category, all_rocksdb_mutexes, count);
+
+  count= array_elements(all_rocksdb_stages);
+  mysql_stage_register(category, all_rocksdb_stages, count);
+}
+#endif
+
+
+/*
+  The keys are in form: {index_number} {mem-comparable-key}
+
+  (todo: knowledge about this format is shared between this class and 
+   RDBSE_KEYDEF)
+*/
+
+class Primary_key_comparator : public rocksdb::Comparator
+{
+public:
+  int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const
+  {
+    size_t a_size= a.size();
+    size_t b_size= b.size();
+    size_t len= (a_size < b_size) ? a_size : b_size;
+    int res;
+    
+    if ((res= memcmp(a.data(), b.data(), len)))
+      return res;
+
+    /* Ok, res== 0 */
+    if (a_size != b_size)
+    {
+      return a_size < b_size? -1 : 1;
+    }
+    return 0;
+  }
+
+  /* The following is not needed by RocksDB, but conceptually should be here: */
+  static ulong get_hashnr(const char *key, size_t key_len);
+  const char* Name() const { return "RocksDB_SE_v2.2"; }
+
+  //TODO: advanced funcs:
+  // - FindShortestSeparator
+  // - FindShortSuccessor
+  // for now, do-nothing implementations:
+  void FindShortestSeparator(std::string* start, const rocksdb::Slice& limit) const {}
+  void FindShortSuccessor(std::string* key) const {}
+};
+
+Primary_key_comparator primary_key_comparator;
+
+int compare_mem_comparable_keys(const uchar *a, size_t a_len, const uchar *b, size_t b_len)
+{
+  rocksdb::Slice a_slice((char*)a, a_len);
+  rocksdb::Slice b_slice((char*)b, b_len);
+  return primary_key_comparator.Compare(a_slice, b_slice);
+}
+
+
+
+/*
+  This is a rocksdb connection. Its members represent the current transaction,
+  which consists of:
+  - the snapshot
+  - the changes we've made but are not seeing yet.
+  
+  The changes are made to individual tables, which store them here and then
+  this object commits them on commit.
+*/
+
+class Rdb_transaction
+{
+public:
+  /* The snapshot the transaction is used */
+  const rocksdb::Snapshot *snapshot;
+
+  /* The changes made by this transaction (and not yet applied) */
+  Row_table changes;
+
+  uint n_writes;
+
+  /* 
+    These are needed to use LF-Hash. They are allocated per-thread. Logically,
+    they are not part of the transaction but it's convenient to have them here.
+  */
+  LF_PINS *pins;
+
+  /* Row locks taken by this transaction */
+  Dynamic_array<Row_lock*> trx_locks;
+
+  int timeout_sec; /* Cached value of @@rocksdb_lock_wait_timeout */
+  int max_row_locks;
+
+  void set_params(int timeout_sec_arg, int max_row_locks_arg)
+  {
+    timeout_sec= timeout_sec_arg;
+    max_row_locks= max_row_locks_arg;
+  }
+
+  Row_lock *get_lock(const uchar* key, size_t keylen, bool *timed_out)
+  {
+    Row_lock *lock;
+    if (trx_locks.elements() > max_row_locks)
+    {
+      *timed_out= false;
+      return NULL;
+    }
+    if (!(lock= row_locks.get_lock(pins, key, keylen, timeout_sec)))
+    {
+      *timed_out= true;
+      return NULL;
+    }
+    return lock;
+  }
+
+  void add_lock(Row_lock* lock)
+  {
+    trx_locks.append(lock);
+  }
+
+  void release_last_lock()
+  {
+    row_locks.release_lock(pins, trx_locks.at(trx_locks.elements() - 1));
+    trx_locks.pop();
+  }
+
+  void release_locks()
+  {
+    int size= trx_locks.elements();
+    for (int i= 0; i < size; i++)
+      row_locks.release_lock(pins, trx_locks.at(i));
+    trx_locks.clear();
+  }
+  
+  bool commit()
+  {
+    bool res= false;
+    flush_batch();
+    /* rollback() will delete snapshot, batch and locks */
+    rollback();
+    return res;
+  }
+
+private:
+
+  int flush_batch_intern()
+  {
+    bool res= false;
+    rocksdb::WriteBatch batch;
+
+    if (changes.is_empty())
+      return false;
+
+    Row_table_iter iter(&changes);
+
+    for (iter.SeekToFirst(); iter.Valid(); iter.Next())
+    {
+      if (iter.is_tombstone())
+      {
+        batch.Delete(iter.key());
+      }
+      else
+      {
+        batch.Put(iter.key(), iter.value());
+      }
+    }
+    rocksdb::Status s= rdb->Write(rocksdb::WriteOptions(), &batch);
+    res= !s.ok(); // we return true when something failed
+    return res;
+  }
+
+public:
+  int flush_batch()
+  {
+    bool bres= flush_batch_intern();
+    changes.reinit();
+    n_writes= 0;
+    return bres;
+  }
+
+  void prepare_for_write()
+  {
+    /* Currently, we don't do anything here */
+  }
+   
+  /* 
+    This must be called when last statement is rolled back, but the transaction 
+    continues
+  */
+  void rollback_stmt() { changes.rollback_stmt(); }
+
+  void start_stmt() { changes.start_stmt(); } 
+  void rollback()
+  {
+    if (snapshot)
+    {
+      rdb->ReleaseSnapshot(snapshot);
+      snapshot= NULL;
+    }
+
+    changes.reinit();
+    release_locks();
+    n_writes= 0;
+  }
+
+  Rdb_transaction() : 
+    snapshot(NULL), n_writes(0), pins(NULL)
+  {
+    changes.init();
+  }
+
+  ~Rdb_transaction() 
+  { 
+    if (snapshot)
+    {
+      rdb->ReleaseSnapshot(snapshot);
+      snapshot= NULL;
+    }
+    changes.cleanup();
+  }
+};
+
+
+static Rdb_transaction* &get_trx_from_thd(THD *thd)
+{
+  return *(Rdb_transaction**) thd_ha_data(thd, rocksdb_hton);
+}
+
+
+/* 
+  TODO: maybe, call this in external_lock() and store in ha_rocksdb.. 
+*/
+
+static Rdb_transaction *get_or_create_trx(THD *thd)
+{
+  Rdb_transaction*& trx= get_trx_from_thd(thd);
+  if (trx == NULL)
+  {
+    if (!(trx= new Rdb_transaction))
+      return NULL;
+    trx->pins= row_locks.get_pins();
+    trx->snapshot= rdb->GetSnapshot();
+  }
+  else
+  {
+    if (!trx->snapshot)
+      trx->snapshot= rdb->GetSnapshot();
+  }
+  return trx;
+}
+
+
+static int rocksdb_close_connection(handlerton* hton, THD* thd)
+{
+  Rdb_transaction*& trx= get_trx_from_thd(thd);
+  if (trx && trx->pins)
+    row_locks.put_pins(trx->pins);
+  delete trx;
+  trx= NULL;
+  return 0;
+}
+
+
+static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_trx)
+{
+  if (commit_trx)
+  {
+    Rdb_transaction*& trx= get_trx_from_thd(thd);
+    if (trx)
+      trx->commit(); // todo: return error code.
+  }
+  else
+  {
+    /*
+      Committing a statement within a transaction. We don't need to do anything
+      here. trx->start_stmt() will notify Rdb_transaction that another
+      statement has started.
+    */
+  }
+  return 0;
+}
+
+
+static int rocksdb_rollback(handlerton* hton, THD* thd, bool rollback_trx)
+{
+  if (rollback_trx)
+  {
+    /* Discard the changes made by the transaction */
+    Rdb_transaction*& trx= get_trx_from_thd(thd);
+    if (trx)
+      trx->rollback();
+  }
+  else
+  {
+    Rdb_transaction*& trx= get_trx_from_thd(thd);
+    if (trx)
+      trx->rollback_stmt();
+  }
+  return 0;
+}
+
+
+static int rocksdb_init_func(void *p)
+{
+  DBUG_ENTER("rocksdb_init_func");
+  
+#ifdef HAVE_PSI_INTERFACE
+  init_rocksdb_psi_keys();
+#endif
+
+  rocksdb_hton= (handlerton *)p;
+  mysql_mutex_init(ex_key_mutex_example, &rocksdb_mutex, MY_MUTEX_INIT_FAST);
+  (void) my_hash_init(&rocksdb_open_tables,system_charset_info,32,0,0,
+                      (my_hash_get_key) rocksdb_get_key,0,0);
+
+  rocksdb_hton->state=    SHOW_OPTION_YES;
+  rocksdb_hton->create=   rocksdb_create_handler;
+  rocksdb_hton->close_connection= rocksdb_close_connection;
+  rocksdb_hton->commit=   rocksdb_commit;
+  rocksdb_hton->rollback= rocksdb_rollback;
+  rocksdb_hton->db_type=  DB_TYPE_ROCKSDB;
+
+  /* 
+    Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
+    TABLE to create an empty table from scratch. RocksDB cannot efficiently
+    re-create a table.
+  */
+  rocksdb_hton->flags= HTON_TEMPORARY_NOT_SUPPORTED |
+                       HTON_SUPPORTS_EXTENDED_KEYS;
+
+  /*
+    As for the datadir, innobase_init() uses mysql_real_data_home for
+    embedded server, and current directory for the "full server".
+  */
+  DBUG_ASSERT(!mysqld_embedded);
+  
+  row_locks.init(compare_mem_comparable_keys, 
+                 Primary_key_comparator::get_hashnr);
+
+  rocksdb::Options main_opts;
+  main_opts.create_if_missing = true;
+  main_opts.comparator= &primary_key_comparator;
+  rocksdb::Status status;
+  status= rocksdb::DB::Open(main_opts, "./rocksdb", &rdb);
+
+  if (!status.ok())
+  {
+    std::string err_text= status.ToString();
+    sql_print_error("RocksDB: Error opening instance: %s", err_text.c_str());
+    DBUG_RETURN(1);
+  }
+
+  if (ddl_manager.init(rdb))
+    DBUG_RETURN(1);
+
+  sql_print_information("RocksDB instance opened");
+  DBUG_RETURN(0);
+}
+
+
+static int rocksdb_done_func(void *p)
+{
+  int error= 0;
+  DBUG_ENTER("rocksdb_done_func");
+  if (rocksdb_open_tables.records)
+    error= 1;
+  my_hash_free(&rocksdb_open_tables);
+  mysql_mutex_destroy(&rocksdb_mutex);
+
+  row_locks.cleanup();
+  ddl_manager.cleanup();
+
+  delete rdb;
+  rdb= NULL;
+
+  DBUG_RETURN(error);
+}
+
+
+/**
+  @brief
+  Example of simple lock controls. The "share" it creates is a
+  structure we will pass to each rocksdb handler. Do you have to have
+  one of these? Well, you have pieces that are used for locking, and
+  they are needed to function.
+*/
+
+static ROCKSDB_SHARE *get_share(const char *table_name, TABLE *table)
+{
+  ROCKSDB_SHARE *share;
+  uint length;
+  char *tmp_name;
+
+  mysql_mutex_lock(&rocksdb_mutex);
+  length=(uint) strlen(table_name);
+
+  if (!(share=(ROCKSDB_SHARE*) my_hash_search(&rocksdb_open_tables,
+                                              (uchar*) table_name,
+                                              length)))
+  {
+    if (!(share=(ROCKSDB_SHARE *)
+          my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
+                          &share, sizeof(*share),
+                          &tmp_name, length+1,
+                          NullS)))
+    {
+      mysql_mutex_unlock(&rocksdb_mutex);
+      return NULL;
+    }
+
+    share->use_count=0;
+    share->table_name_length=length;
+    share->table_name=tmp_name;
+    strmov(share->table_name,table_name);
+
+    if (my_hash_insert(&rocksdb_open_tables, (uchar*) share))
+      goto error;
+    thr_lock_init(&share->lock);
+    //mysql_mutex_init(ex_key_mutex_ROCKSDB_SHARE_mutex,
+    //                 &share->mutex, MY_MUTEX_INIT_FAST);
+  }
+  share->use_count++;
+  mysql_mutex_unlock(&rocksdb_mutex);
+
+  return share;
+
+error:
+ // mysql_mutex_destroy(&share->mutex);
+  my_free(share);
+
+  return NULL;
+}
+
+
+void ha_rocksdb::load_auto_incr_value()
+{
+  int save_active_index= active_index;
+  active_index= table->s->primary_key;
+  uint8 save_table_status= table->status;
+
+  // Do a lookup.
+  if (!index_last(table->record[0]))
+    update_auto_incr_val();
+
+  table->status= save_table_status;
+  active_index= save_active_index;
+
+  /* 
+    Do what ha_rocksdb::index_end() does.
+    (Why don't we use index_init/index_end? class handler defines index_init
+     as private, for some reason).
+  */
+  if (scan_it)
+  {
+    delete scan_it;
+    scan_it= NULL;
+  }
+}
+
+
+/* Get PK value from table->record[0]. */
+void ha_rocksdb::update_auto_incr_val()
+{
+  Field *pk_field;
+  longlong new_val;
+  pk_field= table->key_info[table->s->primary_key].key_part[0].field;
+
+  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+  new_val= pk_field->val_int() + 1;
+  dbug_tmp_restore_column_map(table->read_set, old_map);
+
+  /* TODO: switch to compare-and-swap? */
+  mysql_mutex_lock(&tbl_def->mutex);
+  if (tbl_def->auto_incr_val < new_val)
+    tbl_def->auto_incr_val= new_val;
+  mysql_mutex_unlock(&tbl_def->mutex);
+}
+
+
+/**
+  @brief
+  Free lock controls. We call this whenever we close a table. If the table had
+  the last reference to the share, then we free memory associated with it.
+*/
+
+static int free_share(ROCKSDB_SHARE *share)
+{
+  mysql_mutex_lock(&rocksdb_mutex);
+  if (!--share->use_count)
+  {
+    my_hash_delete(&rocksdb_open_tables, (uchar*) share);
+    thr_lock_delete(&share->lock);
+    //mysql_mutex_destroy(&share->mutex);
+    my_free(share);
+  }
+  mysql_mutex_unlock(&rocksdb_mutex);
+
+  return 0;
+}
+
+
+static handler* rocksdb_create_handler(handlerton *hton,
+                                       TABLE_SHARE *table, 
+                                       MEM_ROOT *mem_root)
+{
+  return new (mem_root) ha_rocksdb(hton, table);
+}
+
+
+ha_rocksdb::ha_rocksdb(handlerton *hton, TABLE_SHARE *table_arg)
+  : handler(hton, table_arg), scan_it(NULL), pk_descr(NULL), 
+    key_descr(NULL), 
+    pk_can_be_decoded(false),
+    pk_tuple(NULL), pk_packed_tuple(NULL), 
+    sec_key_packed_tuple(NULL), sec_key_tails(NULL),
+    lock_rows(FALSE),
+    keyread_only(FALSE)
+{}
+
+
+static const char *ha_rocksdb_exts[] = {
+  NullS
+};
+
+
+const char **ha_rocksdb::bas_ext() const
+{
+  return ha_rocksdb_exts;
+}
+
+
+/*
+  Convert record from table->record[0] form into a form that can be written
+  into rocksdb.
+
+  @param packed_rec OUT Data slice with record data.
+*/
+
+void ha_rocksdb::convert_record_to_storage_format(rocksdb::Slice *packed_rec)
+{
+  Field **field;
+
+  for (field= table->field; *field; field++)
+  {
+    if ((*field)->real_type() == MYSQL_TYPE_VARCHAR)
+    {
+      Field_varstring* field_var= (Field_varstring*)*field;
+      /* Fill unused bytes with zeros */
+      uint used_size= field_var->length_bytes + (*field)->data_length();
+      uint total_size= (*field)->pack_length();
+      memset((*field)->ptr + used_size, 0, total_size - used_size);
+    }
+  }
+
+  if (!table->s->blob_fields)
+  {
+    *packed_rec = rocksdb::Slice((char*)table->record[0], table->s->reclength);
+    return;
+  }
+
+  /* Ok have blob fields */
+  storage_record.length(0);
+  storage_record.append((const char*)table->record[0], table->s->reclength);
+
+  // for each blob column
+  for (field= table->field; *field; field++)
+  {
+    if ((*field)->type() == MYSQL_TYPE_BLOB)
+    {
+      Field_blob *blob= (Field_blob*)(*field);
+      uint32 value_len= blob->get_length();
+      uint length_bytes= blob->pack_length() - 8;
+      char *data_ptr;
+      memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar*));
+
+      storage_record.append(data_ptr, value_len);
+      uint32 size_to_write= htons(value_len);
+      size_t pointer_offset= (blob->ptr - table->record[0]) + length_bytes;
+      memcpy((char*)storage_record.ptr() + pointer_offset, &size_to_write,
+             sizeof(uint32));
+    }
+  }
+  *packed_rec= rocksdb::Slice(storage_record.ptr(), storage_record.length());
+}
+
+
+void ha_rocksdb::convert_record_from_storage_format(rocksdb::Slice *slice, 
+                                                    uchar *buf)
+{
+  if (!table->s->blob_fields)
+  {
+    DBUG_ASSERT(slice->size() == table->s->reclength);
+    memcpy(buf, slice->data(), slice->size());
+  }
+  else
+  {
+    retrieved_record.assign(slice->data(), slice->size());
+    convert_record_from_storage_format(buf);
+  }
+}
+
+
+/*
+  Unpack the record in this->retrieved_record from storage format into 
+  buf (which can be table->record[0] or table->record[1])
+
+  If the table has blobs, the unpacked data in buf may keep pointers to the
+  data in this->retrieved_record.
+*/
+
+void ha_rocksdb::convert_record_from_storage_format(uchar * buf)
+{
+  if (!table->s->blob_fields)
+  {
+    DBUG_ASSERT(retrieved_record.length() == table->s->reclength);
+    memcpy(buf, retrieved_record.c_str(), retrieved_record.length());
+    return;
+  }
+  else
+    unpack_blobs_from_retrieved_record(buf);
+}
+
+
+void ha_rocksdb::unpack_blobs_from_retrieved_record(uchar *buf)
+{
+  /* 
+    Unpack the blobs
+     Blobs in the record are stored as 
+      [record-0 format] [blob data#1] [blob data#2]
+  */
+  memcpy(buf, retrieved_record.c_str(), table->s->reclength);
+
+  const char *blob_ptr= retrieved_record.c_str() + table->s->reclength;
+ 
+  // for each blob column
+  for (Field **field= table->field; *field; field++)
+  {
+    if ((*field)->type() == MYSQL_TYPE_BLOB)
+    {
+      Field_blob *blob= (Field_blob*)(*field);
+      my_ptrdiff_t ptr_diff= buf - table->record[0];
+      blob->move_field_offset(ptr_diff);
+      /* 
+        We've got the blob length when we've memcpy'ed table->record[0]. 
+        But there's still offset instead of blob pointer.
+      */
+      uint32 value_len= blob->get_length();
+      uint length_bytes= blob->pack_length() - 8;
+
+      // set 8-byte pointer to 0, like innodb does.
+      memset(blob->ptr + length_bytes, 0, 8); 
+
+      memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar*));
+
+      blob_ptr += value_len;
+      blob->move_field_offset(-ptr_diff);
+    }
+  }
+}
+
+
+static void make_dbname_tablename(StringBuffer<64> *str, TABLE *table_arg)
+{
+  str->append(table_arg->s->db.str, table_arg->s->db.length);
+  str->append('.');
+  str->append(table_arg->s->table_name.str, table_arg->s->table_name.length);
+  str->c_ptr_safe();
+}
+
+
+int ha_rocksdb::open(const char *name, int mode, uint test_if_locked)
+{
+  DBUG_ENTER("ha_rocksdb::open");
+
+  if (!(share = get_share(name, table)))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  thr_lock_data_init(&share->lock,&lock,NULL);
+  
+  /* 
+    note: pk_descr may be non-NULL here, when using partitions. It seems, 
+    it's ok to discard it
+  */
+  StringBuffer<64> fullname;
+  make_dbname_tablename(&fullname, table);
+  if (!(tbl_def= ddl_manager.find((uchar*)fullname.c_ptr(), fullname.length())))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+  key_descr= tbl_def->key_descr;
+  pk_descr= key_descr[table->s->primary_key];
+
+  pk_key_parts= table->key_info[table->s->primary_key].user_defined_key_parts;
+
+  uint key_len= table->key_info[table->s->primary_key].key_length; 
+  pk_descr->setup(table);  // move this into get_share() ??
+  uint packed_key_len= pk_descr->max_storage_fmt_length();
+
+  if (!(pk_tuple= (uchar*)my_malloc(key_len, MYF(0))) ||
+      !(pk_packed_tuple= (uchar*)my_malloc(packed_key_len, MYF(0))))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  
+  /* Sometimes, we may use sec_key_packed_tuple for storing packed PK */
+  uint max_packed_sec_key_len= packed_key_len;
+  for (uint i= 0; i < table->s->keys; i++)
+  {
+    if (i == table->s->primary_key) /* Primary key was processed above */
+      continue;
+
+    key_descr[i]->setup(table);  // move this into get_share() ??
+
+    uint packed_len= key_descr[i]->max_storage_fmt_length();
+    if (packed_len > max_packed_sec_key_len)
+      max_packed_sec_key_len= packed_len;
+  }
+
+  if (!(sec_key_packed_tuple= (uchar*)my_malloc(max_packed_sec_key_len,
+                                                MYF(0))) ||
+      !((sec_key_tails= (uchar*)my_malloc(max_packed_sec_key_len, MYF(0)))))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+  info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
+  
+  /* TODO: move the following to where TABLE_SHARE is opened: */
+  if (table->next_number_field)
+    load_auto_incr_value();
+
+  DBUG_RETURN(0);
+}
+
+
+int ha_rocksdb::close(void)
+{
+  DBUG_ENTER("ha_rocksdb::close");
+  
+  pk_descr= NULL;
+  key_descr= NULL;
+
+  if (pk_tuple)
+    my_free(pk_tuple);
+  pk_tuple= NULL;
+
+  if (pk_packed_tuple)
+    my_free(pk_packed_tuple);
+  pk_packed_tuple= NULL;
+  
+  if (sec_key_packed_tuple)
+    my_free(sec_key_packed_tuple);
+  sec_key_packed_tuple= NULL;
+  
+  if (sec_key_tails)
+    my_free(sec_key_tails);
+  sec_key_tails= NULL;
+
+  DBUG_RETURN(free_share(share));
+}
+
+
+const int HA_ERR_ROCKSDB_UNIQUE_NOT_SUPPORTED= HA_ERR_LAST+1;
+const int HA_ERR_ROCKSDB_PK_REQUIRED= HA_ERR_LAST+2;
+const int HA_ERR_ROCKSDB_TOO_MANY_LOCKS= HA_ERR_LAST+3;
+
+bool ha_rocksdb::get_error_message(int error, String *buf)
+{
+  if (error == HA_ERR_ROCKSDB_PK_REQUIRED)
+    buf->append("Table must have a PRIMARY KEY");
+  else if (error == HA_ERR_ROCKSDB_UNIQUE_NOT_SUPPORTED)
+    buf->append("Unique indexes are not supported");
+  else if (error == HA_ERR_ROCKSDB_TOO_MANY_LOCKS)
+    buf->append("Number of locks held reached @@rocksdb_max_row_locks");
+  return FALSE; /* not a temporary error */
+}
+
+
+/*
+  Create structures needed for storing data in rocksdb. This is called when the
+  table is created. The structures will be shared by all TABLE* objects.
+
+  @param
+    table_arg  Table with definition
+    db_table   "dbname.tablename" 
+    len        strlen of the above
+
+  @return
+    0      - Ok 
+    other  - error, either given table ddl is not supported by rocksdb or OOM.
+*/
+
+int ha_rocksdb::create_key_defs(TABLE *table_arg, const char *db_table, uint len)
+{
+  uint i;
+  DBUG_ENTER("ha_rocksdb::create_key_defs");
+  uint n_keys= table_arg->s->keys;
+   
+  /* Don't allow to create unique indexes */
+  for (i= 0; i < table_arg->s->keys; i++)
+  {
+    if (table_arg->key_info[i].flags & HA_NOSAME && 
+        i != table_arg->s->primary_key)
+    {
+      DBUG_RETURN(HA_ERR_ROCKSDB_UNIQUE_NOT_SUPPORTED); 
+    }
+  }
+
+  /* Create table/key descriptions and put them into the data dictionary */
+  if (!(key_descr= (RDBSE_KEYDEF**)my_malloc(sizeof(RDBSE_KEYDEF*) * n_keys,
+                                             MYF(MY_ZEROFILL))))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  
+  memset(key_descr, 0, sizeof(RDBSE_KEYDEF*) * n_keys);
+  tbl_def= NULL;
+
+  for (i= 0; i < table_arg->s->keys; i++)
+  {
+    if (!(key_descr[i]= new RDBSE_KEYDEF(ddl_manager.get_next_number(), i)))
+      goto error;
+  }
+  pk_descr= key_descr[table_arg->s->primary_key];
+
+  if (!(tbl_def= new RDBSE_TABLE_DEF))
+    goto error;
+  
+  tbl_def->n_keys= n_keys;
+  tbl_def->key_descr= key_descr;
+  tbl_def->dbname_tablename.append(db_table, len);
+  if (ddl_manager.put_and_write(tbl_def, rdb))
+    goto error;
+
+  DBUG_RETURN(0);
+
+error:
+  /* Delete what we have allocated so far */
+  for (i= 0; i < table_arg->s->keys;i++)
+    delete key_descr[i];
+  delete tbl_def;
+
+  DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+}
+
+
+static int rocksdb_normalize_tablename(const char *tablename,
+                                       StringBuffer<256> *strbuf)
+{
+  DBUG_ASSERT(tablename[0] == '.' && tablename[1] == '/');
+  tablename += 2;
+
+  const char *p= tablename;
+  for (; *p != '/'; p++)
+  {
+    if (*p =='\0')
+    {
+      DBUG_ASSERT(0); // We were not passed table name?
+      return HA_ERR_INTERNAL_ERROR ;
+    }
+  }
+  strbuf->append(tablename, p - tablename);
+  strbuf->append('.');
+  strbuf->append(p + 1);
+  return 0;
+}
+
+
+/**
+  @brief
+  create() is called to create a database. The variable name will have the name
+  of the table.
+
+  @details
+  When create() is called you do not need to worry about
+  opening the table. Also, the .frm file will have already been
+  created so adjusting create_info is not necessary. You can overwrite
+  the .frm file at this point if you wish to change the table
+  definition, but there are no methods currently provided for doing
+  so.
+
+  Called from handle.cc by ha_create_table().
+
+  @see
+  ha_create_table() in handle.cc
+*/
+
+int ha_rocksdb::create(const char *name, TABLE *table_arg,
+                       HA_CREATE_INFO *create_info)
+{
+  int res;
+  StringBuffer<256> strbuf;
+  DBUG_ENTER("ha_rocksdb::create");
+  
+  /* 
+    Construct dbname.tablename ourselves, because parititioning 
+    passes strings like "./test/t14#P#p0" for individual partitions,
+    while table_arg->s->table_name has none of that.
+  */
+  if (rocksdb_normalize_tablename(name, &strbuf))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+  /* 
+    We have HA_REQUIRE_PRIMARY_KEY flag, which guarantees there will be a
+    primary key. 
+    But there is an exception: the query
+
+      ALTER TABLE ... DROP COLUMN primary_key_column
+
+    will attempt to create a table without primary key.
+  */
+  if (table_arg->s->primary_key == MAX_INDEXES)
+  {
+    DBUG_RETURN(HA_ERR_ROCKSDB_PK_REQUIRED);
+  }
+
+  if ((res= create_key_defs(table_arg, strbuf.ptr(), strbuf.length())))
+    DBUG_RETURN(res);
+  
+  if (create_info->auto_increment_value)
+     tbl_def->auto_incr_val= create_info->auto_increment_value;
+
+  /*
+    We only support auto_increment at start of the PRIMARY KEY.
+  */ 
+  Field *field;
+  //if ((field= table_arg->next_number_field))
+  if ((field= table_arg->found_next_number_field))
+  {
+    int pk= table_arg->s->primary_key;
+    Field *pk_field= table_arg->key_info[pk].key_part[0].field;
+    if (field->field_index !=  pk_field->field_index)
+      DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  }
+  DBUG_RETURN(0);
+}
+
+
+/*
+  This function is used only when the table has not yet been opened, and 
+  keyread_allowed bitmap doesn't have the correct values yet.
+  
+  See comment it ha_rocksdb::index_flags() for details.
+*/
+
+bool ha_rocksdb::check_keyread_allowed(uint inx, uint part, bool all_parts) const
+{
+  bool res= true;
+  KEY *key_info= &table_share->key_info[inx];
+  
+  Field_pack_info dummy1;
+  res= dummy1.setup(key_info->key_part[part].field);
+
+  if (res && all_parts)
+  {
+    for (uint i= 0; i < part; i++)
+    {
+      Field *field;
+      if ((field= key_info->key_part[i].field))
+      {
+        Field_pack_info dummy;
+        if (!dummy.setup(field))
+        {
+          /* Cannot do index-only reads for this column */
+          res= false;
+          break;
+        }
+      }
+    }
+  }
+  
+  uint pk= table_share->primary_key;
+  if (inx == pk && all_parts && 
+      part == table_share->key_info[pk].user_defined_key_parts)
+  {
+    pk_can_be_decoded= res;
+  }
+
+  return res;
+}
+
+
+/*
+  @note
+    The problem with this function is that SQL layer calls it, when
+     - the table has not been yet opened (no ::open() call done)
+     - this->table_share already exists, but it is in the process of being
+       filled, so some of fields are still NULL.
+     - In particular, table_share->key_info[inx].key_part[] is filled only up
+       to part #part. Subsequent key parts are not yet filled.
+
+    To complicate things further, SQL layer will call index_flags() with 
+    all_parts=TRUE. Essentially, we're asked to provide flags for reading
+    keyparts whose datatype is not yet known.
+
+    We walk around this problem by using check_keyread_allowed(), which uses
+    table_share object and is careful not to step on unitialized data.
+
+    When we get a call with all_parts=TRUE, we try to analyze all parts but
+    ignore those that have key_part->field==NULL (these are not initialized
+    yet).
+*/
+
+ulong ha_rocksdb::index_flags(uint inx, uint part, bool all_parts) const
+{
+  ulong base_flags= HA_READ_NEXT  |   // doesn't seem to be used
+                    HA_READ_ORDER | 
+                    HA_READ_RANGE |
+                    HA_READ_PREV;
+
+  if (check_keyread_allowed(inx, part, all_parts))
+    base_flags |= HA_KEYREAD_ONLY;
+
+  /* 
+    Index-only reads on primary key are the same as table scan for us. Still,
+    we need to explicitly "allow" them, otherwise SQL layer will miss some
+    plans.
+  */
+  if (inx == table_share->primary_key)
+    base_flags |= HA_KEYREAD_ONLY;
+
+  return base_flags;
+}
+
+
+/*
+  Read next index tuple through the secondary index.
+*/
+
+int ha_rocksdb::secondary_index_read(int keyno, uchar *buf)
+{
+  if (scan_it->Valid())
+  {
+    rocksdb::Slice key= scan_it->key();
+
+    /* Check if we've ran out of records of this index */
+    if (key_descr[keyno]->covers_key(key.data(), key.size()))
+    {
+      int rc;
+      //TODO: We could here check if we have ran out of range we're scanning
+      uint size= key_descr[keyno]->get_primary_key_tuple(pk_descr, key.data(), 
+                                                         (char*) pk_packed_tuple);
+      last_rowkey.copy((const char*)pk_packed_tuple, size, &my_charset_bin);
+
+      if (keyread_only && !lock_rows)
+      {
+        rocksdb::Slice value= scan_it->value();
+        if (key_descr[keyno]->unpack_record(table, buf, &key, &value))
+          return HA_ERR_INTERNAL_ERROR;
+        rc= 0;
+      }
+      else
+        rc= get_row_by_rowid(buf, (const char*)pk_packed_tuple, size);
+
+      if (!rc)
+      {
+        table->status= 0;
+        return rc;
+      }
+    }
+  }
+  table->status= STATUS_NOT_FOUND;
+  return HA_ERR_END_OF_FILE;
+}
+
+
+//void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
+
+int ha_rocksdb::index_read_map(uchar *buf, const uchar *key,
+                               key_part_map keypart_map,
+                               enum ha_rkey_function find_flag)
+{
+  int rc= 0;
+  DBUG_ENTER("ha_rocksdb::index_read_map");
+
+  uint key_parts= table->key_info[active_index].user_defined_key_parts;
+  RDBSE_KEYDEF *kd= key_descr[active_index];
+
+  if (active_index == table->s->primary_key && 
+      find_flag == HA_READ_KEY_EXACT &&
+      ((keypart_map == ((key_part_map(1) << key_parts) - 1) ||
+        keypart_map == HA_WHOLE_KEY)))
+  {
+    /* 
+      Equality lookup over primary key, using full tuple. 
+      This is a special case, use DB::Get.
+    */
+    uint size= kd->pack_index_tuple(table, pk_packed_tuple, key, keypart_map);
+    rc= get_row_by_rowid(buf, (const char*)pk_packed_tuple, size);
+    DBUG_RETURN(rc);
+  }
+
+  uint n_used_parts= my_count_bits(keypart_map);
+
+  uint packed_size; 
+
+  packed_size= kd->pack_index_tuple(table, sec_key_packed_tuple, key, 
+                                    keypart_map);
+
+  if (find_flag == HA_READ_PREFIX_LAST_OR_PREV || 
+      find_flag == HA_READ_PREFIX_LAST)
+  {
+    /* See below */
+    kd->successor(sec_key_packed_tuple, packed_size);
+  }
+
+  rocksdb::Slice slice((char*)sec_key_packed_tuple, packed_size);
+  
+
+  rocksdb::Slice rkey;
+  /*
+    This will open the iterator and position it at a record that's equal or
+    greater than the lookup tuple.
+  */
+  setup_index_scan(&slice);
+  bool move_forward= true;
+
+
+  switch (find_flag) {
+  case HA_READ_KEY_EXACT:
+  {
+    if (!scan_it->Valid())
+      rc= HA_ERR_KEY_NOT_FOUND;
+    else
+    {
+      rkey= scan_it->key();
+      if (!kd->covers_key(rkey.data(), rkey.size()) ||
+           kd->cmp_full_keys(rkey.data(), rkey.size(), 
+                             slice.data(), slice.size(), n_used_parts))
+      {
+        /* 
+          The record we've got is not from this index, or is not equal to the
+          lookup table
+        */
+        rc= HA_ERR_KEY_NOT_FOUND;
+      }
+    }
+    break;
+  }
+  case HA_READ_KEY_OR_NEXT:
+  {
+    if (!scan_it->Valid())
+      rc= HA_ERR_KEY_NOT_FOUND;
+    else
+    {
+      rkey= scan_it->key();
+      if (!kd->covers_key(rkey.data(), rkey.size()))
+      {
+        /* The record we've got is not from this index */
+        rc= HA_ERR_KEY_NOT_FOUND;
+      }
+    }
+    break;
+  }
+  case HA_READ_BEFORE_KEY:
+  {
+    move_forward= false;
+    /* We want to read the record that's right *before* the given key.  */
+    if (!scan_it->Valid())
+    {
+      /*
+        All the values in the database are smaller than our key. Two cases
+         - our index is the last in db. Its last value is a match 
+         - our index has no records (in that case we will get a record from 
+           our index and detect it below)
+      */
+      scan_it->SeekToLast();
+    }
+    else
+    {
+      /*
+        RocksDB iterator is positioned at "the first key in the source that 
+        at or past target".
+        We need to step one key back, so that we're at the last key that is
+        before the target.
+        If the passed key is greater than the max. value that is found in the
+        table, then iterator is pointing at the *first* record in subsequent
+        table/index.
+      */
+      scan_it->Prev();
+    }
+    /* fall through */
+  }
+  case HA_READ_AFTER_KEY:
+  {
+    bool in_key;
+    bool have_row;
+    /* 
+      Walk forward until we've found a record that is not equal to the lookup
+      tuple, but still belongs to this index.
+    */
+    while ((have_row= scan_it->Valid()))
+    {
+      rkey= scan_it->key();
+      if (!(in_key= kd->covers_key(rkey.data(), rkey.size())) ||
+          kd->cmp_full_keys(rkey.data(), rkey.size(), 
+                            slice.data(), slice.size(),
+                            n_used_parts))
+        break;
+
+      if (move_forward) 
+        scan_it->Next();
+      else
+        scan_it->Prev();
+    }
+    if (!have_row || !in_key)
+      rc= HA_ERR_END_OF_FILE;
+    break;
+  }
+  case HA_READ_KEY_OR_PREV:
+  {
+    if (!scan_it->Valid())
+    {
+      /*
+        We're after the last value in the database. It could be we needed the
+        last one.
+      */
+      scan_it->SeekToLast();
+    }
+    /* We should see a key that is less-or-equal than specified */
+    bool in_key;
+    bool have_row;
+    while ((have_row= scan_it->Valid()))
+    {
+      rkey= scan_it->key();
+      if (!(in_key= kd->covers_key(rkey.data(), rkey.size())) ||
+           kd->cmp_full_keys(rkey.data(), rkey.size(), 
+                             slice.data(), slice.size(),
+                             n_used_parts) <= 0)
+        break;
+      scan_it->Prev();
+    }
+    if (!have_row || !in_key)
+      rc= HA_ERR_END_OF_FILE;
+    break;
+  }
+  case HA_READ_PREFIX_LAST:
+  case HA_READ_PREFIX_LAST_OR_PREV:
+  {
+    /* 
+      Given a prefix of (VAL1,VAL2), get the last record that has
+      (kp1,kp2)=(VAL1,VAL2).  This cannot be translated directly to RocksDB
+      Iterator command.
+
+      We navigate to (VAL1,VAL2+1) and then step one record back. 
+    */
+    if (!scan_it->Valid())
+    {
+      /*
+        We're after the last value in the database. It could be we needed the
+        last one.
+      */
+      scan_it->SeekToLast();
+    }
+    else
+      scan_it->Prev();
+
+    if (scan_it->Valid())
+    {
+      rc= 0;
+      rkey= scan_it->key();
+      if (!kd->covers_key(rkey.data(), rkey.size()))
+      {
+        /* The record we've got is not from this index */
+        rc= HA_ERR_KEY_NOT_FOUND;
+      }
+      else
+      if (find_flag == HA_READ_PREFIX_LAST) 
+      {
+        packed_size= kd->pack_index_tuple(table, sec_key_packed_tuple, key, 
+                                          n_used_parts);
+        /*
+          Check if the record has the same search prefix.
+        */
+        if (kd->cmp_full_keys(rkey.data(), rkey.size(), 
+                              (const char*)sec_key_packed_tuple, packed_size,
+                              n_used_parts))
+          rc= HA_ERR_END_OF_FILE;
+      }
+    }
+    else
+      rc= HA_ERR_KEY_NOT_FOUND;
+
+    break;
+  }
+  default:
+    DBUG_ASSERT(0);
+    break;
+  }
+
+  if (rc)
+  {
+    table->status= STATUS_NOT_FOUND; /* Only not found error is possible */
+    DBUG_RETURN(rc);
+  }
+
+  skip_scan_it_next_call= FALSE;
+  uint pk_size;
+  
+  if (active_index == table->s->primary_key)
+  {
+    pk_size= rkey.size();
+    memcpy(pk_packed_tuple, rkey.data(), pk_size);
+    last_rowkey.copy(rkey.data(), pk_size, &my_charset_bin);
+    if (lock_rows)
+    {
+      /* We need to put a lock and re-read */
+      // TODO: what if we find that the row is gone? Need a re-read?
+      rc= get_row_by_rowid(buf, (const char*)pk_packed_tuple, pk_size);
+    }
+    else
+    {
+      /* Unpack from the row we've read */
+      rocksdb::Slice value= scan_it->value();
+      convert_record_from_storage_format(&value, buf);
+    }
+  }
+  else
+  {
+    pk_size= kd->get_primary_key_tuple(pk_descr, rkey.data(), 
+                                       (char*) pk_packed_tuple);
+    last_rowkey.copy((const char*)pk_packed_tuple, pk_size, &my_charset_bin);
+
+    if (keyread_only && !lock_rows)
+    {
+      /* Get the key columns and primary key value */
+      rocksdb::Slice value= scan_it->value();
+      if (kd->unpack_record(table, buf, &rkey, &value))
+        rc= HA_ERR_INTERNAL_ERROR;
+      else
+        rc= 0;
+    }
+    else
+      rc= get_row_by_rowid(buf, (const char*)pk_packed_tuple, pk_size);
+  }
+
+  if (rc)
+  {
+    /* the only possible error condition is record-not-found */
+    table->status= STATUS_NOT_FOUND;
+  }
+  else
+    table->status= 0;
+  DBUG_RETURN(rc);
+}
+
+
+int ha_rocksdb::index_read_last_map(uchar * buf, const uchar * key, 
+                                    key_part_map keypart_map)
+{
+  return index_read_map(buf, key, keypart_map, HA_READ_PREFIX_LAST);
+}
+
+
+static void dbug_dump_str(FILE *out, const char *str, int len)
+{
+  fprintf(out, "\"");
+  for (int i= 0; i < len; i++)
+  {
+    if (str[i] > 32)
+      fprintf(out, "%c", str[i]);
+    else
+      fprintf(out, "\\%d", str[i]);
+  }
+  fprintf(out, "\"");
+}
+
+
+/*
+  Debugging help: dump the whole database into a human-readable file.
+  Usage: 
+    dbug_dump_database(rdb);
+*/
+
+void dbug_dump_database(rocksdb::DB *db)
+{
+  FILE *out= fopen("/tmp/rocksdb.dump", "wt");
+  if (!out)
+    return;
+
+  rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    rocksdb::Slice key= it->key();
+    rocksdb::Slice val= it->value();
+    dbug_dump_str(out, key.data(), key.size());
+    fprintf(out, " -> ");
+    dbug_dump_str(out, val.data(), val.size());
+    fprintf(out, "\n");
+  }
+  //assert(it->status().ok());  // Check for any errors found during the scan
+  delete it;
+  fclose(out);
+}
+
+
+int ha_rocksdb::get_row_by_rowid(uchar *buf, const char *rowid, uint rowid_size)
+{
+  int rc;
+  DBUG_ENTER("ha_rocksdb::get_row_by_rowid");
+
+  rocksdb::Slice key_slice(rowid, rowid_size);
+
+  Rdb_transaction *trx= get_or_create_trx(table->in_use);
+  Row_lock *lock;
+  bool timed_out;
+  if (lock_rows)
+  {
+    if (!(lock= trx->get_lock((const uchar*)rowid, rowid_size, &timed_out)))
+      DBUG_RETURN(return_lock_failure(timed_out));
+  }
+
+  rocksdb::ReadOptions options;
+  if (!lock_rows)
+    options.snapshot= trx->snapshot;
+  
+  bool found;
+  if (!trx->changes.Get(key_slice, &retrieved_record, &found))
+  {
+    rocksdb::Status s= rdb->Get(options, key_slice, &retrieved_record);
+    found= !s.IsNotFound();
+  }
+
+  if (found)
+  {
+    if (lock_rows)
+      trx->add_lock(lock);
+    last_rowkey.copy((const char*)rowid, rowid_size, &my_charset_bin);
+    convert_record_from_storage_format(buf);
+    table->status= 0;
+    rc= 0;
+  }
+  else
+  {
+    if (lock_rows)
+      row_locks.release_lock(trx->pins, lock);
+    table->status= STATUS_NOT_FOUND;
+    rc= HA_ERR_KEY_NOT_FOUND;
+  }
+
+  DBUG_RETURN(rc);
+}
+
+
+int ha_rocksdb::index_next(uchar *buf)
+{
+  return index_next_with_direction(buf, true);
+}
+
+
+int ha_rocksdb::index_prev(uchar *buf)
+{
+  return index_next_with_direction(buf, false);
+}
+
+
+int ha_rocksdb::index_next_with_direction(uchar *buf, bool move_forward)
+{
+  int rc;
+  DBUG_ENTER("ha_rocksdb::index_next");
+
+  if (active_index == table->s->primary_key)
+    rc= rnd_next_with_direction(buf, move_forward);
+  else
+  {
+    if (skip_scan_it_next_call)
+      skip_scan_it_next_call= FALSE;
+    else
+    {
+      if (move_forward)
+        scan_it->Next();  /* this call cannot fail */
+      else
+        scan_it->Prev();
+    }
+
+    rc= secondary_index_read(active_index, buf);
+  }
+
+  DBUG_RETURN(rc);
+}
+
+
+int ha_rocksdb::index_first(uchar *buf)
+{
+  uchar *key;
+  uint key_size;
+  int rc;
+  DBUG_ENTER("ha_rocksdb::index_first");
+  
+  key= (active_index == table->s->primary_key)? pk_packed_tuple : 
+                                                sec_key_packed_tuple;
+  key_descr[active_index]->get_infimum_key(key, &key_size);
+  rocksdb::Slice index_key((const char*)key, key_size);
+
+  setup_index_scan(&index_key);
+  skip_scan_it_next_call= TRUE;
+
+  rc= index_next(buf);
+  DBUG_RETURN(rc);
+}
+
+
+int ha_rocksdb::index_last(uchar *buf)
+{
+  uchar *key;
+  uint key_size;
+  int rc;
+  DBUG_ENTER("ha_rocksdb::index_last");
+  
+  key= (active_index == table->s->primary_key)? pk_packed_tuple : 
+                                                sec_key_packed_tuple;
+  key_descr[active_index]->get_supremum_key(key, &key_size);
+  rocksdb::Slice index_key((const char*)key, key_size);
+
+  setup_index_scan(&index_key);
+
+  if (!scan_it->Valid())
+  {
+    /*
+      RocksDB keyspace has no records that follow our index. It could be, our
+      index is the last in the keyspace. Navigate to last record in the
+      keyspace. 
+    */
+    scan_it->SeekToLast();
+  }
+  else
+  {
+    /*
+      Iterator is pointing at some record right after the end of the index.
+      Move one record back.
+    */
+    scan_it->Prev();
+  }
+  skip_scan_it_next_call= FALSE;
+  
+  if (active_index == table->s->primary_key)
+  {
+    skip_scan_it_next_call= true;
+    rc= rnd_next_with_direction(buf, false);
+  }
+  else
+    rc= secondary_index_read(active_index, buf);
+
+  DBUG_RETURN(rc);
+}
+
+
+void ha_rocksdb::unlock_row()
+{
+  if (lock_rows)
+  {
+    Rdb_transaction *trx= get_or_create_trx(table->in_use);
+    trx->release_last_lock();
+  }
+}
+
+
+//static const char* empty_str="";
+
+int ha_rocksdb::write_row(uchar *buf)
+{
+  DBUG_ENTER("ha_rocksdb::write_row");
+  DBUG_ASSERT(buf == table->record[0]);
+  DBUG_ASSERT(lock_rows);
+  
+  /* Update AUTO_INCREMENT column */
+  bool using_auto_incr_value= false;
+
+  /*
+    Note: "buf == table->record[0]" is copied from innodb. I am not aware of 
+    any use cases where this condition is not true.
+  */
+  if (table->next_number_field && buf == table->record[0])
+  {
+    int err;
+    if ((err= update_auto_increment()))
+    {
+      DBUG_RETURN(err);
+    }
+    using_auto_incr_value= true;
+  }
+
+  /* Get a packed primary key value */
+  uint packed_size= pk_descr->pack_record(table, buf, pk_packed_tuple, NULL, NULL);
+  rocksdb::Slice key_slice((const char*)pk_packed_tuple, packed_size);
+
+  Rdb_transaction *trx= get_or_create_trx(table->in_use);
+  bool do_locking= !THDVAR(table->in_use, bulk_load);
+  Row_lock *lock= NULL; // init to shut up the compiler
+
+  if (do_locking)
+  {
+    /* 
+      Get a record lock to make sure we do not overwrite somebody's changes 
+    */
+    bool timed_out;
+    if (!(lock= trx->get_lock(pk_packed_tuple, packed_size, &timed_out)))
+    {
+      DBUG_RETURN(return_lock_failure(timed_out));
+    }
+
+    /* 
+      note: we intentionally don't set options.snapshot here. We want to read 
+      the latest committed data.
+    */ 
+    rocksdb::ReadOptions options; 
+
+    bool found;
+    if (!trx->changes.Get(key_slice, &retrieved_record, &found))
+    {
+      rocksdb::Status s= rdb->Get(options, key_slice, &retrieved_record);
+      found= !s.IsNotFound();
+    }
+
+    if (found) 
+    {
+      /* There is a row with this rowid already */
+      row_locks.release_lock(trx->pins, lock);
+      DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+    }
+  }
+
+  /* Ok, we keep a lock. This will prevent concurrent writes with this rowid */
+  trx->prepare_for_write();
+  
+  if (do_locking)
+    trx->add_lock(lock); /* the lock will be released when trx commits */
+
+  /* Write every secondary index */
+  for (uint i= 0; i < table->s->keys; i++)
+  {
+    if (i != table->s->primary_key)
+    {
+      int packed_size;
+      int tail_size;
+
+      packed_size= key_descr[i]->pack_record(table, buf, sec_key_packed_tuple,
+                                             sec_key_tails, &tail_size);
+
+      rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, 
+                                         packed_size);
+      rocksdb::Slice value_slice((const char*)sec_key_tails, tail_size);
+      trx->changes.Put(secondary_key_slice, value_slice);
+      trx->n_writes++;
+    }
+  }
+ 
+  /* Write primary_key -> record */
+  if (!using_auto_incr_value && table->next_number_field)
+    update_auto_incr_val();
+
+  rocksdb::Slice value_slice;
+  convert_record_to_storage_format(&value_slice);
+  
+  trx->changes.Put(key_slice, value_slice);
+  trx->n_writes++;
+
+  if (do_locking && trx->n_writes > THDVAR(table->in_use, bulk_load_size))
+  {
+    /* Bulk insert mode: flush the write batch if it became too big */
+    if (trx->flush_batch())
+      DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  }
+
+  DBUG_RETURN(0);
+}
+
+
+/*
+  Open a cursor and position it at the passed record
+*/
+
+void ha_rocksdb::setup_index_scan(rocksdb::Slice *slice)
+{
+  Rdb_transaction *trx= get_or_create_trx(table->in_use);
+
+  /*
+    SQL layer can call rnd_init() multiple times in a row.
+    In that case, re-use the iterator, but re-position it at the table start.
+  */
+  if (!scan_it)
+  {
+    rocksdb::ReadOptions options;
+    if (!lock_rows)
+      options.snapshot= trx->snapshot;
+    rocksdb::Iterator* rocksdb_it= rdb->NewIterator(options);
+    scan_it= new Apply_changes_iter;
+    scan_it->init(&trx->changes, rocksdb_it);
+  }
+  
+  /* 
+    Seek() will "Position at the first key in the source that at or past target".
+    The operation cannot fail.
+  */
+  scan_it->Seek(*slice);
+}
+
+
+int ha_rocksdb::rnd_init(bool scan)
+{
+  DBUG_ENTER("ha_rocksdb::rnd_init");
+  
+  if (scan)
+  {
+    uint key_size;
+    pk_descr->get_infimum_key(pk_packed_tuple, &key_size);
+    rocksdb::Slice table_key((const char*)pk_packed_tuple, key_size);
+
+    setup_index_scan(&table_key);
+    skip_scan_it_next_call= true;
+  }
+  else
+  {
+    /* We don't need any preparations for rnd_pos() calls. */
+  }
+
+  DBUG_RETURN(0);
+}
+
+
+int ha_rocksdb::rnd_next(uchar *buf)
+{
+  return rnd_next_with_direction(buf, true);
+}
+
+
+/*
+  See also secondary_index_read(). 
+*/
+int ha_rocksdb::rnd_next_with_direction(uchar *buf, bool move_forward)
+{
+  int rc;
+  DBUG_ENTER("ha_rocksdb::rnd_next");
+  
+  if (!scan_it)
+  {
+    /*
+      We can get here when SQL layer has called
+
+        h->index_init(PRIMARY);
+        h->index_read_map(full index tuple, HA_READ_KEY_EXACT);
+
+      In this case, we should return EOF.
+    */
+    table->status= STATUS_NOT_FOUND;
+    DBUG_RETURN(HA_ERR_END_OF_FILE);
+  }
+
+retry:
+  if (skip_scan_it_next_call)
+    skip_scan_it_next_call= false;
+  else
+  {
+    if (move_forward)
+      scan_it->Next();  /* this call cannot fail */
+    else
+      scan_it->Prev();  /* this call cannot fail */
+  }
+
+  if (scan_it->Valid())
+  {
+    /* check if we're out of this table */
+    rocksdb::Slice key= scan_it->key();
+    if (!pk_descr->covers_key(key.data(), key.size()))
+    {
+      rc= HA_ERR_END_OF_FILE;
+      table->status= STATUS_NOT_FOUND;
+    }
+    else
+    {
+      if (lock_rows)
+      {
+        Rdb_transaction *trx= get_or_create_trx(table->in_use);
+        /*
+          Here is a problem. We've read a row without locking it.
+          We could not lock the row before reading it, because we didn't know
+          its rowkey.
+          
+          Current solution is: lock the row and re-read it.
+        */
+        Row_lock *lock;
+        bool timed_out;
+        if (!(lock= trx->get_lock((const uchar*)key.data(), key.size(), &timed_out)))
+          DBUG_RETURN(return_lock_failure(timed_out));
+        
+        rocksdb::ReadOptions options;
+        /* lock_rows==TRUE, so not setting options.snapshot */
+        bool found;
+        if (!trx->changes.Get(key, &retrieved_record, &found))
+        {
+          rocksdb::Status s= rdb->Get(options, key, &retrieved_record);
+          found= !s.IsNotFound();
+        }
+
+        if (!found) 
+        {
+          /* The row is not there already. Try another one */
+          row_locks.release_lock(trx->pins, lock);
+          goto retry;
+        }
+        trx->add_lock(lock);
+
+        last_rowkey.copy(key.data(), key.size(), &my_charset_bin);
+        convert_record_from_storage_format(buf);
+        table->status= 0;
+        rc= 0;
+      }
+      else
+      {
+        last_rowkey.copy(key.data(), key.size(), &my_charset_bin);
+        rocksdb::Slice value= scan_it->value();
+        convert_record_from_storage_format(&value, buf);
+        rc= 0;
+        table->status= 0;
+      }
+    }
+  }
+  else
+    rc= HA_ERR_END_OF_FILE;
+
+  DBUG_RETURN(rc);
+}
+
+
+int ha_rocksdb::rnd_end()
+{
+  DBUG_ENTER("ha_rocksdb::rnd_end");
+
+  if (scan_it)
+  {
+    delete scan_it;
+    scan_it= NULL;
+  }
+  DBUG_RETURN(0);
+}
+
+
+int ha_rocksdb::index_end() 
+{
+
+  if (scan_it)
+  {
+    delete scan_it;
+    scan_it= NULL;
+  }
+
+  active_index= MAX_KEY; 
+  return 0;
+}
+
+
+int ha_rocksdb::delete_all_rows()
+{
+  DBUG_ENTER("ha_rocksdb::delete_all_rows");
+  /*
+    We will have to delete records one-by-one anyway. Let the SQL layer do
+    it.
+  */
+  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+}
+
+int ha_rocksdb::truncate()
+{
+  mysql_mutex_lock(&tbl_def->mutex);
+  tbl_def->auto_incr_val= 1;
+  mysql_mutex_unlock(&tbl_def->mutex);
+  remove_rows(tbl_def);
+  return 0;
+}
+
+/*
+  Delete the row we've last read. Ignore the parameter.
+  (Check out InnoDB: row_update_for_mysql() has "UT_NOT_USED(mysql_rec)")
+*/
+int ha_rocksdb::delete_row(const uchar *buf)
+{
+  DBUG_ENTER("ha_rocksdb::delete_row");
+  
+  rocksdb::Slice key_slice(last_rowkey.ptr(), last_rowkey.length());
+  Rdb_transaction *trx= get_or_create_trx(table->in_use);
+  trx->prepare_for_write();
+  trx->changes.Delete(key_slice);
+  DBUG_ASSERT(buf == table->record[0]);
+
+  // Delete the record for every secondary index
+  for (uint i= 0; i < table->s->keys; i++)
+  {
+    if (i != table->s->primary_key)
+    {
+      int packed_size;
+      packed_size= key_descr[i]->pack_record(table, buf, sec_key_packed_tuple,
+                                             NULL, NULL);
+      rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, 
+                                         packed_size);
+      trx->changes.Delete(secondary_key_slice);
+    }
+  }
+
+  DBUG_RETURN(0);
+}
+
+
+int ha_rocksdb::info(uint flag)
+{
+  DBUG_ENTER("ha_rocksdb::info");
+  
+  if (!table)
+    return 1;
+
+  if (flag & HA_STATUS_VARIABLE)
+  {
+    stats.records= 1000;
+  }
+  if (flag & HA_STATUS_CONST)
+  {
+    DBUG_ASSERT(table->s->primary_key != MAX_KEY);
+
+    ref_length= pk_descr->max_storage_fmt_length();
+  }
+
+  if (flag & HA_STATUS_ERRKEY)
+  {
+    /* 
+      Currently we support only primary keys so we know which key had a
+      uniqueness violation.
+    */
+    errkey= 0;
+    dup_ref= pk_tuple; //TODO: this should store packed PK.
+  }
+
+  DBUG_RETURN(0);
+}
+
+
+void ha_rocksdb::position(const uchar *record)
+{
+  DBUG_ENTER("ha_rocksdb::position");
+
+  memcpy(ref, last_rowkey.ptr(), ref_length);
+  DBUG_VOID_RETURN;
+}
+
+
+int ha_rocksdb::rnd_pos(uchar *buf, uchar *pos)
+{
+  int rc;
+  DBUG_ENTER("ha_rocksdb::rnd_pos");
+
+  rc= get_row_by_rowid(buf, (const char*)pos, ref_length);
+
+  DBUG_RETURN(rc);
+}
+
+
+int ha_rocksdb::return_lock_failure(bool timed_out)
+{
+  THD *thd;
+  if ((thd= table->in_use)) 
+  {
+    thd_mark_transaction_to_rollback(thd, false);
+  }
+  return timed_out? HA_ERR_LOCK_WAIT_TIMEOUT : HA_ERR_ROCKSDB_TOO_MANY_LOCKS;
+}
+
+
+int ha_rocksdb::update_row(const uchar *old_data, uchar *new_data)
+{
+  DBUG_ENTER("ha_rocksdb::update_row");
+  DBUG_ASSERT(lock_rows);
+
+  /*
+    old_data points to record we're updating. It is the same as the record
+    we've just read (for multi-table UPDATE, too, because SQL layer will make
+    an rnd_pos() call to re-read the record before calling update_row())
+  */
+  DBUG_ASSERT(new_data == table->record[0]);
+ 
+  /* Get a rowkey fo the new record */
+  uint packed_size= pk_descr->pack_record(table, new_data, pk_packed_tuple,
+                                          NULL, NULL);
+
+  rocksdb::Slice key_slice((const char*)pk_packed_tuple, packed_size);
+  Rdb_transaction *trx= get_or_create_trx(table->in_use);
+
+  /* Compare rowkey for the new record with rowkey for the old one */
+  rocksdb::Slice old_key_slice(last_rowkey.ptr(), last_rowkey.length());
+  if (primary_key_comparator.Compare(key_slice, old_key_slice))
+  {
+    /* Ok, they are different. We're doing DELETE+PUT */
+
+    /* 
+      We need to make sure that PUT will not overwrite some existing record.
+      Get a lock. on what we're going to write.
+    */
+    Row_lock *lock;
+    bool timed_out;
+    if (!(lock= trx->get_lock(pk_packed_tuple, packed_size, &timed_out)))
+      DBUG_RETURN(return_lock_failure(timed_out));
+
+    bool found;
+    if (!trx->changes.Get(key_slice, &retrieved_record, &found))
+    {
+      rocksdb::ReadOptions options; 
+      /* Not setting options.snapshot, we need to check the real contents */
+      rocksdb::Status s= rdb->Get(options, key_slice, &retrieved_record);
+      found= !s.IsNotFound();
+    }
+
+    if (found) 
+    {
+      row_locks.release_lock(trx->pins, lock);
+      DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+    }
+
+    trx->prepare_for_write();
+    trx->add_lock(lock);
+    trx->changes.Delete(old_key_slice);
+  }
+
+  trx->prepare_for_write();
+
+  /* Update every index */
+  for (uint i= 0; i < table->s->keys; i++)
+  {
+    if (i != table->s->primary_key)
+    {
+      int packed_size;
+      int tail_size;
+      // TODO: compare old/new fields, and update only when actually necessary.
+
+      // First, a Delete()
+      {
+        packed_size= key_descr[i]->pack_record(table, old_data, 
+                                               sec_key_packed_tuple, 
+                                               NULL, NULL);
+        rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, 
+                                           packed_size);
+        trx->changes.Delete(secondary_key_slice);
+      }
+
+      // Then, Put().
+      {
+        packed_size= key_descr[i]->pack_record(table, new_data, 
+                                               sec_key_packed_tuple,
+                                               sec_key_tails, &tail_size);
+        rocksdb::Slice secondary_key_slice((char*)sec_key_packed_tuple, 
+                                           packed_size);
+        rocksdb::Slice value_slice((const char*)sec_key_tails, tail_size);
+        trx->changes.Put(secondary_key_slice, value_slice);
+      }
+    }
+  }
+  
+  if (table->next_number_field)
+    update_auto_incr_val();
+  /* Write the table record */
+  rocksdb::Slice value_slice;
+  convert_record_to_storage_format(&value_slice);
+  trx->changes.Put(key_slice, value_slice);
+
+  DBUG_RETURN(0);
+}
+
+
+/* The following function was copied from ha_blackhole::store_lock: */
+THR_LOCK_DATA **ha_rocksdb::store_lock(THD *thd,
+                                         THR_LOCK_DATA **to,
+                                         enum thr_lock_type lock_type)
+{
+  DBUG_ENTER("ha_rocksdb::store_lock");
+  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
+  {
+    /*
+      Here is where we get into the guts of a row level lock.
+      If TL_UNLOCK is set
+      If we are not doing a LOCK TABLE or DISCARD/IMPORT
+      TABLESPACE, then allow multiple writers
+    */
+
+    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
+         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
+        && !thd_tablespace_op(thd))
+      lock_type = TL_WRITE_ALLOW_WRITE;
+
+    /*
+      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
+      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
+      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
+      to t2. Convert the lock to a normal read lock to allow
+      concurrent inserts to t2.
+    */
+
+    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
+      lock_type = TL_READ;
+
+    lock.type= lock_type;
+  }
+  *to++= &lock;
+  DBUG_RETURN(to);
+}
+
+
+int ha_rocksdb::external_lock(THD *thd, int lock_type)
+{
+  int res= 0;
+  DBUG_ENTER("ha_rocksdb::external_lock");
+  if (lock_type == F_UNLCK)
+  {
+    if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) 
+    {
+      /*
+        Do like InnoDB: when we get here, it's time to commit a
+        single-statement transaction.
+        
+        If the statement involved multiple tables, this code will be executed
+        for each of them, but that's ok because non-first trx->commit() calls
+        will be no-ops.
+      */
+      Rdb_transaction* trx= get_or_create_trx(thd);
+      res= trx->commit();
+      delete scan_it;
+      scan_it= NULL;
+    }
+    lock_rows= FALSE;
+  } 
+  else
+  {
+    /*
+      It's nice to do the following on start of every statement. The problem
+      is, handler->start_stmt() is not called for INSERTs. 
+      So, we put this code here.
+    */
+    Rdb_transaction* trx= get_or_create_trx(thd);
+    trx->set_params(THDVAR(table->in_use, lock_wait_timeout), 
+                    THDVAR(table->in_use, max_row_locks));
+
+    if (lock_type == F_WRLCK)
+    {
+      /* 
+        SQL layer signals us to take a write lock. It does so when starting DML
+        statement. We should put locks on the rows we're reading.
+      */
+      lock_rows= TRUE;
+    }
+
+    trans_register_ha(thd, FALSE, rocksdb_hton);
+    if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+    {
+      trx->start_stmt();
+      trans_register_ha(thd, TRUE, rocksdb_hton);
+    }
+  }
+
+  DBUG_RETURN(res);
+}
+
+
+int ha_rocksdb::start_stmt(THD *thd, thr_lock_type lock_type)
+{
+  if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+  {
+    Rdb_transaction* trx= get_or_create_trx(thd);
+    trx->set_params(THDVAR(table->in_use, lock_wait_timeout), 
+                    THDVAR(table->in_use, max_row_locks));
+    trx->start_stmt();
+  }
+  return 0;
+}
+
+/*
+  Note: the following function is called when the table is not open. That is, 
+  this->table==NULL, pk_key_descr==NULL, etc. 
+
+  tablename points to line in form "./dbname/tablename".
+*/
+
+int ha_rocksdb::delete_table(const char *tablename)
+{
+  StringBuffer<256> strbuf;
+  DBUG_ENTER("ha_rocksdb::delete_table");
+
+  if (rocksdb_normalize_tablename(tablename, &strbuf))
+  {
+    DBUG_ASSERT(0); // We were not passed table name?
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  }
+
+
+  /* Find the table in the hash */
+  RDBSE_TABLE_DEF *tbl;
+  if (!(tbl= ddl_manager.find((uchar*)strbuf.c_ptr(), strbuf.length())))
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+
+  remove_rows(tbl);
+
+  /* 
+    Remove the table entry in data dictionary (this will also remove it from
+    the persistent data dictionary).
+  */
+  ddl_manager.remove(tbl, rdb, true);
+
+  DBUG_RETURN(0);
+}
+
+
+void ha_rocksdb::remove_rows(RDBSE_TABLE_DEF *tbl)
+{
+  rocksdb::Iterator* it= rdb->NewIterator(rocksdb::ReadOptions());
+  
+  char key_buf[MAX_KEY_LENGTH];
+  uint key_len;
+  /* 
+    Remove all records in each index.
+    (This is is not crash-safe, but it doesn't matter, because bulk row
+    deletion will be handled on rocksdb side)
+  */
+  for (uint i= 0; i < tbl->n_keys ; i++)
+  {
+    tbl->key_descr[i]->get_infimum_key((uchar*)key_buf, &key_len);
+
+    rocksdb::Slice table_key(key_buf, key_len);
+    it->Seek(table_key);
+    while (it->Valid())
+    {
+      rocksdb::Slice key= it->key();
+      if (!tbl->key_descr[i]->covers_key(key.data(), key.size()))
+        break;
+      rdb->Delete(rocksdb::WriteOptions(), key);
+      it->Next();
+    }
+  }
+  delete it;
+}
+
+
+int ha_rocksdb::rename_table(const char *from, const char *to)
+{
+  StringBuffer<256> from_str;
+  StringBuffer<256> to_str;
+  
+  if (rocksdb_normalize_tablename(from, &from_str))
+    return HA_ERR_INTERNAL_ERROR;
+
+  if (rocksdb_normalize_tablename(to, &to_str))
+    return HA_ERR_INTERNAL_ERROR;
+
+  if (ddl_manager.rename((uchar*)from_str.ptr(), from_str.length(),
+                         (uchar*)to_str.ptr(), to_str.length(), rdb))
+    return HA_ERR_INTERNAL_ERROR;
+
+  return 0;
+}
+
+/**
+  check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
+  if new and old definition are compatible
+
+  @details If there are no other explicit signs like changed number of
+  fields this function will be called by compare_tables()
+  (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
+  file.
+
+*/
+
+bool ha_rocksdb::check_if_incompatible_data(HA_CREATE_INFO *info,
+                                            uint table_changes)
+{
+  DBUG_ENTER("ha_rocksdb::check_if_incompatible_data");
+  // this function is needed only for online alter-table
+  DBUG_RETURN(COMPATIBLE_DATA_NO);
+}
+
+
+int ha_rocksdb::extra(enum ha_extra_function operation)
+{
+  DBUG_ENTER("ha_rocksdb::extra");
+
+  switch (operation) {
+  case HA_EXTRA_KEYREAD:
+    keyread_only= TRUE;
+    break;
+  case HA_EXTRA_NO_KEYREAD:
+    keyread_only= FALSE;
+    break;
+  default:
+    break;
+  }
+  DBUG_RETURN(0);
+}
+
+
+ha_rows ha_rocksdb::records_in_range(uint inx, key_range *min_key,
+                                     key_range *max_key)
+{
+  DBUG_ENTER("ha_rocksdb::records_in_range");
+  DBUG_RETURN(10);                         // low number to force index usage
+}
+
+
+void ha_rocksdb::update_create_info(HA_CREATE_INFO *create_info)
+{
+  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
+  {
+    table->file->info(HA_STATUS_AUTO);
+    // TODO: the following is basic, we need to actually
+    //  store the specified value somewhere..
+    create_info->auto_increment_value= tbl_def->auto_incr_val;
+  }
+}
+
+
+void ha_rocksdb::get_auto_increment(ulonglong offset, ulonglong increment,
+                                    ulonglong nb_desired_values,
+                                    ulonglong *first_value,
+                                    ulonglong *nb_reserved_values)
+{
+  if (!nb_desired_values)
+    nb_desired_values= 1;
+
+  mysql_mutex_lock(&tbl_def->mutex);
+  *first_value= tbl_def->auto_incr_val;
+  *nb_reserved_values= nb_desired_values;
+  tbl_def->auto_incr_val += nb_desired_values;
+  mysql_mutex_unlock(&tbl_def->mutex);
+}
+
+
+#if 0
+//TODO: do we have any status vars?
+static int show_rocksdb_vars(THD *thd, SHOW_VAR *var, char *buff)
+{
+  //innodb_export_status();
+  //rocksdb_counters_copy= rocksdb_counters; 
+
+  var->type= SHOW_ARRAY;
+  //var->value= (char *) &rocksdb_status_variables;
+  return 0;
+}
+#endif
+
+struct st_mysql_storage_engine rocksdb_storage_engine=
+{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+
+static struct st_mysql_show_var func_status[]=
+{
+ // {"Cassandra",  (char *)show_rocksdb_vars, SHOW_FUNC},
+  {0,0,SHOW_UNDEF}
+};
+
+mysql_declare_plugin(rocksdb_se)
+{
+  MYSQL_STORAGE_ENGINE_PLUGIN,
+  &rocksdb_storage_engine,
+  "ROCKSDB",
+  "Monty Program Ab",
+  "RocksDB storage engine",
+  PLUGIN_LICENSE_GPL,
+  rocksdb_init_func,                            /* Plugin Init */
+  rocksdb_done_func,                            /* Plugin Deinit */
+  0x0001,                                       /* version number (0.1) */
+  func_status,                                  /* status variables */
+  rocksdb_system_variables,                     /* system variables */
+  NULL,                                         /* config options */
+  0,                                            /* flags */
+}
+mysql_declare_plugin_end;
+
+
+/*
+  Compute a hash number for a PK value in RowKeyFormat.
+
+  @note 
+    RowKeyFormat is comparable with memcmp. This means, any hash function will
+    work correctly. We use my_charset_bin's hash function. 
+
+    Note from Bar: could also use crc32 function.
+*/
+
+ulong Primary_key_comparator::get_hashnr(const char *key, size_t key_len)
+{
+  ulong nr=1, nr2=4;
+  my_charset_bin.coll->hash_sort(&my_charset_bin, (const uchar*)key, key_len,
+                                 &nr, &nr2);
+  return((ulong) nr);
+}
+
diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h
new file mode 100644
index 0000000..b43efd8
--- /dev/null
+++ b/storage/rocksdb/ha_rocksdb.h
@@ -0,0 +1,307 @@
+/* 
+   Copyright (c) 2012,2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface			/* gcc class implementation */
+#endif
+
+
+#include "my_global.h"                   /* ulonglong */
+#include "thr_lock.h"                    /* THR_LOCK, THR_LOCK_DATA */
+#include "handler.h"                     /* handler */
+#include "my_base.h"                     /* ha_rows */
+
+#include "rocksdb/db.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/write_batch.h"
+
+#include "sql_string.h"
+
+#include "rdb_applyiter.h"
+
+//#ifdef HAVE_PSI_INTERFACE
+extern PSI_stage_info stage_waiting_on_row_lock;
+
+extern "C"
+void thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond, mysql_mutex_t *mutex,
+                    const PSI_stage_info *stage, PSI_stage_info *old_stage);
+extern "C"
+void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage);
+
+/**
+  Mark transaction to rollback and mark error as fatal to a sub-statement.
+  @param  thd   Thread handle
+  @param  all   TRUE <=> rollback main transaction.
+*/
+extern "C"
+void thd_mark_transaction_to_rollback(MYSQL_THD thd, bool all);
+
+//#endif
+
+/** @brief
+  ROCKSDB_SHARE is a structure that will be shared among all open handlers.
+  This example implements the minimum of what you will probably need.
+
+  //TODO: join this with RDBSE_TABLE_DEF ? 
+*/
+typedef struct st_rocksdb_share {
+  char *table_name;
+  uint table_name_length,use_count;
+  //mysql_mutex_t mutex;
+
+  THR_LOCK lock;
+} ROCKSDB_SHARE;
+
+class RDBSE_KEYDEF;
+class RDBSE_TABLE_DEF;
+class Row_lock;
+class Rdb_transaction;
+
+/** @brief
+  Class definition for the storage engine
+*/
+
+class ha_rocksdb: public handler
+{
+  THR_LOCK_DATA lock;      ///< MySQL lock
+  ROCKSDB_SHARE *share;    ///< Shared lock info
+  
+  /* Iterator used for range scans and for full table/index scans */
+  Apply_changes_iter *scan_it;
+
+  RDBSE_TABLE_DEF *tbl_def;
+
+  /* Primary Key encoder from KeyTupleFormat to StorageFormat */
+  RDBSE_KEYDEF *pk_descr;
+
+  /* Array of index descriptors */
+  RDBSE_KEYDEF **key_descr;
+
+  bool check_keyread_allowed(uint inx, uint part, bool all_parts) const;
+  /* 
+    Number of key parts in PK. This is the same as 
+      table->key_info[table->s->primary_key].keyparts
+  */
+  uint pk_key_parts;
+  
+  //Rdb_transaction *cur_trx;
+
+  /*
+    TRUE <=> Primary Key columns can be decoded from the index
+  */
+  mutable bool pk_can_be_decoded;
+
+  uchar *pk_tuple; /* Buffer for storing PK in KeyTupleFormat */
+  uchar *pk_packed_tuple; /* Buffer for storing PK in StorageFormat */
+  // ^^ todo: change it to 'char*'? TODO: ^ can we join this with last_rowkey?
+
+  uchar *sec_key_packed_tuple;
+  uchar *sec_key_tails;
+ 
+  /* rowkey of the last record we've read, in StorageFormat. */
+  String last_rowkey; 
+  
+  /* Buffer used by convert_record_to_storage_format() */
+  String storage_record;
+
+  /* Last retrieved record, in table->record[0] data format */
+  std::string retrieved_record;
+  
+  /* If TRUE, reads should place locks on rows */
+  bool lock_rows;
+
+  /* Lock that we've placed on the row we've just read. */
+  Row_lock *last_row_lock;
+ 
+  /* TRUE means we're doing an index-only read. FALSE means otherwise. */
+  bool keyread_only;
+  
+  bool skip_scan_it_next_call;
+
+  int create_key_defs(TABLE *table_arg, const char *db_table, uint len);
+  int secondary_index_read(int keyno, uchar *buf);
+  void setup_index_scan(rocksdb::Slice *slice);
+  int get_row_by_rowid(uchar *buf, const char *pk_tuple, uint pk_tuple_size);
+
+  void update_auto_incr_val();
+  void load_auto_incr_value();
+  
+  /* Idea copied from InnoDB's convert_error_code_to_mysql */
+  int return_lock_failure(bool timed_out);
+
+public:
+  ha_rocksdb(handlerton *hton, TABLE_SHARE *table_arg);
+  ~ha_rocksdb() {}
+  
+  /** @brief
+    The name that will be used for display purposes.
+   */
+  const char *table_type() const { return "ROCKSDB"; }
+
+  /* The following is only used by SHOW KEYS: */
+  const char *index_type(uint inx) { return "BTREE"; }
+
+  /** @brief
+    The file extensions.
+   */
+  const char **bas_ext() const;
+
+  /** @brief
+    This is a list of flags that indicate what functionality the storage engine
+    implements. The current table flags are documented in handler.h
+  */
+  ulonglong table_flags() const
+  {
+    /*
+      HA_BINLOG_STMT_CAPABLE
+        We are saying that this engine is just statement capable to have
+        an engine that can only handle statement-based logging. This is
+        used in testing.
+      HA_REC_NOT_IN_SEQ 
+        If we don't set it, filesort crashes, because it assumes rowids are 
+        1..8 byte numbers 
+    */
+    return HA_BINLOG_STMT_CAPABLE | 
+           HA_REC_NOT_IN_SEQ |
+           HA_REQUIRE_PRIMARY_KEY |
+           (pk_can_be_decoded? HA_PRIMARY_KEY_IN_READ_INDEX:0) |
+           HA_PRIMARY_KEY_REQUIRED_FOR_POSITION |
+           HA_NULL_IN_KEY;
+  }
+
+  /** @brief
+    This is a bitmap of flags that indicates how the storage engine
+    implements indexes. The current index flags are documented in
+    handler.h. If you do not implement indexes, just return zero here.
+
+      @details
+    part is the key part to check. First key part is 0.
+    If all_parts is set, MySQL wants to know the flags for the combined
+    index, up to and including 'part'.
+  */
+  ulong index_flags(uint inx, uint part, bool all_parts) const;
+
+  int rename_table(const char *from, const char *to);
+
+  void unpack_blobs_from_retrieved_record(uchar *buf);
+  void convert_record_from_storage_format(rocksdb::Slice *slice, uchar *buf);
+  void convert_record_from_storage_format(uchar *buf);
+
+  void convert_record_to_storage_format(rocksdb::Slice *packed_rec);
+
+  /** @brief
+    unireg.cc will call max_supported_record_length(), max_supported_keys(),
+    max_supported_key_parts(), uint max_supported_key_length()
+    to make sure that the storage engine can handle the data it is about to
+    send. Return *real* limits of your storage engine here; MySQL will do
+    min(your_limits, MySQL_limits) automatically.
+   */
+  uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
+
+  uint max_supported_keys()          const { return MAX_INDEXES; }
+  uint max_supported_key_parts()     const { return MAX_REF_PARTS; }
+
+  /** @brief
+    unireg.cc will call this to make sure that the storage engine can handle
+    the data it is about to send. Return *real* limits of your storage engine
+    here; MySQL will do min(your_limits, MySQL_limits) automatically.
+
+      @details
+    There is no need to implement ..._key_... methods if your engine doesn't
+    support indexes.
+   */
+  uint max_supported_key_length()    const { return 16*1024; /* just to return something*/ }
+  
+  /* At the moment, we're ok with default handler::index_init() implementation. */
+  int index_read_map(uchar * buf, const uchar * key,
+                     key_part_map keypart_map,
+                     enum ha_rkey_function find_flag);
+  
+  int index_read_last_map(uchar * buf, const uchar * key, 
+                          key_part_map keypart_map);
+
+  virtual double scan_time() { return (double) (stats.records+stats.deleted) / 20.0+10; }
+  virtual double read_time(uint, uint, ha_rows rows)
+  { return (double) rows /  20.0+1; }
+
+  int open(const char *name, int mode, uint test_if_locked);
+  int close(void);
+
+  int write_row(uchar *buf);
+  int update_row(const uchar *old_data, uchar *new_data);
+  int delete_row(const uchar *buf);
+
+  int index_next(uchar *buf);
+  int index_next_with_direction(uchar *buf, bool move_forward);
+  int index_prev(uchar *buf);
+
+  int index_first(uchar *buf);
+  int index_last(uchar *buf);
+  int index_end();
+
+  void unlock_row();
+
+  /** @brief
+    Unlike index_init(), rnd_init() can be called two consecutive times
+    without rnd_end() in between (it only makes sense if scan=1). In this
+    case, the second call should prepare for the new table scan (e.g if
+    rnd_init() allocates the cursor, the second call should position the
+    cursor to the start of the table; no need to deallocate and allocate
+    it again. This is a required method.
+  */
+  int rnd_init(bool scan);
+  int rnd_end();
+
+  int rnd_next(uchar *buf);
+  int rnd_next_with_direction(uchar *buf, bool move_forward);
+
+  int rnd_pos(uchar *buf, uchar *pos);
+  void position(const uchar *record);
+  int info(uint);
+  int extra(enum ha_extra_function operation);
+  int start_stmt(THD *thd, thr_lock_type lock_type);
+  int external_lock(THD *thd, int lock_type);
+  int delete_all_rows(void);
+  int truncate();
+
+  void remove_rows(RDBSE_TABLE_DEF *tbl);
+  ha_rows records_in_range(uint inx, key_range *min_key,
+                           key_range *max_key);
+  int delete_table(const char *from);
+  int create(const char *name, TABLE *form,
+             HA_CREATE_INFO *create_info);
+  bool check_if_incompatible_data(HA_CREATE_INFO *info,
+                                  uint table_changes);
+
+  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
+                             enum thr_lock_type lock_type);
+
+  my_bool register_query_cache_table(THD *thd, char *table_key,
+                                     uint key_length,
+                                     qc_engine_callback
+                                     *engine_callback,
+                                     ulonglong *engine_data)
+  {
+    /* Currently, we don't support query cache */
+    return FALSE;
+  }
+  bool get_error_message(int error, String *buf);
+
+  void get_auto_increment(ulonglong offset, ulonglong increment,
+                          ulonglong nb_desired_values,
+                          ulonglong *first_value,
+                          ulonglong *nb_reserved_values);
+  void update_create_info(HA_CREATE_INFO *create_info);
+};
diff --git a/storage/rocksdb/rdb_applyiter.cc b/storage/rocksdb/rdb_applyiter.cc
new file mode 100644
index 0000000..c499d04
--- /dev/null
+++ b/storage/rocksdb/rdb_applyiter.cc
@@ -0,0 +1,207 @@
+/* 
+   Copyright (c) 2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "my_global.h"                   /* ulonglong */
+#include "my_base.h"                     /* ha_rows */
+#include "my_sys.h"
+#include "my_tree.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/write_batch.h"
+
+#include "rdb_rowmods.h"
+#include "rdb_applyiter.h"
+
+int compare_mem_comparable_keys(const uchar *a, size_t a_len, const uchar *b, size_t b_len);
+
+
+Apply_changes_iter::Apply_changes_iter() :
+  trx(NULL), rdb(NULL) 
+{}
+
+
+Apply_changes_iter::~Apply_changes_iter() 
+{ 
+  delete trx;
+  delete rdb;
+}
+
+
+void Apply_changes_iter::init(Row_table *trx_arg, rocksdb::Iterator *rdb_arg)
+{
+  delete trx;
+  delete rdb;
+  trx= new Row_table_iter(trx_arg);
+  rdb= rdb_arg; 
+  valid= false;
+}
+
+
+void Apply_changes_iter::Next()
+{
+  DBUG_ASSERT(valid);
+  if (cur_is_trx)
+    trx->Next();
+  else
+    rdb->Next();
+
+  advance(1);
+}
+
+
+void Apply_changes_iter::Prev()
+{
+  DBUG_ASSERT(valid);
+  if (cur_is_trx)
+    trx->Prev();
+  else
+    rdb->Prev();
+
+  advance(-1);
+}
+
+
+void Apply_changes_iter::Seek(rocksdb::Slice &key)
+{
+  rdb->Seek(key);
+  trx->Seek(key);
+  advance(1);
+}
+
+
+void Apply_changes_iter::SeekToLast()
+{
+  rdb->SeekToLast();
+  trx->SeekToLast();
+  advance(-1);
+}
+
+
+/*
+  @param direction  1 means forward, -1 means backward.
+*/
+
+void Apply_changes_iter::advance(int direction)
+{
+  valid= true;
+  while (1)
+  {
+    if (!trx->Valid() && !rdb->Valid())
+    {
+      // ok we got here if neither scan nor trx have any records.
+      cur_is_trx= false;  //just set it to something
+      valid= false;
+      return;
+    }
+
+    if (!trx->Valid())
+    {
+      /* Got record from rocksdb but not from trx */
+      cur_is_trx= false;
+      break;
+    }
+
+    if (!rdb->Valid())
+    {
+      cur_is_trx= true;
+      if (trx->is_tombstone())
+      {
+        if (direction == 1)
+          trx->Next();
+        else
+          trx->Prev();
+        continue;  /* A tombstone.. (but no matching record? odd..) */
+      }
+      break;
+    }
+
+    if (rdb->Valid() && trx->Valid())
+    {
+      rocksdb::Slice rdb_key= rdb->key();
+      rocksdb::Slice trx_key= trx->key();
+      int cmp= direction * 
+               compare_mem_comparable_keys((const uchar*)trx_key.data(), trx_key.size(),
+                                           (const uchar*)rdb_key.data(), rdb_key.size());
+      if (!cmp) // keys are equal
+      {
+        if (trx->is_tombstone())
+        {
+          /* rocksdb has a record, but trx says we have deleted it */
+          if (direction == 1)
+          {
+            rdb->Next();
+            trx->Next();
+          }
+          else
+          {
+            rdb->Prev();
+            trx->Prev();
+          }
+          continue;  // restart the logic
+        }
+
+        /* trx has a newer version of the record */
+        if (direction == 1) 
+          rdb->Next();
+        else
+          rdb->Prev();
+        cur_is_trx= true;
+        break;
+      }
+      else if (cmp > 0)
+      {
+        /* record from rocksdb comes first */
+        cur_is_trx= false;
+        break;
+      }
+      else // cmp < 0
+      {
+        /* record from transaction comes first */
+        if (trx->is_tombstone())
+        {
+          if (direction == 1)
+            trx->Next();
+          else
+            trx->Prev();
+          continue;  /* A tombstone.. (but no matching record? odd..) */
+        }
+        /* A record from transaction but not in the db */
+        cur_is_trx= true;
+        break;
+      }
+    }
+  }
+}
+
+
+rocksdb::Slice Apply_changes_iter::value()
+{
+  if (cur_is_trx)
+    return trx->value();
+  else
+    return rdb->value();
+}
+
+
+rocksdb::Slice Apply_changes_iter::key()
+{
+  if (cur_is_trx)
+    return trx->key();
+  else
+    return rdb->key();
+}
+
diff --git a/storage/rocksdb/rdb_applyiter.h b/storage/rocksdb/rdb_applyiter.h
new file mode 100644
index 0000000..9b65336
--- /dev/null
+++ b/storage/rocksdb/rdb_applyiter.h
@@ -0,0 +1,53 @@
+/* 
+   Copyright (c) 2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+class Row_table;
+class Row_table_iter;
+
+/*
+  A class that looks like RocksDB's iterator, but internally it takes into 
+  account the changes made by the transaction.
+
+  In other words, the iterator gives a view of the data insidde rocksdb, but
+  also applies the changes made by the transaction.
+*/
+
+class Apply_changes_iter
+{
+  bool valid;
+  bool cur_is_trx;
+  
+  /* These are the iterators we're merging. We own them, so should free them */
+  Row_table_iter *trx; 
+  rocksdb::Iterator* rdb;
+public:
+  Apply_changes_iter();
+  ~Apply_changes_iter();
+  void init(Row_table *trx_arg, rocksdb::Iterator *rdb_arg);
+
+  void Next();
+  void Prev();
+  
+  void Seek(rocksdb::Slice &key);
+  void SeekToLast();
+
+  bool Valid() { return valid; }
+  rocksdb::Slice key();
+  rocksdb::Slice value();
+private:
+  void advance(int direction);
+};
+
diff --git a/storage/rocksdb/rdb_datadic.cc b/storage/rocksdb/rdb_datadic.cc
new file mode 100644
index 0000000..72adb48
--- /dev/null
+++ b/storage/rocksdb/rdb_datadic.cc
@@ -0,0 +1,795 @@
+/* 
+   Copyright (c) 2012,2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation        // gcc: Class implementation
+#endif
+
+#include <mysql/plugin.h>
+#include "ha_rocksdb.h"
+#include "sql_class.h"
+#include "my_bit.h"
+
+#include "rdb_datadic.h"
+
+
+void key_restore(uchar *to_record, uchar *from_key, KEY *key_info,
+                 uint key_length);
+
+
+void write_int(String *out, uint32 val)
+{
+  uint buf= htonl(val);
+  out->append((char*)&buf, 4);
+}
+
+
+uint32 read_int(char **data)
+{
+  uint buf;
+  memcpy(&buf, *data, sizeof(uint32));
+  *data += sizeof(uint32);
+  return ntohl(buf);
+}
+
+
+RDBSE_KEYDEF::~RDBSE_KEYDEF()
+{
+  if (pk_key_parts)
+    my_free(pk_key_parts);
+  if (pack_info)
+    my_free(pack_info);
+}
+
+
+void RDBSE_KEYDEF::setup(TABLE *tbl)
+{
+  /* 
+    set max_length based on the table. If we're unlucky, setup() may be
+    called concurrently from multiple threads but that is ok because result of
+    compuation is assignment of maxlength to the same value.
+    ^^ TODO: is this still true? concurrent setup() calls are not safe
+    anymore...
+  */
+  if (!maxlength)
+  {
+    KEY *key_info= &tbl->key_info[keyno];
+    KEY *pk_info=  &tbl->key_info[tbl->s->primary_key];
+
+    if (keyno != tbl->s->primary_key)
+    {
+      n_pk_key_parts= pk_info->actual_key_parts;
+      pk_key_parts= (PK_KEY_PART*)my_malloc(sizeof(PK_KEY_PART) * n_pk_key_parts, MYF(0));
+    }
+    else
+    {
+      pk_info= NULL;
+      pk_key_parts= 0;
+    }
+
+    size_t size= sizeof(Field_pack_info) * key_info->actual_key_parts;
+    pack_info= (Field_pack_info*)my_malloc(size, MYF(0));
+ 
+    uint len= INDEX_NUMBER_SIZE;
+    int unpack_len= 0;
+    /* this loop also loops over the 'extended key' tail */
+    for (uint i= 0; i < key_info->actual_key_parts; i++)
+    {
+      Field *field= key_info->key_part[i].field;
+
+      if (field->real_maybe_null())
+        len +=1; // NULL-byte
+
+      pack_info[i].setup(key_info->key_part[i].field);
+      pack_info[i].image_offset= len;
+      pack_info[i].unpack_data_offset= unpack_len;
+
+      if (pk_info)
+      {
+        for (uint j= 0; j < n_pk_key_parts; j++)
+        {
+          if (field->field_index == pk_info->key_part[j].field->field_index)
+          {
+            pk_key_parts[j].offset= len;
+            pk_key_parts[j].size=   pack_info[i].image_len;
+          }
+        }
+      }
+
+      len        += pack_info[i].image_len;
+      unpack_len += pack_info[i].unpack_data_len;
+    }
+    maxlength= len;
+    unpack_data_len= unpack_len;
+  }
+}
+
+
+/*
+  Get a mem-comparable form of Primary Key from mem-comparable form of this key
+
+  @param
+    pk_descr        Primary Key descriptor
+    key             Index tuple from this key in mem-comparable form
+    pk_buffer  OUT  Put here mem-comparable form of the Primary Key.
+
+  @note
+    It may or may not be possible to restore primary key columns to their
+    mem-comparable form.  To handle all cases, this function copies mem-
+    comparable forms directly.
+
+    RocksDB SE supports "Extended keys". This means that PK columns are present
+    at the end of every key.  If the key already includes PK columns, then
+    these columns are not present at the end of the key.
+
+    Because of the above, we copy each primary key column.
+*/
+
+uint RDBSE_KEYDEF::get_primary_key_tuple(RDBSE_KEYDEF *pk_descr, 
+                                         const char *key, char *pk_buffer)
+{
+  uint size= 0;
+  char *buf= pk_buffer;
+  DBUG_ASSERT(n_pk_key_parts);
+  
+  // copy the PK number
+  store_index_number((uchar*)buf, pk_descr->index_number);
+  buf += INDEX_NUMBER_SIZE;
+  size += INDEX_NUMBER_SIZE;
+
+  for (uint j= 0; j < n_pk_key_parts; j++)
+  {
+    uint len= pk_key_parts[j].size;
+    memcpy(buf, key + pk_key_parts[j].offset, len);
+    buf += len;
+    size += len;
+  }
+  return size;
+}
+
+
+uint RDBSE_KEYDEF::pack_index_tuple(TABLE *tbl, uchar *packed_tuple, 
+                                    const uchar *key_tuple, 
+                                    key_part_map keypart_map)
+{
+  /* We were given a record in KeyTupleFormat. First, save it to record */
+  uint key_len= calculate_key_len(tbl, keyno, key_tuple, keypart_map);
+  key_restore(tbl->record[0], (uchar*)key_tuple, &tbl->key_info[keyno], 
+              key_len);
+
+  uint n_used_parts= my_count_bits(keypart_map);
+  if (keypart_map == HA_WHOLE_KEY)
+    n_used_parts= 0; // Full key is used
+
+  /* Then, convert the record into a mem-comparable form */
+  return pack_record(tbl, tbl->record[0], packed_tuple, NULL, NULL, 
+                     n_used_parts);
+}
+
+
+void RDBSE_KEYDEF::successor(uchar *packed_tuple, uint len)
+{
+  uchar *p= packed_tuple + len - 1;
+  for (; p > packed_tuple; p--)
+  {
+    if (*p != uchar(0xFF))
+    {
+      *p= *p + 1;
+      break;
+    }
+    *p='\0';
+  }
+}
+
+
+/*
+  Get index columns from the record and pack them into mem-comparable form.
+
+  @param
+    tbl                   Table we're working on
+    record           IN   Record buffer with fields in table->record format
+    packed_tuple     OUT  Key in the mem-comparable form
+    unpack_info      OUT  Unpack data
+    unpack_info_len  OUT  Unpack data length
+    n_key_parts           Number of keyparts to process. 0 means all of them.
+
+  @detail
+    Some callers do not need the unpack information, they can pass
+    unpack_info=NULL, unpack_info_len=NULL.
+*/
+
+uint RDBSE_KEYDEF::pack_record(TABLE *tbl, const uchar *record, 
+                               uchar *packed_tuple, 
+                               uchar *unpack_info, int *unpack_info_len,
+                               uint n_key_parts)
+{
+  uchar *tuple= packed_tuple;
+  uchar *unpack_end= unpack_info;
+  KEY *key_info= &tbl->key_info[keyno];
+  
+  store_index_number(tuple, index_number);
+  tuple += INDEX_NUMBER_SIZE;
+  
+  // The following includes the 'extended key' tail:
+  if (n_key_parts == 0 || n_key_parts == MAX_REF_PARTS)
+    n_key_parts= key_info->actual_key_parts;
+
+  for (uint i=0; i < n_key_parts; i++)
+  {
+    Field *field= key_info->key_part[i].field;
+    my_ptrdiff_t ptr_diff= record - tbl->record[0];
+    field->move_field_offset(ptr_diff);
+    
+    const int length= pack_info[i].image_len;
+    if (field->real_maybe_null())
+    {
+      if (field->is_real_null())
+      {
+        /* NULL value. store '\0' so that it sorts before non-NULL values */
+        *tuple++ = 0;
+        memset(tuple, 0, length);
+      }
+      else
+      {
+        // store '1'
+        *tuple++ = 1;
+        field->make_sort_key(tuple, length);
+      }
+    }
+    else
+      field->make_sort_key(tuple, length);
+
+    tuple += length;
+
+    if (unpack_end && pack_info && pack_info[i].make_unpack_info_func)
+    {
+      pack_info[i].make_unpack_info_func(&pack_info[i], field, unpack_end);
+      unpack_end += pack_info[i].unpack_data_len;
+    }
+
+    field->move_field_offset(-ptr_diff);
+  }
+ 
+  if (unpack_info_len)
+    *unpack_info_len= unpack_end - unpack_info;
+
+  return tuple - packed_tuple;
+}
+
+
+/*
+  Take mem-comparable form and unpack_info and unpack it to Table->record
+
+  @detail
+    not all indexes support this
+*/
+
+int RDBSE_KEYDEF::unpack_record(TABLE *table, uchar *buf, 
+                                 const rocksdb::Slice *packed_key, 
+                                 const rocksdb::Slice *unpack_info)
+{
+  int res= 0;
+  KEY * const key_info= &table->key_info[keyno];
+
+  const uchar * const key_ptr= (const uchar*)packed_key->data();
+  const uchar * const unpack_ptr= (const uchar*)unpack_info->data();
+
+  if (packed_key->size() != max_storage_fmt_length())
+    return 1;
+  
+  if (unpack_info->size() != unpack_data_len)
+    return 1;
+
+  for (uint i= 0; i < key_info->actual_key_parts ; i++)
+  {
+    Field *field= key_info->key_part[i].field;
+    Field_pack_info *fpi= &pack_info[i];
+    
+    if (fpi->unpack_func)
+    {
+      my_ptrdiff_t ptr_diff= buf - table->record[0];
+      field->move_field_offset(ptr_diff);
+
+      if (fpi->maybe_null)
+      {
+        if (*(key_ptr + (fpi->image_offset - 1)) == 0)
+          field->set_null();
+        else
+          field->set_notnull();
+      }
+
+      res= fpi->unpack_func(fpi, field, key_ptr + fpi->image_offset,
+                            unpack_ptr + fpi->unpack_data_offset);
+      field->move_field_offset(-ptr_diff);
+
+      if (res) 
+        break; /* Error */
+    }
+  }
+  return res;
+}
+
+///////////////////////////////////////////////////////////////////////////////////////////
+// Field_pack_info
+///////////////////////////////////////////////////////////////////////////////////////////
+
+int unpack_integer(Field_pack_info *fpi, Field *field, 
+                   const uchar *from, const uchar *unpack_info)
+{
+  const int length= field->pack_length();
+  uchar *to= field->ptr;
+
+#ifdef WORDS_BIGENDIAN
+  {
+    if (((Field_num*)field)->unsigned_flag)
+      to[0]= from[0];
+    else
+      to[0]= (char)(from[0] ^ 128); // Reverse the sign bit.
+    memcpy(to + 1, from + 1, length - 1);
+  }
+#else  
+  {
+    const int sign_byte= from[0];
+    if (((Field_num*)field)->unsigned_flag)
+      to[length - 1]= sign_byte;
+    else
+      to[length - 1]= static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
+    for (int i= 0, j= length - 1; i < length-1; ++i, --j)
+      to[i]= from[j];
+  }
+#endif
+  return 0;
+}
+
+
+/* Unpack the string by copying it over */
+int unpack_binary_str(Field_pack_info *fpi, Field *field, 
+                      const uchar *tuple,
+                      const uchar *unpack_info)
+{
+  memcpy(field->ptr + fpi->field_data_offset, tuple, fpi->image_len);
+  return 0;
+}
+
+
+/*
+  For UTF-8, we need to convert 2-byte wide-character entities back into
+  UTF8 sequences.
+*/
+
+int unpack_utf8_str(Field_pack_info *fpi, Field *field,
+                    const uchar *tuple,
+                    const uchar *unpack_info)
+{
+  CHARSET_INFO *cset= (CHARSET_INFO*)field->charset();
+  const uchar *src= tuple;
+  const uchar *src_end= tuple + fpi->image_len;
+  uchar *dst= field->ptr + fpi->field_data_offset;
+  uchar *dst_end= dst + fpi->image_len;
+
+  while (src < src_end)
+  {
+    my_wc_t wc= (src[0] <<8) | src[1]; 
+    src += 2;
+    int res= cset->cset->wc_mb(cset, wc, dst, dst_end);
+    DBUG_ASSERT(res > 0 && res <=3);
+    if (res < 0)
+      return 1;
+    dst += res;
+  }
+  return 0;
+}
+
+
+int unpack_binary_varchar(Field_pack_info *fpi, Field *field, 
+                          const uchar *tuple,
+                          const uchar *unpack_info)
+{
+  uint32 length_bytes= ((Field_varstring*)field)->length_bytes;
+  //copy the length bytes
+  memcpy(field->ptr, unpack_info, length_bytes);
+
+  return unpack_binary_str(fpi, field, tuple, unpack_info);
+}
+
+
+int unpack_utf8_varchar(Field_pack_info *fpi, Field *field, 
+                        const uchar *tuple,
+                        const uchar *unpack_info)
+{
+  uint32 length_bytes= ((Field_varstring*)field)->length_bytes;
+  //copy the length bytes
+  memcpy(field->ptr, unpack_info, length_bytes);
+
+  return unpack_utf8_str(fpi, field, tuple, unpack_info);
+}
+
+
+/*
+  For varchar, save the length.
+*/
+void make_varchar_unpack_info(Field_pack_info *fsi, Field *field, uchar *unpack_data)
+{
+  // TODO: use length from fsi.
+  Field_varstring *fv= (Field_varstring*)field;
+  memcpy(unpack_data, fv->ptr, fv->length_bytes);
+}
+
+
+/*
+  Setup index-only read of a field
+
+  @param
+    field  IN  field to be packed/un-packed
+
+  @return 
+    TRUE  -  Field can be read with index-only reads
+    FALSE -  Otherwise
+*/
+
+bool Field_pack_info::setup(Field *field)
+{
+  int res= false;
+  enum_field_types type= field->real_type();
+
+  maybe_null= field->real_maybe_null();
+  make_unpack_info_func= NULL;
+  unpack_func= NULL;
+  unpack_data_len= 0;
+  field_data_offset= 0;
+
+  /* Calculate image length. By default, is is pack_length() */
+  image_len= field->pack_length();
+  if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING)
+  {
+    /* 
+      For CHAR-based columns, check how strxfrm image will take.
+      field->field_length = field->char_length() * cs->mbmaxlen.
+    */
+    const CHARSET_INFO *cs= field->charset();
+    image_len= cs->coll->strnxfrmlen(cs, field->field_length);
+  }
+
+  if (type == MYSQL_TYPE_LONGLONG ||
+      type == MYSQL_TYPE_LONG ||
+      type == MYSQL_TYPE_INT24 ||
+      type == MYSQL_TYPE_SHORT ||
+      type == MYSQL_TYPE_TINY)
+  {
+    unpack_func= unpack_integer;
+    make_unpack_info_func= NULL;
+    return true;
+  }
+ 
+  const bool is_varchar= (type == MYSQL_TYPE_VARCHAR);
+  if (is_varchar)
+  {
+    make_unpack_info_func= make_varchar_unpack_info;
+    unpack_data_len= ((Field_varstring*)field)->length_bytes;
+    field_data_offset= ((Field_varstring*)field)->length_bytes;
+  }
+
+  if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING)
+  {
+    const CHARSET_INFO *cs= field->charset();
+
+    if (cs == &my_charset_bin ||
+        cs == &my_charset_latin1_bin)
+    {
+      unpack_func= is_varchar? unpack_binary_varchar : unpack_binary_str;
+      res= true; 
+    }
+    else if(cs == &my_charset_utf8_bin)
+    {
+      unpack_func= is_varchar? unpack_utf8_varchar : unpack_utf8_str;
+      res= true;
+    }
+  }
+  return res;
+}
+
+
+#if 0
+void _rdbse_store_blob_length(uchar *pos,uint pack_length,uint length)
+{
+  switch (pack_length) {
+  case 1:
+    *pos= (uchar) length;
+    break;
+  case 2:
+    int2store(pos,length);
+    break;
+  case 3:
+    int3store(pos,length);
+    break;
+  case 4:
+    int4store(pos,length);
+  default:
+    break;
+  }
+  return;
+}
+#endif
+
+
+///////////////////////////////////////////////////////////////////////////////////////////
+// Table_ddl_manager
+///////////////////////////////////////////////////////////////////////////////////////////
+
+/*
+  Write table definition DDL entry.
+
+  We write
+    dbname.tablename -> {index_nr, index_nr, index_nr, ... }
+*/
+
+void RDBSE_TABLE_DEF::write_to(rocksdb::DB *rdb_dict, uchar *key, size_t keylen)
+{
+  StringBuffer<32> indexes;
+
+  for (uint i=0; i < n_keys; i++)
+  {
+    write_int(&indexes, key_descr[i]->index_number);
+  }
+  rocksdb::Slice skey((char*)key, keylen); 
+  rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
+
+  rocksdb::WriteOptions options;
+  options.sync= true;
+  rdb_dict->Put(options, skey, svalue); 
+}
+
+
+uchar* Table_ddl_manager::get_hash_key(RDBSE_TABLE_DEF *rec, size_t *length,
+                                       my_bool not_used __attribute__((unused)))
+{
+  *length= rec->dbname_tablename.length();
+  return (uchar*) rec->dbname_tablename.c_ptr();
+}
+
+
+void Table_ddl_manager::free_hash_elem(void* data)
+{
+  RDBSE_TABLE_DEF* elem= (RDBSE_TABLE_DEF*)data;
+  delete elem;
+}
+
+
+bool Table_ddl_manager::init(rocksdb::DB *rdb_dict)
+{
+  mysql_rwlock_init(0, &rwlock);
+  (void) my_hash_init(&ddl_hash, /*system_charset_info*/&my_charset_bin, 32,0,0,
+                      (my_hash_get_key) Table_ddl_manager::get_hash_key,
+                      Table_ddl_manager::free_hash_elem, 0);
+  
+  /* Read the data dictionary and populate the hash */
+  uchar ddl_entry[RDBSE_KEYDEF::INDEX_NUMBER_SIZE];
+  store_index_number(ddl_entry, DDL_ENTRY_INDEX_NUMBER);
+  rocksdb::Slice ddl_entry_slice((char*)ddl_entry, RDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+  
+  rocksdb::Iterator* it;
+  it= rdb_dict->NewIterator(rocksdb::ReadOptions());
+  int i= 0;
+  int max_number= DDL_ENTRY_INDEX_NUMBER + 1;
+  for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) 
+  {
+    char *ptr;
+    char *ptr_end;
+    RDBSE_TABLE_DEF *tdef= new RDBSE_TABLE_DEF;
+    rocksdb::Slice key= it->key();
+    rocksdb::Slice val= it->value();
+
+    if (key.size() <= RDBSE_KEYDEF::INDEX_NUMBER_SIZE)
+    {
+      sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
+                      (int)key.size());
+      return true;
+    }
+
+    if (memcmp(key.data(), ddl_entry, RDBSE_KEYDEF::INDEX_NUMBER_SIZE))
+      break;
+
+    tdef->dbname_tablename.append(key.data() + RDBSE_KEYDEF::INDEX_NUMBER_SIZE, 
+                                  key.size() - RDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+    
+    // Now, read the DDLs.
+
+    if (val.size() < RDBSE_KEYDEF::INDEX_NUMBER_SIZE)
+    {
+      sql_print_error("RocksDB: Table_store: no keys defined in %*s",
+                      (int)key.size(), key.data());
+      return true;
+    }
+    if (val.size() % RDBSE_KEYDEF::INDEX_NUMBER_SIZE)
+    {
+      sql_print_error("RocksDB: Table_store: invalid keylist for table %s", 
+                      tdef->dbname_tablename.c_ptr_safe());
+      return true;
+    }
+    tdef->n_keys= val.size() / RDBSE_KEYDEF::INDEX_NUMBER_SIZE;
+    if (!(tdef->key_descr= (RDBSE_KEYDEF**)my_malloc(sizeof(RDBSE_KEYDEF*) * 
+                                                     tdef->n_keys, 
+                                                     MYF(MY_ZEROFILL))))
+      return true;
+
+    ptr= (char*)val.data();
+    ptr_end= ptr + val.size();
+    for (uint keyno=0; ptr < ptr_end; keyno++)
+    {
+      int index_number= read_int(&ptr);
+
+      /* 
+        We can't fully initialize RDBSE_KEYDEF object here, because full
+        initialization requires that there is an open TABLE* where we could
+        look at Field* objects and set max_length and other attributes.
+      */
+      tdef->key_descr[keyno]= new RDBSE_KEYDEF(index_number, keyno); 
+      
+      /* Keep track of what was the last index number we saw */
+      if (max_number < index_number)
+        max_number= index_number;
+    }
+    put(tdef);
+    i++;
+  }
+
+  sequence.init(max_number+1);
+
+  if (!it->status().ok())
+  {
+    std::string s= it->status().ToString();
+    sql_print_error("RocksDB: Table_store: load error: %s", s.c_str());
+    return true;
+  }
+  delete it;
+  sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables", i);
+  return false;
+}
+
+
+RDBSE_TABLE_DEF* Table_ddl_manager::find(uchar *table_name, 
+                                         uint table_name_len, 
+                                         bool lock)
+{
+  RDBSE_TABLE_DEF *rec;
+  if (lock)
+    mysql_rwlock_rdlock(&rwlock);
+  rec= (RDBSE_TABLE_DEF*)my_hash_search(&ddl_hash, (uchar*)table_name,
+                                        table_name_len);
+  if (lock)
+    mysql_rwlock_unlock(&rwlock);
+  return rec;
+}
+
+
+int Table_ddl_manager::put_and_write(RDBSE_TABLE_DEF *tbl, rocksdb::DB *rdb_dict)
+{
+  uchar buf[NAME_LEN * 2 + RDBSE_KEYDEF::INDEX_NUMBER_SIZE];
+  uint pos= 0;
+
+  store_index_number(buf, DDL_ENTRY_INDEX_NUMBER);
+  pos+= RDBSE_KEYDEF::INDEX_NUMBER_SIZE; 
+  
+  memcpy(buf + pos, tbl->dbname_tablename.ptr(), tbl->dbname_tablename.length());
+  pos += tbl->dbname_tablename.length();
+
+  int res;
+  if ((res= put(tbl)))
+    return res;
+
+  tbl->write_to(rdb_dict, buf, pos);
+  return 0;
+}
+
+
+/* Return 0 - ok, other value - error */
+
+int Table_ddl_manager::put(RDBSE_TABLE_DEF *tbl, bool lock)
+{
+  RDBSE_TABLE_DEF *rec;
+  my_bool result;
+
+  if (lock)
+    mysql_rwlock_wrlock(&rwlock);
+  rec= (RDBSE_TABLE_DEF*)find((uchar*)tbl->dbname_tablename.c_ptr(),
+                               tbl->dbname_tablename.length(), false);
+  if (rec)
+  {
+    // this will free the old record.
+    my_hash_delete(&ddl_hash, (uchar*) rec);
+  }
+  result= my_hash_insert(&ddl_hash, (uchar*)tbl);
+
+  if (lock)
+    mysql_rwlock_unlock(&rwlock);
+  return result;
+}
+
+
+void Table_ddl_manager::remove(RDBSE_TABLE_DEF *tbl, rocksdb::DB *rdb_dict, bool lock)
+{
+  if (lock)
+    mysql_rwlock_wrlock(&rwlock);
+
+  uchar buf[NAME_LEN * 2 + RDBSE_KEYDEF::INDEX_NUMBER_SIZE];
+  uint pos= 0;
+
+  store_index_number(buf, DDL_ENTRY_INDEX_NUMBER);
+  pos+= RDBSE_KEYDEF::INDEX_NUMBER_SIZE; 
+  
+  memcpy(buf + pos, tbl->dbname_tablename.ptr(), tbl->dbname_tablename.length());
+  pos += tbl->dbname_tablename.length();
+
+  rocksdb::Slice tkey((char*)buf, pos);
+  rdb_dict->Delete(rocksdb::WriteOptions(), tkey); 
+
+  /* The following will also delete the object: */
+  my_hash_delete(&ddl_hash, (uchar*) tbl);
+
+  if (lock)
+    mysql_rwlock_unlock(&rwlock);
+}
+
+
+bool Table_ddl_manager::rename(uchar *from, uint from_len, 
+                               uchar *to, uint to_len, 
+                               rocksdb::DB *rdb_dict)
+{
+  RDBSE_TABLE_DEF *rec;
+  RDBSE_TABLE_DEF *new_rec;
+  bool res= true;
+  uchar new_buf[NAME_LEN * 2 + RDBSE_KEYDEF::INDEX_NUMBER_SIZE];
+  uint new_pos= 0;
+
+  mysql_rwlock_wrlock(&rwlock);
+  if (!(rec= (RDBSE_TABLE_DEF*)find(from, from_len, false)))
+    goto err;
+
+  if (!(new_rec= new RDBSE_TABLE_DEF))
+    goto err;
+
+  new_rec->dbname_tablename.append((char*)to, to_len);
+  new_rec->n_keys= rec->n_keys;
+  new_rec->auto_incr_val= rec->auto_incr_val;
+  new_rec->key_descr= rec->key_descr;
+  rec->key_descr= NULL; /* so that it's not free'd when deleting the old rec */
+  
+  // Create a new key 
+  store_index_number(new_buf, DDL_ENTRY_INDEX_NUMBER);
+  new_pos+= RDBSE_KEYDEF::INDEX_NUMBER_SIZE; 
+  
+  memcpy(new_buf + new_pos, new_rec->dbname_tablename.ptr(), 
+         new_rec->dbname_tablename.length());
+  new_pos += new_rec->dbname_tablename.length();
+  
+  // Create a key to add 
+  new_rec->write_to(rdb_dict, new_buf, new_pos);
+  remove(rec, rdb_dict, false);
+  put(new_rec, false);
+  res= false; // ok
+err:
+  mysql_rwlock_unlock(&rwlock);
+  return res;
+}
+
+
+void Table_ddl_manager::cleanup()
+{
+  my_hash_free(&ddl_hash);
+  mysql_rwlock_destroy(&rwlock);
+  sequence.cleanup();
+}
+
diff --git a/storage/rocksdb/rdb_datadic.h b/storage/rocksdb/rdb_datadic.h
new file mode 100644
index 0000000..9015e9a
--- /dev/null
+++ b/storage/rocksdb/rdb_datadic.h
@@ -0,0 +1,359 @@
+/* 
+   Copyright (c) 2012,2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+class RDBSE_KEYDEF;
+class Field_pack_info;
+
+inline void store_index_number(uchar *dst, uint32 number)
+{
+#ifdef WORDS_BIGENDIAN
+    memcpy(dst, &number, RDBSE_KEYDEF::INDEX_NUMBER_SIZE);
+#else
+    const uchar *src= (uchar*)&number;
+    dst[0]= src[3];
+    dst[1]= src[2];
+    dst[2]= src[1];
+    dst[3]= src[0];
+#endif
+}
+
+/*
+  An object of this class represents information about an index in an SQL 
+  table. It provides services to encode and decode index tuples.
+
+  There are several data encodings. 
+  
+  === SQL LAYER ===
+  SQL layer uses two encodings:
+
+  - "Table->record format". This is the format that is used for the data in
+     the record buffers, table->record[i]
+
+  - KeyTupleFormat (see opt_range.cc) - this is used in parameters to index
+    lookup functions, like handler::index_read_map().
+
+  === Inside RocksDB === 
+  Primary Key is stored as a mapping:
+
+    index_tuple -> StoredRecord
+  
+  StoredRecord is in Table->record format, except for blobs, which are stored
+  in-place. See ha_rocksdb::convert_record_to_storage_format for details.
+
+  Secondary indexes are stored as one of two variants:
+   
+    index_tuple -> unpack_info
+    index_tuple -> empty_string
+  
+  index_tuple here is the form of key that can be compared with memcmp(), aka
+  "mem-comparable form".
+
+  unpack_info is extra data that allows to restore the original value from its
+  mem-comparable form. It is present only if the index supports index-only 
+  reads.
+*/
+
+class RDBSE_KEYDEF
+{
+public:
+  /* Convert a key from KeyTupleFormat to mem-comparable form */
+  uint pack_index_tuple(TABLE *tbl, uchar *packed_tuple, 
+                        const uchar *key_tuple, key_part_map keypart_map);
+  
+  /* Convert a key from Table->record format to mem-comparable form */
+  uint pack_record(TABLE *tbl, const uchar *record, uchar *packed_tuple, 
+                   uchar *unpack_info, int *unpack_info_len,
+                   uint n_key_parts=0);
+  int unpack_record(TABLE *table, uchar *buf, const rocksdb::Slice *packed_key,
+                    const rocksdb::Slice *unpack_info);
+
+  /* Get the key that is the "infimum" for this index */
+  inline void get_infimum_key(uchar *key, uint *size)
+  {
+    store_index_number(key, index_number);
+    *size= INDEX_NUMBER_SIZE;
+  }
+  
+  /* Get the key that is a "supremum" for this index */
+  inline void get_supremum_key(uchar *key, uint *size)
+  {
+    store_index_number(key, index_number+1);
+    *size= INDEX_NUMBER_SIZE;
+  }
+  
+  /* Make a key that is right after the given key. */
+  void successor(uchar *packed_tuple, uint len);
+
+  /*
+    This can be used to compare prefixes.
+    if  X is a prefix of Y, then we consider that X = Y.
+  */
+  int cmp_full_keys(const char *pa, uint a_len, const char *pb, uint b_len,
+                    uint n_parts)
+  {
+    DBUG_ASSERT(covers_key(pa, a_len));
+    DBUG_ASSERT(covers_key(pb, b_len));
+
+    uint min_len= a_len < b_len? a_len : b_len;
+    int res= memcmp(pa, pb, min_len);
+    return res;
+  }
+  
+  /* Check if given mem-comparable key belongs to this index */
+  bool covers_key(const char *key, uint keylen)
+  {
+    if (keylen < INDEX_NUMBER_SIZE)
+      return false;
+    if (memcmp(key, index_number_storage_form, INDEX_NUMBER_SIZE))
+      return false;
+    else
+      return true;
+  }
+  
+  /* Must only be called for secondary keys: */
+  uint get_primary_key_tuple(RDBSE_KEYDEF *pk_descr, const char *key, 
+                             char *pk_buffer);
+   
+  /* Return max length of mem-comparable form */
+  uint max_storage_fmt_length() 
+  {
+    return maxlength;
+  }
+
+  RDBSE_KEYDEF(uint indexnr_arg, uint keyno_arg) : 
+    index_number(indexnr_arg), 
+    pk_key_parts(NULL),
+    pack_info(NULL),
+    keyno(keyno_arg), 
+    maxlength(0) // means 'not intialized'
+  {
+    store_index_number(index_number_storage_form, indexnr_arg);
+  }
+  ~RDBSE_KEYDEF();
+  
+  enum {
+    INDEX_NUMBER_SIZE= 4
+  };
+
+  void setup(TABLE *table);
+  
+private:
+  
+  /* Global number of this index (used as prefix in StorageFormat) */
+  const uint32 index_number;
+
+  uchar index_number_storage_form[INDEX_NUMBER_SIZE]; 
+  
+  friend class RDBSE_TABLE_DEF; // for index_number above
+
+  class PK_KEY_PART
+  {
+  public:
+    uint offset;
+    uint size;
+  };
+
+  /*
+    Array of descriptions of primary key columns. 
+     - element #0 describes the first PK column, 
+     - element #1 describes the second PK column, and so forth.
+    the offsets are offsets of column representation in StorageFormat
+    representation of this index.
+  */
+  PK_KEY_PART *pk_key_parts;
+  uint n_pk_key_parts;
+  
+  /* Array of index-part descriptors. */
+  Field_pack_info *pack_info;
+  
+  uint keyno; /* number of this index in the table */
+  
+  /*
+    Length of the mem-comparable form. In the encoding we're using, it is
+    constant (any value will have this length).
+  */
+  uint maxlength;
+  
+  /* Length of the unpack_data */
+  uint unpack_data_len;
+};
+
+
+typedef void (*make_unpack_info_t) (Field_pack_info *fpi, Field *field, uchar *dst);
+typedef int (*index_field_unpack_t)(Field_pack_info *fpi, Field *field,
+                                    const uchar *tuple,
+                                    const uchar *unpack_info);
+
+/*
+  This stores information about how a field can be packed to mem-comparable
+  form and unpacked back.
+*/
+
+class Field_pack_info
+{
+public:
+  /* 
+    Offset of the image of this field in the mem-comparable image. This field
+    must be set from outside of the class
+  */
+  int image_offset;
+
+  /* Length of mem-comparable image of the field, in bytes */
+  int image_len;
+  
+  /* Length of image in the unpack data */
+  int unpack_data_len;
+  int unpack_data_offset;
+
+  /* Offset of field data in table->record[i] from field->ptr. */
+  int field_data_offset;
+  
+  bool maybe_null; /* TRUE <=> NULL-byte is stored */
+
+  /*
+    Pack function is assumed to be:
+     - store NULL-byte, if needed
+     - call field->make_sort_key();
+    If you neeed to unpack, you should also call
+  */
+  make_unpack_info_t make_unpack_info_func;
+
+  /*
+    This function takes
+    - mem-comparable form 
+    - unpack_info data 
+    and restores the original value.
+  */
+  index_field_unpack_t unpack_func;
+
+  bool setup(Field *field);
+};
+
+
+/* 
+  A table definition. This is an entry in the mapping
+    
+    dbname.tablename -> {index_nr, index_nr, ... }
+
+  There is only one RDBSE_TABLE_DEF object for a given table.
+  That's why we keep auto_increment value here, too.
+*/
+
+class RDBSE_TABLE_DEF
+{
+public:
+  RDBSE_TABLE_DEF() : key_descr(NULL), auto_incr_val(1)
+  {
+    mysql_mutex_init(0, &mutex, MY_MUTEX_INIT_FAST);
+  }
+  ~RDBSE_TABLE_DEF()
+  {
+    mysql_mutex_destroy(&mutex);
+    /* Don't free key definitions */
+    if (key_descr)
+    {
+      for (uint i= 0; i < n_keys; i++)
+        delete key_descr[i];
+      my_free(key_descr);
+    }
+  }
+  /* Stores 'dbname.tablename' */
+  StringBuffer<64> dbname_tablename;
+  
+  /* Number of indexes */
+  uint n_keys;
+  
+  /* Array of index descriptors */
+  RDBSE_KEYDEF **key_descr;
+  
+  mysql_mutex_t mutex; // guards the following:
+  longlong auto_incr_val;
+
+  void write_to(rocksdb::DB *rdb_dict, uchar *key, size_t keylen);
+};
+
+
+/* 
+  A thread-safe sequential number generator. Its performance is not a concern
+*/
+
+class Sequence_generator
+{
+  int next_number;
+
+  mysql_mutex_t mutex;
+public:
+  void init(int initial_number)
+  {
+    mysql_mutex_init(0 , &mutex, MY_MUTEX_INIT_FAST);
+    next_number= initial_number;
+  }
+
+  int get_next_number() 
+  {
+    int res;
+    mysql_mutex_lock(&mutex);
+    res= next_number++;
+    mysql_mutex_unlock(&mutex);
+    return res;
+  }
+
+  void cleanup()
+  {
+    mysql_mutex_destroy(&mutex);
+  }
+};
+
+
+/*
+  This contains a mapping of 
+
+     dbname.table_name -> array{RDBSE_KEYDEF}.
+
+  objects are shared among all threads.
+*/
+
+class Table_ddl_manager
+{
+  HASH ddl_hash; // Contains RDBSE_TABLE_DEF elements
+  mysql_rwlock_t rwlock;
+
+  Sequence_generator sequence;
+
+  enum {
+    DDL_ENTRY_INDEX_NUMBER=1
+  };
+
+public:
+  bool init(rocksdb::DB *rdb_dict);
+  void cleanup();
+
+  int put_and_write(RDBSE_TABLE_DEF *key_descr, rocksdb::DB *rdb_dict);
+  int put(RDBSE_TABLE_DEF *key_descr, bool lock= true);
+  void remove(RDBSE_TABLE_DEF *rec, rocksdb::DB *rdb_dict, bool lock=true);
+
+  RDBSE_TABLE_DEF *find(uchar *table_name, uint len, bool lock=true);
+  
+  bool rename(uchar *from, uint from_len, uchar *to, uint to_len, 
+              rocksdb::DB *rdb_dict);
+
+  int get_next_number() { return sequence.get_next_number(); }
+private:
+  static uchar* get_hash_key(RDBSE_TABLE_DEF *rec, size_t *length,
+                             my_bool not_used __attribute__((unused)));
+  static void free_hash_elem(void* data);
+};
+
diff --git a/storage/rocksdb/rdb_locks.cc b/storage/rocksdb/rdb_locks.cc
new file mode 100644
index 0000000..1ee15cb
--- /dev/null
+++ b/storage/rocksdb/rdb_locks.cc
@@ -0,0 +1,311 @@
+/* 
+   Copyright (c) 2012,2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation        // gcc: Class implementation
+#endif
+
+#include <mysql/plugin.h>
+
+#include "ha_rocksdb.h"  // TODO: this is too much
+
+#include "rdb_locks.h"
+
+static uchar* get_row_lock_hash_key(const uchar *entry, size_t* key_len, my_bool)
+{
+  Row_lock *rlock= (Row_lock*)entry;
+  *key_len= rlock->len;
+  return (uchar*) rlock->rowkey;
+}
+
+/**
+  Row_lock constructor
+
+  It is called from lf_hash and takes a pointer to an LF_SLIST instance.
+  Row_lock is located at arg+sizeof(LF_SLIST)
+*/
+static void rowlock_init(uchar *arg)
+{
+  Row_lock *rc= (Row_lock*)(arg+LF_HASH_OVERHEAD);
+  DBUG_ENTER("rowlock_init");
+
+  memset(rc, 0, sizeof(*rc));
+ 
+  mysql_mutex_init(0 /* TODO: register in P_S. */, &rc->mutex, MY_MUTEX_INIT_FAST);
+  mysql_cond_init(0, &rc->cond, 0);
+  
+  rc->waiters= 0;
+  DBUG_VOID_RETURN;
+}
+
+
+/**
+  Row_lock destructor
+
+  It is called from lf_hash and takes a pointer to an LF_SLIST instance.
+  Row_lock is located at arg+sizeof(LF_SLIST)
+*/
+static void rowlock_destroy(uchar *arg)
+{
+  Row_lock *rc= (Row_lock*)(arg+LF_HASH_OVERHEAD);
+  DBUG_ENTER("rowlock_destroy");
+
+  mysql_mutex_destroy(&rc->mutex);
+  mysql_cond_destroy(&rc->cond);
+
+  DBUG_ASSERT(rc->waiters == 0);
+  DBUG_VOID_RETURN;
+}
+
+
+/*
+  Global initialization. Should be called before any other operation.
+*/
+
+void LockTable::init(lf_key_comparison_func_t key_cmp_func, 
+                     lf_hashfunc_t hashfunc)
+{
+  lf_hash_init(&lf_hash, sizeof(Row_lock), LF_HASH_UNIQUE, 0 /* key offset */,
+               0 /*key_len*/, get_row_lock_hash_key /*get_hash_key*/,
+               NULL /*charset*/);
+
+  lf_hash.alloc.constructor= rowlock_init;
+  lf_hash.alloc.destructor=  rowlock_destroy;
+
+  lf_hash.key_comparator= key_cmp_func;
+  lf_hash.hashfunc=       hashfunc;
+
+  lf_hash.element_size= offsetof(Row_lock, mutex);
+}
+
+
+/* This should be called when shutting down */
+
+void LockTable::cleanup()
+{
+  /* We should not have any locks at this point */ 
+  DBUG_ASSERT(lf_hash.count == 0);
+  lf_hash_destroy(&lf_hash);
+}
+
+
+/*
+  Get a lock for given row
+
+  @param pins          Pins for this thread as returned by LockTable::get_pins().
+  @param key           Row key
+  @param keylen        Length of the row key, in bytes.
+  @param timeout_sec   Wait at most this many seconds.
+
+  @return
+    pointer  Pointer to the obtained lock
+    NULL     Failed to acquire the lock (timeout or out-of-memory error).
+
+
+  @note 
+    The code is based on wt_thd_will_wait_for() in mysys/waiting_threads.c
+*/
+
+Row_lock* LockTable::get_lock(LF_PINS *pins, const uchar* key, size_t keylen,
+                              int timeout_sec)
+{
+  Row_lock *found_lock;
+  void *ptr;
+  bool inserted= false;
+
+  uchar *key_copy= NULL;
+
+retry:
+  while (!(ptr= lf_hash_search(&lf_hash, pins, key, keylen)))
+  {
+    Row_lock new_lock;
+    new_lock.deleted= FALSE;
+    new_lock.waiters= 0;
+    new_lock.busy= 0;
+
+    if (!key_copy && !(key_copy= (uchar*)my_malloc(keylen, MYF(0))))
+      return NULL;
+    memcpy(key_copy, key, keylen);
+    new_lock.rowkey= (char*)key_copy;
+    new_lock.len= keylen;
+    
+    int res= lf_hash_insert(&lf_hash, pins, &new_lock);
+    
+    if (res == -1)
+       goto return_null; /* out of memory */
+    
+    inserted= !res;
+    if (inserted)
+    {
+      /* 
+        key_copy is now used in the entry in hash table.
+        We should not free it.
+      */
+      key_copy= NULL;
+    }
+
+    /*
+      Two cases: either lf_hash_insert() failed - because another thread
+      has just inserted a resource with the same id - and we need to retry.
+
+      Or lf_hash_insert() succeeded, and then we need to repeat
+      lf_hash_search() to find a real address of the newly inserted element.
+
+      That is, we don't care what lf_hash_insert() has returned.
+      And we need to repeat the loop anyway.
+    */
+  }
+    
+  if (ptr == MY_ERRPTR)
+    goto return_null; /* Out of memory */
+
+  found_lock= (Row_lock*)ptr;
+  mysql_mutex_lock(&found_lock->mutex);
+
+  if (found_lock->deleted)
+  {
+    /* We have found the lock, but it was deleted after that */
+    mysql_mutex_unlock(&found_lock->mutex);
+    lf_hash_search_unpin(pins);
+    goto retry;
+  }
+
+  /* We're holding Row_lock's mutex, which prevents anybody from deleting it */
+  lf_hash_search_unpin(pins);
+
+  if (!found_lock->busy)
+  {
+    /* We got the Row_lock. Do nothing. */
+    found_lock->busy= 1;
+    found_lock->owner_data= pins;
+    mysql_mutex_unlock(&found_lock->mutex);
+  }
+  else
+  {
+    if (found_lock->owner_data == pins)
+    {
+      /* We already own this lock */
+      found_lock->busy++;
+      mysql_mutex_unlock(&found_lock->mutex);
+    }
+    else
+    {
+      /* The found row_lock is not ours. Wait for it. */
+      found_lock->waiters++;
+      int res= 0;
+
+      struct timespec wait_timeout;
+      set_timespec(wait_timeout, timeout_sec);
+#ifndef STANDALONE_UNITTEST
+      THD *thd= current_thd;
+      PSI_stage_info old_stage;
+      thd_enter_cond(thd, &found_lock->cond, &found_lock->mutex,
+                     &stage_waiting_on_row_lock, &old_stage);
+#endif
+      while (found_lock->busy)
+      {
+        res= mysql_cond_timedwait(&found_lock->cond, &found_lock->mutex, 
+                                  &wait_timeout);
+        bool killed= false;
+#ifndef STANDALONE_UNITTEST
+        killed= thd_killed(thd);
+#endif
+        if (res == ETIMEDOUT || killed)
+        {
+          if (found_lock->busy)
+          {
+            // We own the mutex still
+            found_lock->waiters--; // we're not waiting anymore
+            mysql_mutex_unlock(&found_lock->mutex);
+            goto return_null;
+          }
+          else
+            break;
+        }
+        if (res!=0)
+          fprintf(stderr, "wait failed: %d\n", res);
+      }
+      
+      /*
+        Ok, now we own the mutex again, and the lock is released. Take it.
+      */
+      DBUG_ASSERT(!found_lock->busy);
+      found_lock->busy= 1;
+      found_lock->owner_data= pins;
+      found_lock->waiters--; // we're not waiting anymore
+#ifndef STANDALONE_UNITTEST
+      thd_exit_cond(thd, &old_stage);
+#else
+      mysql_mutex_unlock(&found_lock->mutex);
+#endif
+    }
+  }
+
+  if (key_copy)
+    my_free(key_copy);
+  return found_lock;
+
+return_null:
+  if (key_copy)
+    my_free(key_copy);
+  return NULL;
+}
+
+
+/*
+  Release the previously obtained lock
+    @param pins      This thread pins
+    @param own_lock  Previously obtained lock
+*/
+
+void LockTable::release_lock(LF_PINS *pins, Row_lock *own_lock)
+{
+  /* Acquire the mutex to prevent anybody from getting into the queue */
+  mysql_mutex_lock(&own_lock->mutex);
+
+  DBUG_ASSERT(own_lock->owner_data == pins);
+ 
+  if (--own_lock->busy)
+  {
+    /* 
+      We've released the lock once. We've acquired it more than once though,
+      so we still keep it.
+    */
+    mysql_mutex_unlock(&own_lock->mutex);
+    return; 
+  }
+
+  if (own_lock->waiters)
+  {
+    /* 
+      Somebody is waiting for this lock (they can't stop as we're holding the
+      mutex). They are now responsible for disposing of the lock.
+    */
+    mysql_cond_signal(&own_lock->cond);
+    mysql_mutex_unlock(&own_lock->mutex);
+  }
+  else
+  {
+    /* Nobody's waiting. Release the lock */
+    char *rowkey= own_lock->rowkey;
+    own_lock->deleted= true;
+    mysql_mutex_unlock(&own_lock->mutex);
+    int res= lf_hash_delete(&lf_hash, pins, own_lock->rowkey, own_lock->len);
+    DBUG_ASSERT(res == 0);
+    my_free(rowkey);
+  }
+}
+
diff --git a/storage/rocksdb/rdb_locks.h b/storage/rocksdb/rdb_locks.h
new file mode 100644
index 0000000..e83ccc2
--- /dev/null
+++ b/storage/rocksdb/rdb_locks.h
@@ -0,0 +1,96 @@
+/* 
+   Copyright (c) 2012,2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface			/* gcc class implementation */
+#endif
+
+#include "my_sys.h"
+#include "lf.h"
+
+/*
+  A row lock that one gets from LockTable.
+
+  note: the structure is stored in LF_HASH, which will copy a part of
+  structure with memcpy(). See LockTable::init().
+*/
+class Row_lock
+{
+public:
+  char *rowkey; /* The key this lock is for */
+  int len; /* length of the rowkey */
+  
+  /* TRUE - this row_lock is being deleted */
+  bool deleted;
+  
+  /* How many are waiting for the lock */
+  int waiters;
+  
+  /* 
+    busy==0 - means free
+    busy>=1 - means the lock is occupied, the number tells how many rows the
+              lock was acquired
+  */
+  int busy;
+  
+  /* 
+    Some opaque data that identifies the lock owner.  This is needed so we can
+    tell if this is the lock owner requesting the lock the second time, or
+    somebody else.
+  */
+  void *owner_data;
+  
+  /*
+    One must hold this mutex 
+     - when marking lock as busy or free
+     - when adding/removing himself from waiters
+    the mutex is also associated with the condition when waiting for the lock.
+  */
+  mysql_mutex_t mutex;
+  mysql_cond_t cond;
+};
+
+
+/*
+  A table of locks. It is backed by a lock-free hash.
+
+  INTERNALS
+  - Locks are exclusive.
+  - If a thread has an element in the hashtable, it has a lock.
+*/
+
+class LockTable
+{
+public:
+  LF_HASH lf_hash;
+
+public:
+  void init(lf_key_comparison_func_t key_cmp_func,
+            lf_hashfunc_t hashfunc);
+
+  void cleanup();
+  /*
+    Before using the LockTable, each thread should get its own "pins". 
+  */
+  LF_PINS* get_pins() { return lf_hash_get_pins(&lf_hash); }
+  void put_pins(LF_PINS *pins) { return lf_hash_put_pins(pins); }
+
+  Row_lock* get_lock(LF_PINS *pins, const uchar* key, size_t keylen,
+                     int timeout_sec);
+  void release_lock(LF_PINS *pins, Row_lock *own_lock);
+};
+
+
diff --git a/storage/rocksdb/rdb_rowmods.cc b/storage/rocksdb/rdb_rowmods.cc
new file mode 100644
index 0000000..08e579c
--- /dev/null
+++ b/storage/rocksdb/rdb_rowmods.cc
@@ -0,0 +1,364 @@
+/* 
+   Copyright (c) 2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "my_global.h"                   /* ulonglong */
+#include "my_base.h"                     /* ha_rows */
+#include "my_sys.h"
+#include "my_tree.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/write_batch.h"
+#include "rdb_rowmods.h"
+
+void Row_table::init()
+{
+  init_tree(&tree, 512 /*default_alloc_size*/, 0 /*memory_limit*/,
+            sizeof(void*)/*size*/, Row_table::compare_rows, 1 /*with_delete*/,
+            NULL /*free_element*/, NULL/*custom_arg*/);
+  tree.flag |= TREE_NO_DUPS;
+  init_alloc_root(&mem_root, 512, 512);
+  stmt_id= 1;
+  change_id= 0;
+}
+
+
+void Row_table::reinit()
+{
+  if (tree.elements_in_tree > 0)
+  {
+    cleanup();
+    init();
+  }
+}
+
+
+void Row_table::cleanup()
+{
+  delete_tree(&tree);
+  free_root(&mem_root, MYF(0));
+}
+
+
+/*
+  This function may
+  - find no records with the key.
+  - find a record with the key
+  - find a tombstone with the key.
+
+  @param found  OUT  TRUE means we've found a record
+                     FALSE means we've found a tombstone
+
+  @return true - means we've found a record or a tombstone
+          false - means found nothing
+*/
+
+bool Row_table::Get(rocksdb::Slice &key, std::string *record, bool *found)
+{
+  ROW_DATA **row_ptr;
+  if ((row_ptr= (ROW_DATA**)tree_search(&tree, &key, &key)))
+  {
+    ROW_DATA *row= *row_ptr;
+    if (row->value_len == DATA_IS_TOMBSTONE)
+      *found= false; 
+    else
+    {
+      *found= true;
+      record->assign(((char*)row) + ROW_DATA_SIZE + row->key_len, row->value_len);
+    }
+    return true; /* Found either a record or a tombstone */
+  }
+  else
+    return false; /* Not found */
+}
+
+
+int Row_table::compare_rows(const void* arg, const void *a, const void *b)
+{
+  uchar *pa, *pb;
+  size_t a_size, b_size;
+  
+  /* One of the parameters may be a rocksdb slice */
+  if (a == arg)
+  {
+    rocksdb::Slice *slice= (rocksdb::Slice*)a;
+    pa= (uchar*)slice->data();
+    a_size= slice->size();
+  }
+  else
+  {
+    ROW_DATA *row = *((ROW_DATA**)a);
+    a_size= row->key_len;
+    pa= ((uchar*)row) + ROW_DATA_SIZE;
+  }
+  
+  /* Same as above for b */
+  if (b == arg)
+  {
+    rocksdb::Slice *slice= (rocksdb::Slice*)b;
+    pb= (uchar*)slice->data();
+    b_size= slice->size();
+  }
+  else
+  {
+    ROW_DATA *row = *((ROW_DATA**)b);
+    b_size= row->key_len;
+    pb= ((uchar*)row) + ROW_DATA_SIZE;
+  }
+
+  size_t len= (a_size < b_size) ? a_size : b_size;
+  int res= memcmp(pa, pb, len);
+
+  if (!res)
+  {
+    if (a_size < b_size)
+      res= -1;
+    if (a_size > b_size)
+      res= 1;
+  }
+
+  return res;
+}
+
+
+bool Row_table::Put(rocksdb::Slice& key, rocksdb::Slice& val)
+{
+  uchar *data = (uchar*)alloc_root(&mem_root, ROW_DATA_SIZE + key.size() + 
+                                              val.size());
+
+  ROW_DATA *rdata= (ROW_DATA*)data;
+  rdata->key_len= key.size();
+  rdata->value_len= val.size();
+  rdata->stmt_id= stmt_id;
+  rdata->prev_version= NULL;
+  memcpy(data + ROW_DATA_SIZE, key.data(), key.size());
+  memcpy(data + ROW_DATA_SIZE + key.size(), val.data(), val.size());
+ 
+  change_id++;
+  if (!tree_insert(&tree, &data, /*key_size*/0, NULL/*custom_arg*/))
+  {
+    /* There is already a record with this key (or Out-Of-Memory) */
+    ROW_DATA **row_ptr;
+    row_ptr= (ROW_DATA**)tree_search(&tree, &key, &key);
+    if (!row_ptr)
+      return true;
+
+    /*
+      The record is from a previous statement. We may need to get back to
+      that record. Save a pointer to it
+    */
+    if ((*row_ptr)->stmt_id != stmt_id)
+    {
+      rdata->prev_version= *row_ptr;
+    }
+    *row_ptr= rdata;
+  }
+  return false;
+}
+
+
+/*
+  Put a tombstone into the table
+*/
+
+bool Row_table::Delete(rocksdb::Slice& key)
+{
+  uchar *data = (uchar*)alloc_root(&mem_root, ROW_DATA_SIZE + key.size());
+  ROW_DATA *rdata= (ROW_DATA*)data;
+  rdata->key_len= key.size();
+  rdata->value_len= DATA_IS_TOMBSTONE;
+  rdata->stmt_id= stmt_id;
+  rdata->prev_version= NULL;
+  memcpy(data + ROW_DATA_SIZE, key.data(), key.size());
+
+  change_id++;
+
+  if (!tree_insert(&tree, &data, /*key_size*/0, NULL/*custom_arg*/))
+  {
+    /* There is already a record with this key (or Out-Of-Memory) */
+    ROW_DATA **row_ptr;
+    row_ptr= (ROW_DATA**)tree_search(&tree, &key, &key);
+    if (!row_ptr)
+      return true; /* OOM */
+    
+    if ((*row_ptr)->stmt_id != stmt_id)
+    {
+      /*
+        The record is from a previous statement. We may need to get back to
+        that record. Save a pointer to it
+      */
+      rdata->prev_version= *row_ptr;
+    }
+
+    /* Put the new record instead of the old one */
+    *row_ptr= rdata;
+  }
+  return false;
+}
+
+
+void Row_table::start_stmt()
+{
+  stmt_id++;
+}
+
+
+/*
+  Undo all changes made with the current stmt_id.
+*/
+void Row_table::rollback_stmt()
+{
+  ROW_DATA *delete_list= NULL;
+  Row_table_iter iter(this);
+
+  /*
+    To avoid invalidating the iterator, first collect all items that need to be
+    deleted in a linked list, and then actually do the deletes.
+  */
+  for (iter.SeekToFirst(); iter.Valid(); iter.Next())
+  {
+    if ((*iter.row_ptr)->stmt_id == stmt_id)
+    {
+      if ((*iter.row_ptr)->prev_version)
+      {
+        /*
+          This element has a previous version (the previous version is what the
+          element was before the current statement).
+          Replace the element with the its previous version. They have the same
+          key value, so there is no need to re-balance the tree.
+        */
+        *iter.row_ptr= (*iter.row_ptr)->prev_version;
+      }
+      else
+      {
+        /* No previous version. Record for removal */
+        (*iter.row_ptr)->prev_version= delete_list;
+        delete_list= (*iter.row_ptr);
+      }
+    }
+  }
+  
+  /* Do all of the recorded deletes */
+  while (delete_list) 
+  {
+    ROW_DATA *next= delete_list->prev_version;
+    
+    tree_delete(&tree, &delete_list, /*key_size*/ 0, NULL);
+
+    delete_list= next;
+  }
+
+  change_id++;
+}
+
+
+/****************************************************************************
+ * Row_table_iter
+ ***************************************************************************/
+
+Row_table_iter::Row_table_iter(Row_table *rtable_arg) : 
+  rtable(rtable_arg), row_ptr(NULL), change_id(rtable_arg->change_id)
+{}
+
+
+void Row_table_iter::Seek(const rocksdb::Slice &slice)
+{
+  row_ptr= (ROW_DATA**)tree_search_key(&rtable->tree, &slice, parents, &last_pos,
+                                       HA_READ_KEY_OR_NEXT, &slice/*custom_arg*/);
+  change_id= rtable->change_id;
+}
+
+
+void Row_table_iter::SeekToFirst()
+{
+  row_ptr= (ROW_DATA**)tree_search_edge(&rtable->tree, parents, &last_pos, 
+                                        offsetof(TREE_ELEMENT, left));
+  change_id= rtable->change_id;
+}
+
+
+void Row_table_iter::SeekToLast()
+{
+  row_ptr= (ROW_DATA**)tree_search_edge(&rtable->tree, parents, &last_pos, 
+                                        offsetof(TREE_ELEMENT, right));
+  change_id= rtable->change_id;
+}
+
+
+void Row_table_iter::Next()
+{
+  if (rtable->change_id != change_id)
+  {
+    change_id= rtable->change_id;
+    row_ptr= (ROW_DATA**)tree_search_key(&rtable->tree, row_ptr, parents, 
+                                         &last_pos, HA_READ_AFTER_KEY,
+                                         NULL/*custom_arg*/);
+  }
+  else
+  {
+    row_ptr= (ROW_DATA**)tree_search_next(&rtable->tree, &last_pos, 
+                                          offsetof(TREE_ELEMENT, left),
+                                          offsetof(TREE_ELEMENT, right));
+  }
+}
+
+
+void Row_table_iter::Prev()
+{
+  if (rtable->change_id != change_id)
+  {
+    change_id= rtable->change_id;
+    row_ptr= (ROW_DATA**)tree_search_key(&rtable->tree, row_ptr, parents, 
+                                         &last_pos, HA_READ_BEFORE_KEY,
+                                         NULL/*custom_arg*/);
+  }
+  else
+  {
+    row_ptr= (ROW_DATA**)tree_search_next(&rtable->tree, &last_pos,
+                                          offsetof(TREE_ELEMENT, right),
+                                          offsetof(TREE_ELEMENT, left));
+  }
+}
+
+
+bool Row_table_iter::Valid()
+{
+  return (row_ptr != NULL);
+}
+
+
+bool Row_table_iter::is_tombstone()
+{
+  DBUG_ASSERT(Valid());
+  return ((*row_ptr)->value_len == DATA_IS_TOMBSTONE);
+}
+
+
+rocksdb::Slice Row_table_iter::key()
+{
+  DBUG_ASSERT(Valid());
+  return rocksdb::Slice(((char*)*row_ptr) + ROW_DATA_SIZE, (*row_ptr)->key_len);
+}
+
+
+rocksdb::Slice Row_table_iter::value()
+{
+  DBUG_ASSERT(Valid() && !is_tombstone());
+  ROW_DATA *row= *row_ptr;
+  return rocksdb::Slice(((char*)row) + ROW_DATA_SIZE + row->key_len, 
+                        row->value_len);
+}
+
diff --git a/storage/rocksdb/rdb_rowmods.h b/storage/rocksdb/rdb_rowmods.h
new file mode 100644
index 0000000..bc5a53c
--- /dev/null
+++ b/storage/rocksdb/rdb_rowmods.h
@@ -0,0 +1,140 @@
+/* 
+   Copyright (c) 2013 Monty Program Ab
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "my_tree.h"
+
+
+typedef struct st_row_data
+{
+  size_t key_len;
+
+  /* Can have a special value: DATA_IS_TOMBSTONE */
+  size_t value_len;
+  
+  /* Previous version */
+  struct st_row_data *prev_version;
+ 
+  /* Number of the statement that inserted this row/tombstone */
+  int stmt_id;
+
+  /* 
+    This structure is always followed by the key, which is followed by the
+    value
+  */
+} ROW_DATA;
+
+const size_t ROW_DATA_SIZE= ALIGN_SIZE(sizeof(ROW_DATA));
+
+const size_t DATA_IS_TOMBSTONE= size_t(-1);
+
+class Row_table;
+
+
+/*
+  A rocksdb-like iterator for traversing contents of Row_table. 
+  
+  Changes (insertion/removal of records) to the underlying Row_table will not
+  invalidate this iterator (internally, the iterator will detect the change
+  and re-position itself).
+
+  The iterator uses ideas from B-TREE index scans on ha_heap tables.
+*/
+
+class Row_table_iter
+{
+  Row_table *rtable; /* Table this iterator is for*/
+
+  /* The following are for tree iteration: */
+  TREE_ELEMENT *parents[MAX_TREE_HEIGHT+1];
+  TREE_ELEMENT **last_pos;
+  ROW_DATA **row_ptr;
+
+  /* 
+    If rtable->change_id is greater than ours, the iterator is invalidated and
+    we need to re-position in the tree 
+  */
+  int change_id;
+  friend class Row_table;
+public:
+  Row_table_iter(Row_table *rtable_arg);
+
+  /* Scanning functions */
+  void Seek(const rocksdb::Slice &slice);
+  void SeekToFirst();
+  void SeekToLast();
+
+  void Next();
+  void Prev();
+  
+  /* Functions to get information about the current element */
+  bool Valid();
+  bool is_tombstone();
+  rocksdb::Slice key();
+  rocksdb::Slice value();
+};
+
+
+/*
+  A storage for rows, or their tombstones. One can use rocksdb-like iterators
+  to traverse the rows.
+*/
+
+class Row_table
+{
+  TREE tree;
+  MEM_ROOT mem_root;
+
+  /* Current statement id */
+  int stmt_id;
+  
+  /* 
+    This is incremented on every change, so iterators can know 
+    if they were invalidated and should re-position themselves.
+  */
+  int change_id;
+
+  friend class Row_table_iter;
+public:
+  /* This is like a constructor */
+  void init(); 
+
+  void cleanup();
+  void reinit();
+
+  /* Operations to put a row, or a tombstone */
+  bool Put(rocksdb::Slice& key, rocksdb::Slice& val);
+  bool Delete(rocksdb::Slice& key);
+  
+  /* Lookup may find nothing, find row, of find a tombstone */
+  bool Get(rocksdb::Slice &key, std::string *record, bool *found);
+  
+  /*
+    Statement support. It is possible to rollback all changes made by the
+    current statement.
+  */
+  void start_stmt();
+  void rollback_stmt();
+
+  /* This may return false when there are really no changes (TODO: still true?) */
+  bool is_empty()
+  {
+    return test(tree.elements_in_tree == 0);
+  };
+private:
+  static int compare_rows(const void* arg, const void *a,const void *b);
+};
+
+
diff --git a/storage/rocksdb/unittest/CMakeLists.txt b/storage/rocksdb/unittest/CMakeLists.txt
new file mode 100644
index 0000000..b543151
--- /dev/null
+++ b/storage/rocksdb/unittest/CMakeLists.txt
@@ -0,0 +1,11 @@
+
+INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/zlib
+                    ${CMAKE_SOURCE_DIR}/unittest/mytap)
+LINK_LIBRARIES(mytap mysys dbug strings)
+
+ADD_DEFINITIONS(-DSTANDALONE_UNITTEST)
+
+ADD_EXECUTABLE(test_rowlocks
+        test_rowlocks.cc ../rdb_locks.cc ../rdb_locks.h)
+#MY_ADD_TEST(test_rowlocks)
+
diff --git a/storage/rocksdb/unittest/test_rowlocks.cc b/storage/rocksdb/unittest/test_rowlocks.cc
new file mode 100644
index 0000000..d065a34
--- /dev/null
+++ b/storage/rocksdb/unittest/test_rowlocks.cc
@@ -0,0 +1,165 @@
+/*
+  TODO: MP AB Copyrights
+*/
+#include <my_sys.h>
+#include <string.h>
+#include "../rdb_locks.h"
+
+#include "thr_template.cc"
+
+
+/* This will hold one lock table that we're testing */
+LockTable *lock_table;
+
+const int N_ACCTS= 100;
+int bank_accounts[N_ACCTS];
+int total_money;
+
+bool prevent_deadlocks= true;
+
+int n_deadlocks= 0;
+
+int timeout_sec;
+
+int compare_int_keys(const uchar *s, size_t slen, 
+                     const uchar *t, size_t tlen)
+{
+  DBUG_ASSERT(slen==sizeof(int));
+  DBUG_ASSERT(tlen==sizeof(int));
+  int sval;
+  int tval;
+  memcpy(&sval, s, sizeof(int));
+  memcpy(&tval, t, sizeof(int));
+  if (sval < tval)
+    return -1;
+  else if (sval > tval)
+    return 1;
+  else
+    return 0;
+}
+
+/* Not really a hash function */
+ulong int_hashfunc(const char *key, size_t key_len)
+{
+  DBUG_ASSERT(key_len == sizeof(int));
+  int keyval;
+  memcpy(&keyval, key, sizeof(int));
+  return keyval;
+}
+
+
+/* This is one thread */
+pthread_handler_t locktable_test1(void *arg)
+{
+  LF_PINS *pins;
+  pins= lf_hash_get_pins(&lock_table->lf_hash);
+
+  /* In a loop, get a couple of locks */
+  int loop;
+  DBUG_ASSERT(RAND_MAX > N_ACCTS);
+
+  for (loop=0; loop < 200*1000; loop++)
+  {
+    int val1, val2;
+    val1= rand() % N_ACCTS;
+    do {
+      val2= rand() % N_ACCTS;
+    } while (val2 == val1);
+
+    if (prevent_deadlocks && val2 > val1)
+    {
+      int tmp=val2;
+      val2= val1;
+      val1= tmp;
+    }
+
+    int transfer=150;
+
+    Row_lock *lock1;
+    Row_lock *lock2;
+
+    lock1= lock_table->get_lock(pins, (uchar*)&val1, sizeof(int), timeout_sec);
+    DBUG_ASSERT(lock1);
+    lock2= lock_table->get_lock(pins, (uchar*)&val2, sizeof(int), timeout_sec);
+
+    if (!prevent_deadlocks && !lock2)
+    {
+      //Couldn't get lock2, must be a deadlock
+      lock_table->release_lock(pins, lock1);
+
+      mysql_mutex_lock(&mutex);
+      n_deadlocks++;
+      mysql_mutex_unlock(&mutex);
+      continue;
+    }
+
+    DBUG_ASSERT(lock2);
+    bank_accounts[val1] -= transfer;
+    bank_accounts[val2] += transfer;
+
+    lock_table->release_lock(pins, lock1);
+    lock_table->release_lock(pins, lock2);
+  }
+
+  lf_hash_put_pins(pins);
+ 
+  // Test harness needs us to signal that we're done:
+  mysql_mutex_lock(&mutex);
+  if (!--running_threads) mysql_cond_signal(&cond);
+  mysql_mutex_unlock(&mutex);
+
+  return 0;
+}
+
+
+void init_shared_data()
+{
+  total_money= 0;
+  for (int i=0; i < N_ACCTS;i++)
+  {
+    bank_accounts[i]= 1000;
+    total_money += bank_accounts[i]; 
+  }
+}
+
+void check_shared_data(const char *name)
+{
+  int money_after= 0;
+  for (int i=0; i < N_ACCTS; i++)
+    money_after += bank_accounts[i];
+  if (money_after == total_money)
+    fprintf(stderr, "# validation %s ok\n", name);
+  else
+    fprintf(stderr, "# validation %s failed: expected %d found %d\n", name,
+            total_money, money_after);
+}
+
+void do_tests()
+{
+  fprintf(stderr, "# lf_hash based lock table tests\n");
+
+  /* Global initialization */
+  lock_table= new LockTable;
+  lock_table->init(compare_int_keys, int_hashfunc);
+
+  init_shared_data();
+  prevent_deadlocks= true;
+  timeout_sec= 10*1000;
+
+  locktable_test1(NULL);
+
+  test_concurrently("locktable_test1", locktable_test1, 2 /*THREADS*/, 10 /*CYCLES*/);
+  check_shared_data("1");
+  
+
+  prevent_deadlocks= false;
+  timeout_sec= 2;
+  test_concurrently("locktable_test1", locktable_test1, 2 /*THREADS*/, 10 /*CYCLES*/);
+  check_shared_data("2");
+  fprintf(stderr, "# n_deadlocks=%d\n", n_deadlocks);
+  
+  lock_table->cleanup();
+
+  fprintf(stderr, "# tests end\n");
+}
+
diff --git a/storage/rocksdb/unittest/thr_template.cc b/storage/rocksdb/unittest/thr_template.cc
new file mode 100644
index 0000000..a0e1e5b
--- /dev/null
+++ b/storage/rocksdb/unittest/thr_template.cc
@@ -0,0 +1,93 @@
+/* Copyright (c) 2006-2008 MySQL AB, 2009 Sun Microsystems, Inc.
+   Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
+
+#include <my_global.h>
+#include <my_sys.h>
+#include <my_atomic.h>
+#include <tap.h>
+
+volatile uint32 bad;
+pthread_attr_t thr_attr;
+mysql_mutex_t mutex;
+mysql_cond_t cond;
+uint running_threads;
+
+void do_tests();
+
+void test_concurrently(const char *test, pthread_handler handler, int n, int m)
+{
+  pthread_t t;
+  ulonglong now= my_getsystime();
+
+  bad= 0;
+
+  diag("Testing %s with %d threads, %d iterations... ", test, n, m);
+  for (running_threads= n ; n ; n--)
+  {
+    if (pthread_create(&t, &thr_attr, handler, &m) != 0)
+    {
+      diag("Could not create thread");
+      abort();
+    }
+  }
+  mysql_mutex_lock(&mutex);
+  while (running_threads)
+    mysql_cond_wait(&cond, &mutex);
+  mysql_mutex_unlock(&mutex);
+
+  now= my_getsystime()-now;
+  ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e7, bad);
+}
+
+int main(int argc __attribute__((unused)), char **argv)
+{
+  MY_INIT("thd_template");
+
+  if (argv[1] && *argv[1])
+    DBUG_SET_INITIAL(argv[1]);
+
+  mysql_mutex_init(0, &mutex, 0);
+  mysql_cond_init(0, &cond, 0);
+  pthread_attr_init(&thr_attr);
+  pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
+
+#ifdef MY_ATOMIC_MODE_RWLOCKS
+#if defined(HPUX11) || defined(__POWERPC__) /* showed to be very slow (scheduler-related) */
+#define CYCLES 300
+#else
+#define CYCLES 3000
+#endif
+#else
+#define CYCLES 3000
+#endif
+#define THREADS 30
+
+  diag("N CPUs: %d, atomic ops: %s", my_getncpus(), MY_ATOMIC_MODE);
+
+  do_tests();
+
+  /*
+    workaround until we know why it crashes randomly on some machine
+    (BUG#22320).
+  */
+  sleep(2);
+  mysql_mutex_destroy(&mutex);
+  mysql_cond_destroy(&cond);
+  pthread_attr_destroy(&thr_attr);
+  my_end(0);
+  return exit_status();
+}
+


More information about the commits mailing list