[Commits] Rev 3526: MDEV-506 Cassandra dynamic columns access in file:///home/bell/maria/bzr/work-maria-5.5-cassandra/

sanja at montyprogram.com sanja at montyprogram.com
Fri Sep 28 15:27:20 EEST 2012


At file:///home/bell/maria/bzr/work-maria-5.5-cassandra/

------------------------------------------------------------
revno: 3526
revision-id: sanja at montyprogram.com-20120928122716-g2n99x05m8rntvqm
parent: sanja at montyprogram.com-20120928110117-pfwaprvex9dwaxsd
committer: sanja at montyprogram.com
branch nick: work-maria-5.5-cassandra
timestamp: Fri 2012-09-28 15:27:16 +0300
message:
  MDEV-506 Cassandra dynamic columns access
-------------- next part --------------
=== modified file 'include/ma_dyncol.h'
--- a/include/ma_dyncol.h	2012-09-28 11:01:17 +0000
+++ b/include/ma_dyncol.h	2012-09-28 12:27:16 +0000
@@ -39,6 +39,12 @@
 */
 #define MAX_DYNAMIC_COLUMN_LENGTH 0X1FFFFFFFL
 
+/*
+  Limits of implementation
+*/
+#define MAX_NAME_LENGTH 255
+#define MAX_TOTAL_NAME_LENGTH 65535
+
 /* NO and OK is the same used just to show semantics */
 #define ER_DYNCOL_NO ER_DYNCOL_OK
 
@@ -50,7 +56,8 @@ enum enum_dyncol_func_result
   ER_DYNCOL_LIMIT=  -2,            /* Some limit reached */
   ER_DYNCOL_RESOURCE= -3,          /* Out of resourses */
   ER_DYNCOL_DATA= -4,              /* Incorrect input data */
-  ER_DYNCOL_UNKNOWN_CHARSET= -5    /* Unknown character set */
+  ER_DYNCOL_UNKNOWN_CHARSET= -5,   /* Unknown character set */
+  ER_DYNCOL_TRUNCATED= 2           /* OK, but data was truncated */
 };
 
 typedef DYNAMIC_STRING DYNAMIC_COLUMN;
@@ -81,6 +88,7 @@ struct st_dynamic_column_value
     struct {
       LEX_STRING value;
       CHARSET_INFO *charset;
+      my_bool nonfreeable;
     } string;
     struct {
       decimal_digit_t buffer[DECIMAL_BUFF_LENGTH];
@@ -108,6 +116,13 @@ dynamic_column_create_many_fmt(DYNAMIC_C
                                uchar *column_keys,
                                DYNAMIC_COLUMN_VALUE *values,
                                my_bool names);
+enum enum_dyncol_func_result
+dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
+                                        uint column_count,
+                                        void *column_keys,
+                                        DYNAMIC_COLUMN_VALUE *values,
+                                        my_bool new_str,
+                                        my_bool string_keys);
 
 enum enum_dyncol_func_result
 dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr,
@@ -163,6 +178,21 @@ dynamic_column_json(DYNAMIC_COLUMN *str,
 #define dynamic_column_initialize(A) memset((A), 0, sizeof(*(A)))
 #define dynamic_column_column_free(V) dynstr_free(V)
 
+/* conversion of values to 3 base types */
+enum enum_dyncol_func_result
+dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
+                       CHARSET_INFO *cs, my_bool quote);
+enum enum_dyncol_func_result
+dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val);
+enum enum_dyncol_func_result
+dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val);
+
+
+enum enum_dyncol_func_result
+dynamic_column_vals(DYNAMIC_COLUMN *str,
+                    DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
+                    char **free_names);
+
 /***************************************************************************
  Internal functions, don't use if you don't know what you are doing...
 ***************************************************************************/

