[Commits] Rev 3490: MDEV-377 - Name support for dynamic columns in file:///home/bell/maria/bzr/work-maria-5.5-dnames/

sanja at askmonty.org sanja at askmonty.org
Thu Sep 20 18:10:55 EEST 2012


At file:///home/bell/maria/bzr/work-maria-5.5-dnames/

------------------------------------------------------------
revno: 3490
revision-id: sanja at askmonty.org-20120920151050-zw337eh1m75t6l10
parent: sergii at pisem.net-20120811083110-abzobkos5o6aw5hi
committer: sanja at askmonty.org
branch nick: work-maria-5.5-dnames
timestamp: Thu 2012-09-20 18:10:50 +0300
message:
  MDEV-377 - Name support for dynamic columns
-------------- next part --------------
=== modified file 'include/ma_dyncol.h'
--- a/include/ma_dyncol.h	2011-09-22 09:04:00 +0000
+++ b/include/ma_dyncol.h	2012-09-20 15:10:50 +0000
@@ -103,6 +103,13 @@ dynamic_column_create_many(DYNAMIC_COLUM
                            DYNAMIC_COLUMN_VALUE *values);
 
 enum enum_dyncol_func_result
+dynamic_column_create_many_fmt(DYNAMIC_COLUMN *str,
+                               uint column_count,
+                               uchar *column_keys,
+                               DYNAMIC_COLUMN_VALUE *values,
+                               my_bool names);
+
+enum enum_dyncol_func_result
 dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr,
                       DYNAMIC_COLUMN_VALUE *value);
 enum enum_dyncol_func_result
@@ -110,16 +117,30 @@ dynamic_column_update_many(DYNAMIC_COLUM
                            uint add_column_count,
                            uint *column_numbers,
                            DYNAMIC_COLUMN_VALUE *values);
+enum enum_dyncol_func_result
+dynamic_column_update_many_fmt(DYNAMIC_COLUMN *str,
+                               uint add_column_count,
+                               void *column_keys,
+                               DYNAMIC_COLUMN_VALUE *values,
+                               my_bool string_keys);
 
 enum enum_dyncol_func_result
 dynamic_column_delete(DYNAMIC_COLUMN *org, uint column_nr);
 
 enum enum_dyncol_func_result
 dynamic_column_exists(DYNAMIC_COLUMN *org, uint column_nr);
+enum enum_dyncol_func_result
+dynamic_column_exists_str(DYNAMIC_COLUMN *str, LEX_STRING *name);
+enum enum_dyncol_func_result
+dynamic_column_exists_fmt(DYNAMIC_COLUMN *str, void *key, my_bool string_keys);
 
 /* List of not NULL columns */
 enum enum_dyncol_func_result
 dynamic_column_list(DYNAMIC_COLUMN *org, DYNAMIC_ARRAY *array_of_uint);
+enum enum_dyncol_func_result
+dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr);
+enum enum_dyncol_func_result
+dynamic_column_list_fmt(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array, my_bool string_keys);
 
 /*
    if the column do not exists it is NULL
@@ -127,6 +148,14 @@ dynamic_column_list(DYNAMIC_COLUMN *org,
 enum enum_dyncol_func_result
 dynamic_column_get(DYNAMIC_COLUMN *org, uint column_nr,
                    DYNAMIC_COLUMN_VALUE *store_it_here);
+enum enum_dyncol_func_result
+dynamic_column_get_str(DYNAMIC_COLUMN *str, LEX_STRING *name,
+                           DYNAMIC_COLUMN_VALUE *store_it_here);
+
+my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str);
+
+enum enum_dyncol_func_result
+dynamic_column_check(DYNAMIC_COLUMN *str);
 
 #define dynamic_column_initialize(A) memset((A), 0, sizeof(*(A)))
 #define dynamic_column_column_free(V) dynstr_free(V)

=== modified file 'mysql-test/r/dyncol.result'
--- a/mysql-test/r/dyncol.result	2011-12-12 22:58:40 +0000
+++ b/mysql-test/r/dyncol.result	2012-09-20 15:10:50 +0000
@@ -1088,7 +1088,7 @@ column_list(column_add(column_create(1,
 
 select column_list(column_add(column_create(1, 1), 1, ""));
 column_list(column_add(column_create(1, 1), 1, ""))
-1
+`1`
 select hex(column_add("", 1, 1));
 hex(column_add("", 1, 1))
 00010001000002
@@ -1133,10 +1133,10 @@ column_exists(column_create(1, 1212 as i
 # column list
 select column_list(column_create(1, 1212 as integer, 2, 1212 as integer));
 column_list(column_create(1, 1212 as integer, 2, 1212 as integer))
-1,2
+`1`,`2`
 select column_list(column_create(1, 1212 as integer));
 column_list(column_create(1, 1212 as integer))
-1
+`1`
 select column_list(column_create(1, NULL as integer));
 column_list(column_create(1, NULL as integer))
 
@@ -1218,35 +1218,35 @@ sum(column_get(str, 1 as int))
 11
 select id, column_list(str) from t1 where id= 5;
 id	column_list(str)
-5	1,2,3,10
+5	`1`,`2`,`3`,`10`
 update t1 set str=column_delete(str, 3, 4, 2) where id= 5;
 select id, length(str), column_list(str), column_get(str, 1 as int),  column_get(str, 2 as char), column_get(str, 3 as int) from t1;
 id	length(str)	column_list(str)	column_get(str, 1 as int)	column_get(str, 2 as char)	column_get(str, 3 as int)
-1	12	1,2	1	a	NULL
-2	12	1,2	2	a	NULL
-3	12	2,3	NULL	c	100
-4	16	1,2,3	5	c	100
-5	15	1,10	6	NULL	NULL
-6	21	2,3,10	NULL	c	100
+1	12	`1`,`2`	1	a	NULL
+2	12	`1`,`2`	2	a	NULL
+3	12	`2`,`3`	NULL	c	100
+4	16	`1`,`2`,`3`	5	c	100
+5	15	`1`,`10`	6	NULL	NULL
+6	21	`2`,`3`,`10`	NULL	c	100
 update t1 set str=column_add(str, 4, 45 as char, 2, 'c') where id= 5;
 select id, length(str), column_list(str), column_get(str, 1 as int),  column_get(str, 2 as char), column_get(str, 3 as int) from t1 where id = 5;
 id	length(str)	column_list(str)	column_get(str, 1 as int)	column_get(str, 2 as char)	column_get(str, 3 as int)
-5	26	1,2,4,10	6	c	NULL
+5	26	`1`,`2`,`4`,`10`	6	c	NULL
 select id, length(str), column_list(str), column_exists(str, 4) from t1;
 id	length(str)	column_list(str)	column_exists(str, 4)
-1	12	1,2	0
-2	12	1,2	0
-3	12	2,3	0
-4	16	1,2,3	0
-5	26	1,2,4,10	1
-6	21	2,3,10	0
+1	12	`1`,`2`	0
+2	12	`1`,`2`	0
+3	12	`2`,`3`	0
+4	16	`1`,`2`,`3`	0
+5	26	`1`,`2`,`4`,`10`	1
+6	21	`2`,`3`,`10`	0
 select sum(column_get(str, 1 as int)), column_list(str) from t1 group by 2;
 sum(column_get(str, 1 as int))	column_list(str)
-3	1,2
-5	1,2,3
-6	1,2,4,10
-NULL	2,3
-NULL	2,3,10
+3	`1`,`2`
+5	`1`,`2`,`3`
+6	`1`,`2`,`4`,`10`
+NULL	`2`,`3`
+NULL	`2`,`3`,`10`
 select id, hex(str) from t1;
 id	hex(str)
 1	00020001000002000B020861
@@ -1282,11 +1282,11 @@ id
 5
 select id, column_list(str), length(str) from t1 where id=5;
 id	column_list(str)	length(str)
-5	1,2,4,5,10	100048
+5	`1`,`2`,`4`,`5`,`10`	100048
 update t1 set str=column_delete(str, 5) where id=5;
 select id, column_list(str), length(str) from t1 where id=5;
 id	column_list(str)	length(str)
-5	1,2,4,10	34
+5	`1`,`2`,`4`,`10`	34
 drop table t1;
 #
 # LP#778905: Assertion `value->year <= 9999' failed in
@@ -1306,7 +1306,7 @@ INSERT INTO t1 SET f1 = COLUMN_CREATE( 2
 SELECT HEX(COLUMN_ADD(f1, 1, 'abc')), COLUMN_LIST(f1) FROM t1;
 HEX(COLUMN_ADD(f1, 1, 'abc'))	COLUMN_LIST(f1)
 NULL	NULL
-0002000100030200230861626308636465	2
+0002000100030200230861626308636465	`2`
 SELECT COLUMN_ADD(f1, 1, 'abc'), COLUMN_LIST(f1) FROM t1;
 DROP TABLE t1;
 #
@@ -1335,3 +1335,212 @@ hex(COLUMN_CREATE(0, COLUMN_GET(COLUMN_C
 select hex(COLUMN_CREATE(0, 0.0 as decimal));
 hex(COLUMN_CREATE(0, 0.0 as decimal))
 000100000004
+#
+# test of symbolic names
+#
+# creation test (names)
+set names utf8;
+select hex(column_create("????????", 1212));
+hex(column_create("????????", 1212))
+040100080008000000D0B0D0B4D18BD0BD7809
+select hex(column_create("1212", 1212));
+hex(column_create("1212", 1212))
+040100040004000000313231327809
+select hex(column_create(1212, 2, "www", 3));
+hex(column_create(1212, 2, "www", 3))
+04020007000300000004030008777777313231320604
+select hex(column_create("1212", 2, "www", 3));
+hex(column_create("1212", 2, "www", 3))
+04020007000300000004030008777777313231320604
+select hex(column_create("1212", 2, 3, 3));
+hex(column_create("1212", 2, 3, 3))
+0402000500010000000401000833313231320604
+select hex(column_create("1212", 2, "????????", 1, 3, 3));
+hex(column_create("1212", 2, "????????", 1, 3, 3))
+0403000D000100000004010008080500103331323132D0B0D0B4D18BD0BD060402
+set names default;
+# fetching column test (names)
+set names utf8;
+select column_get(column_create("????????", 1212), "????????" as int);
+column_get(column_create("????????", 1212), "????????" as int)
+1212
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), "????????" as int);
+column_get(column_create("1212", 2, "????????", 1, 3, 3), "????????" as int)
+1
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), 1212 as int);
+column_get(column_create("1212", 2, "????????", 1, 3, 3), 1212 as int)
+2
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), "3" as int);
+column_get(column_create("1212", 2, "????????", 1, 3, 3), "3" as int)
+3
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), 3 as int);
+column_get(column_create("1212", 2, "????????", 1, 3, 3), 3 as int)
+3
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), 4 as int);
+column_get(column_create("1212", 2, "????????", 1, 3, 3), 4 as int)
+NULL
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), "4" as int);
+column_get(column_create("1212", 2, "????????", 1, 3, 3), "4" as int)
+NULL
+set names default;
+# column existance test (names)
+set names utf8;
+select column_exists(column_create("????????", 1212), "????????");
+column_exists(column_create("????????", 1212), "????????")
+1
+select column_exists(column_create("????????", 1212), "a??????");
+column_exists(column_create("????????", 1212), "a??????")
+0
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), "????????");
+column_exists(column_create("1212", 2, "????????", 1, 3, 3), "????????")
+1
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), 1212);
+column_exists(column_create("1212", 2, "????????", 1, 3, 3), 1212)
+1
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), "3");
+column_exists(column_create("1212", 2, "????????", 1, 3, 3), "3")
+1
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), 3);
+column_exists(column_create("1212", 2, "????????", 1, 3, 3), 3)
+1
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), 4);
+column_exists(column_create("1212", 2, "????????", 1, 3, 3), 4)
+0
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), "4");
+column_exists(column_create("1212", 2, "????????", 1, 3, 3), "4")
+0
+set names default;
+# column changing test (names)
+select hex(column_add(column_create(1, "AAA"), "b", "BBB"));
+hex(column_add(column_create(1, "AAA"), "b", "BBB"))
+0402000200010000030101002331620841414108424242
+select hex(column_add(column_create("1", "AAA"), "b", "BBB"));
+hex(column_add(column_create("1", "AAA"), "b", "BBB"))
+0402000200010000030101002331620841414108424242
+select column_get(column_add(column_create(1, "AAA"), "b", "BBB"), 1 as char);
+column_get(column_add(column_create(1, "AAA"), "b", "BBB"), 1 as char)
+AAA
+select column_get(column_add(column_create(1, "AAA"), "b", "BBB"), "b" as char);
+column_get(column_add(column_create(1, "AAA"), "b", "BBB"), "b" as char)
+BBB
+select hex(column_add(column_create("a", "AAA"), 1, "BBB"));
+hex(column_add(column_create("a", "AAA"), 1, "BBB"))
+0402000200010000030101002331610842424208414141
+select hex(column_add(column_create("a", "AAA"), "1", "BBB"));
+hex(column_add(column_create("a", "AAA"), "1", "BBB"))
+0402000200010000030101002331610842424208414141
+select hex(column_add(column_create("a", 1212 as integer), "b", "1212" as integer));
+hex(column_add(column_create("a", 1212 as integer), "b", "1212" as integer))
+04020002000100000001010010616278097809
+select hex(column_add(column_create("a", 1212 as integer), "a", "1212" as integer));
+hex(column_add(column_create("a", 1212 as integer), "a", "1212" as integer))
+040100010001000000617809
+select hex(column_add(column_create("a", 1212 as integer), "a", NULL as integer));
+hex(column_add(column_create("a", 1212 as integer), "a", NULL as integer))
+0400000000
+select hex(column_add(column_create("a", 1212 as integer), "b", NULL as integer));
+hex(column_add(column_create("a", 1212 as integer), "b", NULL as integer))
+040100010001000000617809
+select hex(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer));
+hex(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer))
+040200020001000000010100086162167809
+select column_get(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer), "a" as integer);
+column_get(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer), "a" as integer)
+11
+select column_get(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer), "b" as integer);
+column_get(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer), "b" as integer)
+1212
+select hex(column_add(column_create("a", 1212 as integer), "a", 1212 as integer, "b", 11 as integer));
+hex(column_add(column_create("a", 1212 as integer), "a", 1212 as integer, "b", 11 as integer))
+040200020001000000010100106162780916
+select hex(column_add(column_create("a", NULL as integer), "a", 1212 as integer, "b", 11 as integer));
+hex(column_add(column_create("a", NULL as integer), "a", 1212 as integer, "b", 11 as integer))
+040200020001000000010100106162780916
+select hex(column_add(column_create("a", 1212 as integer, "b", 1212 as integer), "a", 11 as integer));
+hex(column_add(column_create("a", 1212 as integer, "b", 1212 as integer), "a", 11 as integer))
+040200020001000000010100086162167809
+select hex(column_add(column_create("a", 1), "a", null));
+hex(column_add(column_create("a", 1), "a", null))
+0400000000
+select column_list(column_add(column_create("a", 1), "a", null));
+column_list(column_add(column_create("a", 1), "a", null))
+
+select column_list(column_add(column_create("a", 1), "a", ""));
+column_list(column_add(column_create("a", 1), "a", ""))
+`a`
+select hex(column_add("", "a", 1));
+hex(column_add("", "a", 1))
+0401000100010000006102
+# column delete (names)
+select hex(column_delete(column_create("a", 1212 as integer, "b", 1212 as integer), "a"));
+hex(column_delete(column_create("a", 1212 as integer, "b", 1212 as integer), "a"))
+040100010001000000627809
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b"));
+hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b"))
+0402000200010000000101000861630206
+select hex(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer));
+hex(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer))
+0403000300010000000101000801020010616263020406
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "c"));
+hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "c"))
+0402000200010000000101000861620204
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "d"));
+hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "d"))
+0403000300010000000101000801020010616263020406
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b", "a"));
+hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b", "a"))
+0401000100010000006306
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b", "c"));
+hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b", "c"))
+0401000100010000006102
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "a", "b", "c"));
+hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "a", "b", "c"))
+0400000000
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "a", "b", "c", "e"));
+hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "a", "b", "c", "e"))
+0400000000
+select hex(column_delete(column_create("a", 1), "a"));
+hex(column_delete(column_create("a", 1), "a"))
+0400000000
+select hex(column_delete("", "a"));
+hex(column_delete("", "a"))
+
+#
+# MDEV-458 DNAMES: Server crashes on using an unquoted string
+# as a dynamic column name
+#
+select COLUMN_CREATE(color, "black");
+ERROR 42S22: Unknown column 'color' in 'field list'
+#
+# MDEV-489 Assertion `offset < 0x1f' failed in
+# type_and_offset_store on COLUMN_ADD
+#
+CREATE TABLE t1 (f1 tinyblob);
+INSERT INTO t1 VALUES (COLUMN_CREATE('col1', REPEAT('a',30)));
+UPDATE t1 SET f1 = COLUMN_ADD( f1, REPEAT('b',211), 'val2' );
+Warnings:
+Warning	1265	Data truncated for column 'f1' at row 1
+UPDATE t1 SET f1 = COLUMN_ADD( f1, REPEAT('c',211), 'val3' );
+ERROR HY000: Encountered illegal format of dynamic column string
+drop table t1;
+#
+# MDEV-490 null as arguments
+#
+SELECT COLUMN_GET( COLUMN_CREATE( 'col', 'val' ), NULL AS CHAR );
+COLUMN_GET( COLUMN_CREATE( 'col', 'val' ), NULL AS CHAR )
+NULL
+SELECT COLUMN_GET( NULL, 'col' as char );
+COLUMN_GET( NULL, 'col' as char )
+NULL
+SELECT COLUMN_EXISTS( COLUMN_CREATE( 'col', 'val' ), NULL);
+COLUMN_EXISTS( COLUMN_CREATE( 'col', 'val' ), NULL)
+NULL
+SELECT COLUMN_EXISTS( NULL, 'col');
+COLUMN_EXISTS( NULL, 'col')
+NULL
+SELECT COLUMN_CREATE( NULL, 'val' );
+COLUMN_CREATE( NULL, 'val' )
+NULL
+SELECT COLUMN_ADD( NULL, 'val', 'col');
+COLUMN_ADD( NULL, 'val', 'col')
+NULL

=== added file 'mysql-test/r/dyncol_koi8.result'
--- a/mysql-test/r/dyncol_koi8.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/r/dyncol_koi8.result	2012-09-20 15:10:50 +0000
@@ -0,0 +1,18 @@
+set names koi8r;
+select hex(column_create("????", 1212));
+hex(column_create("????", 1212))
+040100080008000000D0B0D0B4D18BD0BD7809
+set names utf8;
+select hex(column_create("????????", 1212));
+hex(column_create("????????", 1212))
+040100080008000000D0B0D0B4D18BD0BD7809
+#
+# MDEV-461: DNAMES: column names do not work with non-trivial encodings
+#
+select column_exists(column_create('???','value'),'???');
+column_exists(column_create('???','value'),'???')
+1
+select column_get(column_create('???','value'),'???' as char);
+column_get(column_create('???','value'),'???' as char)
+value
+set names default;

=== modified file 'mysql-test/t/dyncol.test'
--- a/mysql-test/t/dyncol.test	2012-02-29 20:55:04 +0000
+++ b/mysql-test/t/dyncol.test	2012-09-20 15:10:50 +0000
@@ -550,3 +550,103 @@ select hex(COLUMN_CREATE(0, COLUMN_GET(@
 
 select hex(COLUMN_CREATE(0, COLUMN_GET(COLUMN_CREATE(0, 0.0 as decimal), 0 as decimal)));
 select hex(COLUMN_CREATE(0, 0.0 as decimal));
+
+--echo #
+--echo # test of symbolic names
+--echo #
+--echo # creation test (names)
+set names utf8;
+select hex(column_create("????????", 1212));
+select hex(column_create("1212", 1212));
+select hex(column_create(1212, 2, "www", 3));
+select hex(column_create("1212", 2, "www", 3));
+select hex(column_create("1212", 2, 3, 3));
+select hex(column_create("1212", 2, "????????", 1, 3, 3));
+set names default;
+
+--echo # fetching column test (names)
+set names utf8;
+select column_get(column_create("????????", 1212), "????????" as int);
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), "????????" as int);
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), 1212 as int);
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), "3" as int);
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), 3 as int);
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), 4 as int);
+select column_get(column_create("1212", 2, "????????", 1, 3, 3), "4" as int);
+set names default;
+
+--echo # column existance test (names)
+set names utf8;
+select column_exists(column_create("????????", 1212), "????????");
+select column_exists(column_create("????????", 1212), "a??????");
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), "????????");
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), 1212);
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), "3");
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), 3);
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), 4);
+select column_exists(column_create("1212", 2, "????????", 1, 3, 3), "4");
+set names default;
+
+--echo # column changing test (names)
+select hex(column_add(column_create(1, "AAA"), "b", "BBB"));
+select hex(column_add(column_create("1", "AAA"), "b", "BBB"));
+select column_get(column_add(column_create(1, "AAA"), "b", "BBB"), 1 as char);
+select column_get(column_add(column_create(1, "AAA"), "b", "BBB"), "b" as char);
+select hex(column_add(column_create("a", "AAA"), 1, "BBB"));
+select hex(column_add(column_create("a", "AAA"), "1", "BBB"));
+select hex(column_add(column_create("a", 1212 as integer), "b", "1212" as integer));
+select hex(column_add(column_create("a", 1212 as integer), "a", "1212" as integer));
+select hex(column_add(column_create("a", 1212 as integer), "a", NULL as integer));
+select hex(column_add(column_create("a", 1212 as integer), "b", NULL as integer));
+select hex(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer));
+select column_get(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer), "a" as integer);
+select column_get(column_add(column_create("a", 1212 as integer), "b", 1212 as integer, "a", 11 as integer), "b" as integer);
+select hex(column_add(column_create("a", 1212 as integer), "a", 1212 as integer, "b", 11 as integer));
+select hex(column_add(column_create("a", NULL as integer), "a", 1212 as integer, "b", 11 as integer));
+select hex(column_add(column_create("a", 1212 as integer, "b", 1212 as integer), "a", 11 as integer));
+select hex(column_add(column_create("a", 1), "a", null));
+select column_list(column_add(column_create("a", 1), "a", null));
+select column_list(column_add(column_create("a", 1), "a", ""));
+select hex(column_add("", "a", 1));
+
+-- echo # column delete (names)
+select hex(column_delete(column_create("a", 1212 as integer, "b", 1212 as integer), "a"));
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b"));
+select hex(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer));
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "c"));
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "d"));
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b", "a"));
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "b", "c"));
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "a", "b", "c"));
+select hex(column_delete(column_create("a", 1 as integer, "b", 2 as integer, "c", 3 as integer), "a", "b", "c", "e"));
+select hex(column_delete(column_create("a", 1), "a"));
+select hex(column_delete("", "a"));
+
+--echo #
+--echo # MDEV-458 DNAMES: Server crashes on using an unquoted string
+--echo # as a dynamic column name
+--echo #
+--error ER_BAD_FIELD_ERROR
+select COLUMN_CREATE(color, "black");
+
+--echo #
+--echo # MDEV-489 Assertion `offset < 0x1f' failed in
+--echo # type_and_offset_store on COLUMN_ADD
+--echo #
+CREATE TABLE t1 (f1 tinyblob);
+
+INSERT INTO t1 VALUES (COLUMN_CREATE('col1', REPEAT('a',30)));
+UPDATE t1 SET f1 = COLUMN_ADD( f1, REPEAT('b',211), 'val2' );
+--error ER_DYN_COL_WRONG_FORMAT
+UPDATE t1 SET f1 = COLUMN_ADD( f1, REPEAT('c',211), 'val3' );
+drop table t1;
+
+--echo #
+--echo # MDEV-490 null as arguments
+--echo #
+SELECT COLUMN_GET( COLUMN_CREATE( 'col', 'val' ), NULL AS CHAR );
+SELECT COLUMN_GET( NULL, 'col' as char );
+SELECT COLUMN_EXISTS( COLUMN_CREATE( 'col', 'val' ), NULL);
+SELECT COLUMN_EXISTS( NULL, 'col');
+SELECT COLUMN_CREATE( NULL, 'val' );
+SELECT COLUMN_ADD( NULL, 'val', 'col');

=== added file 'mysql-test/t/dyncol_koi8.test'
--- a/mysql-test/t/dyncol_koi8.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/t/dyncol_koi8.test	2012-09-20 15:10:50 +0000
@@ -0,0 +1,14 @@
+--source include/not_embedded.inc
+--source include/have_utf8.inc
+--source include/have_koi8r.inc
+
+set names koi8r;
+select hex(column_create("????", 1212));
+set names utf8;
+select hex(column_create("????????", 1212));
+--echo #
+--echo # MDEV-461: DNAMES: column names do not work with non-trivial encodings
+--echo #
+select column_exists(column_create('???','value'),'???');
+select column_get(column_create('???','value'),'???' as char);
+set names default;

=== modified file 'mysys/ma_dyncol.c'
--- a/mysys/ma_dyncol.c	2011-11-22 17:04:38 +0000
+++ b/mysys/ma_dyncol.c	2012-09-20 15:10:50 +0000
@@ -37,19 +37,40 @@
 */
 /* mask to get above bits */
 #define DYNCOL_FLG_OFFSET  3