=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result	2012-09-27 07:59:14 +0000
+++ b/mysql-test/r/cassandra.result	2012-09-28 12:27:16 +0000
@@ -365,8 +365,9 @@ CREATE TABLE t2 (rowkey bigint PRIMARY K
 thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
 select * from t2;
 rowkey	datecol
-1	1346189025000
-10	1346189026000
+1	1346192625000
+10	1346192626000
+delete from t2;
 drop table t2;
 #
 # Check whether changing parameters with ALTER TABLE works.
@@ -407,3 +408,141 @@ new-rowkey12	data1-value3	454
 rowkey11	updated-1	34543
 delete from t1;
 drop table t1;
+#
+# Dynamic columns support
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+drop table t2;
+#error: dynamic column is not a blob
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+ERROR 42000: Incorrect column specifier for column 'uuidcol'
+#error: double dynamic column
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+ERROR 42000: Incorrect column specifier for column 'textcol'
+#
+# Dynamic column read
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
+insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+drop table t2;
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
+rowkey	column_list(dyn)	column_get(dyn, 'uuidcol' as char)
+1	`uuidcol`	9b5658dc-f32f-11e1-94cd-f46d046e9f09
+2	`uuidcol`	9b5658dc-f32f-11e1-94cd-f46d046e9f0a
+drop table t2;
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+drop table t2;
+#
+# Dynamic column insert
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two"));
+select rowkey, column_json(dyn) from t2;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn2":"two"}]
+delete from t2;
+drop table t2;
+# bigint
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543));
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":254324},{"dyn1":"1"},{"dyn2":"two"}]
+2	[{"a":2543},{"dyn1":"1"},{"dyn2":"two"}]
+delete from t1;
+drop table t1;
+# int
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543));
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn2":"two"},{"intcol":254324}]
+2	[{"dyn1":"1"},{"dyn2":"two"},{"intcol":2543}]
+delete from t1;
+drop table t1;
+# timestamp
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543));
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn2":"two"},{"datecol":254324}]
+2	[{"dyn1":"1"},{"dyn2":"two"},{"datecol":2543}]
+delete from t1;
+drop table t1;
+# boolean
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0));
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}]
+2	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}]
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}]
+2	[{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3");
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn1":"1"},{"dyn3":"3"},{"boolcol":1}]
+2	[{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1;
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"dyn3":"3"},{"boolcol":1}]
+2	[{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd");
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1}]
+2	[{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, "12345678901234", "ddd");
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1},{"12345678901234":"ddd"}]
+2	[{"a":"ddd"},{"dyn1":"1"},{"boolcol":0},{"12345678901234":"ddd"}]
+update t1 set dyn=column_add(dyn, "12345678901234", null);
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1}]
+2	[{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}]
+update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2;
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1}]
+2	[{"a":"ddd"},{"dyn1":"1"}]
+update t1 set rowkey= 3, dyn=column_add(dyn, "dyn1", null, 'boolcol', 0) where rowkey= 2;
+select rowkey, column_json(dyn) from t1;
+rowkey	column_json(dyn)
+1	[{"a":"ddd"},{"boolcol":1}]
+3	[{"a":"ddd"},{"boolcol":0}]
+delete from t1;
+drop table t1;
+CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1';
+select * from t1;
+ERROR HY000: Internal error: 'Unable to convert value for field `dyn` from Cassandra's data format. Name length exceed limit of 255: 'very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_ver'
+drop table t1;
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) 
+ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
+DELETE FROM t1;
+insert into t1 values (1, column_create("dyn", 1));
+select rowkey, column_list(dyn) from t1;
+rowkey	column_list(dyn)
+1	`dyn`
+delete from t1;
+DROP TABLE t1;
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) 
+ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
+insert into t1 values (1,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+ERROR HY000: Encountered illegal format of dynamic column string
+delete from t1;
+DROP TABLE t1;

=== modified file 'mysql-test/r/mysqld--help.result'
--- a/mysql-test/r/mysqld--help.result	2012-06-19 12:06:45 +0000
+++ b/mysql-test/r/mysqld--help.result	2012-09-28 12:27:16 +0000
@@ -89,6 +89,24 @@ The following options may be given as th
  --bulk-insert-buffer-size=# 
  Size of tree cache used in bulk insert optimisation. Note
  that this is a limit per thread!
+ --cassandra[=name]  Enable or disable CASSANDRA plugin. Possible values are
+ ON, OFF, FORCE (don't start if the plugin fails to load).
+ --cassandra-default-thrift-host=name 
+ Default host for Cassandra thrift connections
+ --cassandra-failure-retries=# 
+ Number of times to retry Cassandra calls that failed due
+ to timeouts or network communication problems. The
+ default, 0, means not to retry.
+ --cassandra-insert-batch-size=# 
+ Number of rows in an INSERT batch
+ --cassandra-multiget-batch-size=# 
+ Number of rows in a multiget(MRR) batch
+ --cassandra-read-consistency=name 
+ Cassandra consistency level to use for read operations
+ --cassandra-rnd-batch-size=# 
+ Number of rows in an rnd_read (full scan) batch
+ --cassandra-write-consistency=name 
+ Cassandra consistency level to use for write operations
  --character-set-client-handshake 
  Don't ignore client side character set value sent during
  handshake.
@@ -863,6 +881,14 @@ binlog-optimize-thread-scheduling TRUE
 binlog-row-event-max-size 1024
 binlog-stmt-cache-size 32768
 bulk-insert-buffer-size 8388608
+cassandra ON
+cassandra-default-thrift-host (No default value)
+cassandra-failure-retries 0
+cassandra-insert-batch-size 100
+cassandra-multiget-batch-size 100
+cassandra-read-consistency ONE
+cassandra-rnd-batch-size 10000
+cassandra-write-consistency ONE
 character-set-client-handshake TRUE
 character-set-filesystem binary
 character-set-server latin1

=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test	2012-09-27 07:59:14 +0000
+++ b/mysql-test/t/cassandra.test	2012-09-28 12:27:16 +0000
@@ -94,6 +94,18 @@ CREATE COLUMN FAMILY cf10
   WITH comparator = UTF8Type
   AND key_validation_class=UTF8Type
   AND default_validation_class = UTF8Type;
+
+CREATE COLUMN FAMILY cfd1
+  WITH comparator = UTF8Type
+  AND key_validation_class=UTF8Type 
+  AND default_validation_class = UTF8Type;
+SET cfd1['1']['very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_long_name']='1';
+
+CREATE COLUMN FAMILY cfd2 
+  WITH comparator = UTF8Type
+    AND key_validation_class=Int32Type
+    AND default_validation_class = UTF8Type;
+
 EOF
 
 --error 0,1,2
@@ -463,7 +475,7 @@ drop table t2;
 CREATE TABLE t2 (rowkey bigint PRIMARY KEY, datecol bigint) ENGINE=CASSANDRA
   thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
 select * from t2;
-
+delete from t2;
 drop table t2;
 
 --echo #
@@ -511,6 +523,118 @@ select * from t1;
 delete from t1;
 drop table t1;
 
+--echo #
+--echo # Dynamic columns support
+--echo #
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+drop table t2;
+
+--echo #error: dynamic column is not a blob
+--error ER_WRONG_FIELD_SPEC
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+
+--echo #error: double dynamic column
+--error ER_WRONG_FIELD_SPEC
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+
+--echo #
+--echo # Dynamic column read
+--echo #
+#prepare data
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
+insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+drop table t2;
+
+#test dynamic column read
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
+drop table t2;
+
+#cleanup data
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+  thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+drop table t2;
+
+--echo #
+--echo # Dynamic column insert
+--echo #
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two")); 
+select rowkey, column_json(dyn) from t2;
+delete from t2;
+drop table t2;
+--echo # bigint
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324)); 
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543)); 
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+--echo # int
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324)); 
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543)); 
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+--echo # timestamp
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324)); 
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543)); 
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+--echo # boolean
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324)); 
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0)); 
+select rowkey, column_json(dyn) from t1;
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3"); 
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1;
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd");
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "12345678901234", "ddd");
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, "12345678901234", null);
+select rowkey, column_json(dyn) from t1;
+update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2;
+select rowkey, column_json(dyn) from t1;
+update t1 set rowkey= 3, dyn=column_add(dyn, "dyn1", null, 'boolcol', 0) where rowkey= 2;
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+
+CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1';
+--error ER_INTERNAL_ERROR
+select * from t1;
+drop table t1;
+
+# MDEV-560
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) 
+ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
+DELETE FROM t1;
+insert into t1 values (1, column_create("dyn", 1));
+select rowkey, column_list(dyn) from t1;
+# Cleanup
+delete from t1;
+DROP TABLE t1;
+
+# MDEV-561 (incorrect format data to dynamic column)
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) 
+ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2';
+--error ER_DYN_COL_WRONG_FORMAT
+insert into t1 values (1,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+delete from t1;
+DROP TABLE t1;
+
+
 ############################################################################
 ## Cassandra cleanup
 ############################################################################

=== modified file 'mysys/ma_dyncol.c'
--- a/mysys/ma_dyncol.c	2012-09-28 11:01:17 +0000
+++ b/mysys/ma_dyncol.c	2012-09-28 12:27:16 +0000
@@ -68,6 +68,8 @@ uint32 copy_and_convert(char *to, uint32
 
 #define MAX_OFFSET_LENGTH  5
 
+#define DYNCOL_NUM_CHAR 6
+
 my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str)
 {
   if (str->length < 1)
@@ -211,7 +213,7 @@ static my_bool check_limit_num(const voi
 
 static my_bool check_limit_str(const void *val)
 {
-  return (*((LEX_STRING **)val))->length > 255;
+  return (*((LEX_STRING **)val))->length > MAX_NAME_LENGTH;
 }
 
 
@@ -288,7 +290,7 @@ my_bool put_header_entry_str(DYN_HEADER
                              size_t offset)
 {
   LEX_STRING *column_name= (LEX_STRING *)column_key;
-  DBUG_ASSERT(column_name->length <= 255);
+  DBUG_ASSERT(column_name->length <= MAX_NAME_LENGTH);
   hdr->entry[0]= column_name->length;
   DBUG_ASSERT(hdr->name - hdr->nmpool < (long) 0x10000L);
   int2store(hdr->entry + 1, hdr->name - hdr->nmpool);
@@ -1381,6 +1383,9 @@ dynamic_new_column_store(DYNAMIC_COLUMN
                        DYNCOL_SYZERESERVE))
       goto err;
   }
+  if (!column_count)
+    return ER_DYNCOL_OK;
+
   bzero(str->str, fmt->fixed_hdr);
   str->length= fmt->fixed_hdr;
 
@@ -1501,7 +1506,7 @@ calc_var_sizes(DYN_HEADER *hdr,
   @return ER_DYNCOL_* return code
 */
 
-static enum enum_dyncol_func_result
+enum enum_dyncol_func_result
 dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
                                         uint column_count,
                                         void *column_keys,
@@ -1761,7 +1766,7 @@ static my_bool
 find_column(DYN_HEADER *hdr, uint numkey, LEX_STRING *strkey)
 {
   LEX_STRING nmkey;
-  char nmkeybuff[6]; /* to fit max 2 bytes number */
+  char nmkeybuff[DYNCOL_NUM_CHAR]; /* to fit max 2 bytes number */
   DBUG_ASSERT(hdr->header != NULL);
 
   if (hdr->header + hdr->header_size > hdr->data_end)
@@ -2169,10 +2174,10 @@ dynamic_column_list_str(DYNAMIC_COLUMN *
     if (header.format == DYNCOL_FMT_NUM)
     {
       uint nm= uint2korr(read);
-      tmp.str= my_malloc(6, MYF(0));
+      tmp.str= my_malloc(DYNCOL_NUM_CHAR, MYF(0));
       if (!tmp.str)
         return ER_DYNCOL_RESOURCE;
-      tmp.length= snprintf(tmp.str, 6, "%u", nm);
+      tmp.length= snprintf(tmp.str, DYNCOL_NUM_CHAR, "%u", nm);
     }
     else
     {
@@ -2208,7 +2213,7 @@ find_place(DYN_HEADER *hdr, void *key, m
   uint mid, start, end, val;
   int flag;
   LEX_STRING str;
-  char buff[6];
+  char buff[DYNCOL_NUM_CHAR];
   my_bool need_conversion= ((string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM) !=
                             hdr->format);
   LINT_INIT(flag);                              /* 100 % safe */
@@ -2425,7 +2430,7 @@ dynamic_column_update_copy(DYNAMIC_COLUM
       size_t offs;
       uint nm;
       DYNAMIC_COLUMN_TYPE tp;
-      char buff[6];
+      char buff[DYNCOL_NUM_CHAR];
 
       if (hdr->format == DYNCOL_FMT_NUM)
       {
@@ -3438,7 +3443,7 @@ end:
 
 enum enum_dyncol_func_result
 dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
-                       my_bool quote)
+                       CHARSET_INFO *cs, my_bool quote)
 {
   char buff[40];
   int len;
@@ -3468,24 +3473,22 @@ dynamic_column_val_str(DYNAMIC_STRING *s
         char *alloc= NULL;
         char *from= val->x.string.value.str;
         uint bufflen;
-        my_bool conv= !my_charset_same(val->x.string.charset,
-                                       &my_charset_utf8_general_ci);
+        my_bool conv= !my_charset_same(val->x.string.charset, cs);
         my_bool rc;
         len= val->x.string.value.length;
-        bufflen= (len * (conv ? my_charset_utf8_general_ci.mbmaxlen : 1));
+        bufflen= (len * (conv ? cs->mbmaxlen : 1));
         if (dynstr_realloc(str, bufflen))
             return ER_DYNCOL_RESOURCE;
 
         // guaranty UTF-8 string for value
-        if (!my_charset_same(val->x.string.charset,
-                             &my_charset_utf8_general_ci))
+        if (!my_charset_same(val->x.string.charset, cs))
         {
           uint dummy_errors;
           if (!quote)
           {
             /* convert to the destination */
             str->length+= copy_and_convert_extended(str->str, bufflen,
-                                                    &my_charset_utf8_general_ci,
+                                                    cs,
                                                     from, len,
                                                     val->x.string.charset,
                                                     &dummy_errors);
@@ -3494,8 +3497,7 @@ dynamic_column_val_str(DYNAMIC_STRING *s
           if ((alloc= (char *)my_malloc(bufflen, MYF(0))))
           {
             len=
-              copy_and_convert_extended(alloc, bufflen,
-                                        &my_charset_utf8_general_ci,
+              copy_and_convert_extended(alloc, bufflen, cs,
                                         from, len, val->x.string.charset,
                                         &dummy_errors);
             from= alloc;
@@ -3543,6 +3545,155 @@ dynamic_column_val_str(DYNAMIC_STRING *s
   return(ER_DYNCOL_OK);
 }
 
+
+enum enum_dyncol_func_result
+dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val)
+{
+  enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
+  *ll= 0;
+  switch (val->type) {
+  case DYN_COL_INT:
+      *ll= val->x.long_value;
+      break;
+    case DYN_COL_UINT:
+      *ll= (longlong)val->x.ulong_value;
+      if (val->x.ulong_value > ULONGLONG_MAX)
+         rc= ER_DYNCOL_TRUNCATED;
+      break;
+    case DYN_COL_DOUBLE:
+      *ll= (longlong)val->x.double_value;
+      if (((double) *ll) != val->x.double_value)
+        rc= ER_DYNCOL_TRUNCATED;
+      break;
+    case DYN_COL_STRING:
+      {
+        longlong i= 0, sign= 1;
+        char *src= val->x.string.value.str;
+        uint len= val->x.string.value.length;
+
+        while (len && my_isspace(&my_charset_latin1, *src)) src++,len--;
+
+        if (len)
+        {
+          if (*src == '-')
+          {
+            sign= -1;
+            src++;
+          } else if (*src == '-')
+            src++;
+          while(len && my_isdigit(&my_charset_latin1, *src))
+          {
+            i= i * 10 + (*src - '0');
+            src++;
+          }
+        }
+        else
+          rc= ER_DYNCOL_TRUNCATED;
+        if (len)
+          rc= ER_DYNCOL_TRUNCATED;
+        *ll= i * sign;
+        break;
+      }
+    case DYN_COL_DECIMAL:
+      if (decimal2longlong(&val->x.decimal.value, ll) != E_DEC_OK)
+        rc= ER_DYNCOL_TRUNCATED;
+      break;
+    case DYN_COL_DATETIME:
+      *ll= (val->x.time_value.year * 10000000000L +
+            val->x.time_value.month * 100000000L +
+            val->x.time_value.day * 1000000 +
+            val->x.time_value.hour * 10000 +
+            val->x.time_value.minute * 100 +
+            val->x.time_value.second) *
+        (val->x.time_value.neg ? -1 : 1);
+      break;
+    case DYN_COL_DATE:
+      *ll= (val->x.time_value.year * 10000 +
+            val->x.time_value.month * 100 +
+            val->x.time_value.day) *
+        (val->x.time_value.neg ? -1 : 1);
+      break;
+    case DYN_COL_TIME:
+      *ll= (val->x.time_value.hour * 10000 +
+            val->x.time_value.minute * 100 +
+            val->x.time_value.second) *
+        (val->x.time_value.neg ? -1 : 1);
+      break;
+    case DYN_COL_NULL:
+      rc= ER_DYNCOL_TRUNCATED;
+      break;
+    default:
+      return(ER_DYNCOL_FORMAT);
+  }
+  return(rc);
+}
+
+
+enum enum_dyncol_func_result
+dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val)
+{
+  enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
+  *dbl= 0;
+  switch (val->type) {
+  case DYN_COL_INT:
+      *dbl= (double)val->x.long_value;
+      if (((longlong) *dbl) != val->x.long_value)
+        rc= ER_DYNCOL_TRUNCATED;
+      break;
+    case DYN_COL_UINT:
+      *dbl= (double)val->x.ulong_value;
+      if (((ulonglong) *dbl) != val->x.ulong_value)
+        rc= ER_DYNCOL_TRUNCATED;
+      break;
+    case DYN_COL_DOUBLE:
+      *dbl= val->x.double_value;
+      break;
+    case DYN_COL_STRING:
+      {
+        char *str, *end;
+        if ((str= malloc(val->x.string.value.length + 1)))
+          return ER_DYNCOL_RESOURCE;
+        memcpy(str, val->x.string.value.str, val->x.string.value.length);
+        str[val->x.string.value.length]= '\0';
+        *dbl= strtod(str, &end);
+        if (*end != '\0')
+          rc= ER_DYNCOL_TRUNCATED;
+      }
+    case DYN_COL_DECIMAL:
+      if (decimal2double(&val->x.decimal.value, dbl) != E_DEC_OK)
+        rc= ER_DYNCOL_TRUNCATED;
+      break;
+    case DYN_COL_DATETIME:
+      *dbl= (double)(val->x.time_value.year * 10000000000L +
+                     val->x.time_value.month * 100000000L +
+                     val->x.time_value.day * 1000000 +
+                     val->x.time_value.hour * 10000 +
+                     val->x.time_value.minute * 100 +
+                     val->x.time_value.second) *
+        (val->x.time_value.neg ? -1 : 1);
+      break;
+    case DYN_COL_DATE:
+      *dbl= (double)(val->x.time_value.year * 10000 +
+                     val->x.time_value.month * 100 +
+                     val->x.time_value.day) *
+        (val->x.time_value.neg ? -1 : 1);
+      break;
+    case DYN_COL_TIME:
+      *dbl= (double)(val->x.time_value.hour * 10000 +
+                     val->x.time_value.minute * 100 +
+                     val->x.time_value.second) *
+        (val->x.time_value.neg ? -1 : 1);
+      break;
+    case DYN_COL_NULL:
+      rc= ER_DYNCOL_TRUNCATED;
+      break;
+    default:
+      return(ER_DYNCOL_FORMAT);
+  }
+  return(rc);
+}
+
+
 /**
   Convert to JSON
 
@@ -3602,10 +3753,11 @@ dynamic_column_json(DYNAMIC_COLUMN *str,
     if (header.format == DYNCOL_FMT_NUM)
     {
       uint nm= uint2korr(header.entry);
-      if (dynstr_realloc(json, 6 + 3))
+      if (dynstr_realloc(json, DYNCOL_NUM_CHAR + 3))
         goto err;
       json->str[json->length++]= '"';
-      json->length+= (snprintf(json->str + json->length, 6, "%u", nm));
+      json->length+= (snprintf(json->str + json->length,
+                               DYNCOL_NUM_CHAR, "%u", nm));
     }
     else
     {
@@ -3619,7 +3771,8 @@ dynamic_column_json(DYNAMIC_COLUMN *str,
     }
     json->str[json->length++]= '"';
     json->str[json->length++]= ':';
-    if ((rc= dynamic_column_val_str(json, &val, TRUE)) < 0 ||
+    if ((rc= dynamic_column_val_str(json, &val,
+                                    &my_charset_utf8_general_ci, TRUE)) < 0 ||
         dynstr_append_mem(json, "}", 1))
       goto err;
   }
@@ -3631,3 +3784,99 @@ err:
   json->length= 0;
   return rc;
 }
+
+
+/**
+  Convert to DYNAMIC_COLUMN_VALUE values and names (LEX_STING) dynamic array
+
+  @param str             The packed string
+  @param names           Where to put names
+  @param vals            Where to put values
+  @param free_names      pointer to free names buffer if there is it.
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_vals(DYNAMIC_COLUMN *str,
+                    DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
+                    char **free_names)
+{
+  DYN_HEADER header;
+  char *nm;
+  uint i;
+  enum enum_dyncol_func_result rc;
+
+  *free_names= 0;
+  bzero(names, sizeof(DYNAMIC_ARRAY));        /* In case of errors */
+  bzero(vals, sizeof(DYNAMIC_ARRAY));         /* In case of errors */
+  if (str->length == 0)
+    return ER_DYNCOL_OK;                      /* no columns */
+
+  if ((rc= init_read_hdr(&header, str)) < 0)
+    return rc;
+
+  if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
+      str->length)
+    return ER_DYNCOL_FORMAT;
+
+  if (init_dynamic_array(names, sizeof(LEX_STRING),
+                         header.column_count, 0) ||
+      init_dynamic_array(vals, sizeof(DYNAMIC_COLUMN_VALUE),
+                         header.column_count, 0) ||
+      (header.format == DYNCOL_FMT_NUM &&
+       !(*free_names= (char *)malloc(DYNCOL_NUM_CHAR * header.column_count))))
+  {
+    rc= ER_DYNCOL_RESOURCE;
+    goto err;
+  }
+  nm= *free_names;
+
+  for (i= 0, header.entry= header.header;
+       i < header.column_count;
+       i++, header.entry+= header.entry_size)
+  {
+    DYNAMIC_COLUMN_VALUE val;
+    LEX_STRING name;
+    header.length=
+      hdr_interval_length(&header, header.entry + header.entry_size);
+    header.data= header.dtpool + header.offset;
+    /*
+      Check that the found data is withing the ranges. This can happen if
+      we get data with wrong offsets.
+    */
+    if (header.length == DYNCOL_OFFSET_ERROR ||
+        header.length > INT_MAX || header.offset > header.data_size)
+    {
+      rc= ER_DYNCOL_FORMAT;
+      goto err;
+    }
+    if ((rc= dynamic_column_get_value(&header, &val)) < 0)
+      goto err;
+
+    if (header.format == DYNCOL_FMT_NUM)
+    {
+      uint num= uint2korr(header.entry);
+      name.str= nm;
+      name.length= snprintf(nm, DYNCOL_NUM_CHAR, "%u", num);
+      nm+= name.length + 1;
+    }
+    else
+    {
+      name.length= header.entry[0];
+      name.str= (char *)header.nmpool + uint2korr(header.entry + 1);
+    }
+    /* following is preallocated and so do not fail */
+    (void) insert_dynamic(names, (uchar *)&name);
+    (void) insert_dynamic(vals, (uchar *)&val);
+  }
+  return ER_DYNCOL_OK;
+
+err:
+  delete_dynamic(names);
+  delete_dynamic(vals);
+  if (*free_names)
+    my_free(*free_names);
+  *free_names= 0;
+  return rc;
+}

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2012-06-14 18:05:31 +0000
+++ b/sql/sql_base.cc	2012-09-28 12:27:16 +0000
@@ -9773,6 +9773,7 @@ int dynamic_column_error_message(enum_dy
   switch (rc) {
   case ER_DYNCOL_YES:
   case ER_DYNCOL_OK:
+  case ER_DYNCOL_TRUNCATED:
     break; // it is not an error
   case ER_DYNCOL_FORMAT:
     my_error(ER_DYN_COL_WRONG_FORMAT, MYF(0));

=== modified file 'sql/sql_base.h'
--- a/sql/sql_base.h	2012-05-16 15:44:17 +0000
+++ b/sql/sql_base.h	2012-09-28 12:27:16 +0000
@@ -272,6 +272,7 @@ bool rename_temporary_table(THD* thd, TA
 			    const char *table_name);
 bool is_equal(const LEX_STRING *a, const LEX_STRING *b);
 
+class Open_tables_backup;
 /* Functions to work with system tables. */
 bool open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
                                  Open_tables_backup *backup);

=== modified file 'storage/cassandra/CMakeLists.txt'
--- a/storage/cassandra/CMakeLists.txt	2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/CMakeLists.txt	2012-09-28 12:27:16 +0000
@@ -12,7 +12,7 @@ SET(cassandra_sources
      gen-cpp/Cassandra.h)
 
 #INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS})
-INCLUDE_DIRECTORIES(AFTER /home/psergey/cassandra/thrift/include/thrift/) 
+INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift) 
 # 
 STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
 STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})

=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc	2012-09-27 12:08:28 +0000
+++ b/storage/cassandra/cassandra_se.cc	2012-09-28 12:27:16 +0000
@@ -17,6 +17,12 @@
 
 #include "cassandra_se.h"
 
+struct st_mysql_lex_string
+{
+  char *str;
+  size_t length;
+};
+
 using namespace std;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
@@ -74,6 +80,7 @@ class Cassandra_se_impl: public Cassandr
   std::string rowkey; /* key of the record we're returning now */
 
   SlicePredicate slice_pred;
+  SliceRange slice_pred_sr;
   bool get_slices_returned_less;
   bool get_slice_found_rows;
 public:
@@ -91,6 +98,8 @@ public:
   void first_ddl_column();
   bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
   void get_rowkey_type(char **name, char **type);
+  size_t get_ddl_size();
+  const char* get_default_validator();
 
   /* Settings */
   void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
@@ -98,15 +107,19 @@ public:
   /* Writes */
   void clear_insert_buffer();
   void start_row_insert(const char *key, int key_len);
-  void add_insert_column(const char *name, const char *value, int value_len);
+  void add_insert_column(const char *name, int name_len,
+                         const char *value, int value_len);
+  void add_insert_delete_column(const char *name, int name_len);
   void add_row_deletion(const char *key, int key_len,
-                        Column_name_enumerator *col_names);
-  
+                        Column_name_enumerator *col_names,
+                        LEX_STRING *names, uint nnames);
+
   bool do_insert();
 
   /* Reads, point lookups */
   bool get_slice(char *key, size_t key_len, bool *found);
-  bool get_next_read_column(char **name, char **value, int *value_len);
+  bool get_next_read_column(char **name, int *name_len,
+                            char **value, int *value_len );
   void get_read_rowkey(char **value, int *value_len);
 
   /* Reads, multi-row scans */
@@ -122,6 +135,7 @@ public:
 
   /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
   void clear_read_columns();
+  void clear_read_all_columns();
   void add_read_column(const char *name);
  
   /* Reads, MRR scans */
@@ -277,6 +291,16 @@ void Cassandra_se_impl::get_rowkey_type(
     *name= NULL;
 }
 
+size_t Cassandra_se_impl::get_ddl_size()
+{
+  return cf_def.column_metadata.size();
+}
+
+const char* Cassandra_se_impl::get_default_validator()
+{
+  return cf_def.default_validation_class.c_str();
+}
+
 
 /////////////////////////////////////////////////////////////////////////////
 // Data writes
@@ -315,8 +339,9 @@ void Cassandra_se_impl::start_row_insert
 }
 
 
-void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, 
-                                         Column_name_enumerator *col_names)
+void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
+                                         Column_name_enumerator *col_names,
+                                         LEX_STRING *names, uint nnames)
 {
   std::string key_to_delete;
   key_to_delete.assign(key, key_len);
@@ -344,6 +369,9 @@ void Cassandra_se_impl::add_row_deletion
   const char *col_name;
   while ((col_name= col_names->get_next_name()))
     slice_pred.column_names.push_back(std::string(col_name));
+  for (uint i= 0; i < nnames; i++)
+    slice_pred.column_names.push_back(std::string(names[i].str,
+                                                  names[i].length));
 
   mut.deletion.predicate= slice_pred;
 
@@ -351,7 +379,9 @@ void Cassandra_se_impl::add_row_deletion
 }
 
 
-void Cassandra_se_impl::add_insert_column(const char *name, const char *value, 
+void Cassandra_se_impl::add_insert_column(const char *name,
+                                          int name_len,
+                                          const char *value,
                                           int value_len)
 {
   Mutation mut;
@@ -359,7 +389,10 @@ void Cassandra_se_impl::add_insert_colum
   mut.column_or_supercolumn.__isset.column= true;
 
   Column& col=mut.column_or_supercolumn.column;
-  col.name.assign(name);
+  if (name_len)
+    col.name.assign(name, name_len);
+  else
+    col.name.assign(name);
   col.value.assign(value, value_len);
   col.timestamp= insert_timestamp;
   col.__isset.value= true;
@@ -367,6 +400,23 @@ void Cassandra_se_impl::add_insert_colum
   insert_list->push_back(mut);
 }
 
+void Cassandra_se_impl::add_insert_delete_column(const char *name,
+                                                 int name_len)
+{
+  Mutation mut;
+  mut.__isset.deletion= true;
+  mut.deletion.__isset.timestamp= true;
+  mut.deletion.timestamp= insert_timestamp;
+  mut.deletion.__isset.predicate= true;
+
+  SlicePredicate slice_pred;
+  slice_pred.__isset.column_names= true;
+  slice_pred.column_names.push_back(std::string(name, name_len));
+  mut.deletion.predicate= slice_pred;
+
+  insert_list->push_back(mut);
+}
+
 
 bool Cassandra_se_impl::retryable_do_insert()
 {
@@ -444,8 +494,8 @@ bool Cassandra_se_impl::retryable_get_sl
 }
 
 
-bool Cassandra_se_impl::get_next_read_column(char **name, char **value, 
-                                             int *value_len)
+bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len,
+                                             char **value, int *value_len)
 {
   bool use_counter=false;
   while (1)
@@ -468,12 +518,14 @@ bool Cassandra_se_impl::get_next_read_co
   ColumnOrSuperColumn& cs= *column_data_it;
   if (use_counter)
   {
+    *name_len= cs.counter_column.name.size();
     *name= (char*)cs.counter_column.name.c_str();
     *value= (char*)&cs.counter_column.value;
     *value_len= sizeof(cs.counter_column.value);
   }
   else
   {
+    *name_len= cs.column.name.size();
     *name= (char*)cs.column.name.c_str();
     *value= (char*)cs.column.value.c_str();
     *value_len= cs.column.value.length();
@@ -601,6 +653,13 @@ void Cassandra_se_impl::clear_read_colum
   slice_pred.column_names.clear();
 }
 
+void Cassandra_se_impl::clear_read_all_columns()
+{
+  slice_pred_sr.start = "";
+  slice_pred_sr.finish = "";
+  slice_pred.__set_slice_range(slice_pred_sr);
+}
+
 
 void Cassandra_se_impl::add_read_column(const char *name_arg)
 {

=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h	2012-09-27 12:08:28 +0000
+++ b/storage/cassandra/cassandra_se.h	2012-09-28 12:27:16 +0000
@@ -6,6 +6,8 @@
   both together causes compile errors due to conflicts).
 */
 
+struct st_mysql_lex_string;
+typedef struct st_mysql_lex_string LEX_STRING;
 
 /* We need to define this here so that ha_cassandra.cc also has access to it */
 typedef enum
@@ -50,19 +52,25 @@ public:
   virtual bool next_ddl_column(char **name, int *name_len, char **value, 
                                int *value_len)=0;
   virtual void get_rowkey_type(char **name, char **type)=0;
+  virtual size_t get_ddl_size()=0;
+  virtual const char* get_default_validator()=0;
 
   /* Writes */
   virtual void clear_insert_buffer()=0;
   virtual void add_row_deletion(const char *key, int key_len,
-                                Column_name_enumerator *col_names)=0;
+                                Column_name_enumerator *col_names,
+                                LEX_STRING *names, uint nnames)=0;
   virtual void start_row_insert(const char *key, int key_len)=0;
-  virtual void add_insert_column(const char *name, const char *value, 
+  virtual void add_insert_delete_column(const char *name, int name_len)= 0;
+  virtual void add_insert_column(const char *name, int name_len,
+                                 const char *value,
                                  int value_len)=0;
   virtual bool do_insert()=0;
 
   /* Reads */
   virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
-  virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
+  virtual bool get_next_read_column(char **name, int *name_len,
+                                    char **value, int *value_len)=0;
   virtual void get_read_rowkey(char **value, int *value_len)=0;
 
   /* Reads, multi-row scans */
@@ -70,7 +78,7 @@ public:
   virtual bool get_range_slices(bool last_key_as_start_key)=0;
   virtual void finish_reading_range_slices()=0;
   virtual bool get_next_range_slice_row(bool *eof)=0;
-  
+
   /* Reads, MRR scans */
   virtual void new_lookup_keys()=0;
   virtual int  add_lookup_key(const char *key, size_t key_len)=0;
@@ -79,8 +87,9 @@ public:
 
   /* read_set setup */
   virtual void clear_read_columns()=0;
+  virtual void clear_read_all_columns()=0;
   virtual void add_read_column(const char *name)=0;
-  
+
   virtual bool truncate()=0;
   virtual bool remove_row()=0;
 

=== modified file 'storage/cassandra/gen-cpp/cassandra_constants.cpp'
--- a/storage/cassandra/gen-cpp/cassandra_constants.cpp	2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/gen-cpp/cassandra_constants.cpp	2012-09-28 12:27:16 +0000
@@ -11,7 +11,7 @@ namespace org { namespace apache { names
 const cassandraConstants g_cassandra_constants;
 
 cassandraConstants::cassandraConstants() {
-  cassandra_const_VERSION = "19.32.0";
+  cassandra_const_VERSION = (char *)"19.32.0";
 }
 
 }}} // namespace

=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc	2012-09-27 12:08:28 +0000
+++ b/storage/cassandra/ha_cassandra.cc	2012-09-28 12:27:16 +0000
@@ -1,4 +1,4 @@
-/* 
+/*
    Copyright (c) 2012, Monty Program Ab
 
    This program is free software; you can redistribute it and/or modify
@@ -22,15 +22,21 @@
 #include "ha_cassandra.h"
 #include "sql_class.h"
 
+#define DYNCOL_USUAL 20
+#define DYNCOL_DELTA 100
+#define DYNCOL_USUAL_REC 1024
+#define DYNCOL_DELTA_REC 1024
+
 static handler *cassandra_create_handler(handlerton *hton,
-                                       TABLE_SHARE *table, 
+                                       TABLE_SHARE *table,
                                        MEM_ROOT *mem_root);
 
+extern int dynamic_column_error_message(enum_dyncol_func_result rc);
 
 handlerton *cassandra_hton;
 
 
-/* 
+/*
    Hash used to track the number of open tables; variable for example share
    methods
 */
@@ -69,6 +75,25 @@ ha_create_table_option cassandra_table_o
   HA_TOPTION_END
 };
 
+/**
+  Structure for CREATE TABLE options (field options).
+*/
+
+struct ha_field_option_struct
+{
+  bool dyncol_field;
+};
+
+ha_create_table_option cassandra_field_option_list[]=
+{
+  /*
+    Collect all other columns as dynamic here,
+    the valid values are YES/NO, ON/OFF, 1/0.
+    The default is 0, that is true, yes, on.
+  */
+  HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
+  HA_FOPTION_END
+};
 
 static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
   "Number of rows in an INSERT batch",
@@ -245,17 +270,16 @@ static int cassandra_init_func(void *p)
 
   cassandra_hton->state=   SHOW_OPTION_YES;
   cassandra_hton->create=  cassandra_create_handler;
-  /* 
+  /*
     Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
     TABLE to create an *empty* table from scratch. Cassandra table won't be
     emptied if re-created.
   */
-  cassandra_hton->flags=   0; 
+  cassandra_hton->flags=   0;
   cassandra_hton->table_options= cassandra_table_option_list;
-  //cassandra_hton->field_options= example_field_option_list;
-  cassandra_hton->field_options= NULL;
-  
-  mysql_mutex_init(0 /* no instrumentation */, 
+  cassandra_hton->field_options= cassandra_field_option_list;
+
+  mysql_mutex_init(0 /* no instrumentation */,
                    &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
 
   DBUG_RETURN(0);
@@ -352,7 +376,7 @@ static int free_share(CASSANDRA_SHARE *s
 
 
 static handler* cassandra_create_handler(handlerton *hton,
-                                       TABLE_SHARE *table, 
+                                       TABLE_SHARE *table,
                                        MEM_ROOT *mem_root)
 {
   return new (mem_root) ha_cassandra(hton, table);
@@ -361,7 +385,11 @@ static handler* cassandra_create_handler
 
 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-   se(NULL), field_converters(NULL), rowkey_converter(NULL)
+   se(NULL), field_converters(NULL),
+   special_type_field_converters(NULL),
+   special_type_field_names(NULL), n_special_type_fields(0),
+   rowkey_converter(NULL),
+   dyncol_field(0), dyncol_set(0)
 {}
 
 
@@ -381,7 +409,8 @@ int ha_cassandra::connect_and_check_opti
   int res;
   DBUG_ENTER("ha_cassandra::connect_and_check_options");
 
-  if ((res= check_table_options(options)))
+  if ((res= check_field_options(table_arg->s->field)) ||
+      (res= check_table_options(options)))
     DBUG_RETURN(res);
 
   se= create_cassandra_se();
@@ -403,6 +432,32 @@ int ha_cassandra::connect_and_check_opti
 }
 
 
+int ha_cassandra::check_field_options(Field **fields)
+{
+  Field **field;
+  uint i;
+  DBUG_ENTER("ha_cassandra::check_field_options");
+  for (field= fields, i= 0; *field; field++, i++)
+  {
+    ha_field_option_struct *field_options= (*field)->option_struct;
+    if (field_options && field_options->dyncol_field)
+    {
+      if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
+      {
+         my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
+         DBUG_RETURN(HA_WRONG_CREATE_OPTION);
+      }
+      dyncol_set= 1;
+      dyncol_field= i;
+      bzero(&dynamic_values, sizeof(dynamic_values));
+      bzero(&dynamic_names, sizeof(dynamic_names));
+      bzero(&dynamic_rec, sizeof(dynamic_rec));
+    }
+  }
+  DBUG_RETURN(0);
+}
+
+
 int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
 {
   DBUG_ENTER("ha_cassandra::open");
@@ -578,7 +633,7 @@ public:
     field->store(*pdata);
     return 0;
   }
-  
+
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
   {
     buf= field->val_real();
@@ -770,6 +825,43 @@ static int convert_hex_digit(const char
 
 const char map2number[]="0123456789abcdef";
 
+static void convert_uuid2string(char *str, const char *cass_data)
+{
+  char *ptr= str;
+  /* UUID arrives as 16-byte number in network byte order */
+  for (uint i=0; i < 16; i++)
+  {
+    *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
+    *(ptr++)= map2number[cass_data[i] & 0xF];
+    if (i == 3 || i == 5 || i == 7 || i == 9)
+      *(ptr++)= '-';
+  }
+  *ptr= 0;
+}
+
+static bool convert_string2uuid(char *buf, const char *str)
+{
+  int lower, upper;
+  for (uint i= 0; i < 16; i++)
+  {
+    if ((upper= convert_hex_digit(str[0])) == -1 ||
+        (lower= convert_hex_digit(str[1])) == -1)
+    {
+      return true;
+    }
+    buf[i]= lower | (upper << 4);
+    str += 2;
+    if (i == 3 || i == 5 || i == 7 || i == 9)
+    {
+      if (str[0] != '-')
+        return true;
+      str++;
+    }
+  }
+  return false;
+}
+
+
 class UuidDataConverter : public ColumnDataConverter
 {
   char buf[16]; /* Binary UUID representation */
@@ -779,16 +871,7 @@ public:
   {
     DBUG_ASSERT(cass_data_len==16);
     char str[37];
-    char *ptr= str;
-    /* UUID arrives as 16-byte number in network byte order */
-    for (uint i=0; i < 16; i++)
-    {
-      *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
-      *(ptr++)= map2number[cass_data[i] & 0xF];
-      if (i == 3 || i == 5 || i == 7 || i == 9)
-        *(ptr++)= '-';
-    }
-    *ptr= 0;
+    convert_uuid2string(str, cass_data);
     field->store(str, 36,field->charset());
     return 0;
   }
@@ -796,29 +879,12 @@ public:
   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
   {
     String *uuid_str= field->val_str(&str_buf);
-    char *pstr= (char*)uuid_str->c_ptr();
 
-    if (uuid_str->length() != 36) 
+    if (uuid_str->length() != 36)
+      return true;
+
+    if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
       return true;
-    
-    int lower, upper;
-    for (uint i=0; i < 16; i++)
-    {
-      if ((upper= convert_hex_digit(pstr[0])) == -1 ||
-          (lower= convert_hex_digit(pstr[1])) == -1)
-      {
-        return true;
-      }
-      buf[i]= lower | (upper << 4);
-      pstr += 2;
-      if (i == 3 || i == 5 || i == 7 || i == 9)
-      {
-        if (pstr[0] != '-')
-          return true;
-        pstr++;
-      }
-    }
-     
     *cass_data= buf;
     *cass_data_len= 16;
     return false;
@@ -826,6 +892,302 @@ public:
   ~UuidDataConverter(){}
 };
 
+/**
+  Converting dynamic columns types to/from casandra types
+*/
+bool cassandra_to_dyncol_intLong(const char *cass_data,
+                                 int cass_data_len __attribute__((unused)),
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_INT;
+#ifdef WORDS_BIGENDIAN
+  value->x.long_value= (longlong *)*cass_data;
+#else
+  flip64(cass_data, (char *)&value->x.long_value);
+#endif
+  return 0;
+}
+
+bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
+                             char **cass_data, int *cass_data_len,
+                             void* buff, void **freemem)
+{
+  longlong *tmp= (longlong *) buff;
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_long(tmp, value);
+  if (rc < 0)
+    return true;
+  *cass_data_len= sizeof(longlong);
+#ifdef WORDS_BIGENDIAN
+  *cass_data= (char *)buff;
+#else
+  flip64((char *)buff, (char *)buff + sizeof(longlong));
+  *cass_data= (char *)buff + sizeof(longlong);
+#endif
+  *freemem= NULL;
+  return false;
+}
+
+bool cassandra_to_dyncol_intInt32(const char *cass_data,
+                                  int cass_data_len __attribute__((unused)),
+                                  DYNAMIC_COLUMN_VALUE *value)
+{
+  int32 tmp;
+  value->type= DYN_COL_INT;
+#ifdef WORDS_BIGENDIAN
+  tmp= *((int32 *)cass_data);
+#else
+  flip32(cass_data, (char *)&tmp);
+#endif
+  value->x.long_value= tmp;
+  return 0;
+}
+
+
+bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
+                              char **cass_data, int *cass_data_len,
+                              void* buff, void **freemem)
+{
+  longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_long(tmp, value);
+  if (rc < 0)
+    return true;
+  *cass_data_len= sizeof(int32);
+  *cass_data= (char *)buff;
+#ifdef WORDS_BIGENDIAN
+  *((int32 *) buff) = (int32) *tmp;
+#else
+  {
+    int32 tmp2= (int32) *tmp;
+    flip32((char *)&tmp2, (char *)buff);
+  }
+#endif
+  *freemem= NULL;
+  return false;
+}
+
+
+bool cassandra_to_dyncol_intCounter(const char *cass_data,
+                                    int cass_data_len __attribute__((unused)),
+                                    DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_INT;
+  value->x.long_value= *((longlong *)cass_data);
+  return 0;
+}
+
+
+bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
+                                char **cass_data, int *cass_data_len,
+                                void* buff, void **freemem)
+{
+  longlong *tmp= (longlong *)buff;
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_long(tmp, value);
+  if (rc < 0)
+    return true;
+  *cass_data_len= sizeof(longlong);
+  *cass_data= (char *)buff;
+  *freemem= NULL;
+  return false;
+}
+
+bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
+                                     int cass_data_len __attribute__((unused)),
+                                     DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_DOUBLE;
+  value->x.double_value= *((float *)cass_data);
+  return 0;
+}
+
+bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
+                              char **cass_data, int *cass_data_len,
+                              void* buff, void **freemem)
+{
+  double tmp;
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_double(&tmp, value);
+  if (rc < 0)
+    return true;
+  *((float *)buff)= (float) tmp;
+  *cass_data_len= sizeof(float);
+  *cass_data= (char *)buff;
+  *freemem= NULL;
+  return false;
+}
+
+bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
+                                      int cass_data_len __attribute__((unused)),
+                                      DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_DOUBLE;
+  value->x.double_value= *((double *)cass_data);
+  return 0;
+}
+
+bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
+                               char **cass_data, int *cass_data_len,
+                               void* buff, void **freemem)
+{
+  double *tmp= (double *)buff;
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_double(tmp, value);
+  if (rc < 0)
+    return true;
+  *cass_data_len= sizeof(double);
+  *cass_data= (char *)buff;
+  *freemem= NULL;
+  return false;
+}
+
+bool cassandra_to_dyncol_strStr(const char *cass_data,
+                                int cass_data_len,
+                                DYNAMIC_COLUMN_VALUE *value,
+                                CHARSET_INFO *cs)
+{
+  value->type= DYN_COL_STRING;
+  value->x.string.charset= cs;
+  value->x.string.value.str= (char *)cass_data;
+  value->x.string.value.length= cass_data_len;
+  value->x.string.nonfreeable= TRUE; // do not try to free
+  return 0;
+}
+
+bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
+                            char **cass_data, int *cass_data_len,
+                            void* buff, void **freemem, CHARSET_INFO *cs)
+{
+  DYNAMIC_STRING tmp;
+  if (init_dynamic_string(&tmp, NULL, 1024, 1024))
+    return 1;
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_str(&tmp, value, cs, FALSE);
+  if (rc < 0)
+  {
+    dynstr_free(&tmp);
+    return 1;
+  }
+  *cass_data_len= tmp.length;
+  *(cass_data)= tmp.str;
+  *freemem= tmp.str;
+  return 0;
+}
+
+bool cassandra_to_dyncol_strBytes(const char *cass_data,
+                                  int cass_data_len,
+                                  DYNAMIC_COLUMN_VALUE *value)
+{
+  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+                                    &my_charset_bin);
+}
+
+bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
+                              char **cass_data, int *cass_data_len,
+                              void* buff, void **freemem)
+{
+  return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
+                                buff, freemem, &my_charset_bin);
+}
+
+bool cassandra_to_dyncol_strAscii(const char *cass_data,
+                                  int cass_data_len,
+                                  DYNAMIC_COLUMN_VALUE *value)
+{
+  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+                                    &my_charset_latin1_bin);
+}
+
+bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
+                              char **cass_data, int *cass_data_len,
+                              void* buff, void **freemem)
+{
+  return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
+                                buff, freemem, &my_charset_latin1_bin);
+}
+
+bool cassandra_to_dyncol_strUTF8(const char *cass_data,
+                                 int cass_data_len,
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+                                    &my_charset_utf8_unicode_ci);
+}
+
+bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
+                             char **cass_data, int *cass_data_len,
+                             void* buff, void **freemem)
+{
+  return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
+                                buff, freemem, &my_charset_utf8_unicode_ci);
+}
+
+bool cassandra_to_dyncol_strUUID(const char *cass_data,
+                                 int cass_data_len,
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_STRING;
+  value->x.string.charset= &my_charset_bin;
+  value->x.string.value.str= (char *)my_malloc(37, MYF(0));
+  if (!value->x.string.value.str)
+  {
+    value->x.string.value.length= 0;
+    value->x.string.nonfreeable= TRUE;
+    return 1;
+  }
+  convert_uuid2string(value->x.string.value.str, cass_data);
+  value->x.string.value.length= 36;
+  value->x.string.nonfreeable= FALSE;
+  return 0;
+}
+
+bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
+                             char **cass_data, int *cass_data_len,
+                             void* buff, void **freemem)
+{
+  DYNAMIC_STRING tmp;
+  if (init_dynamic_string(&tmp, NULL, 1024, 1024))
+    return true;
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_str(&tmp, value, &my_charset_latin1_bin, FALSE);
+  if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
+  {
+    dynstr_free(&tmp);
+    return true;
+  }
+
+  *cass_data_len= tmp.length;
+  *(cass_data)= tmp.str;
+  *freemem= tmp.str;
+  return 0;
+}
+
+bool cassandra_to_dyncol_intBool(const char *cass_data,
+                                 int cass_data_len,
+                                 DYNAMIC_COLUMN_VALUE *value)
+{
+  value->type= DYN_COL_INT;
+  value->x.long_value= (cass_data[0] ? 1 : 0);
+  return 0;
+}
+
+bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
+                             char **cass_data, int *cass_data_len,
+                             void* buff, void **freemem)
+{
+  longlong tmp;
+  enum enum_dyncol_func_result rc=
+    dynamic_column_val_long(&tmp, value);
+  if (rc < 0)
+    return true;
+  ((char *)buff)[0]= (tmp ? 1 : 0);
+  *cass_data_len= 1;
+  *(cass_data)= (char *)buff;
+  *freemem= 0;
+  return 0;
+}
+
 
 const char * const validator_bigint=  "org.apache.cassandra.db.marshal.LongType";
 const char * const validator_int=     "org.apache.cassandra.db.marshal.Int32Type";
@@ -849,6 +1211,126 @@ const char * const validator_varint= "or
 const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
 
 
+static CASSANDRA_TYPE_DEF cassandra_types[]=
+{
+  {
+    validator_bigint,
+    &cassandra_to_dyncol_intLong,
+    &dyncol_to_cassandraLong
+  },
+  {
+    validator_int,
+    &cassandra_to_dyncol_intInt32,
+    &dyncol_to_cassandraInt32
+  },
+  {
+    validator_counter,
+    cassandra_to_dyncol_intCounter,
+    &dyncol_to_cassandraCounter
+  },
+  {
+    validator_float,
+    &cassandra_to_dyncol_doubleFloat,
+    &dyncol_to_cassandraFloat
+  },
+  {
+    validator_double,
+    &cassandra_to_dyncol_doubleDouble,
+    &dyncol_to_cassandraDouble
+  },
+  {
+    validator_blob,
+    &cassandra_to_dyncol_strBytes,
+    &dyncol_to_cassandraBytes
+  },
+  {
+    validator_ascii,
+    &cassandra_to_dyncol_strAscii,
+    &dyncol_to_cassandraAscii
+  },
+  {
+    validator_text,
+    &cassandra_to_dyncol_strUTF8,
+    &dyncol_to_cassandraUTF8
+  },
+  {
+    validator_timestamp,
+    &cassandra_to_dyncol_intLong,
+    &dyncol_to_cassandraLong
+  },
+  {
+    validator_uuid,
+    &cassandra_to_dyncol_strUUID,
+    &dyncol_to_cassandraUUID
+  },
+  {
+    validator_boolean,
+    &cassandra_to_dyncol_intBool,
+    &dyncol_to_cassandraBool
+  },
+  {
+    validator_varint,
+    &cassandra_to_dyncol_strBytes,
+    &dyncol_to_cassandraBytes
+  },
+  {
+    validator_decimal,
+    &cassandra_to_dyncol_strBytes,
+    &dyncol_to_cassandraBytes
+  }
+};
+
+CASSANDRA_TYPE get_cassandra_type(const char *validator)
+{
+  CASSANDRA_TYPE rc;
+  switch(validator[32])
+  {
+  case 'L':
+    rc= CT_BIGINT;
+    break;
+  case 'I':
+    rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
+    rc= CT_INT;
+    break;
+  case 'C':
+    rc= CT_COUNTER;
+    break;
+  case 'F':
+    rc= CT_FLOAT;
+    break;
+  case 'D':
+    switch (validator[33])
+    {
+    case 'o':
+      rc= CT_DOUBLE;
+      break;
+    case 'a':
+      rc= CT_TIMESTAMP;
+      break;
+    case 'e':
+      rc= CT_DECIMAL;
+      break;
+    default:
+      rc= CT_BLOB;
+      break;
+    }
+    break;
+  case 'B':
+    rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
+    break;
+  case 'A':
+    rc= CT_ASCII;
+    break;
+  case 'U':
+    rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
+    break;
+  default:
+    rc= CT_BLOB;
+  }
+  DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
+  return rc;
+}
+
 ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
 {
   ColumnDataConverter *res= NULL;
@@ -880,16 +1362,16 @@ ColumnDataConverter *map_field_to_valida
       if (!strcmp(validator_name, validator_double))
         res= new DoubleDataConverter;
       break;
-    
+
     case MYSQL_TYPE_TIMESTAMP:
       if (!strcmp(validator_name, validator_timestamp))
         res= new TimestampDataConverter;
       break;
 
     case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
-      if (!strcmp(validator_name, validator_uuid) && 
+      if (!strcmp(validator_name, validator_uuid) &&
           field->real_type() == MYSQL_TYPE_STRING &&
-          field->field_length == 36) 
+          field->field_length == 36)
       {
         // UUID maps to CHAR(36), its text representation
         res= new UuidDataConverter;
@@ -943,39 +1425,117 @@ bool ha_cassandra::setup_field_converter
   int  col_name_len;
   char *col_type;
   int col_type_len;
+  size_t ddl_fields= se->get_ddl_size();
+  const char *default_type= se->get_default_validator();
+  uint max_non_default_fields;
+  DBUG_ENTER("ha_cassandra::setup_field_converters");
+  DBUG_ASSERT(default_type);
 
   DBUG_ASSERT(!field_converters);
-  size_t memsize= sizeof(ColumnDataConverter*) * n_fields;
+  DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
+
+  /*
+    We always should take into account that in case of using dynamic columns
+    sql description contain one field which does not described in
+    Cassandra DDL also key field is described separately. So that
+    is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
+  */
+  max_non_default_fields= ddl_fields + 2 - n_fields;
+  if (ddl_fields < (n_fields - dyncol_set - 1))
+  {
+    se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
+    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+    DBUG_RETURN(true);
+  }
+
+  /* allocate memory in one chunk */
+  size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
+    (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
+    (dyncol_set ? max_non_default_fields : 0);
   if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
-    return true;
+    DBUG_RETURN(true);
   bzero(field_converters, memsize);
   n_field_converters= n_fields;
 
+  if (dyncol_set)
+  {
+    special_type_field_converters=
+      (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
+    special_type_field_names=
+      ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
+  }
+
+  if (dyncol_set)
+  {
+    if (init_dynamic_array(&dynamic_values,
+                           sizeof(DYNAMIC_COLUMN_VALUE),
+                           DYNCOL_USUAL, DYNCOL_DELTA))
+      DBUG_RETURN(true);
+    else
+      if (init_dynamic_array(&dynamic_names,
+                             sizeof(LEX_STRING),
+                             DYNCOL_USUAL, DYNCOL_DELTA))
+      {
+        delete_dynamic(&dynamic_values);
+        DBUG_RETURN(true);
+      }
+      else
+        if (init_dynamic_string(&dynamic_rec, NULL,
+                                DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
+        {
+          delete_dynamic(&dynamic_values);
+          delete_dynamic(&dynamic_names);
+          DBUG_RETURN(true);
+        }
+
+    /* Dynamic column field has special processing */
+    field_converters[dyncol_field]= NULL;
+
+    default_type_def= cassandra_types + get_cassandra_type(default_type);
+  }
+
   se->first_ddl_column();
   uint n_mapped= 0;
   while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
                               &col_type_len))
   {
+    Field **field;
+    uint i;
     /* Mapping for the 1st field is already known */
-    for (Field **field= field_arg + 1; *field; field++)
+    for (field= field_arg + 1, i= 1; *field; field++, i++)
     {
-      if (!strcmp((*field)->field_name, col_name))
+      if ((!dyncol_set || dyncol_field != i) &&
+          !strcmp((*field)->field_name, col_name))
       {
         n_mapped++;
         ColumnDataConverter **conv= field_converters + (*field)->field_index;
         if (!(*conv= map_field_to_validator(*field, col_type)))
         {
-          se->print_error("Failed to map column %s to datatype %s", 
+          se->print_error("Failed to map column %s to datatype %s",
                           (*field)->field_name, col_type);
           my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-          return true;
+          DBUG_RETURN(true);
         }
         (*conv)->field= *field;
       }
     }
+    if (dyncol_set && !(*field)) // is needed and not found
+    {
+      DBUG_PRINT("info",("Field not found: %s", col_name));
+      if (strcmp(col_type, default_type))
+      {
+        DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
+                           col_name, col_type));
+        special_type_field_names[n_special_type_fields].length= col_name_len;
+        special_type_field_names[n_special_type_fields].str= col_name;
+        special_type_field_converters[n_special_type_fields]=
+          cassandra_types[get_cassandra_type(col_type)];
+        n_special_type_fields++;
+      }
+    }
   }
 
-  if (n_mapped != n_fields - 1)
+  if (n_mapped != n_fields - 1 - dyncol_set)
   {
     Field *first_unmapped= NULL;
     /* Find the first field */
@@ -990,27 +1550,28 @@ bool ha_cassandra::setup_field_converter
     DBUG_ASSERT(first_unmapped);
 
     se->print_error("Field `%s` could not be mapped to any field in Cassandra",
-                    first_unmapped->field_name); 
+                    first_unmapped->field_name);
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
-  
-  /* 
+
+  /*
     Setup type conversion for row_key.
   */
   se->get_rowkey_type(&col_name, &col_type);
   if (col_name && strcmp(col_name, (*field_arg)->field_name))
   {
-    se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", col_name);
+    se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
+                    col_name);
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
   if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
   {
     se->print_error("target column family has no key_alias defined, "
                     "PRIMARY KEY column must be named 'rowkey'");
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
 
   if (col_type != NULL)
@@ -1019,7 +1580,7 @@ bool ha_cassandra::setup_field_converter
     {
       se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
       my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-      return true;
+      DBUG_RETURN(true);
     }
     rowkey_converter->field= *field_arg;
   }
@@ -1027,10 +1588,10 @@ bool ha_cassandra::setup_field_converter
   {
     se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-    return true;
+    DBUG_RETURN(true);
   }
 
-  return false;
+  DBUG_RETURN(false);
 }
 
 
@@ -1039,10 +1600,20 @@ void ha_cassandra::free_field_converters
   delete rowkey_converter;
   rowkey_converter= NULL;
 
+  if (dyncol_set)
+  {
+    delete_dynamic(&dynamic_values);
+    delete_dynamic(&dynamic_names);
+    dynstr_free(&dynamic_rec);
+  }
   if (field_converters)
   {
     for (uint i=0; i < n_field_converters; i++)
-      delete field_converters[i];
+      if (field_converters[i])
+      {
+        DBUG_ASSERT(!dyncol_set || i == dyncol_field);
+        delete field_converters[i];
+      }
     my_free(field_converters);
     field_converters= NULL;
   }
@@ -1065,7 +1636,7 @@ int ha_cassandra::index_read_map(uchar *
 {
   int rc= 0;
   DBUG_ENTER("ha_cassandra::index_read_map");
-  
+
   if (find_flag != HA_READ_KEY_EXACT)
     DBUG_RETURN(HA_ERR_WRONG_COMMAND);
 
@@ -1081,6 +1652,7 @@ int ha_cassandra::index_read_map(uchar *
   if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
   {
     /* We get here when making lookups like uuid_column='not-an-uuid' */
+    dbug_tmp_restore_column_map(table->read_set, old_map);
     DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
   }
 
@@ -1092,7 +1664,7 @@ int ha_cassandra::index_read_map(uchar *
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
     rc= HA_ERR_INTERNAL_ERROR;
   }
-  
+
   /* TODO: what if we're not reading all columns?? */
   if (!found)
     rc= HA_ERR_KEY_NOT_FOUND;
@@ -1125,15 +1697,42 @@ void ha_cassandra::print_conversion_erro
 }
 
 
+void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num)
+{
+  for (uint i= 0; i < num; i++)
+    if (vals[i].type == DYN_COL_STRING &&
+        !vals[i].x.string.nonfreeable)
+      my_free(vals[i].x.string.value.str);
+}
+
+
+CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
+                                                           int cass_name_len)
+{
+  CASSANDRA_TYPE_DEF *type= default_type_def;
+  for(uint i= 0; i < n_special_type_fields; i++)
+  {
+    if (cass_name_len == (int)special_type_field_names[i].length &&
+        memcmp(cass_name, special_type_field_names[i].str,
+               cass_name_len) == 0)
+    {
+      type= special_type_field_converters + i;
+      break;
+    }
+  }
+  return type;
+}
+
 int ha_cassandra::read_cassandra_columns(bool unpack_pk)
 {
   char *cass_name;
   char *cass_value;
-  int cass_value_len;
+  int cass_value_len, cass_name_len;
   Field **field;
   int res= 0;
-  
-  /* 
+  ulong total_name_len= 0;
+
+  /*
     cassandra_to_mariadb() calls will use field->store(...) methods, which
     require that the column is in the table->write_set
   */
@@ -1144,16 +1743,18 @@ int ha_cassandra::read_cassandra_columns
   for (field= table->field + 1; *field; field++)
     (*field)->set_null();
 
-  while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
+  while (!se->get_next_read_column(&cass_name, &cass_name_len,
+                                   &cass_value, &cass_value_len))
   {
     // map to our column. todo: use hash or something..
-    int idx=1;
+    bool found= 0;
     for (field= table->field + 1; *field; field++)
     {
-      idx++;
-      if (!strcmp((*field)->field_name, cass_name))
+      uint fieldnr= (*field)->field_index;
+      if ((!dyncol_set || dyncol_field != fieldnr) &&
+          !strcmp((*field)->field_name, cass_name))
       {
-        int fieldnr= (*field)->field_index;
+        found= 1;
         (*field)->set_notnull();
         if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
                                                             cass_value_len))
@@ -1166,8 +1767,86 @@ int ha_cassandra::read_cassandra_columns
         break;
       }
     }
+    if (dyncol_set && !found)
+    {
+      DYNAMIC_COLUMN_VALUE val;
+      LEX_STRING nm;
+      CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
+                                                        cass_name_len);
+      nm.str= cass_name;
+      nm.length= cass_name_len;
+      if (nm.length > MAX_NAME_LENGTH)
+      {
+        se->print_error("Unable to convert value for field `%s`"
+                        " from Cassandra's data format. Name"
+                        " length exceed limit of %u: '%s'",
+                        table->field[dyncol_field]->field_name,
+                        (uint)MAX_NAME_LENGTH, cass_name);
+        my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+        res=1;
+        goto err;
+      }
+      total_name_len+= cass_name_len;
+      if (nm.length > MAX_TOTAL_NAME_LENGTH)
+      {
+        se->print_error("Unable to convert value for field `%s`"
+                        " from Cassandra's data format. Sum of all names"
+                        " length exceed limit of %lu",
+                        table->field[dyncol_field]->field_name,
+                        cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
+        my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+        res=1;
+        goto err;
+      }
+
+      if ((res= (*(type->cassandra_to_dynamic))(cass_value,
+                                                cass_value_len, &val)) ||
+          insert_dynamic(&dynamic_names, (uchar *) &nm) ||
+          insert_dynamic(&dynamic_values, (uchar *) &val))
+      {
+        if (res)
+        {
+          print_conversion_error(cass_name, cass_value, cass_value_len);
+        }
+        free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
+                     dynamic_values.elements);
+        // EOM shouldm be already reported if happened
+        res=1;
+        goto err;
+      }
+    }
   }