+#define DYNCOL_FLG_NAMES  4
 /* All known flags mask */
-#define DYNCOL_FLG_KNOWN  3
+#define DYNCOL_FLG_KNOWN  7
+
+/* formats */
+#define DYNCOL_FMT_NUM 0
+#define DYNCOL_FMT_STR 1
 
 /* dynamic column size reserve */
 #define DYNCOL_SYZERESERVE 80
 
+#define DYNCOL_OFFSET_ERROR 0xffffffff
+
 /* length of fixed string header 1 byte - flags, 2 bytes - columns counter */
 #define FIXED_HEADER_SIZE 3
+/*
+  length of fixed string header with names
+  1 byte - flags, 2 bytes - columns counter,  2 bytes - name pool size
+*/
+#define FIXED_HEADER_SIZE_NM 5
 
 #define COLUMN_NUMBER_SIZE 2
+/* 1 byte name length + 2 bytes offset from the name pool */
+#define COLUMN_NAMEPTR_SIZE 3
 
 #define MAX_OFFSET_LENGTH  5
 
+my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str)
+{
+  if (str->length < 1)
+    return FALSE;
+  return test(str->str[0] & DYNCOL_FLG_NAMES);
+}
+
 static enum enum_dyncol_func_result
 dynamic_column_time_store(DYNAMIC_COLUMN *str,
                           MYSQL_TIME *value);
@@ -62,6 +83,311 @@ dynamic_column_time_read_internal(DYNAMI
 static enum enum_dyncol_func_result
 dynamic_column_date_read_internal(DYNAMIC_COLUMN_VALUE *store_it_here,
                                   uchar *data, size_t length);
+static enum enum_dyncol_func_result
+dynamic_column_get_internal(DYNAMIC_COLUMN *str,
+                                DYNAMIC_COLUMN_VALUE *store_it_here,
+                                uint num_key, LEX_STRING *str_key);
+static enum enum_dyncol_func_result
+dynamic_column_exists_internal(DYNAMIC_COLUMN *str, uint num_key,
+                               LEX_STRING *str_key);
+enum enum_dyncol_func_result
+dynamic_column_update_many_fmt(DYNAMIC_COLUMN *str,
+                               uint add_column_count,
+                               void *column_keys,
+                               DYNAMIC_COLUMN_VALUE *values,
+                               my_bool string_keys);
+static int plan_sort_num(const void *a, const void *b);
+static int plan_sort_str(const void *a, const void *b);
+
+/*
+  Structure to hold information about dynamic columns record and
+  iterate through it.
+*/
+
+struct st_dyn_header
+{
+  uchar *header, *nmpool, *dtpool, *data_end;
+  size_t offset_size;
+  size_t entry_size;
+  size_t header_size;
+  size_t nmpool_size;
+  size_t data_size;
+  /* DYNCOL_FMT_NUM - numeric columns, DYNCOL_FMT_STR - column names */
+  uint format;
+  uint column_count;
+
+  uchar *entry, *data, *name;
+  size_t offset;
+  uint length;
+  enum enum_dynamic_column_type type;
+};
+
+typedef struct st_dyn_header DYN_HEADER;
+
+static inline my_bool read_fixed_header(DYN_HEADER *hdr,
+                                        DYNAMIC_COLUMN *str);
+static void set_fixed_header(DYNAMIC_COLUMN *str,
+                             uint offset_size,
+                             uint column_count);
+static my_bool type_and_offset_store(uchar *place, size_t offset_size,
+                                     DYNAMIC_COLUMN_TYPE type,
+                                     size_t offset);
+
+/*
+  Calculate entry size (E) and header size (H) by offset size (O) and column
+  count (C) and fixed part of entry size (F).
+*/
+
+#define calc_param(E,H,F,O,C) do { \
+  (*(E))= (O) + F;                 \
+  (*(H))= (*(E)) * (C);            \
+}while(0);
+
+
+/**
+  Name pool size functions, for numeric format it is 0
+*/
+
+size_t name_size_num(void *keys __attribute__((unused)),
+                     uint i __attribute__((unused)))
+{
+  return 0;
+}
+
+
+/**
+  Name pool size functions.
+*/
+size_t name_size_str(void *keys, uint i)
+{
+  return ((LEX_STRING *) keys)[i].length;
+}
+
+
+/**
+  Comparator function for references on column numbers for qsort
+  (numeric format)
+*/
+
+static int column_sort_num(const void *a, const void *b)
+{
+  return **((uint **)a) - **((uint **)b);
+}
+
+
+/**
+  Comparator function for references on column numbers for qsort
+  (names format)
+*/
+
+static int column_sort_str(const void *a, const void *b)
+{
+  LEX_STRING *s1= *((LEX_STRING **)a);
+  LEX_STRING *s2= *((LEX_STRING **)b);
+  int rc= s1->length - s2->length;
+  if (rc == 0)
+    rc= memcmp((void *)s1->str, (void *)s2->str, (size_t) s1->length);
+  return rc;
+}
+
+
+/**
+  Check limit function (numeric format)
+*/
+
+static my_bool check_limit_num(const void *val)
+{
+  return **((uint **)val) > UINT_MAX16;
+}
+
+
+/**
+  Check limit function (names format)
+*/
+
+static my_bool check_limit_str(const void *val)
+{
+  return (*((LEX_STRING **)val))->length > 255;
+}
+
+
+/**
+  Write numeric format static header part.
+*/
+
+void set_fixed_header_num(DYNAMIC_COLUMN *str, DYN_HEADER *hdr)
+{
+  set_fixed_header(str, hdr->offset_size, hdr->column_count);
+  hdr->header= (uchar *)str->str + FIXED_HEADER_SIZE;
+  hdr->nmpool= hdr->dtpool= hdr->header + hdr->header_size;
+}
+
+
+/**
+  Write names format static header part.
+*/
+
+void set_fixed_header_str(DYNAMIC_COLUMN *str, DYN_HEADER *hdr)
+{
+  set_fixed_header(str, hdr->offset_size, hdr->column_count);
+  str->str[0]|= DYNCOL_FLG_NAMES;
+  int2store(str->str + 3, hdr->nmpool_size);
+  hdr->header= (uchar *)str->str + FIXED_HEADER_SIZE_NM;
+  hdr->nmpool= hdr->header + hdr->header_size;
+  hdr->dtpool= hdr->nmpool + hdr->nmpool_size;
+}
+
+
+/**
+  Write numeric format header entry
+   2 bytes - column number
+   1-4 bytes - data offset combined with type
+
+  @param hdr             descriptor of dynamic column record
+  @param column_key      pointer to uint (column number)
+  @param value           value which will be written (only type used)
+  @param offset          offset of the data
+*/
+
+my_bool put_header_entry_num(DYN_HEADER *hdr,
+                             void *column_key,
+                             DYNAMIC_COLUMN_VALUE *value,
+                             size_t offset)
+{
+  uint *column_number= (uint *)column_key;
+  int2store(hdr->entry, *column_number);
+  DBUG_ASSERT(hdr->nmpool_size == 0);
+  if (type_and_offset_store(hdr->entry, hdr->offset_size,
+                            value->type,
+                            offset))
+      return TRUE;
+  hdr->entry= hdr->entry + hdr->entry_size;
+  return FALSE;
+}
+
+
+/**
+  Write names format header entry
+   1 byte - name length
+   2 bytes - name offset in the name pool
+   1-4 bytes - data offset combined with type
+
+  @param hdr             descriptor of dynamic column record
+  @param column_key      pointer to LEX_STRING (column name)
+  @param value           value which will be written (only type used)
+  @param offset          offset of the data
+*/
+
+my_bool put_header_entry_str(DYN_HEADER *hdr,
+                             void *column_key,
+                             DYNAMIC_COLUMN_VALUE *value,
+                             size_t offset)
+{
+  LEX_STRING *column_name= (LEX_STRING *)column_key;
+  DBUG_ASSERT(column_name->length < 255);
+  hdr->entry[0]= column_name->length;
+  DBUG_ASSERT(hdr->name - hdr->nmpool < (long) 0x10000L);
+  int2store(hdr->entry + 1, hdr->name - hdr->nmpool);
+  memcpy(hdr->name, column_name->str, column_name->length);
+  DBUG_ASSERT(hdr->nmpool_size != 0 || column_name->length == 0);
+  if (type_and_offset_store(hdr->entry + 1, hdr->offset_size,
+                            value->type,
+                            offset))
+    return TRUE;
+  hdr->entry+= hdr->entry_size;
+  hdr->name+= column_name->length;
+  return FALSE;
+}
+
+
+/**
+  Format descriptor, contain constants and function references for
+  format processing
+*/
+
+struct st_service_funcs
+{
+  /* size of fixed header */
+  uint fixed_hdr;
+  /* size of fixed part of header entry */
+  uint fixed_hdr_entry;
+
+  /*size of array element which stores keys */
+  uint key_size_in_array;
+
+  size_t (*name_size)
+    (void *, uint);
+  int (*column_sort)
+    (const void *a, const void *b);
+  my_bool (*check_limit)
+    (const void *val);
+  void (*set_fixed_hdr)
+    (DYNAMIC_COLUMN *str, DYN_HEADER *hdr);
+  my_bool (*put_header_entry)(DYN_HEADER *hdr,
+                              void *column_key,
+                              DYNAMIC_COLUMN_VALUE *value,
+                              size_t offset);
+  int (*plan_sort)(const void *a, const void *b);
+};
+
+
+/**
+  Actual our 2 format descriptors
+*/
+
+static struct st_service_funcs fmt_data[2]=
+{
+  {
+    FIXED_HEADER_SIZE,
+    COLUMN_NUMBER_SIZE,
+    sizeof(uint),
+    &name_size_num,
+    &column_sort_num,
+    &check_limit_num,
+    &set_fixed_header_num,
+    &put_header_entry_num,
+    &plan_sort_num
+  },
+  {
+    FIXED_HEADER_SIZE_NM,
+    COLUMN_NAMEPTR_SIZE,
+    sizeof(LEX_STRING),
+    &name_size_str,
+    &column_sort_str,
+    &check_limit_str,
+    &set_fixed_header_str,
+    &put_header_entry_str,
+    &plan_sort_str
+  }
+};
+
+
+/**
+  Read dynamic column record header and fill the descriptor
+
+  @param hdr             dynamic columns record descriptor to fill
+  @param str             dynamic columns record
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+init_read_hdr(DYN_HEADER *hdr, DYNAMIC_COLUMN *str)
+{
+  if (read_fixed_header(hdr, str))
+    return ER_DYNCOL_FORMAT;
+  hdr->header= (uchar*)str->str + fmt_data[hdr->format].fixed_hdr;
+  calc_param(&hdr->entry_size, &hdr->header_size,
+             fmt_data[hdr->format].fixed_hdr_entry, hdr->offset_size,
+             hdr->column_count);
+  hdr->nmpool= hdr->header + hdr->header_size;
+  hdr->dtpool= hdr->nmpool + hdr->nmpool_size;
+  hdr->data_size= str->length - fmt_data[hdr->format].fixed_hdr -
+    hdr->header_size - hdr->nmpool_size;
+  hdr->data_end= (uchar*)str->str + str->length;
+  return ER_DYNCOL_OK;
+}
+
 
 /**
   Initialize dynamic column string with (make it empty but correct format)
@@ -82,11 +408,8 @@ static my_bool dynamic_column_init_str(D
     - First \0 is flags
     - other 2 \0 is number of fields
   */
-  if (init_dynamic_string(str, NULL,
-                          size + FIXED_HEADER_SIZE, DYNCOL_SYZERESERVE))
+  if (init_dynamic_string(str, NULL, size, DYNCOL_SYZERESERVE))
     return TRUE;
-  bzero(str->str, FIXED_HEADER_SIZE);
-  str->length= FIXED_HEADER_SIZE;
   return FALSE;
 }
 