-  
+
+  dynamic_rec.length= 0;
+  if (dyncol_set)
+  {
+    if (dynamic_column_create_many_internal_fmt(&dynamic_rec,
+                                                dynamic_names.elements,
+                                                dynamic_names.buffer,
+                                                (DYNAMIC_COLUMN_VALUE *)
+                                                dynamic_values.buffer,
+                                                FALSE,
+                                                TRUE) < 0)
+      dynamic_rec.length= 0;
+
+    free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
+                 dynamic_values.elements);
+    dynamic_values.elements= dynamic_names.elements= 0;
+  }
+  if (dyncol_set)
+  {
+    if (dynamic_rec.length == 0)
+      table->field[dyncol_field]->set_null();
+    else
+    {
+      Field_blob *blob= (Field_blob *)table->field[dyncol_field];
+      blob->set_notnull();
+      blob->store_length(dynamic_rec.length);
+      *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
+        dynamic_rec.str;
+    }
+  }
+
   if (unpack_pk)
   {
     /* Unpack rowkey to primary key */
@@ -1187,6 +1866,82 @@ err:
   return res;
 }
 
+int ha_cassandra::read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                              String *valcol, char **freenames)
+{
+  String *strcol;
+  DYNAMIC_COLUMN col;
+  enum enum_dyncol_func_result rc;
+  DBUG_ENTER("ha_cassandra::read_dyncol");
+
+  Field *field= table->field[dyncol_field];
+  DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
+  /* It is blob and it does not use buffer */
+  strcol= field->val_str(NULL, valcol);
+  if (field->is_null())
+  {
+    bzero(vals, sizeof(DYNAMIC_ARRAY));
+    bzero(names, sizeof(DYNAMIC_ARRAY));
+    DBUG_RETURN(0); // nothing to write
+  }
+  /*
+    dynamic_column_vals only read the string so we can
+    cheat here with assignment
+  */
+  bzero(&col, sizeof(col));
+  col.str= (char *)strcol->ptr();
+  col.length= strcol->length();
+  if ((rc= dynamic_column_vals(&col, names, vals, freenames)) < 0)
+  {
+    dynamic_column_error_message(rc);
+    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+  }
+  DBUG_RETURN(0);
+}
+
+int ha_cassandra::write_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names)
+{
+  uint i;
+  DBUG_ENTER("ha_cassandra::write_dynamic_row");
+  DBUG_ASSERT(dyncol_set);
+
+
+  DBUG_ASSERT(names->elements == vals->elements);
+  for (i= 0; i < names->elements; i++)
+  {
+    char buff[16];
+    CASSANDRA_TYPE_DEF *type;
+    void *freemem= NULL;
+    char *cass_data;
+    int cass_data_len;
+    LEX_STRING *name= dynamic_element(names, i, LEX_STRING*);
+    DYNAMIC_COLUMN_VALUE *val= dynamic_element(vals, i, DYNAMIC_COLUMN_VALUE*);
+
+    DBUG_PRINT("info", ("field %*s", (int)name->length, name->str));
+    type= get_cassandra_field_def(name->str, (int) name->length);
+    if ((*type->dynamic_to_cassandra)(val, &cass_data, &cass_data_len,
+                                      buff, &freemem))
+    {
+      my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+               name->str, insert_lineno);
+      DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+    }
+    se->add_insert_column(name->str, name->length,
+                          cass_data, cass_data_len);
+    if (freemem)
+      my_free(freemem);
+  }
+  DBUG_RETURN(0);
+}
+
+void ha_cassandra::free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                                    char *free_names)
+{
+  delete_dynamic(names);
+  delete_dynamic(vals);
+  if (free_names)
+    my_free(free_names);
+}
 
 int ha_cassandra::write_row(uchar *buf)
 {
@@ -1221,15 +1976,35 @@ int ha_cassandra::write_row(uchar *buf)
   {
     char *cass_data;
     int cass_data_len;
-    if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
+    if (dyncol_set && dyncol_field == i)
     {
-      my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
-               field_converters[i]->field->field_name, insert_lineno);
-      dbug_tmp_restore_column_map(table->read_set, old_map);
-      DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+      String valcol;
+      DYNAMIC_ARRAY vals, names;
+      char *free_names;
+      int rc;
+      DBUG_ASSERT(field_converters[i] == NULL);
+      if (!(rc= read_dyncol(&vals, &names, &valcol, &free_names)))
+        rc= write_dynamic_row(&vals, &names);
+      free_dynamic_row(&vals, &names, free_names);
+      if (rc)
+      {
+        dbug_tmp_restore_column_map(table->read_set, old_map);
+        DBUG_RETURN(rc);
+      }
+    }
+    else
+    {
+      if (field_converters[i]->mariadb_to_cassandra(&cass_data,
+                                                    &cass_data_len))
+      {
+        my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+                 field_converters[i]->field->field_name, insert_lineno);
+        dbug_tmp_restore_column_map(table->read_set, old_map);
+        DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+      }
+      se->add_insert_column(field_converters[i]->field->field_name, 0,
+                            cass_data, cass_data_len);
     }