@@ -902,37 +1225,42 @@ static size_t dynamic_column_offset_byte
   @param offset          Offset to be written
 */
 
-static void type_and_offset_store(uchar *place, size_t offset_size,
-                                  DYNAMIC_COLUMN_TYPE type,
-                                  size_t offset)
+static my_bool type_and_offset_store(uchar *place, size_t offset_size,
+                                     DYNAMIC_COLUMN_TYPE type,
+                                     size_t offset)
 {
   ulong val = (((ulong) offset) << 3) | (type - 1);
   DBUG_ASSERT(type != DYN_COL_NULL);
   DBUG_ASSERT(((type - 1) & (~7)) == 0); /* fit in 3 bits */
 
   /* Index entry starts with column number; Jump over it */
-  place+= COLUMN_NUMBER_SIZE;                   
+  place+= COLUMN_NUMBER_SIZE;
 
   switch (offset_size) {
   case 1:
-    DBUG_ASSERT(offset < 0x1f);          /* all 1 value is reserved */
+    if (offset >= 0x1f)          /* all 1 value is reserved */
+      return TRUE;
     place[0]= (uchar)val;
     break;
   case 2:
-    DBUG_ASSERT(offset < 0x1fff);        /* all 1 value is reserved */
+    if (offset >= 0x1fff)        /* all 1 value is reserved */
+      return TRUE;
     int2store(place, val);
     break;
   case 3:
-    DBUG_ASSERT(offset < 0x1fffff);      /* all 1 value is reserved */
+    if (offset >= 0x1fffff)      /* all 1 value is reserved */
+      return TRUE;
     int3store(place, val);
     break;
   case 4:
-    DBUG_ASSERT(offset < 0x1fffffff);    /* all 1 value is reserved */
+    if (offset >= 0x1fffffff)    /* all 1 value is reserved */
+      return TRUE;
     int4store(place, val);
     break;
   default:
-    DBUG_ASSERT(0);                             /* impossible */
+      return TRUE;
   }
+  return FALSE;
 }
 
 
@@ -941,45 +1269,40 @@ static void type_and_offset_store(uchar
 
   @param type            Where to put type info
   @param offset          Where to put offset info
-  @param place           Beginning of the index entry
+  @param place           beginning of the type and offset
   @param offset_size     Size of offset field in bytes
 */
 
-static void type_and_offset_read(DYNAMIC_COLUMN_TYPE *type,
-                                 size_t *offset,
-                                 uchar *place, size_t offset_size)
+static my_bool type_and_offset_read(DYNAMIC_COLUMN_TYPE *type,
+                                    size_t *offset,
+                                    uchar *place, size_t offset_size)
 {
   ulong UNINIT_VAR(val);
+  ulong UNINIT_VAR(lim);
 
-  place+= COLUMN_NUMBER_SIZE;                 /* skip column number */
   switch (offset_size) {
   case 1:
     val= (ulong)place[0];
+    lim= 0x1f;
     break;
   case 2:
     val= uint2korr(place);
+    lim= 0x1fff;
     break;
   case 3:
     val= uint3korr(place);
+    lim= 0x1fffff;
     break;
   case 4:
     val= uint4korr(place);
+    lim= 0x1fffffff;
     break;
   default:
     DBUG_ASSERT(0);                             /* impossible */
   }
   *type= (val & 0x7) + 1;
   *offset= val >> 3;
-}
-
-
-/**
-  Comparator function for references on column numbers for qsort
-*/
-
-static int column_sort(const void *a, const void *b)
-{
-  return **((uint **)a) - **((uint **)b);
+  return (*offset >= lim);
 }
 
 
@@ -1003,27 +1326,13 @@ static void set_fixed_header(DYNAMIC_COL
   DBUG_ASSERT((str->str[0] & (~DYNCOL_FLG_KNOWN)) == 0);
 }
 
-/*
-  Calculate entry size (E) and header size (H) by offset size (O) and column
-  count (C).
-*/
-
-#define calc_param(E,H,O,C) do { \
-  (*(E))= (O) + COLUMN_NUMBER_SIZE;           \
-  (*(H))= (*(E)) * (C);                       \
-}while(0);
-
-
 /**
   Adds columns into the empty string
 
-  @param str             String where to write the data
-  @param header_size     Size of the header without fixed part
-  @param offset_size     Size of offset field in bytes
+  @param str             String where to write the data (the record)
+  @param hdr             Dynamic columns record descriptor
   @param column_count    Number of columns in the arrays
-  @parem not_null_count  Number of non-null columns in the arrays
-  @param data_size       Size of the data segment
-  @param column_numbers  Array of columns numbers
+  @param column_keys     Array of columns keys (uint or LEX_STRING)
   @param values          Array of columns values
   @param new_str         True if we need to allocate new string
 
@@ -1032,42 +1341,51 @@ static void set_fixed_header(DYNAMIC_COL
 
 static enum enum_dyncol_func_result
 dynamic_new_column_store(DYNAMIC_COLUMN *str,
-                         size_t header_size,
-                         size_t offset_size,
+                         DYN_HEADER *hdr,
                          uint column_count,
-                         uint not_null_count,
-                         size_t data_size,
-                         uint *column_numbers,
+                         void *column_keys,
                          DYNAMIC_COLUMN_VALUE *values,
                          my_bool new_str)
 {
-  uchar *header_end;
-  uint **columns_order;
+  struct st_service_funcs *fmt= fmt_data + hdr->format;
+  void **columns_order;
+  uchar *element;
   uint i;
-  uint entry_size= COLUMN_NUMBER_SIZE + offset_size;
   enum enum_dyncol_func_result rc= ER_DYNCOL_RESOURCE;
+  size_t all_headers_size;
 
-  if (!(columns_order= malloc(sizeof(uint*)*column_count)))
+  if (!(columns_order= malloc(sizeof(void*)*column_count)))
     return ER_DYNCOL_RESOURCE;
   if (new_str)
   {
     if (dynamic_column_init_str(str,
-                                data_size + header_size + DYNCOL_SYZERESERVE))
+                                fmt->fixed_hdr +
+                                hdr->header_size +
+                                hdr->nmpool_size +
+                                hdr->data_size +
+                                DYNCOL_SYZERESERVE))
       goto err;
   }
   else
   {
     str->length= 0;
-    if (dynstr_realloc(str, data_size + header_size + DYNCOL_SYZERESERVE))
+    if (dynstr_realloc(str,
+                       fmt->fixed_hdr +
+                       hdr->header_size +
+                       hdr->nmpool_size +
+                       hdr->data_size +
+                       DYNCOL_SYZERESERVE))
       goto err;
-    bzero(str->str, FIXED_HEADER_SIZE);
-    str->length= FIXED_HEADER_SIZE;
   }
+  bzero(str->str, fmt->fixed_hdr);
+  str->length= fmt->fixed_hdr;
 
   /* sort columns for the header */
-  for (i= 0; i < column_count; i++)
-    columns_order[i]= column_numbers + i;
-  qsort(columns_order, (size_t)column_count, sizeof(uint*), &column_sort);
+  for (i= 0, element= (uchar *) column_keys;
+       i < column_count;
+       i++, element+= fmt->key_size_in_array)
+    columns_order[i]= (void *)element;
+  qsort(columns_order, (size_t)column_count, sizeof(void*), fmt->column_sort);
 
   /*
     For now we don't allow creating two columns with the same number
@@ -1076,38 +1394,43 @@ dynamic_new_column_store(DYNAMIC_COLUMN
   */
   for (i= 0; i < column_count - 1; i++)
   {
-    if (columns_order[i][0] > UINT_MAX16 ||
-        columns_order[i][0] == columns_order[i + 1][0])
+    if ((*fmt->check_limit)(&columns_order[i]) ||
+        (*fmt->column_sort)(&columns_order[i], &columns_order[i + 1]) == 0)
     {
       rc= ER_DYNCOL_DATA;
       goto err;
     }
   }
-  if (columns_order[i][0] > UINT_MAX16)
+  if ((*fmt->check_limit)(&columns_order[i]))
   {
     rc= ER_DYNCOL_DATA;
     goto err;
   }
 
-  DBUG_ASSERT(str->max_length >= str->length + header_size);
-  set_fixed_header(str, offset_size, not_null_count);
-  str->length+= header_size; /* reserve place for header */
-  header_end= (uchar *)str->str + FIXED_HEADER_SIZE;
+  (*fmt->set_fixed_hdr)(str, hdr);
+  /* reserve place for header and name pool */
+  str->length+= hdr->header_size + hdr->nmpool_size;
+
+  hdr->entry= hdr->header;
+  hdr->name= hdr->nmpool;
+  all_headers_size= fmt->fixed_hdr + hdr->header_size + hdr->nmpool_size;
   for (i= 0; i < column_count; i++)
   {
-    uint ord= columns_order[i] - column_numbers;
+    uint ord= ((uchar*)columns_order[i] - (uchar*)column_keys) /
+      fmt->key_size_in_array;
     if (values[ord].type != DYN_COL_NULL)
     {
       /* Store header first in the str */
-      int2store(header_end, column_numbers[ord]);
-      type_and_offset_store(header_end, offset_size,
-                            values[ord].type,
-                            str->length - header_size - FIXED_HEADER_SIZE);
+      if ((*fmt->put_header_entry)(hdr, columns_order[i], values + ord,
+                                   str->length - all_headers_size))
+      {
+        rc= ER_DYNCOL_FORMAT;
+        goto err;
+      }
 
       /* Store value in 'str + str->length' and increase str->length */
       if ((rc= data_store(str, values + ord)))
         goto err;
-      header_end+= entry_size;
     }
   }
   rc= ER_DYNCOL_OK;
@@ -1117,61 +1440,88 @@ err:
 }
 
 /**
-  Create packed string which contains given columns (internal)
+  Calculate size of header, name pool and data pool
 
-  @param str             String where to write the data
+  @param hdr             descriptor of dynamic column record
+  @param column_count    number of elements in arrays
   @param column_count    Number of columns in the arrays
-  @param column_numbers  Array of columns numbers
+  @param column_keys     Array of columns keys (uint or LEX_STRING)
   @param values          Array of columns values
-  @param new_str         True if we need allocate new string
 
   @return ER_DYNCOL_* return code
 */
 
 static enum enum_dyncol_func_result