-    se->add_insert_column(field_converters[i]->field->field_name, 
-                          cass_data, cass_data_len);
   }
 
   dbug_tmp_restore_column_map(table->read_set, old_map);
@@ -1296,9 +2071,16 @@ int ha_cassandra::rnd_init(bool scan)
     DBUG_RETURN(0);
   }
 
-  se->clear_read_columns();
-  for (uint i= 1; i < table->s->fields; i++)
-    se->add_read_column(table->field[i]->field_name);
+  if (dyncol_set)
+  {
+    se->clear_read_all_columns();
+  }
+  else
+  {
+    se->clear_read_columns();
+    for (uint i= 1; i < table->s->fields; i++)
+      se->add_read_column(table->field[i]->field_name);
+  }
 
   se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
   bres= se->get_range_slices(false);
@@ -1633,13 +2415,16 @@ public:
 
 int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
 {
+  DYNAMIC_ARRAY oldvals, oldnames, vals, names;
+  String oldvalcol, valcol;
+  char *oldfree_names= NULL, *free_names= NULL;
   my_bitmap_map *old_map;
+  int res;
   DBUG_ENTER("ha_cassandra::update_row");
   /* Currently, it is guaranteed that new_data == table->record[0] */
-  
+  DBUG_ASSERT(new_data == table->record[0]);
   /* For now, just rewrite the full record */
   se->clear_insert_buffer();
-  
 
   old_map= dbug_tmp_use_all_columns(table, table->read_set);
 
@@ -1668,6 +2453,22 @@ int ha_cassandra::update_row(const uchar
   else
     new_primary_key= false;
 
+  if (dyncol_set)
+  {
+    Field *field= table->field[dyncol_field];
+    /* move to get old_data */
+    my_ptrdiff_t diff;
+    diff= (my_ptrdiff_t) (old_data - new_data);
+    field->move_field_offset(diff);      // Points now at old_data
+    if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names)))
+      DBUG_RETURN(res);
+    field->move_field_offset(-diff);     // back to new_data
+    if ((res= read_dyncol(&vals, &names, &valcol, &free_names)))
+    {
+      free_dynamic_row(&oldnames, &oldvals, oldfree_names);
+      DBUG_RETURN(res);
+    }
+  }
 
   if (new_primary_key)
   {
@@ -1676,7 +2477,10 @@ int ha_cassandra::update_row(const uchar
       Add a DELETE operation into the batch
     */
     Column_name_enumerator_impl name_enumerator(this);
-    se->add_row_deletion(old_key, old_key_len, &name_enumerator);
+    se->add_row_deletion(old_key, old_key_len, &name_enumerator,
+                         (LEX_STRING *)oldnames.buffer,
+                         (dyncol_set ? oldnames.elements : 0));
+    oldnames.elements= oldvals.elements= 0; // they will be deleted
   }
 
   se->start_row_insert(new_key, new_key_len);
@@ -1686,23 +2490,64 @@ int ha_cassandra::update_row(const uchar
   {
     char *cass_data;
     int cass_data_len;
-    if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
+    if (dyncol_set && dyncol_field == i)
     {
-      my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
-               field_converters[i]->field->field_name, insert_lineno);
-      dbug_tmp_restore_column_map(table->read_set, old_map);
-      DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+      DBUG_ASSERT(field_converters[i] == NULL);
+      if ((res= write_dynamic_row(&vals, &names)))
+        goto err;
+    }
+    else
+    {
+      if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
+      {
+        my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+                 field_converters[i]->field->field_name, insert_lineno);
+        dbug_tmp_restore_column_map(table->read_set, old_map);
+        DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+      }
+      se->add_insert_column(field_converters[i]->field->field_name, 0,
+                            cass_data, cass_data_len);
+    }
+  }
+  if (dyncol_set)
+  {
+    /* find removed fields */
+    uint i= 0, j= 0;
+    LEX_STRING *onames= (LEX_STRING *)oldnames.buffer;
+    LEX_STRING *nnames= (LEX_STRING *)names.buffer;
+    /* both array are sorted */
+    for(; i < oldnames.elements; i++)
+    {
+      int scmp= 0;
+      while (j < names.elements &&
+             (nnames[j].length < onames[i].length ||
+             (nnames[j].length == onames[i].length &&
+              (scmp= memcmp(nnames[j].str, onames[i].str,
+                            onames[i].length)) < 0)))
+        j++;
+      if (j < names.elements &&
+          nnames[j].length == onames[i].length &&
+          scmp == 0)
+        j++;
+      else
+        se->add_insert_delete_column(onames[i].str, onames[i].length);
     }
-    se->add_insert_column(field_converters[i]->field->field_name, 
-                          cass_data, cass_data_len);
   }