-dynamic_column_create_many_internal(DYNAMIC_COLUMN *str,
-                                    uint column_count,
-                                    uint *column_numbers,
-                                    DYNAMIC_COLUMN_VALUE *values,
-                                    my_bool new_str)
+calc_var_sizes(DYN_HEADER *hdr,
+               uint column_count,
+               void *column_keys,
+               DYNAMIC_COLUMN_VALUE *values)
 {
-  size_t data_size= 0;
-  size_t header_size, offset_size;
+  struct st_service_funcs *fmt= fmt_data + hdr->format;
   uint i;
-  int not_null_column_count= 0;
-
-  if (new_str)
-  {
-    /* to make dynstr_free() working in case of errors */
-    bzero(str, sizeof(DYNAMIC_COLUMN));
-  }
-
+  hdr->nmpool_size= hdr->data_size= 0;
+  hdr->column_count= 0;
   for (i= 0; i < column_count; i++)
   {
     if (values[i].type != DYN_COL_NULL)
     {
       size_t tmp;
-      not_null_column_count++;
-      data_size+= (tmp=dynamic_column_value_len(values + i));
+      hdr->column_count++;
+      hdr->data_size+= (tmp= dynamic_column_value_len(values + i));
       if (tmp == (size_t) ~0)
         return ER_DYNCOL_DATA;
+      hdr->nmpool_size+= (*fmt->name_size)(column_keys, i);
     }
   }
-
   /* We can handle data up to 1fffffff = 536870911 bytes now */
-  if ((offset_size= dynamic_column_offset_bytes(data_size)) >=
+  if ((hdr->offset_size= dynamic_column_offset_bytes(hdr->data_size)) >=
       MAX_OFFSET_LENGTH)
     return ER_DYNCOL_LIMIT;
 
-  /* header entry is column number + offset & type */
-  header_size= not_null_column_count * (offset_size + 2);
+  /* header entry is column number or string pointer + offset & type */
+  hdr->entry_size= fmt->fixed_hdr_entry + hdr->offset_size;
+  hdr->header_size= hdr->column_count * hdr->entry_size;
+  return ER_DYNCOL_OK;
+}
+
+/**
+  Create packed string which contains given columns (internal multi format)
+
+  @param str             String where to write the data
+  @param column_count    Number of columns in the arrays
+  @param column_keys     Array of columns keys (format dependent)
+  @param values          Array of columns values
+  @param new_str         True if we need allocate new string
+  @param string_keys     keys are strings
+
+  @return ER_DYNCOL_* return code
+*/
+
+static enum enum_dyncol_func_result
+dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
+                                        uint column_count,
+                                        void *column_keys,
+                                        DYNAMIC_COLUMN_VALUE *values,
+                                        my_bool new_str,
+                                        my_bool string_keys)
+{
+  DYN_HEADER header;
+  enum enum_dyncol_func_result rc;
+  bzero(&header, sizeof(header));
+  header.format= (string_keys ? 1 : 0);
+
+  if (new_str)
+  {
+    /* to make dynstr_free() working in case of errors */
+    bzero(str, sizeof(DYNAMIC_COLUMN));
+  }
+
+  if ((rc= calc_var_sizes(&header, column_count, column_keys, values)) < 0)
+    return rc;
 
-  return dynamic_new_column_store(str,
-                                  header_size, offset_size,
+  return dynamic_new_column_store(str, &header,
                                   column_count,
-                                  not_null_column_count,
-                                  data_size,
-                                  column_numbers, values,
+                                  column_keys, values,
                                   new_str);
 }
 
@@ -1194,11 +1544,35 @@ dynamic_column_create_many(DYNAMIC_COLUM
                            DYNAMIC_COLUMN_VALUE *values)
 {
   DBUG_ENTER("dynamic_column_create_many");
-  DBUG_RETURN(dynamic_column_create_many_internal(str, column_count,
-                                                  column_numbers, values,
-                                                  TRUE));
+  DBUG_RETURN(dynamic_column_create_many_internal_fmt(str, column_count,
+                                                      column_numbers, values,
+                                                      TRUE, FALSE));
 }
 
+/**
+  Create packed string which contains given columns
+
+  @param str             String where to write the data
+  @param column_count    Number of columns in the arrays
+  @param column_keys     Array of columns keys
+  @param values          Array of columns value
+  @param names           use string names as keys
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_create_many_fmt(DYNAMIC_COLUMN *str,
+                               uint column_count,
+                               uchar *column_keys,
+                               DYNAMIC_COLUMN_VALUE *values,
+                               my_bool names)
+{
+  DBUG_ENTER("dynamic_column_create_many");
+  DBUG_RETURN(dynamic_column_create_many_internal_fmt(str, column_count,
+                                                      column_keys, values,
+                                                      TRUE, names));
+}
 
 /**
   Create packed string which contains given column
@@ -1239,32 +1613,46 @@ static size_t get_length_interval(uchar
   DYNAMIC_COLUMN_TYPE type, type_next;
   DBUG_ASSERT(entry < entry_next);
 
-  type_and_offset_read(&type, &offset, entry, offset_size);
+  if (type_and_offset_read(&type, &offset, entry + COLUMN_NUMBER_SIZE,
+                           offset_size))
+      return DYNCOL_OFFSET_ERROR;
   if (entry_next >= header_end)
     return (last_offset - offset);
-  type_and_offset_read(&type_next, &offset_next, entry_next, offset_size);
+  if (type_and_offset_read(&type_next, &offset_next,
+                           entry_next + COLUMN_NUMBER_SIZE, offset_size))
+    return DYNCOL_OFFSET_ERROR;
   return (offset_next - offset);
 }
 
-/*
-  Calculate length of data of one column
 
+/**
+  Calculate length of data between given hdr->entry and next_entry
 
-  @param entry           Pointer to the first entry
-  @param header_end      Pointer to the header end
-  @param offset_size     Size of offset field in bytes
-  @param last_offset     Size of the data segment
+  @param hdr             descriptor of dynamic column record
+  @param next_entry      next header entry (can point just after last header
+                         entry)
 
   @return number of bytes
 */
 
-static size_t get_length(uchar *entry, uchar *header_end,
-                         size_t offset_size,
-                         size_t last_offset)
-{
-  return get_length_interval(entry,
-                             entry + offset_size + COLUMN_NUMBER_SIZE,
-                             header_end, offset_size, last_offset);
+static size_t hdr_interval_length(DYN_HEADER *hdr, uchar *next_entry)
+{
+  struct st_service_funcs *fmt= fmt_data + hdr->format;
+  size_t next_entry_offset;
+  DYNAMIC_COLUMN_TYPE next_entry_type;
+  DBUG_ASSERT(hdr->entry < next_entry);
+  DBUG_ASSERT(hdr->entry >= hdr->header);
+  DBUG_ASSERT(next_entry <= hdr->header + hdr->header_size);
+
+  if (type_and_offset_read(&hdr->type, &hdr->offset,
+                           hdr->entry + fmt->fixed_hdr_entry, hdr->offset_size))
+    return DYNCOL_OFFSET_ERROR;
+  if (next_entry == hdr->header + hdr->header_size)
+    return hdr->data_size - hdr->offset;
+  if (type_and_offset_read(&next_entry_type, &next_entry_offset,
+                           next_entry + fmt->fixed_hdr_entry, hdr->offset_size))
+    return DYNCOL_OFFSET_ERROR;
+  return (next_entry_offset - hdr->offset);
 }
 
 
@@ -1272,7 +1660,7 @@ static size_t get_length(uchar *entry, u
   Comparator function for references to header entries for qsort
 */
 
-static int header_compar(const void *a, const void *b)
+static int header_compar_num(const void *a, const void *b)
 {
   uint va= uint2korr((uchar*)a), vb= uint2korr((uchar*)b);
   return (va > vb ? 1 : (va < vb ? -1 : 0));
@@ -1280,151 +1668,300 @@ static int header_compar(const void *a,
 
 
 /**
-  Find column and fill information about it
+  Find entry in the numeric format header by the column number
 
-  @param type            Returns type of the column
-  @param data            Returns a pointer to the data
-  @param length          Returns length of the data
-  @param offset_size     Size of offset field in bytes
-  @param column_count    Number of column in the packed string
-  @param data_end        Pointer to the data end
-  @param num             Number of the column we want to fetch
-  @param entry_pos       NULL or place where to put reference to the entry
+  @param hdr             descriptor of dynamic column record
+  @param key             number to find
 
-  @return 0 ok
-  @return 1 error in data
+  @return pointer to the entry or NULL
 */
 
-static my_bool
-find_column(DYNAMIC_COLUMN_TYPE *type, uchar **data, size_t *length,
-            uchar *header, size_t offset_size, uint column_count,
-            uchar *data_end, uint num, uchar **entry_pos)
+static uchar *find_entry_num(DYN_HEADER *hdr, uint key)
 {
-  uchar *entry;
-  size_t offset, total_data, header_size, entry_size;
-  uchar key[2+4];
+  uchar header_entry[2+4];
+  DBUG_ASSERT(hdr->format == DYNCOL_FMT_NUM);
+  int2store(header_entry, key);
+  return hdr->entry= bsearch(header_entry, hdr->header,
+                             (size_t)hdr->column_count,
+                             hdr->entry_size, &header_compar_num);
+}
 
-  if (!entry_pos)
-    entry_pos= &entry;
 
-  calc_param(&entry_size, &header_size, offset_size, column_count);
+/**
+  Find entry in the names format header by the column number
 
-  if (header + header_size > data_end)
-    return 1;
+  @param hdr             descriptor of dynamic column record
+  @param key             name to find
 
-  int2store(key, num);
-  entry= bsearch(key, header, (size_t)column_count, entry_size,
-                 &header_compar);
-  if (!entry)
+  @return pointer to the entry or NULL
+*/
+static uchar *find_entry_str(DYN_HEADER *hdr, LEX_STRING *key)
+{
+  uchar *min= hdr->header;
+  uchar *max= hdr->header + (hdr->column_count - 1) * hdr->entry_size;
+  uchar *mid;
+  DBUG_ASSERT(hdr->format == DYNCOL_FMT_STR);
+  DBUG_ASSERT(hdr->nmpool != NULL);
+  while (max >= min)
   {
-    /* Column not found */
-    *type= DYN_COL_NULL;
-    *entry_pos= NULL;
-    return 0;
+    uint len;
+    int cmp;
+    mid= hdr->header + ((min - hdr->header) + (max - hdr->header)) / 2 / hdr->entry_size * hdr->entry_size;
+    len= mid[0];
+    cmp= len - key->length;
+    if (cmp == 0)
+      cmp= memcmp(hdr->nmpool +  uint2korr(mid + 1), key->str, len);
+    if (cmp < 0)
+      min= mid + hdr->entry_size;
+    else if (cmp > 0)
+      max= mid - hdr->entry_size;
+    else
+      return mid;
   }
-  type_and_offset_read(type, &offset, entry, offset_size);
-  total_data= data_end - (header + header_size);
-  if (offset > total_data)
-    return 1;
-  *data= header + header_size + offset;
-  *length= get_length(entry, header + header_size, offset_size,
-                      total_data);
-  /*
-    Check that the found data is withing the ranges. This can happen if
-    we get data with wrong offsets.
-  */
-  if ((long) *length < 0 || offset + *length > total_data)
-    return 1;
-
-  *entry_pos= entry;
-  return 0;
+  return NULL;
 }
 
 
 /**
-   Read and check the header of the dynamic string
-
-  @param str             Dynamic string
+  Write number in the buffer (backward direction - starts from the buffer end)
 
-  @retval FALSE OK
-  @retval TRUE  error
-
-  Note
-    We don't check for str->length == 0 as all code that calls this
-    already have handled this case.
+  @return pointer on the number begining
 */
 
-static inline my_bool read_fixed_header(DYNAMIC_COLUMN *str,
-                                        size_t *offset_size,
-                                        uint *column_count)
+static char *backwritenum(char *chr, uint numkey)
 {
-  DBUG_ASSERT(str != NULL && str->length != 0);
-  if ((str->length < FIXED_HEADER_SIZE) ||
-      (str->str[0] & (~DYNCOL_FLG_KNOWN)))
-    return 1;                                   /* Wrong header */
-  *offset_size= (str->str[0] & DYNCOL_FLG_OFFSET) + 1;
-  *column_count= uint2korr(str->str + 1);
-  return 0;
+  if (numkey == 0)
+    *(--chr)= '0';
+  else
+    while (numkey > 0)
+    {
+      *(--chr)= '0' + numkey % 10;
+      numkey/= 10;
+    }
+  return chr;
 }
 
 
 /**
-  Get dynamic column value
+  Find column and fill information about it
 
-  @param str             The packed string to extract the column
-  @param column_nr       Number of column to fetch
+  @param hdr             descriptor of dynamic column record
+  @param numkey          Number of the column to fetch (if strkey is NULL)
+  @param strkey          Name of the column to fetch (or NULL)
+
+  @return 0 ok
+  @return 1 error in data
+*/
+
+static my_bool
+find_column(DYN_HEADER *hdr, uint numkey, LEX_STRING *strkey)
+{
+  LEX_STRING nmkey;
+  char nmkeybuff[6]; /* to fit max 2 bytes number */
+  DBUG_ASSERT(hdr->header != NULL);
+
+  if (hdr->header + hdr->header_size > hdr->data_end)
+    return TRUE;
+
+  /* fix key */
+  if (hdr->format == DYNCOL_FMT_NUM && strkey != NULL)
+  {
+    char *end;
+    numkey= (uint) strtoul(strkey->str, &end, 10);
+    if (end != strkey->str + strkey->length)
+    {
+      /* we can't find non-numeric key among numeric ones */
+      hdr->type= DYN_COL_NULL;
+      return 0;
+    }
+  }
+  else if (hdr->format == DYNCOL_FMT_STR && strkey == NULL)
+  {
+    nmkey.str= backwritenum(nmkeybuff + sizeof(nmkeybuff), numkey);
+    nmkey.length= (nmkeybuff + sizeof(nmkeybuff)) - nmkey.str;
+    strkey= &nmkey;
+  }
+  if (hdr->format == DYNCOL_FMT_NUM)
+    hdr->entry= find_entry_num(hdr, numkey);
+  else
+    hdr->entry= find_entry_str(hdr, strkey);
+
+  if (!hdr->entry)
+  {
+    /* Column not found */
+    hdr->type= DYN_COL_NULL;
+    return 0;
+  }
+  hdr->length= hdr_interval_length(hdr, hdr->entry + hdr->entry_size);
+  hdr->data= hdr->dtpool + hdr->offset;
+  /*
+    Check that the found data is withing the ranges. This can happen if
+    we get data with wrong offsets.
+  */
+  if (hdr->length == DYNCOL_OFFSET_ERROR ||
+      hdr->length > INT_MAX || hdr->offset > hdr->data_size)
+    return 1;
+
+  return 0;
+}
+
+
+/**
+  Read and check the header of the dynamic string
+
+  @param hdr             descriptor of dynamic column record
+  @param str             Dynamic string
+
+  @retval FALSE OK
+  @retval TRUE  error
+
+  Note
+    We don't check for str->length == 0 as all code that calls this
+    already have handled this case.
+*/
+
+static inline my_bool read_fixed_header(DYN_HEADER *hdr,
+                                        DYNAMIC_COLUMN *str)
+{
+  DBUG_ASSERT(str != NULL && str->length != 0);
+  if ((str->length < 1)  ||
+      (str->str[0] & (~DYNCOL_FLG_KNOWN)))
+    return 1;
+  hdr->format= ((str->str[0] & DYNCOL_FLG_NAMES) ?
+                DYNCOL_FMT_STR:
+                DYNCOL_FMT_NUM);
+  if ((str->length < fmt_data[hdr->format].fixed_hdr))
+    return 1;                                   /* Wrong header */
+  hdr->offset_size= (str->str[0] & DYNCOL_FLG_OFFSET) + 1;
+  hdr->column_count= uint2korr(str->str + 1);
+  if (hdr->format == DYNCOL_FMT_STR)
+    hdr->nmpool_size= uint2korr(str->str + 3);
+  else
+    hdr->nmpool_size= 0;
+  return 0;
+}
+
+
+/**
+  Get dynamic column value by column number
+
+  @param str             The packed string to extract the column
+  @param column_nr       Number of column to fetch
   @param store_it_here   Where to store the extracted value
 
   @return ER_DYNCOL_* return code
 */
 
-int dynamic_column_get(DYNAMIC_COLUMN *str, uint column_nr,
+enum enum_dyncol_func_result
+dynamic_column_get(DYNAMIC_COLUMN *str, uint column_nr,
                        DYNAMIC_COLUMN_VALUE *store_it_here)
 {
-  uchar *data;
-  size_t offset_size, length;
-  uint column_count;
+  return dynamic_column_get_internal(str, store_it_here, column_nr, NULL);
+}
+
+
+/**
+  Get dynamic column value by name
+
+  @param str             The packed string to extract the column
+  @param name            Name of column to fetch
+  @param store_it_here   Where to store the extracted value
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_get_str(DYNAMIC_COLUMN *str, LEX_STRING *name,
+                           DYNAMIC_COLUMN_VALUE *store_it_here)
+{
+  DBUG_ASSERT(name != NULL);
+  return dynamic_column_get_internal(str, store_it_here, 0, name);
+}
+
+
+/**
+  Get dynamic column value by number or name
+
+  @param str             The packed string to extract the column
+  @param key             Name or number of column to fetch
+                         (depends on string_key)
+  @param store_it_here   Where to store the extracted value
+  @param string_key      True if we gave pointer to LEX_STRING.
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_get_fmt(DYNAMIC_COLUMN *str, void *key,
+                           DYNAMIC_COLUMN_VALUE *store_it_here,
+                           my_bool string_key)
+{
+  DBUG_ASSERT(key != NULL);
+  if (string_key)
+    return dynamic_column_get_internal(str, store_it_here,
+                                       0, (LEX_STRING *)key);
+  return dynamic_column_get_internal(str, store_it_here,
+                                     *((uint *)key), NULL);
+}
+
+
+/**
+  Get dynamic column value by number or name
+
+  @param str             The packed string to extract the column
+  @param store_it_here   Where to store the extracted value
+  @param numkey          Number of the column to fetch (if strkey is NULL)
+  @param strkey          Name of the column to fetch (or NULL)
+
+  @return ER_DYNCOL_* return code
+*/
+
+static enum enum_dyncol_func_result
+dynamic_column_get_internal(DYNAMIC_COLUMN *str,
+                            DYNAMIC_COLUMN_VALUE *store_it_here,
+                            uint num_key, LEX_STRING *str_key)
+{
+  DYN_HEADER header;
   enum enum_dyncol_func_result rc= ER_DYNCOL_FORMAT;
+  bzero(&header, sizeof(header));
 
   if (str->length == 0)
     goto null;
 
-  if (read_fixed_header(str, &offset_size, &column_count))
+  if ((rc= init_read_hdr(&header, str)) < 0)
     goto err;
 
-  if (column_count == 0)
+  if (header.column_count == 0)
     goto null;
 
-  if (find_column(&store_it_here->type, &data, &length,
-                  (uchar*)str->str + FIXED_HEADER_SIZE,
-                  offset_size, column_count, (uchar*)str->str + str->length,
-                  column_nr, NULL))
+  if (find_column(&header, num_key, str_key))
     goto err;
 
-  switch (store_it_here->type) {
+  switch ((store_it_here->type= header.type)) {
   case DYN_COL_INT:
-    rc= dynamic_column_sint_read(store_it_here, data, length);
+    rc= dynamic_column_sint_read(store_it_here, header.data, header.length);
     break;
   case DYN_COL_UINT:
-    rc= dynamic_column_uint_read(store_it_here, data, length);
+    rc= dynamic_column_uint_read(store_it_here, header.data, header.length);
     break;
   case DYN_COL_DOUBLE:
-    rc= dynamic_column_double_read(store_it_here, data, length);
+    rc= dynamic_column_double_read(store_it_here, header.data, header.length);
     break;
   case DYN_COL_STRING:
-    rc= dynamic_column_string_read(store_it_here, data, length);
+    rc= dynamic_column_string_read(store_it_here, header.data, header.length);
     break;
   case DYN_COL_DECIMAL:
-    rc= dynamic_column_decimal_read(store_it_here, data, length);
+    rc= dynamic_column_decimal_read(store_it_here, header.data, header.length);
     break;
   case DYN_COL_DATETIME:
-    rc= dynamic_column_date_time_read(store_it_here, data, length);
+    rc= dynamic_column_date_time_read(store_it_here, header.data,
+                                      header.length);
     break;
   case DYN_COL_DATE:
-    rc= dynamic_column_date_read(store_it_here, data, length);
+    rc= dynamic_column_date_read(store_it_here, header.data, header.length);
     break;
   case DYN_COL_TIME:
-    rc= dynamic_column_time_read(store_it_here, data, length);
+    rc= dynamic_column_time_read(store_it_here, header.data, header.length);
     break;
   case DYN_COL_NULL:
     rc= ER_DYNCOL_OK;
@@ -1441,282 +1978,899 @@ err:
     return rc;
 }
 
+
+/**
+  Check existence of the column in the packed string (by number)
+
+  @param str             The packed string to check the column
+  @param column_nr       Number of column to check
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_exists(DYNAMIC_COLUMN *str, uint column_nr)
+{
+  return dynamic_column_exists_internal(str, column_nr, NULL);
+}
+
+
+/**
+  Check existence of the column in the packed string (by name)
+
+  @param str             The packed string to check the column
+  @param name            Name of column to check
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_exists_str(DYNAMIC_COLUMN *str, LEX_STRING *name)
+{
+  DBUG_ASSERT(name != NULL);
+  return dynamic_column_exists_internal(str, 0, name);
+}
+
+
 /**
-  Delete column with given number from the packed string
+  Check existence of the column in the packed string (by name of number)
 
-  @param str             The packed string to delete the column
-  @param column_nr       Number of column to delete
+  @param str             The packed string to check the column
+  @param key             Name or number of column to fetch
+                         (depends on string_key)
+  @param string_key      True if we gave pointer to LEX_STRING.
 
   @return ER_DYNCOL_* return code
 */
 
-int dynamic_column_delete(DYNAMIC_COLUMN *str, uint column_nr)
+enum enum_dyncol_func_result
+dynamic_column_exists_fmt(DYNAMIC_COLUMN *str, void *key, my_bool string_key)
 {
-  uchar *data, *header_entry, *read, *write;
-  size_t offset_size, new_offset_size, length, entry_size, new_entry_size,
-         header_size, new_header_size, data_size, new_data_size,
-         deleted_entry_offset;
-  uint column_count, i;
-  DYNAMIC_COLUMN_TYPE type;
+  DBUG_ASSERT(key != NULL);
+  if (string_key)
+    return dynamic_column_exists_internal(str, 0, (LEX_STRING *) key);
+  return dynamic_column_exists_internal(str, *((uint *)key), NULL);
+}
+
+
+/**
+  Check existence of the column in the packed string (by name of number)
+
+  @param str             The packed string to check the column
+  @param num_key         Number of the column to fetch (if strkey is NULL)
+  @param str_key         Name of the column to fetch (or NULL)
+
+  @return ER_DYNCOL_* return code
+*/
+
+static enum enum_dyncol_func_result
+dynamic_column_exists_internal(DYNAMIC_COLUMN *str, uint num_key,
+                               LEX_STRING *str_key)
+{
+  DYN_HEADER header;
+  enum enum_dyncol_func_result rc;
+  bzero(&header, sizeof(header));
 
   if (str->length == 0)
-    return ER_DYNCOL_OK;  /* no columns */
+    return ER_DYNCOL_NO;                        /* no columns */
 
-  if (read_fixed_header(str, &offset_size, &column_count))
+  if ((rc= init_read_hdr(&header, str)) < 0)
+    return rc;
+
+  if (header.column_count == 0)
+    return ER_DYNCOL_NO;                        /* no columns */
+
+  if (find_column(&header, num_key, str_key))
     return ER_DYNCOL_FORMAT;
 
-  if (column_count == 0)
-  {
-    str->length= 0;
-    return ER_DYNCOL_OK;  /* no columns */
-  }
+  return (header.type != DYN_COL_NULL ? ER_DYNCOL_YES : ER_DYNCOL_NO);
+}
 
-  if (find_column(&type, &data, &length, (uchar*)str->str + FIXED_HEADER_SIZE,
-                  offset_size, column_count, (uchar*)str->str + str->length,
-                  column_nr, &header_entry))
+
+/**
+  List not-null columns in the packed string (only numeric foemat)
+
+  @param str             The packed string
+  @param array_of_uint   Where to put reference on created array
+
+  @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_list(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_uint)
+{
+  DYN_HEADER header;
+  uchar *read;
+  uint i;
+  enum enum_dyncol_func_result rc;
+
+  bzero(array_of_uint, sizeof(*array_of_uint)); /* In case of errors */
+  if (str->length == 0)
+    return ER_DYNCOL_OK;                        /* no columns */
+
+  if ((rc= init_read_hdr(&header, str)) < 0)
+    return rc;
+
+  if (header.format != DYNCOL_FMT_NUM)
     return ER_DYNCOL_FORMAT;
 
-  if (type == DYN_COL_NULL)
-    return ER_DYNCOL_OK;  /* no such column */
+  if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
+      str->length)
+    return ER_DYNCOL_FORMAT;
 
-  if (column_count == 1)
+  if (init_dynamic_array(array_of_uint, sizeof(uint), header.column_count, 0))
+    return ER_DYNCOL_RESOURCE;
+
+  for (i= 0, read= header.header;
+       i < header.column_count;
+       i++, read+= header.entry_size)
   {
-    /* delete the only column; Return empty string */
-    str->length= 0;
-    return ER_DYNCOL_OK;
+    uint nm= uint2korr(read);
+    /* Insert can't never fail as it's pre-allocated above */
+    (void) insert_dynamic(array_of_uint, (uchar *)&nm);
   }
+  return ER_DYNCOL_OK;
+}
 
-  /* Calculate entry_size and header_size */
-  calc_param(&entry_size, &header_size, offset_size, column_count);
-  data_size= str->length - FIXED_HEADER_SIZE - header_size;
 
-  new_data_size= data_size - length;
-  if ((new_offset_size= dynamic_column_offset_bytes(new_data_size)) >=
-      MAX_OFFSET_LENGTH)
-    return ER_DYNCOL_LIMIT;
-  DBUG_ASSERT(new_offset_size <= offset_size);
+/**
+  List not-null columns in the packed string (any format)
 
-  calc_param(&new_entry_size, &new_header_size,
-             new_offset_size, column_count - 1);
+  @param str             The packed string
+  @param array_of_lexstr Where to put reference on created array
 
-  deleted_entry_offset= ((data - (uchar*) str->str) -
-                         header_size - FIXED_HEADER_SIZE);
+  @return ER_DYNCOL_* return code
+*/
 
-  /* rewrite header*/
-  set_fixed_header(str, new_offset_size, column_count - 1);
-  for (i= 0, write= read= (uchar *)str->str + FIXED_HEADER_SIZE;
-       i < column_count;
-       i++, read+= entry_size, write+= new_entry_size)
+enum enum_dyncol_func_result
+dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr)
+{
+  DYN_HEADER header;
+  uchar *read;
+  struct st_service_funcs *fmt;
+  uint i;
+  enum enum_dyncol_func_result rc;
+
+  bzero(array_of_lexstr, sizeof(*array_of_lexstr)); /* In case of errors */
+  if (str->length == 0)
+    return ER_DYNCOL_OK;                        /* no columns */
+
+  if ((rc= init_read_hdr(&header, str)) < 0)
+    return rc;
+
+  fmt= fmt_data + header.format;
+
+  if (header.entry_size * header.column_count + fmt->fixed_hdr >
+      str->length)
+    return ER_DYNCOL_FORMAT;
+
+  if (init_dynamic_array(array_of_lexstr, sizeof(LEX_STRING),
+                         header.column_count, 0))
+    return ER_DYNCOL_RESOURCE;
+
+  for (i= 0, read= header.header;
+       i < header.column_count;
+       i++, read+= header.entry_size)
   {
-    size_t offs;
-    uint nm;
-    DYNAMIC_COLUMN_TYPE tp;
-    if (read == header_entry)
+    LEX_STRING tmp;
+    if (header.format == DYNCOL_FMT_NUM)
+    {
+      uint nm= uint2korr(read);
+      tmp.str= my_malloc(6, MYF(0));
+      if (!tmp.str)
+        return ER_DYNCOL_RESOURCE;
+      tmp.length= snprintf(tmp.str, 6, "%u", nm);
+    }
+    else
     {
-#ifndef DBUG_OFF
-      nm= uint2korr(read);
-      type_and_offset_read(&tp, &offs, read,
-                           offset_size);
-      DBUG_ASSERT(nm == column_nr);
-      DBUG_ASSERT(offs == deleted_entry_offset);
-#endif
-      write-= new_entry_size;                 /* do not move writer */
-      continue;                               /* skip removed field */
+      tmp.length= read[0];
+      tmp.str= my_malloc(tmp.length + 1, MYF(0));
+      if(!tmp.str)
+        return ER_DYNCOL_RESOURCE;
+      memcpy(tmp.str, (const void *)header.nmpool + uint2korr(read + 1),
+             tmp.length);
+      tmp.str[tmp.length]= '\0'; // just for safety
     }
+    /* Insert can't never fail as it's pre-allocated above */
+    (void) insert_dynamic(array_of_lexstr, (uchar *)&tmp);
+  }
+  return ER_DYNCOL_OK;
+}
 
-    nm= uint2korr(read),
-    type_and_offset_read(&tp, &offs, read,
-                         offset_size);
+/**
+  Find the place of the column in the header or place where it should be put
 
-    if (offs > deleted_entry_offset)
-      offs-= length;              /* data stored after removed data */
+  @param hdr             descriptor of dynamic column record
+  @param key             Name or number of column to fetch
+                         (depends on string_key)
+  @param string_key      True if we gave pointer to LEX_STRING.
 
-    int2store(write, nm);
-    type_and_offset_store(write, new_offset_size, tp, offs);
-  }
+  @retval TRUE found
+  @retval FALSE pointer set to the next row
+*/
+
+static my_bool
+find_place(DYN_HEADER *hdr, void *key, my_bool string_keys)
+{
+  uint mid, start, end, val;
+  int flag;
+  LEX_STRING str;
+  char buff[6];
+  my_bool need_conversion= ((string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM) !=
+                            hdr->format);
+  LINT_INIT(flag);                              /* 100 % safe */
+  /* new format can't be numeric if the old one is names */
+  DBUG_ASSERT(string_keys ||
+              hdr->format == DYNCOL_FMT_NUM);
 
-  /* move data */
+  start= 0;
+  end= hdr->column_count -1;
+  mid= 1;
+  while (start != end)
   {
-    size_t first_chunk_len= ((data - (uchar *)str->str) -
-                             FIXED_HEADER_SIZE - header_size);
-    size_t second_chunk_len= new_data_size - first_chunk_len;
-    if (first_chunk_len)
-      memmove(str->str + FIXED_HEADER_SIZE + new_header_size,
-              str->str + FIXED_HEADER_SIZE + header_size,
-              first_chunk_len);
-    if (second_chunk_len)
-      memmove(str->str +
-              FIXED_HEADER_SIZE + new_header_size + first_chunk_len,
-              str->str +
-              FIXED_HEADER_SIZE + header_size + first_chunk_len + length,
-              second_chunk_len);
+    uint val;
+    mid= (start + end) / 2;
+    hdr->entry= hdr->header + mid * hdr->entry_size;
+    if (!string_keys)
+    {
+      val= uint2korr(hdr->entry);
+      flag= CMP_NUM(*((uint *)key), val);
+    }
+    else
+    {
+      if (need_conversion)
+      {
+        str.str= backwritenum(buff + sizeof(buff), uint2korr(hdr->entry));
+        str.length= (buff + sizeof(buff)) - str.str;
+      }
+      else
+      {
+        DBUG_ASSERT(hdr->format == DYNCOL_FMT_STR);
+        str.length= hdr->entry[0];
+        str.str= (char *)hdr->nmpool + uint2korr(hdr->entry + 1);
+      }
+      flag= ((LEX_STRING *) key)->length - str.length;
+      if (flag == 0)
+        flag= memcmp(((LEX_STRING *) key)->str, str.str, str.length);
+    }
+    if (flag <= 0)
+      end= mid;
+    else
+      start= mid + 1;
   }
+  hdr->entry= hdr->header + start * hdr->entry_size;
+  if (start != mid)
+  {
+    if (!string_keys)
+    {
+      val= uint2korr(hdr->entry);
+      flag= CMP_NUM(*((uint *)key), val);
+    }
+    else
+    {
+      if (need_conversion)
+      {
+        str.str= backwritenum(buff + sizeof(buff), uint2korr(hdr->entry));
+        str.length= (buff + sizeof(buff)) - str.str;
+      }
+      else
+      {
+        DBUG_ASSERT(hdr->format == DYNCOL_FMT_STR);
+        str.length= hdr->entry[0];
+        str.str= (char*) hdr->nmpool + uint2korr(hdr->entry + 1);
+      }
+      flag= ((LEX_STRING *) key)->length - str.length;
+      if (flag == 0)
+        flag= memcmp(((LEX_STRING *) key)->str, str.str, str.length);
+    }
+  }
+  if (flag > 0)
+    hdr->entry+= hdr->entry_size; /* Point at next bigger key */
+  return flag == 0;
+}
 
-  /* fix str length */
-  DBUG_ASSERT(str->length >=
-              FIXED_HEADER_SIZE + new_header_size + new_data_size);
-  str->length= FIXED_HEADER_SIZE + new_header_size + new_data_size;
 
-  return ER_DYNCOL_OK;
+/*
+  It is internal structure which describes plan of chenging the record
+  of dynamic columns
+*/
+
+typedef enum {PLAN_REPLACE, PLAN_ADD, PLAN_DELETE, PLAN_NOP} PLAN_ACT;
+
+struct st_plan {
+  DYNAMIC_COLUMN_VALUE *val;
+  void *key;
+  uchar *place;
+  size_t length;
+  int hdelta, ddelta, ndelta;
+  uint mv_offset, mv_length, mv_end;
+  PLAN_ACT act;
+};
+typedef struct st_plan PLAN;
+
+
+/**
+  Sort function for plan by column number
+*/
+
+static int plan_sort_num(const void *a, const void *b)
+{
+  return *((uint *)((PLAN *)a)->key) - *((uint *)((PLAN *)b)->key);
 }
 
 
 /**
-  Check existence of the column in the packed string
+  Sort function for plan by column name
+*/
 
-  @param str             The packed string to check the column
-  @param column_nr       Number of column to check
+static int plan_sort_str(const void *a, const void *b)
+{
+  int res= (((LEX_STRING *)((PLAN *)a)->key)->length -
+            ((LEX_STRING *)((PLAN *)b)->key)->length);
+  if (res == 0)
+    res= memcmp(((LEX_STRING *)((PLAN *)a)->key)->str,
+                ((LEX_STRING *)((PLAN *)b)->key)->str,
+                ((LEX_STRING *)((PLAN *)a)->key)->length);
+  return res;
+}
+
+#define DELTA_CHECK(S, D, C)        \
+  if ((S) == 0)                     \
+    (S)= (D);                       \
+  else if (((S) > 0 && (D) < 0) ||  \
+            ((S) < 0 && (D) > 0))   \
+  {                                 \
+    (C)= TRUE;                      \
+  }
+
+/**
+  Update dynamic column by copying in a new record (string).
+
+  @param str             Dynamic column record to change
+  @param plan            Plan of changing the record
+  @param add_column_count number of records in the plan array.
+  @param hdr             descriptor of old dynamic column record
+  @param new_hdr         descriptor of new dynamic column record
+  @param convert         need conversion from numeric to names format
 
   @return ER_DYNCOL_* return code
 */
 
 enum enum_dyncol_func_result
-dynamic_column_exists(DYNAMIC_COLUMN *str, uint column_nr)
+dynamic_column_update_copy(DYNAMIC_COLUMN *str, PLAN *plan,
+                           uint add_column_count,
+                           DYN_HEADER *hdr, DYN_HEADER *new_hdr,
+                           my_bool convert)
 {
-  uchar *data;
-  size_t offset_size, length;
-  uint column_count;
-  DYNAMIC_COLUMN_TYPE type;
+  DYNAMIC_COLUMN tmp;
+  struct st_service_funcs *fmt= fmt_data + hdr->format,
+                          *new_fmt= fmt_data + new_hdr->format;
+  uint i, j, k;
+  size_t all_headers_size;
 
-  if (str->length == 0)
-    return ER_DYNCOL_NO;                        /* no columns */
+  if (dynamic_column_init_str(&tmp,
+                              (new_fmt->fixed_hdr + new_hdr->header_size +
+                               new_hdr->nmpool_size +
+                               new_hdr->data_size + DYNCOL_SYZERESERVE)))
+  {
+    return ER_DYNCOL_RESOURCE;
+  }
+  bzero(tmp.str, new_fmt->fixed_hdr);
+  (*new_fmt->set_fixed_hdr)(&tmp, new_hdr);
+  /* Adjust tmp to contain whole the future header */
+  tmp.length= new_fmt->fixed_hdr + new_hdr->header_size + new_hdr->nmpool_size;
 
-  if (read_fixed_header(str, &offset_size, &column_count))
-    return ER_DYNCOL_FORMAT;
 
-  if (column_count == 0)
-    return ER_DYNCOL_NO;                        /* no columns */
+  /*
+    Copy data to the new string
+    i= index in array of changes
+    j= index in packed string header index
+  */
+  new_hdr->entry= new_hdr->header;
+  new_hdr->name= new_hdr->nmpool;
+  all_headers_size= new_fmt->fixed_hdr +
+    new_hdr->header_size + new_hdr->nmpool_size;
+  for (i= 0, j= 0; i < add_column_count || j < hdr->column_count; i++)
+  {
+    size_t first_offset;
+    uint start= j, end;
+    LINT_INIT(first_offset);
 
-  if (find_column(&type, &data, &length, (uchar*)str->str + FIXED_HEADER_SIZE,
-                  offset_size, column_count, (uchar*)str->str + str->length,
-                  column_nr, NULL))
-    return ER_DYNCOL_FORMAT;
+    /*
+      Search in i and j for the next column to add from i and where to
+      add.
+    */
 
-  return (type != DYN_COL_NULL ? ER_DYNCOL_YES : ER_DYNCOL_NO);
-}
+    while (i < add_column_count && plan[i].act == PLAN_NOP)
+      i++;                                    /* skip NOP */
 
+    if (i == add_column_count)
+      j= end= hdr->column_count;
+    else
+    {
+      /*
+        old data portion. We don't need to check that j < column_count
+        as plan[i].place is guaranteed to have a pointer inside the
+        data.
+      */
+      while (hdr->header + j * hdr->entry_size < plan[i].place)
+        j++;
+      end= j;
+      if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
+        j++;                              /* data at 'j' will be removed */
+    }
 
-/**
-  List not-null columns in the packed string
+    /*
+      Adjust all headers since last loop.
+      We have to do this as the offset for data has moved
+    */
+    for (k= start; k < end; k++)
+    {
+      uchar *read= hdr->header + k * hdr->entry_size;
+      void *key;
+      LEX_STRING name;
+      size_t offs;
+      uint nm;
+      DYNAMIC_COLUMN_TYPE tp;
+      char buff[6];
 
-  @param str             The packed string
-  @param array_of_uint   Where to put reference on created array
+      if (hdr->format == DYNCOL_FMT_NUM)
+      {
+        if (convert)
+        {
+          name.str= backwritenum(buff + sizeof(buff), uint2korr(read));
+          name.length= (buff + sizeof(buff)) - name.str;
+          key= &name;
+        }
+        else
+        {
+          nm= uint2korr(read);                    /* Column nummber */
+          key= &nm;
+        }
+      }
+      else
+      {
+        name.length= read[0];
+        name.str= (char *) hdr->nmpool +  uint2korr(read + 1);
+        key= &name;
+      }
+      if (type_and_offset_read(&tp, &offs,
+                               read + fmt->fixed_hdr_entry, hdr->offset_size))
+          goto err;
+      if (k == start)
+        first_offset= offs;
+      else if (offs < first_offset)
+        goto err;
 
-  @return ER_DYNCOL_* return code
-*/
+      offs+= plan[i].ddelta;
+      {
+        DYNAMIC_COLUMN_VALUE val;
+        val.type= tp; // only the type used in the header
+        if ((*new_fmt->put_header_entry)(new_hdr, key, &val, offs))
+          goto err;
+      }
+    }
+
+    /* copy first the data that was not replaced in original packed data */
+    if (start < end)
+    {
+      size_t data_size;
+      /* Add old data last in 'tmp' */
+      hdr->entry= hdr->header + start * hdr->entry_size;
+      data_size=
+        hdr_interval_length(hdr, hdr->header + end * hdr->entry_size);
+      if (data_size == DYNCOL_OFFSET_ERROR ||
+          (long) data_size < 0 ||
+          data_size > hdr->data_size - first_offset)
+        goto err;
+
+      memcpy(tmp.str + tmp.length, (char *)hdr->dtpool + first_offset,
+             data_size);
+      tmp.length+= data_size;
+    }
+
+    /* new data adding */
+    if (i < add_column_count)
+    {
+      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
+      {
+        if ((*new_fmt->put_header_entry)(new_hdr, plan[i].key,
+                                         plan[i].val,
+                                         tmp.length - all_headers_size))
+          goto err;
+        data_store(&tmp, plan[i].val);        /* Append new data */
+      }
+    }
+  }
+  dynamic_column_column_free(str);
+  *str= tmp;
+  return ER_DYNCOL_OK;
+err:
+  dynamic_column_column_free(&tmp);
+  return ER_DYNCOL_FORMAT;
+}
 
 enum enum_dyncol_func_result
-dynamic_column_list(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_uint)
+dynamic_column_update_move_left(DYNAMIC_COLUMN *str, PLAN *plan,
+                                size_t offset_size,
+                                size_t entry_size,
+                                size_t header_size,
+                                size_t new_offset_size,
+                                size_t new_entry_size,
+                                size_t new_header_size,
+                                uint column_count,
+                                uint new_column_count,
+                                uint add_column_count,
+                                uchar *header_end,
+                                size_t max_offset)
 {
-  uchar *read;
-  size_t offset_size, entry_size;
-  uint column_count, i;
+  uchar *write;
+  uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE;
+  uint i, j, k;
+  size_t curr_offset;
+
+  write= (uchar *)str->str + FIXED_HEADER_SIZE;
+  set_fixed_header(str, new_offset_size, new_column_count);
+
+  /*
+    Move headers first.
+    i= index in array of changes
+    j= index in packed string header index
+  */
+  for (curr_offset= 0, i= 0, j= 0;
+       i < add_column_count || j < column_count;
+       i++)
+  {
+    size_t first_offset;
+    uint start= j, end;
+    LINT_INIT(first_offset);
+
+    /*
+      Search in i and j for the next column to add from i and where to
+      add.
+    */
+
+    while (i < add_column_count && plan[i].act == PLAN_NOP)
+      i++;                                    /* skip NOP */
+
+    if (i == add_column_count)
+      j= end= column_count;
+    else
+    {
+      /*
+        old data portion. We don't need to check that j < column_count
+        as plan[i].place is guaranteed to have a pointer inside the
+        data.
+      */
+      while (header_base + j * entry_size < plan[i].place)
+        j++;
+      end= j;
+      if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
+        j++;                              /* data at 'j' will be removed */
+    }
+    plan[i].mv_end= end;
+
+    {
+      DYNAMIC_COLUMN_TYPE tp;
+      if (type_and_offset_read(&tp, &first_offset,
+                               header_base + start * entry_size +
+                               COLUMN_NUMBER_SIZE, offset_size))
+        return ER_DYNCOL_FORMAT;
+    }
+    /* find data to be moved */
+    if (start < end)
+    {
+      size_t data_size=
+        get_length_interval(header_base + start * entry_size,
+                            header_base + end * entry_size,
+                            header_end, offset_size, max_offset);
+      if (data_size == DYNCOL_OFFSET_ERROR ||
+          (long) data_size < 0 ||
+          data_size > max_offset - first_offset)
+      {
+        str->length= 0; // just something valid
+        return ER_DYNCOL_FORMAT;
+      }
+      DBUG_ASSERT(curr_offset == first_offset + plan[i].ddelta);
+      plan[i].mv_offset= first_offset;
+      plan[i].mv_length= data_size;
+      curr_offset+= data_size;
+    }
+    else
+    {
+      plan[i].mv_length= 0;
+      plan[i].mv_offset= curr_offset;
+    }
+
+    if (plan[i].ddelta == 0 && offset_size == new_offset_size &&
+        plan[i].act != PLAN_DELETE)
+      write+= entry_size * (end - start);
+    else
+    {
+      /*
+        Adjust all headers since last loop.
+        We have to do this as the offset for data has moved
+      */
+      for (k= start; k < end; k++)
+      {
+        uchar *read= header_base + k * entry_size;
+        size_t offs;
+        uint nm;
+        DYNAMIC_COLUMN_TYPE tp;
+
+        nm= uint2korr(read);                    /* Column nummber */
+        if (type_and_offset_read(&tp, &offs, read + COLUMN_NUMBER_SIZE,
+                                 offset_size))
+          return ER_DYNCOL_FORMAT;
+
+        if (k > start && offs < first_offset)
+        {
+          str->length= 0; // just something valid
+          return ER_DYNCOL_FORMAT;
+        }
+
+        offs+= plan[i].ddelta;
+        int2store(write, nm);
+        /* write rest of data at write + COLUMN_NUMBER_SIZE */
+        type_and_offset_store(write, new_offset_size, tp, offs);
+        write+= new_entry_size;
+      }
+    }
+
+    /* new data adding */
+    if (i < add_column_count)
+    {
+      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
+      {
+        int2store(write, *((uint *)plan[i].key));
+        type_and_offset_store(write, new_offset_size,
+                              plan[i].val[0].type,
+                              curr_offset);
+        write+= new_entry_size;
+        curr_offset+= plan[i].length;
+      }
+    }
+  }
 
-  bzero(array_of_uint, sizeof(*array_of_uint)); /* In case of errors */
-  if (str->length == 0)
-    return ER_DYNCOL_OK;                        /* no columns */
+  /*
+    Move data.
+    i= index in array of changes
+    j= index in packed string header index
+  */
+  str->length= (FIXED_HEADER_SIZE + new_header_size);
+  for (i= 0, j= 0;
+       i < add_column_count || j < column_count;
+       i++)
+  {
+    uint start= j, end;
 
-  if (read_fixed_header(str, &offset_size, &column_count))
-    return ER_DYNCOL_FORMAT;
+    /*
+      Search in i and j for the next column to add from i and where to
+      add.
+    */
 
-  entry_size= COLUMN_NUMBER_SIZE + offset_size;
+    while (i < add_column_count && plan[i].act == PLAN_NOP)
+      i++;                                    /* skip NOP */
 
-  if (entry_size * column_count + FIXED_HEADER_SIZE > str->length)
-    return ER_DYNCOL_FORMAT;
+    j= end= plan[i].mv_end;
+    if (i != add_column_count &&
+        (plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
+      j++;
 
-  if (init_dynamic_array(array_of_uint, sizeof(uint), column_count, 0))
-    return ER_DYNCOL_RESOURCE;
+    /* copy first the data that was not replaced in original packed data */
+    if (start < end && plan[i].mv_length)
+    {
+      memmove((header_base + new_header_size +
+               plan[i].mv_offset + plan[i].ddelta),
+              header_base + header_size + plan[i].mv_offset,
+              plan[i].mv_length);
+    }
+    str->length+= plan[i].mv_length;
 
-  for (i= 0, read= (uchar *)str->str + FIXED_HEADER_SIZE;
-       i < column_count;
-       i++, read+= entry_size)
-  {
-    uint nm= uint2korr(read);
-    /* Insert can't never fail as it's pre-allocated above */
-    (void) insert_dynamic(array_of_uint, (uchar *)&nm);
+    /* new data adding */
+    if (i < add_column_count)
+    {
+      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
+      {
+        data_store(str, plan[i].val);        /* Append new data */
+      }
+    }
   }
   return ER_DYNCOL_OK;
 }
 
+enum enum_dyncol_func_result
+dynamic_column_update_move_right(DYNAMIC_COLUMN *str, PLAN *plan,
+                                 size_t offset_size,
+                                 size_t entry_size,
+                                 size_t header_size,
+                                 size_t new_offset_size,
+                                 size_t new_entry_size,
+                                 size_t new_header_size,
+                                 uint column_count,
+                                 uint new_column_count,
+                                 uint add_column_count,
+                                 uchar *header_end,
+                                 size_t max_offset)
+{
+  uchar *write;
+  uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE;
+  uint i, j, k;
+  size_t curr_offset;
 
-/**
-  Find the place of the column in the header or place where it should be put
+  write= (uchar *)str->str + FIXED_HEADER_SIZE;
+  set_fixed_header(str, new_offset_size, new_column_count);
 
-  @param num             Number of the column
-  @param header          Pointer to the header
-  @param entry_size      Size of a header entry
-  @param column_count    Number of columns in the packed string
-  @param entry           Return pointer to the entry or next entry
+  /*
+    Move data first.
+    i= index in array of changes
+    j= index in packed string header index
+  */
+  for (curr_offset= 0, i= 0, j= 0;
+       i < add_column_count || j < column_count;
+       i++)
+  {
+    size_t first_offset;
+    uint start= j, end;
+    LINT_INIT(first_offset);
 
-  @retval TRUE found
-  @retval FALSE pointer set to the next row
-*/
+    /*
+      Search in i and j for the next column to add from i and where to
+      add.
+    */
 
-static my_bool
-find_place(uint num, uchar *header, size_t entry_size,
-           uint column_count, uchar **entry)
-{
-  uint mid, start, end, val;
-  int flag;
-  LINT_INIT(flag);                              /* 100 % safe */
+    while (i < add_column_count && plan[i].act == PLAN_NOP)
+      i++;                                    /* skip NOP */
 
-  start= 0;
-  end= column_count -1;
-  mid= 1;
-  while (start != end)
-  {
-   uint val;
-   mid= (start + end) / 2;
-   val= uint2korr(header + mid * entry_size);
-   if ((flag= CMP_NUM(num, val)) <= 0)
-     end= mid;
-   else
-     start= mid + 1;
-  }
-  if (start != mid)
-  {
-    val= uint2korr(header + start * entry_size);
-    flag= CMP_NUM(num, val);
+    if (i == add_column_count)
+      j= end= column_count;
+    else
+    {
+      /*
+        old data portion. We don't need to check that j < column_count
+        as plan[i].place is guaranteed to have a pointer inside the
+        data.
+      */
+      while (header_base + j * entry_size < plan[i].place)
+        j++;
+      end= j;
+      if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
+        j++;                              /* data at 'j' will be removed */
+    }
+    plan[i].mv_end= end;
+
+    {
+      DYNAMIC_COLUMN_TYPE tp;
+      type_and_offset_read(&tp, &first_offset,
+                           header_base + start * entry_size + COLUMN_NUMBER_SIZE, offset_size);
+    }
+    /* find data to be moved */
+    if (start < end)
+    {
+      size_t data_size=
+        get_length_interval(header_base + start * entry_size,
+                            header_base + end * entry_size,
+                            header_end, offset_size, max_offset);
+      if (data_size == DYNCOL_OFFSET_ERROR ||
+          (long) data_size < 0 ||
+          data_size > max_offset - first_offset)
+      {
+        str->length= 0; // just something valid
+        return ER_DYNCOL_FORMAT;
+      }
+      DBUG_ASSERT(curr_offset == first_offset + plan[i].ddelta);
+      plan[i].mv_offset= first_offset;
+      plan[i].mv_length= data_size;
+      curr_offset+= data_size;
+    }
+    else
+    {
+      plan[i].mv_length= 0;
+      plan[i].mv_offset= curr_offset;
+    }
+
+    if (plan[i].ddelta == 0 && offset_size == new_offset_size &&
+        plan[i].act != PLAN_DELETE)
+      write+= entry_size * (end - start);
+    else
+    {
+      /*
+        Adjust all headers since last loop.
+        We have to do this as the offset for data has moved
+      */
+      for (k= start; k < end; k++)
+      {
+        uchar *read= header_base + k * entry_size;
+        size_t offs;
+        uint nm;
+        DYNAMIC_COLUMN_TYPE tp;
+
+        nm= uint2korr(read);                    /* Column nummber */
+        type_and_offset_read(&tp, &offs, read + COLUMN_NUMBER_SIZE, offset_size);
+        if (k > start && offs < first_offset)
+        {
+          str->length= 0; // just something valid
+          return ER_DYNCOL_FORMAT;
+        }
+
+        offs+= plan[i].ddelta;
+        int2store(write, nm);
+        /* write rest of data at write + COLUMN_NUMBER_SIZE */
+        if (type_and_offset_store(write, new_offset_size, tp, offs))
+        {
+          str->length= 0; // just something valid
+          return ER_DYNCOL_FORMAT;
+        }
+        write+= new_entry_size;
+      }
+    }
+
+    /* new data adding */
+    if (i < add_column_count)
+    {
+      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
+      {
+        int2store(write, *((uint *)plan[i].key));
+        if (type_and_offset_store(write, new_offset_size,
+                                  plan[i].val[0].type,
+                                  curr_offset))
+        {
+          str->length= 0; // just something valid
+          return ER_DYNCOL_FORMAT;
+        }
+        write+= new_entry_size;
+        curr_offset+= plan[i].length;
+      }
+    }
   }
-  *entry= header + start * entry_size;
-  if (flag > 0)
-    *entry+= entry_size;        /* Point at next bigger key */
-  return flag == 0;
-}
 
+  /*
+    Move headers.
+    i= index in array of changes
+    j= index in packed string header index
+  */
+  str->length= (FIXED_HEADER_SIZE + new_header_size);
+  for (i= 0, j= 0;
+       i < add_column_count || j < column_count;
+       i++)
+  {
+    uint start= j, end;
 
-/*
-  Description of plan of adding/removing/updating a packed string
-*/
+    /*
+      Search in i and j for the next column to add from i and where to
+      add.
+    */
 
-typedef enum {PLAN_REPLACE, PLAN_ADD, PLAN_DELETE, PLAN_NOP} PLAN_ACT;
+    while (i < add_column_count && plan[i].act == PLAN_NOP)
+      i++;                                    /* skip NOP */
 
-struct st_plan {
-  DYNAMIC_COLUMN_VALUE *val;
-  uint *num;
-  uchar *place;
-  size_t length;
-  int hdelta, ddelta;
-  PLAN_ACT act;
-};
-typedef struct st_plan PLAN;
+    j= end= plan[i].mv_end;
+    if (i != add_column_count &&
+        (plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
+      j++;
 
+    /* copy first the data that was not replaced in original packed data */
+    if (start < end && plan[i].mv_length)
+    {
+      memmove((header_base + new_header_size +
+               plan[i].mv_offset + plan[i].ddelta),
+              header_base + header_size + plan[i].mv_offset,
+              plan[i].mv_length);
+    }
+    str->length+= plan[i].mv_length;
 
-static int plan_sort(const void *a, const void *b)
-{
-  return ((PLAN *)a)->num[0] - ((PLAN *)b)->num[0];
+    /* new data adding */
+    if (i < add_column_count)
+    {
+      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
+      {
+        data_store(str, plan[i].val);        /* Append new data */
+      }
+    }
+  }
+  return ER_DYNCOL_OK;
 }
 
-#define DELTA_CHECK(S, D, C)        \
-  if ((S) == 0)                     \
-    (S)= (D);                       \
-  else if (((S) > 0 && (D) < 0) ||  \
-            ((S) < 0 && (D) > 0))   \
-  {                                 \
-    (C)= TRUE;                      \
-    break;                          \
-  }                                 \
-
 
 /**
   Update the packed string with the given columns
@@ -1728,6 +2882,8 @@ static int plan_sort(const void *a, cons
 
   @return ER_DYNCOL_* return code
 */
+/* plan allocated on the stack */
+#define IN_PLACE_PLAN 4
 
 enum enum_dyncol_func_result
 dynamic_column_update_many(DYNAMIC_COLUMN *str,
@@ -1735,39 +2891,75 @@ dynamic_column_update_many(DYNAMIC_COLUM
                            uint *column_numbers,
                            DYNAMIC_COLUMN_VALUE *values)
 {
-  PLAN *plan;
-  uchar *header_end;
-  long data_delta= 0;
-  uint i, j, k;
-  uint new_column_count, column_count, not_null;
+  return dynamic_column_update_many_fmt(str, add_column_count, column_numbers,
+                                        values, FALSE);
+}
+
+uint numlen(uint val)
+{
+  uint res;
+  if (val == 0)
+    return 1;
+  res= 0;
+  while(val)
+  {
+    res++;
+    val/=10;
+  }
+  return res;
+}
+
+enum enum_dyncol_func_result
+dynamic_column_update_many_fmt(DYNAMIC_COLUMN *str,
+                               uint add_column_count,
+                               void *column_keys,
+                               DYNAMIC_COLUMN_VALUE *values,
+                               my_bool string_keys)
+{
+  PLAN *plan, *alloc_plan= NULL, in_place_plan[IN_PLACE_PLAN];
+  uchar *element;
+  DYN_HEADER header, new_header;
+  struct st_service_funcs *fmt, *new_fmt;
+  long data_delta= 0, name_delta= 0;
+  uint i;
+  uint not_null;
+  int header_delta= 0;
+  int copy= FALSE;
+  int header_delta_sign, data_delta_sign;
   enum enum_dyncol_func_result rc;
-  int header_delta;
-  size_t offset_size, entry_size, header_size, data_size;
-  size_t new_offset_size, new_entry_size, new_header_size, new_data_size;
-  size_t max_offset;
+  my_bool convert;
 
   if (add_column_count == 0)
     return ER_DYNCOL_OK;
 
+  bzero(&header, sizeof(header));
+  bzero(&new_header, sizeof(new_header));
+  new_header.format= (string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM);
+  new_fmt= fmt_data + new_header.format;
+
   /*
     Get columns in column order. As the data in 'str' is already
     in column order this allows to replace all columns in one loop.
   */
-
-  if (!(plan= my_malloc(sizeof(PLAN) * (add_column_count + 1), MYF(0))))
+  if (IN_PLACE_PLAN > add_column_count)
+    plan= in_place_plan;
+  else if (!(alloc_plan= plan=
+             my_malloc(sizeof(PLAN) * (add_column_count + 1), MYF(0))))
     return ER_DYNCOL_RESOURCE;
 
   not_null= add_column_count;
-  for (i= 0; i < add_column_count; i++)
+  for (i= 0, element= (uchar *) column_keys;
+       i < add_column_count;
+       i++, element+= new_fmt->key_size_in_array)
   {
-    if (column_numbers[i] > UINT_MAX16)
+    if ((*new_fmt->check_limit)(&element))
     {
       rc= ER_DYNCOL_DATA;
       goto end;
     }
 
     plan[i].val= values + i;
-    plan[i].num= column_numbers + i;
+    plan[i].key= element;
     if (values[i].type == DYN_COL_NULL)
       not_null--;
 
@@ -1783,22 +2975,32 @@ dynamic_column_update_many(DYNAMIC_COLUM
   }
 
   /* Check that header is ok */
-  if (read_fixed_header(str, &offset_size, &column_count))
-  {
-    rc= ER_DYNCOL_FORMAT;
+  if ((rc= init_read_hdr(&header, str)) < 0)
     goto end;
-  }
-  if (column_count == 0)
+  fmt= fmt_data + header.format;
+  /* new format can't be numeric if the old one is names */
+  DBUG_ASSERT(new_header.format == DYNCOL_FMT_STR ||
+              header.format == DYNCOL_FMT_NUM);
+  if (header.column_count == 0)
     goto create_new_string;
 
-  qsort(plan, (size_t)add_column_count, sizeof(PLAN), &plan_sort);
+  qsort(plan, (size_t)add_column_count, sizeof(PLAN), new_fmt->plan_sort);
 
-  new_column_count= column_count;
-  calc_param(&entry_size, &header_size, offset_size, column_count);
-  max_offset= str->length - (FIXED_HEADER_SIZE + header_size);
-  header_end= (uchar*) str->str + FIXED_HEADER_SIZE + header_size;
+  new_header.column_count= header.column_count;
+  new_header.nmpool_size= header.nmpool_size;
+  if ((convert= (new_header.format == DYNCOL_FMT_STR &&
+                 header.format == DYNCOL_FMT_NUM)))
+  {
+    DBUG_ASSERT(new_header.nmpool_size == 0);
+    for(i= 0, header.entry= header.header;
+        i < header.column_count;
+        i++, header.entry+= header.entry_size)
+    {
+      new_header.nmpool_size+= numlen(uint2korr(header.entry));
+    }
+  }
 
-  if (header_size + FIXED_HEADER_SIZE > str->length)
+  if (fmt->fixed_hdr + header.header_size + header.nmpool_size > str->length)
   {
     rc= ER_DYNCOL_FORMAT;
     goto end;
@@ -1808,17 +3010,15 @@ dynamic_column_update_many(DYNAMIC_COLUM
     Calculate how many columns and data is added/deleted and make a 'plan'
     for each of them.
   */
-  header_delta= 0;
   for (i= 0; i < add_column_count; i++)
   {
-    uchar *entry;
-
     /*
       For now we don't allow creating two columns with the same number
       at the time of create.  This can be fixed later to just use the later
       by comparing the pointers.
     */
-    if (i < add_column_count - 1 && plan[i].num[0] == plan[i + 1].num[0])
+    if (i < add_column_count - 1 &&
+        new_fmt->column_sort(&plan[i].key, &plan[i + 1].key) == 0)
     {
       rc= ER_DYNCOL_DATA;
       goto end;
@@ -1826,26 +3026,36 @@ dynamic_column_update_many(DYNAMIC_COLUM
 
     /* Set common variables for all plans */
     plan[i].ddelta= data_delta;
+    plan[i].ndelta= name_delta;
     /* get header delta in entries */
     plan[i].hdelta= header_delta;
     plan[i].length= 0;                          /* Length if NULL */
 
-    if (find_place(plan[i].num[0],
-                   (uchar *)str->str + FIXED_HEADER_SIZE,
-                   entry_size, column_count, &entry))
+    if (find_place(&header, plan[i].key, string_keys))
     {
-      size_t entry_data_size;
+      size_t entry_data_size, entry_name_size= 0;
 
       /* Data existed; We have to replace or delete it */
 
-      entry_data_size= get_length(entry, header_end,
-                                  offset_size, max_offset);
-      if ((long) entry_data_size < 0)
+      entry_data_size= hdr_interval_length(&header, header.entry +
+                                           header.entry_size);
+      if (entry_data_size == DYNCOL_OFFSET_ERROR ||
+          (long) entry_data_size < 0)
       {
         rc= ER_DYNCOL_FORMAT;
         goto end;
       }
 
+        //get_length(header.entry, header.dtpool, header.offset_size,
+        //header.data_size);
+      if (new_header.format == DYNCOL_FMT_STR)
+      {
+        if (header.format == DYNCOL_FMT_STR)
+          entry_name_size= header.entry[0];
+        else
+          entry_name_size= numlen(uint2korr(header.entry));
+      }
+
       if (plan[i].val->type == DYN_COL_NULL)
       {
         /* Inserting a NULL means delete the old data */
@@ -1853,6 +3063,7 @@ dynamic_column_update_many(DYNAMIC_COLUM
         plan[i].act= PLAN_DELETE;	        /* Remove old value */
         header_delta--;                         /* One row less in header */
         data_delta-= entry_data_size;           /* Less data to store */
+        name_delta-= entry_name_size;
       }
       else
       {
@@ -1867,6 +3078,10 @@ dynamic_column_update_many(DYNAMIC_COLUM
           goto end;
         }
         data_delta+= plan[i].length - entry_data_size;
+        if (new_header.format == DYNCOL_FMT_STR)
+        {
+          name_delta+= ((LEX_STRING *)(plan[i].key))->length - entry_name_size;
+        }
       }
     }
     else
@@ -1891,202 +3106,105 @@ dynamic_column_update_many(DYNAMIC_COLUM
           goto end;
         }
         data_delta+= plan[i].length;
+        if (new_header.format == DYNCOL_FMT_STR)
+          name_delta+= ((LEX_STRING *)plan[i].key)->length;
       }
     }
-    plan[i].place= entry;
+    plan[i].place= header.entry;
   }
   plan[add_column_count].hdelta= header_delta;
   plan[add_column_count].ddelta= data_delta;
-  new_column_count= column_count + header_delta;
+  plan[add_column_count].act= PLAN_NOP;
+  plan[add_column_count].place= header.dtpool;
+
+  new_header.column_count= header.column_count + header_delta;
 
   /*
     Check if it is only "increasing" or only "decreasing" plan for (header
     and data separately).
   */
-  data_size= str->length - header_size - FIXED_HEADER_SIZE;
-  new_data_size= data_size + data_delta;
-  if ((new_offset_size= dynamic_column_offset_bytes(new_data_size)) >=
+  new_header.data_size= header.data_size + data_delta;
+  new_header.nmpool_size= new_header.nmpool_size + name_delta;
+  DBUG_ASSERT(new_header.format != DYNCOL_FMT_NUM ||
+              new_header.nmpool_size == 0);
+  if ((new_header.offset_size=
+       dynamic_column_offset_bytes(new_header.data_size)) >=
       MAX_OFFSET_LENGTH)
   {
     rc= ER_DYNCOL_LIMIT;
     goto end;
   }
 
-#ifdef NOT_IMPLEMENTED
-  /* if (new_offset_size != offset_size) then we have to rewrite header */
-  header_delta_sign= new_offset_size - offset_size;
+  copy= ((header.format != new_header.format) ||
+         (new_header.format == DYNCOL_FMT_STR));
+  /* if (new_header.offset_size!=offset_size) then we have to rewrite header */
+  header_delta_sign=
+    ((int)new_header.offset_size + new_fmt->fixed_hdr_entry) -
+    ((int)header.offset_size + fmt->fixed_hdr_entry);
   data_delta_sign= 0;
-  for (i= 0; i < add_column_count; i++)
+  // plan[add_column_count] contains last deltas.
+  for (i= 0; i <= add_column_count && !copy; i++)
   {
     /* This is the check for increasing/decreasing */
     DELTA_CHECK(header_delta_sign, plan[i].hdelta, copy);
     DELTA_CHECK(data_delta_sign, plan[i].ddelta, copy);
   }
-#endif
-  calc_param(&new_entry_size, &new_header_size,
-             new_offset_size, new_column_count);
+  calc_param(&new_header.entry_size, &new_header.header_size,
+             new_fmt->fixed_hdr_entry,
+             new_header.offset_size, new_header.column_count);
 
   /*
-    The following code always make a copy. In future we can do a more
-    optimized version when data is only increasing / decreasing.
+    Need copy because:
+    1. Header/data parts moved in different directions.
+    2. There is no enough allocated space in the string.
+    3. Header and data moved in different directions.
   */
-
-  /*if (copy) */
-  {
-    DYNAMIC_COLUMN tmp;
-    uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE,
-          *write;
-    if (dynamic_column_init_str(&tmp,
-                                (FIXED_HEADER_SIZE + new_header_size +
-                                 new_data_size + DYNCOL_SYZERESERVE)))
-    {
-      rc= ER_DYNCOL_RESOURCE;
-      goto end;
-    }
-    write= (uchar *)tmp.str + FIXED_HEADER_SIZE;
-    /* Adjust tmp to contain whole the future header */
-    tmp.length= FIXED_HEADER_SIZE + new_header_size;
-    set_fixed_header(&tmp, new_offset_size, new_column_count);
-    data_delta= 0;
-
-    /*
-      Copy data to the new string
-      i= index in array of changes
-      j= index in packed string header index
-    */
-
-    for (i= 0, j= 0; i < add_column_count || j < column_count; i++)
-    {
-      size_t first_offset;
-      uint start= j, end;
-      LINT_INIT(first_offset);
-
+  if (copy || /*1*/
+      str->max_length < str->length + header_delta + data_delta || /*2*/
+      ((header_delta_sign < 0 && data_delta_sign > 0) ||
+       (header_delta_sign > 0 && data_delta_sign < 0))) /*3*/
+    rc= dynamic_column_update_copy(str, plan, add_column_count,
+                                   &header, &new_header,
+                                   convert);
+  else
+    if (header_delta_sign < 0)
+      rc= dynamic_column_update_move_left(str, plan, header.offset_size,
+                                          header.entry_size,
+                                          header.header_size,
+                                          new_header.offset_size,
+                                          new_header.entry_size,
+                                          new_header.header_size,
+                                          header.column_count,
+                                          new_header.column_count,
+                                          add_column_count, header.dtpool,
+                                          header.data_size);
+    else
       /*
-        Search in i and j for the next column to add from i and where to
-        add.
-      */
-
-      while (i < add_column_count && plan[i].act == PLAN_NOP)
-        i++;                                    /* skip NOP */
-      if (i == add_column_count)
-        j= end= column_count;
-      else
-      {
-        /*
-          old data portion. We don't need to check that j < column_count
-          as plan[i].place is guaranteed to have a pointer inside the
-          data.
-        */
-        while (header_base + j * entry_size < plan[i].place)
-          j++;
-        end= j;
-        if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
-          j++;                              /* data at 'j' will be removed */
-      }
-
-      if (plan[i].ddelta == 0 && offset_size == new_offset_size)
-      {
-        uchar *read= header_base + start * entry_size;
-        DYNAMIC_COLUMN_TYPE tp;
-        /*
-          It's safe to copy the header unchanged. This is usually the
-          case for the first header block before any changed data.
-        */
-        if (start < end)                        /* Avoid memcpy with 0 */
-        {
-          size_t length= entry_size * (end - start);
-          memcpy(write, read, length);
-          write+= length;
-        }
-        /* Read first_offset */
-        type_and_offset_read(&tp, &first_offset, read, offset_size);
-      }
-      else
-      {
-        /*
-          Adjust all headers since last loop.
-          We have to do this as the offset for data has moved
-        */
-        for (k= start; k < end; k++)
-        {
-          uchar *read= header_base + k * entry_size;
-          size_t offs;
-          uint nm;
-          DYNAMIC_COLUMN_TYPE tp;
-
-          nm= uint2korr(read);                    /* Column nummber */
-          type_and_offset_read(&tp, &offs, read, offset_size);
-          if (k == start)
-            first_offset= offs;
-          else if (offs < first_offset)
-          {
-            dynamic_column_column_free(&tmp);
-            rc= ER_DYNCOL_FORMAT;
-            goto end;
-          }
-
-          offs+= plan[i].ddelta;
-          int2store(write, nm);
-          /* write rest of data at write + COLUMN_NUMBER_SIZE */
-          type_and_offset_store(write, new_offset_size, tp, offs);
-          write+= new_entry_size;
-        }
-      }
-
-      /* copy first the data that was not replaced in original packed data */
-      if (start < end)
-      {
-        /* Add old data last in 'tmp' */
-        size_t data_size=
-          get_length_interval(header_base + start * entry_size,
-                              header_base + end * entry_size,
-                              header_end, offset_size, max_offset);
-        if ((long) data_size < 0 ||
-            data_size > max_offset - first_offset)
-        {
-          dynamic_column_column_free(&tmp);
-          rc= ER_DYNCOL_FORMAT;
-          goto end;
-        }
-
-        memcpy(tmp.str + tmp.length, (char *)header_end + first_offset,
-               data_size);
-        tmp.length+= data_size;
-      }
-
-      /* new data adding */
-      if (i < add_column_count)
-      {
-        if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
-        {
-          int2store(write, plan[i].num[0]);
-          type_and_offset_store(write, new_offset_size,
-                                plan[i].val[0].type,
-                                tmp.length -
-                                (FIXED_HEADER_SIZE + new_header_size));
-          write+= new_entry_size;
-          data_store(&tmp, plan[i].val);        /* Append new data */
-        }
-        data_delta= plan[i].ddelta;
-      }
-    }
-    dynamic_column_column_free(str);
-    *str= tmp;
-  }
-
-  rc= ER_DYNCOL_OK;
-
+      rc= dynamic_column_update_move_right(str, plan, offset_size,
+                                           entry_size,  header_size,
+                                           new_header.offset_size,
+                                           new_header.entry_size,
+                                           new_heder.header_size, column_count,
+                                           new_header.column_count,
+                                           add_column_count, header_end,
+                                           header.data_size);
+                                         */
+      rc= dynamic_column_update_copy(str, plan, add_column_count,
+                                     &header, &new_header,
+                                     convert);
 end:
-  my_free(plan);
+  my_free(alloc_plan);
   return rc;
 
 create_new_string:
   /* There is no columns from before, so let's just add the new ones */
   rc= ER_DYNCOL_OK;
+  my_free(alloc_plan);
   if (not_null != 0)
-    rc= dynamic_column_create_many_internal(str, add_column_count,
-                                            column_numbers, values,
-                                            str->str == NULL);
+    rc= dynamic_column_create_many_internal_fmt(str, add_column_count,
+                                                (uint*)column_keys, values,
+                                                str->str == NULL,
+                                                string_keys);
   goto end;
 }
 
@@ -2107,3 +3225,199 @@ int dynamic_column_update(DYNAMIC_COLUMN
 {
   return dynamic_column_update_many(str, 1, &column_nr, value);
 }
+
+
+enum enum_dyncol_func_result
+dynamic_column_check(DYNAMIC_COLUMN *str)
+{
+  struct st_service_funcs *fmt;
+  enum enum_dyncol_func_result rc= ER_DYNCOL_FORMAT;
+  DYN_HEADER header;
+  uint i;
+  size_t data_offset= 0, name_offset= 0;
+  size_t prev_data_offset= 0, prev_name_offset= 0;
+  LEX_STRING name= {0,0}, prev_name= {0,0};
+  uint num= 0, prev_num= 0;
+  void *key, *prev_key;
+  enum enum_dynamic_column_type type= DYN_COL_NULL, prev_type= DYN_COL_NULL;
+
+  DBUG_ENTER("dynamic_column_check");
+
+  if (str->length == 0)
+  {
+    DBUG_PRINT("info", ("empty string is OK"));
+    DBUG_RETURN(ER_DYNCOL_OK);
+  }
+
+  bzero(&header, sizeof(header));
+
+  /* Check that header is OK */
+  if (read_fixed_header(&header, str))
+  {
+    DBUG_PRINT("info", ("Reading fixed string header failed"));
+    goto end;
+  }
+  fmt= fmt_data + header.format;
+  calc_param(&header.entry_size, &header.header_size,
+             fmt->fixed_hdr_entry, header.offset_size,
+             header.column_count);
+  /* headers are out of string length (no space for data and part of headers) */
+  if (fmt->fixed_hdr + header.header_size + header.nmpool_size > str->length)
+  {
+    DBUG_PRINT("info", ("Fixed header: %u  Header size: %u  "
+                        "Name pool size: %u  but Strig length: %u",
+                        (uint)fmt->fixed_hdr,
+                        (uint)header.header_size,
+                        (uint)header.nmpool_size,
+                        (uint)str->length));
+    goto end;
+  }
+  header.header= (uchar*)str->str + fmt->fixed_hdr;
+  header.nmpool= header.header + header.header_size;
+  header.dtpool= header.nmpool + header.nmpool_size;
+  header.data_size= str->length - fmt->fixed_hdr -
+    header.header_size - header.nmpool_size;
+
+  /* read and check headers */
+  if (header.format == DYNCOL_FMT_NUM)
+  {
+    key= &num;
+    prev_key= &prev_num;
+  }
+  else
+  {
+    key= &name;
+    prev_key= &prev_name;
+  }
+  for (i= 0, header.entry= header.dtpool;
+       i < header.column_count;
+       i++, header.entry+= header.entry_size)
+  {
+
+    if (header.format == DYNCOL_FMT_NUM)
+    {
+       num= uint2korr(header.entry);
+    }
+    else
+    {
+      DBUG_ASSERT(header.format == DYNCOL_FMT_STR);
+      name.length= header.entry[0];
+      name_offset= uint2korr(header.entry + 1);
+      name.str= (char *)header.nmpool + name_offset;
+    }
+    if (type_and_offset_read(&type, &data_offset,
+                             header.entry + fmt->fixed_hdr_entry,
+                             header.offset_size))
+      goto end;
+
+    DBUG_ASSERT(type != DYN_COL_NULL);
+    if (data_offset > header.data_size)
+    {
+      DBUG_PRINT("info", ("Field order: %u  Data offset: %u"
+                          " > Data pool size: %u",
+                          (uint)i,
+                          (uint)name_offset,
+                          (uint)header.nmpool_size));
+      goto end;
+    }
+    if (name_offset > header.nmpool_size)
+    {
+      DBUG_PRINT("info", ("Field order: %u  Name offset: %u"
+                          " > Name pool size: %u",
+                          (uint)i,
+                          (uint)name_offset,
+                          (uint)header.nmpool_size));
+      goto end;
+    }
+    if (prev_type != DYN_COL_NULL)
+    {
+      /* It is not first entry */
+      if (prev_data_offset >= data_offset)
+      {
+        DBUG_PRINT("info", ("Field order: %u  Previous data offset: %u"
+                            " >= Current data offset: %u",
+                            (uint)i,
+                            (uint)prev_data_offset,
+                            (uint)data_offset));
+        goto end;
+      }
+      if (prev_name_offset > name_offset)
+      {
+        DBUG_PRINT("info", ("Field order: %u  Previous name offset: %u"
+                            " > Current name offset: %u",
+                            (uint)i,
+                            (uint)prev_data_offset,
+                            (uint)data_offset));
+        goto end;
+      }
+      if ((*fmt->column_sort)(&prev_key, &key) >= 0)
+      {
+        DBUG_PRINT("info", ("Field order: %u  Previous key >= Current key",
+                            (uint)i));
+        goto end;
+      }
+    }
+    prev_num= num;
+    prev_name= name;
+    prev_data_offset= data_offset;
+    prev_name_offset= name_offset;
+    prev_type= type;
+  }
+
+  /* check data, which we can */
+  for (i= 0, header.entry= header.dtpool;
+       i < header.column_count;
+       i++, header.entry+= header.entry_size)
+  {
+    DYNAMIC_COLUMN_VALUE store;
+    // already checked by previouse pass
+    type_and_offset_read(&header.type, &header.offset,
+                         header.entry + fmt->fixed_hdr_entry,
+                         header.offset_size);
+    header.length=
+      hdr_interval_length(&header, header.entry + header.entry_size);
+    header.data= header.dtpool + header.offset;
+    switch ((header.type)) {
+    case DYN_COL_INT:
+      rc= dynamic_column_sint_read(&store, header.data, header.length);
+      break;
+    case DYN_COL_UINT:
+      rc= dynamic_column_uint_read(&store, header.data, header.length);
+      break;
+    case DYN_COL_DOUBLE:
+      rc= dynamic_column_double_read(&store, header.data, header.length);
+      break;
+    case DYN_COL_STRING:
+      rc= dynamic_column_string_read(&store, header.data, header.length);
+      break;
+    case DYN_COL_DECIMAL:
+      rc= dynamic_column_decimal_read(&store, header.data, header.length);
+      break;
+    case DYN_COL_DATETIME:
+      rc= dynamic_column_date_time_read(&store, header.data,
+                                        header.length);
+      break;
+    case DYN_COL_DATE:
+      rc= dynamic_column_date_read(&store, header.data, header.length);
+      break;
+    case DYN_COL_TIME:
+      rc= dynamic_column_time_read(&store, header.data, header.length);
+      break;
+    case DYN_COL_NULL:
+    default:
+      rc= ER_DYNCOL_FORMAT;
+      goto end;
+    }
+    if (rc != ER_DYNCOL_OK)
+    {
+      DBUG_ASSERT(rc < 0);
+      DBUG_PRINT("info", ("Field order: %u  Can't read data: %i",
+                          (uint)i, (int) rc));
+      goto end;
+    }
+  }
+
+  rc= ER_DYNCOL_OK;
+end:
+  DBUG_RETURN(rc);
+}

=== modified file 'sql/item.h'
--- a/sql/item.h	2012-06-06 19:26:40 +0000
+++ b/sql/item.h	2012-09-20 15:10:50 +0000
@@ -523,7 +523,7 @@ public:
 
 struct st_dyncall_create_def
 {
-  Item  *num, *value;
+  Item  *key, *value;
   CHARSET_INFO *cs;
   uint len, frac;
   DYNAMIC_COLUMN_TYPE type;

=== modified file 'sql/item_cmpfunc.cc'
--- a/sql/item_cmpfunc.cc	2012-08-09 15:22:00 +0000
+++ b/sql/item_cmpfunc.cc	2012-09-20 15:10:50 +0000
@@ -6048,24 +6048,87 @@ Item* Item_equal::get_first(JOIN_TAB *co
   return NULL;
 }
 
-
-longlong Item_func_dyncol_exists::val_int()
+longlong Item_func_dyncol_check::val_int()
 {
   char buff[STRING_BUFFER_USUAL_SIZE];
   String tmp(buff, sizeof(buff), &my_charset_bin);
   DYNAMIC_COLUMN col;
   String *str;
-  ulonglong num;
   enum enum_dyncol_func_result rc;
 
-  num= args[1]->val_int();
+  str= args[0]->val_str(&tmp);
+  if (args[0]->null_value || args[1]->null_value)
+    goto null;
+  col.length= str->length();
+  /* We do not change the string, so could do this trick */
+  col.str= (char *)str->ptr();
+  rc= dynamic_column_check(&col);
+  if (rc < 0)
+  {
+    dynamic_column_error_message(rc);
+    goto null;
+  }
+  null_value= FALSE;
+  return rc == ER_DYNCOL_YES;
+
+null:
+  null_value= TRUE;
+  return 0;
+}
+
+longlong Item_func_dyncol_exists::val_int()
+{
+  char buff[STRING_BUFFER_USUAL_SIZE], nmstrbuf[11];
+  String tmp(buff, sizeof(buff), &my_charset_bin),
+         nmbuf(nmstrbuf, sizeof(nmstrbuf), system_charset_info);
+  DYNAMIC_COLUMN col;
+  String *str;
+  LEX_STRING buf, *name= NULL;
+  ulonglong num= 0;
+  enum enum_dyncol_func_result rc;
+
+  if (args[1]->result_type() == INT_RESULT)
+    num= args[1]->val_int();
+  else
+  {
+    String *nm= args[1]->val_str(&nmbuf);
+    if (!nm || args[1]->null_value)
+    {
+      null_value= 1;
+      return 1;
+    }
+    if (my_charset_same(nm->charset(), &my_charset_utf8_general_ci))
+    {
+      buf.str= (char *) nm->ptr();
+      buf.length= nm->length();
+    }
+    else
+    {
+      uint strlen;
+      uint dummy_errors;
+      buf.str= (char *)sql_alloc((strlen= nm->length() *
+                                     my_charset_utf8_general_ci.mbmaxlen + 1));
+      if (buf.str)
+      {
+        buf.length=
+          copy_and_convert(buf.str, strlen, &my_charset_utf8_general_ci,
+                           nm->ptr(), nm->length(), nm->charset(),
+                           &dummy_errors);
+      }
+      else
+        buf.length= 0;
+    }
+    name= &buf;
+  }
   str= args[0]->val_str(&tmp);
   if (args[0]->null_value || args[1]->null_value || num > UINT_MAX16)
     goto null;
   col.length= str->length();
   /* We do not change the string, so could do this trick */
   col.str= (char *)str->ptr();
-  rc= dynamic_column_exists(&col, (uint) num);
+  rc= ((name == NULL) ?
+       dynamic_column_exists(&col, (uint) num) :
+       dynamic_column_exists_str(&col, name));
   if (rc < 0)
   {
     dynamic_column_error_message(rc);

=== modified file 'sql/item_cmpfunc.h'
--- a/sql/item_cmpfunc.h	2012-06-20 11:01:28 +0000
+++ b/sql/item_cmpfunc.h	2012-09-20 15:10:50 +0000
@@ -1861,6 +1861,14 @@ public:
   Item *neg_transformer(THD *thd);
 };
 
+class Item_func_dyncol_check :public Item_bool_func
+{
+public:
+  Item_func_dyncol_check(Item *str) :Item_bool_func(str) {}
+  longlong val_int();
+  const char *func_name() const { return "column_check"; }
+};
+
 class Item_func_dyncol_exists :public Item_bool_func
 {
 public:

=== modified file 'sql/item_create.cc'
--- a/sql/item_create.cc	2012-01-13 14:50:02 +0000
+++ b/sql/item_create.cc	2012-09-20 15:10:50 +0000
@@ -5704,7 +5704,7 @@ static List<Item> *create_func_dyncol_pr
   for (uint i= 0; (def= li++) ;)
   {
     dfs[0][i++]= *def;
-    args->push_back(def->num);
+    args->push_back(def->key);
     args->push_back(def->value);
   }
   return args;
@@ -5740,7 +5740,7 @@ Item *create_func_dyncol_add(THD *thd, I
 Item *create_func_dyncol_delete(THD *thd, Item *str, List<Item> &nums)
 {
   DYNCALL_CREATE_DEF *dfs;
-  Item *num;
+  Item *key;
   List_iterator_fast<Item> it(nums);
   List<Item> *args= new (thd->mem_root) List<Item>;
 
@@ -5750,12 +5750,12 @@ Item *create_func_dyncol_delete(THD *thd
   if (!args || !dfs)
     return NULL;
 
-  for (uint i= 0; (num= it++); i++)
+  for (uint i= 0; (key= it++); i++)
   {
-    dfs[i].num= num;
+    dfs[i].key= key;
     dfs[i].value= new Item_null();
     dfs[i].type= DYN_COL_INT;
-    args->push_back(dfs[i].num);
+    args->push_back(dfs[i].key);
     args->push_back(dfs[i].value);
   }
 

=== modified file 'sql/item_strfunc.cc'
--- a/sql/item_strfunc.cc	2012-04-07 13:58:46 +0000
+++ b/sql/item_strfunc.cc	2012-09-20 15:10:50 +0000
@@ -3760,7 +3760,8 @@ String *Item_func_uuid::val_str(String *
 
 Item_func_dyncol_create::Item_func_dyncol_create(List<Item> &args,
                                                  DYNCALL_CREATE_DEF *dfs)
-  : Item_str_func(args), defs(dfs), vals(0), nums(0)
+  : Item_str_func(args), defs(dfs), vals(0), keys(NULL), names(FALSE),
+  force_names(FALSE)
 {
   DBUG_ASSERT((args.elements & 0x1) == 0); // even number of arguments
 }
@@ -3768,13 +3769,26 @@ Item_func_dyncol_create::Item_func_dynco
 
 bool Item_func_dyncol_create::fix_fields(THD *thd, Item **ref)
 {
+  uint i;
   bool res= Item_func::fix_fields(thd, ref); // no need Item_str_func here
-  vals= (DYNAMIC_COLUMN_VALUE *) alloc_root(thd->mem_root,
-                                            sizeof(DYNAMIC_COLUMN_VALUE) *
-                                            (arg_count / 2));
-  nums= (uint *) alloc_root(thd->mem_root,
-                            sizeof(uint) * (arg_count / 2));
-  return res || vals == 0 || nums == 0;
+  if (!res)
+  {
+    vals= (DYNAMIC_COLUMN_VALUE *) alloc_root(thd->mem_root,
+                                              sizeof(DYNAMIC_COLUMN_VALUE) *
+                                              (arg_count / 2));
+    for (i= 0; i + 1 < arg_count && args[i]->result_type() == INT_RESULT; i+= 2);
+    if (i + 1 < arg_count)
+    {
+      names= TRUE;
+    }
+
+    keys= (uchar *) alloc_root(thd->mem_root,
+                               (sizeof(LEX_STRING) > sizeof(uint) ?
+                                sizeof(LEX_STRING) :
+                                sizeof(uint)) *
+                               (arg_count / 2));
+  }
+  return res || vals == 0 || keys == 0;
 }
 
 
@@ -3785,13 +3799,14 @@ void Item_func_dyncol_create::fix_length
   decimals= 0;
 }
 
-void Item_func_dyncol_create::prepare_arguments()
+bool Item_func_dyncol_create::prepare_arguments(bool force_names_arg)
 {
   char buff[STRING_BUFFER_USUAL_SIZE];
   String *res, tmp(buff, sizeof(buff), &my_charset_bin);
   uint column_count= (arg_count / 2);
   uint i;
   my_decimal dtmp, *dres;
+  force_names= force_names_arg;
 
   /* get values */
   for (i= 0; i < column_count; i++)
@@ -3850,7 +3865,54 @@ void Item_func_dyncol_create::prepare_ar
         break;
       }
     }
-    nums[i]= (uint) args[i * 2]->val_int();
+    if (names || force_names)
+    {
+      res= args[i * 2]->val_str(&tmp);
+      if (res)
+      {
+        // guaranty UTF-8 string for names
+        if (my_charset_same(res->charset(), &my_charset_utf8_general_ci))
+        {
+          ((LEX_STRING *)keys)[i].length= res->length();
+          if (!(((LEX_STRING *)keys)[i].str=
+                (char *)sql_memdup(res->ptr(), res->length())))
+            ((LEX_STRING *)keys)[i].length= 0;
+        }
+        else
+        {
+          uint strlen;
+          uint dummy_errors;
+          char *str=
+            (char *)sql_alloc((strlen= res->length() *
+                               my_charset_utf8_general_ci.mbmaxlen + 1));
+          if (str)
+          {
+            ((LEX_STRING *)keys)[i].length=
+              copy_and_convert(str, strlen, &my_charset_utf8_general_ci,
+                               res->ptr(), res->length(), res->charset(),
+                               &dummy_errors);
+              ((LEX_STRING *)keys)[i].str= str;
+          }
+          else
+            ((LEX_STRING *)keys)[i].length= 0;
+
+        }
+      }
+      else
+      {
+        ((LEX_STRING *)keys)[i].length= 0;
+        ((LEX_STRING *)keys)[i].str= NULL;
+      }
+    }
+    else
+      ((uint *)keys)[i]= (uint) args[i * 2]->val_int();
+    if (args[i * 2]->null_value)
+    {
+      /* to make cleanup possible */
+      for (; i < column_count; i++)
+        vals[i].type= DYN_COL_NULL;
+      return 1;
+    }
     vals[i].type= type;
     switch (type) {
     case DYN_COL_NULL:
@@ -3918,6 +3980,7 @@ void Item_func_dyncol_create::prepare_ar
       vals[i].type= DYN_COL_NULL;
     }
   }
+  return FALSE;
 }
 
 void Item_func_dyncol_create::cleanup_arguments()
@@ -3930,6 +3993,7 @@ void Item_func_dyncol_create::cleanup_ar
     if (vals[i].type == DYN_COL_STRING)
       my_free(vals[i].x.string.value.str);
   }
+  force_names= FALSE;
 }
 
 String *Item_func_dyncol_create::val_str(String *str)
@@ -3940,25 +4004,32 @@ String *Item_func_dyncol_create::val_str
   enum enum_dyncol_func_result rc;
   DBUG_ASSERT((arg_count & 0x1) == 0); // even number of arguments
 
-  prepare_arguments();
-
-  if ((rc= dynamic_column_create_many(&col, column_count, nums, vals)))
+  if (prepare_arguments(FALSE))
   {
-    dynamic_column_error_message(rc);
-    dynamic_column_column_free(&col);
     res= NULL;
-    null_value= TRUE;
+    null_value= 1;
   }
   else
   {
-    /* Move result from DYNAMIC_COLUMN to str_value */
-    char *ptr;
-    size_t length, alloc_length;
-    dynamic_column_reassociate(&col, &ptr, &length, &alloc_length);
-    str_value.reassociate(ptr, (uint32) length, (uint32) alloc_length,
-                          &my_charset_bin);
-    res= &str_value;
-    null_value= FALSE;
+    if ((rc= dynamic_column_create_many_fmt(&col, column_count, keys,
+                                            vals, names || force_names)))
+    {
+      dynamic_column_error_message(rc);
+      dynamic_column_column_free(&col);
+      res= NULL;
+      null_value= TRUE;
+    }
+    else
+    {
+      /* Move result from DYNAMIC_COLUMN to str_value */
+      char *ptr;
+      size_t length, alloc_length;
+      dynamic_column_reassociate(&col, &ptr, &length, &alloc_length);
+      str_value.reassociate(ptr, (uint32) length, (uint32) alloc_length,
+                            &my_charset_bin);
+      res= &str_value;
+      null_value= FALSE;
+    }
   }
 
   /* cleanup */
@@ -4045,9 +4116,11 @@ String *Item_func_dyncol_add::val_str(St
   col.length= res->length();
   memcpy(col.str, res->ptr(), col.length);
 
-  prepare_arguments();
+  if (prepare_arguments(dynamic_column_has_names(&col)))
+    goto null;
 
-  if ((rc= dynamic_column_update_many(&col, column_count, nums, vals)))
+  if ((rc= dynamic_column_update_many_fmt(&col, column_count, keys,
+                                          vals, names || force_names)))
   {
     dynamic_column_error_message(rc);
     dynamic_column_column_free(&col);
@@ -4100,10 +4173,48 @@ bool Item_dyncol_get::get_dyn_value(DYNA
 {
   DYNAMIC_COLUMN dyn_str;
   String *res;
-  longlong num;
+  longlong num= 0;
+  LEX_STRING buf, *name= NULL;
+  char nmstrbuf[11];
+  String nmbuf(nmstrbuf, sizeof(nmstrbuf), system_charset_info);
   enum enum_dyncol_func_result rc;
 
-  num= args[1]->val_int();
+  if (args[1]->result_type() == INT_RESULT)
+    num= args[1]->val_int();
+  else
+  {
+    String *nm= args[1]->val_str(&nmbuf);
+    if (!nm || args[1]->null_value)
+    {
+      null_value= 1;
+      return 1;
+    }
+
+    if (my_charset_same(nm->charset(), &my_charset_utf8_general_ci))
+    {
+      buf.str= (char *) nm->ptr();
+      buf.length= nm->length();
+    }
+    else
+    {
+      uint strlen;
+      uint dummy_errors;
+      buf.str= (char *)sql_alloc((strlen= nm->length() *
+                                     my_charset_utf8_general_ci.mbmaxlen + 1));
+      if (buf.str)
+      {
+        buf.length=
+          copy_and_convert(buf.str, strlen, &my_charset_utf8_general_ci,
+                           nm->ptr(), nm->length(), nm->charset(),
+                           &dummy_errors);
+      }
+      else
+        buf.length= 0;
+    }
+    name= &buf;
+  }
+
+
   if (args[1]->null_value || num < 0 || num > INT_MAX)
   {
     null_value= 1;
@@ -4119,7 +4230,9 @@ bool Item_dyncol_get::get_dyn_value(DYNA
 
   dyn_str.str=   (char*) res->ptr();
   dyn_str.length= res->length();
-  if ((rc= dynamic_column_get(&dyn_str, (uint) num, val)))
+  if ((rc= ((name == NULL) ?
+            dynamic_column_get(&dyn_str, (uint) num, val) :
+            dynamic_column_get_str(&dyn_str, name, val))))
   {
     dynamic_column_error_message(rc);
     null_value= 1;
@@ -4468,6 +4581,8 @@ null:
   return 1;
 }
 
+void
+append_identifier(THD *thd, String *packet, const char *name, uint length);
 
 void Item_dyncol_get::print(String *str, enum_query_type query_type)
 {
@@ -4492,26 +4607,30 @@ String *Item_func_dyncol_list::val_str(S
   col.length= res->length();
   /* We do not change the string, so could do this trick */
   col.str= (char *)res->ptr();
-  if ((rc= dynamic_column_list(&col, &arr)))
+  if ((rc= dynamic_column_list_str(&col, &arr)))
   {
     dynamic_column_error_message(rc);
+    for (i= 0; i < arr.elements; i++)
+      my_free(dynamic_element(&arr, i, LEX_STRING*)->str);
     delete_dynamic(&arr);
     goto null;
   }
 
   /*
-    We support elements from 0 - 65536, so max size for one element is
-    6 (including ,).
+    We estimate average name length as 10
   */
-  if (str->alloc(arr.elements * 6))
+  if (str->alloc(arr.elements * 13))
     goto null;
 
   str->length(0);
+  str->set_charset(&my_charset_utf8_general_ci);
   for (i= 0; i < arr.elements; i++)
   {
-    str->qs_append(*dynamic_element(&arr, i, uint*));
+    LEX_STRING *name= dynamic_element(&arr, i, LEX_STRING *);
+    append_identifier(current_thd, str, name->str, name->length);
     if (i < arr.elements - 1)
       str->qs_append(',');
+    my_free(name->str);
   }
 
   null_value= FALSE;

=== modified file 'sql/item_strfunc.h'
--- a/sql/item_strfunc.h	2012-06-19 12:06:45 +0000
+++ b/sql/item_strfunc.h	2012-09-20 15:10:50 +0000
@@ -1001,8 +1001,9 @@ class Item_func_dyncol_create: public It
 protected:
   DYNCALL_CREATE_DEF *defs;
   DYNAMIC_COLUMN_VALUE *vals;
-  uint *nums;
-  void prepare_arguments();
+  uchar *keys;
+  bool names, force_names;
+  bool prepare_arguments(bool force_names);
   void cleanup_arguments();
   void print_arguments(String *str, enum_query_type query_type);
 public:

=== modified file 'sql/lex.h'
--- a/sql/lex.h	2012-03-11 22:45:18 +0000
+++ b/sql/lex.h	2012-09-20 15:10:50 +0000
@@ -123,6 +123,7 @@ static SYMBOL symbols[] = {
   { "COLUMN_NAME",      SYM(COLUMN_NAME_SYM)},
   { "COLUMNS",		SYM(COLUMNS)},
   { "COLUMN_ADD",       SYM(COLUMN_ADD_SYM)},
+  { "COLUMN_CHECK",     SYM(COLUMN_CHECK_SYM)},
   { "COLUMN_CREATE",    SYM(COLUMN_CREATE_SYM)},
   { "COLUMN_DELETE",    SYM(COLUMN_DELETE_SYM)},
   { "COLUMN_EXISTS",    SYM(COLUMN_EXISTS_SYM)},

=== modified file 'sql/sql_yacc.yy'
--- a/sql/sql_yacc.yy	2012-08-09 15:22:00 +0000
+++ b/sql/sql_yacc.yy	2012-09-20 15:10:50 +0000
@@ -882,6 +882,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
 %token  COLLATION_SYM                 /* SQL-2003-N */
 %token  COLUMNS
 %token  COLUMN_ADD_SYM
+%token  COLUMN_CHECK_SYM
 %token  COLUMN_CREATE_SYM
 %token  COLUMN_DELETE_SYM
 %token  COLUMN_EXISTS_SYM
@@ -8261,7 +8262,7 @@ dyncall_create_element:
        alloc_root(YYTHD->mem_root, sizeof(DYNCALL_CREATE_DEF));
      if ($$ == NULL)
        MYSQL_YYABORT;
-     $$->num= $1;
+     $$->key= $1;
      $$->value= $3;
      $$->type= (DYNAMIC_COLUMN_TYPE)$4;
      $$->cs= lex->charset;
@@ -8815,6 +8816,13 @@ function_call_nonkeyword:
               MYSQL_YYABORT;
           }
         |
+          COLUMN_CHECK_SYM '(' expr ')'
+          {
+            $$= new (YYTHD->mem_root) Item_func_dyncol_check($3);
+            if ($$ == NULL)
+              MYSQL_YYABORT;
+          }
+        |
           COLUMN_EXISTS_SYM '(' expr ',' expr ')'
           {
             $$= new (YYTHD->mem_root) Item_func_dyncol_exists($3, $5);
@@ -12924,6 +12932,7 @@ keyword:
         | CHECKPOINT_SYM        {}
         | CLOSE_SYM             {}
         | COLUMN_ADD_SYM        {}
+        | COLUMN_CHECK_SYM      {}
         | COLUMN_CREATE_SYM     {}
         | COLUMN_DELETE_SYM     {}
         | COLUMN_EXISTS_SYM     {}



More information about the commits mailing list