+
   dbug_tmp_restore_column_map(table->read_set, old_map);
-  
-  bool res= se->do_insert();
+
+  res= se->do_insert();
 
   if (res)
     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
-  
+
+err:
+  if (dyncol_set)
+  {
+    free_dynamic_row(&oldnames, &oldvals, oldfree_names);
+    free_dynamic_row(&names, &vals, free_names);
+  }
+
   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
 }
 

=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h	2012-09-27 12:08:28 +0000
+++ b/storage/cassandra/ha_cassandra.h	2012-09-28 12:27:16 +0000
@@ -40,6 +40,33 @@ class ColumnDataConverter;
 
 struct ha_table_option_struct;
 
+
+struct st_dynamic_column_value;
+
+typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
+                                   int cass_data_len,
+                                   struct st_dynamic_column_value *value);
+typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
+                                   char **cass_data,
+                                   int *cass_data_len,
+                                   void *buf, void **freemem);
+struct cassandra_type_def
+{
+  const char *name;
+  CAS2DYN_CONVERTER cassandra_to_dynamic;
+  DYN2CAS_CONVERTER dynamic_to_cassandra;
+};
+
+typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
+
+enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
+  CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
+  CT_DECIMAL};
+
+typedef enum cassandtra_type_enum CASSANDRA_TYPE;
+
+
+
 /** @brief
   Class definition for the storage engine
 */
@@ -48,23 +75,35 @@ class ha_cassandra: public handler
   friend class Column_name_enumerator_impl;
   THR_LOCK_DATA lock;      ///< MySQL lock
   CASSANDRA_SHARE *share;    ///< Shared lock info
-  
+
   Cassandra_se_interface *se;
 
+  /* description of static part of the table definition */
   ColumnDataConverter **field_converters;
   uint n_field_converters;
 
+  CASSANDRA_TYPE_DEF *default_type_def;
+  /* description of dynamic columns part */
+  CASSANDRA_TYPE_DEF *special_type_field_converters;
+  LEX_STRING *special_type_field_names;
+  uint n_special_type_fields;
+  DYNAMIC_ARRAY dynamic_values, dynamic_names;
+  DYNAMIC_STRING dynamic_rec;
+
   ColumnDataConverter *rowkey_converter;
 
   bool setup_field_converters(Field **field, uint n_fields);
   void free_field_converters();
-  
+
   int read_cassandra_columns(bool unpack_pk);
   int check_table_options(struct ha_table_option_struct* options);
 
   bool doing_insert_batch;
   ha_rows insert_rows_batched;
-  
+
+  uint dyncol_field;
+  bool dyncol_set;
+
   /* Used to produce 'wrong column %s at row %lu' warnings */
   ha_rows insert_lineno;
   void print_conversion_error(const char *field_name, 
@@ -191,6 +230,14 @@ public:
 private:
   bool source_exhausted;
   bool mrr_start_read();
+  int check_field_options(Field **fields);
+  int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                  String *valcol, char **freenames);
+  int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals);
+  void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
+                               char *free_names);
+  CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
+                                               int cass_name_length);
 public:
 
   /*



More information about the commits mailing list