[Commits] Rev 3525: Current state of dyncol cassandra support (read/insert). (Moved to new cassandra tree) in file:///home/bell/maria/bzr/work-maria-5.5-cassandra/
sanja at montyprogram.com
sanja at montyprogram.com
Thu Sep 27 17:15:15 EEST 2012
At file:///home/bell/maria/bzr/work-maria-5.5-cassandra/
------------------------------------------------------------
revno: 3525
revision-id: sanja at montyprogram.com-20120927141513-nkx1jh7p38arzb02
parent: sanja at montyprogram.com-20120927122244-t5e26fniecj00157
committer: sanja at montyprogram.com
branch nick: work-maria-5.5-cassandra
timestamp: Thu 2012-09-27 17:15:13 +0300
message:
Current state of dyncol cassandra support (read/insert). (Moved to new cassandra tree)
-------------- next part --------------
=== modified file 'include/ma_dyncol.h'
--- a/include/ma_dyncol.h 2012-09-27 12:22:44 +0000
+++ b/include/ma_dyncol.h 2012-09-27 14:15:13 +0000
@@ -39,6 +39,12 @@
*/
#define MAX_DYNAMIC_COLUMN_LENGTH 0X1FFFFFFFL
+/*
+ Limits of implementation
+*/
+#define MAX_NAME_LENGTH 255
+#define MAX_TOTAL_NAME_LENGTH 65535
+
/* NO and OK is the same used just to show semantics */
#define ER_DYNCOL_NO ER_DYNCOL_OK
@@ -50,7 +56,8 @@ enum enum_dyncol_func_result
ER_DYNCOL_LIMIT= -2, /* Some limit reached */
ER_DYNCOL_RESOURCE= -3, /* Out of resourses */
ER_DYNCOL_DATA= -4, /* Incorrect input data */
- ER_DYNCOL_UNKNOWN_CHARSET= -5 /* Unknown character set */
+ ER_DYNCOL_UNKNOWN_CHARSET= -5, /* Unknown character set */
+ ER_DYNCOL_TRUNCATED= 2 /* OK, but data was truncated */
};
typedef DYNAMIC_STRING DYNAMIC_COLUMN;
@@ -81,6 +88,7 @@ struct st_dynamic_column_value
struct {
LEX_STRING value;
CHARSET_INFO *charset;
+ my_bool nonfreeable;
} string;
struct {
decimal_digit_t buffer[DECIMAL_BUFF_LENGTH];
@@ -108,6 +116,13 @@ dynamic_column_create_many_fmt(DYNAMIC_C
uchar *column_keys,
DYNAMIC_COLUMN_VALUE *values,
my_bool names);
+enum enum_dyncol_func_result
+dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
+ uint column_count,
+ void *column_keys,
+ DYNAMIC_COLUMN_VALUE *values,
+ my_bool new_str,
+ my_bool string_keys);
enum enum_dyncol_func_result
dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr,
@@ -163,6 +178,21 @@ dynamic_column_json(DYNAMIC_COLUMN *str,
#define dynamic_column_initialize(A) memset((A), 0, sizeof(*(A)))
#define dynamic_column_column_free(V) dynstr_free(V)
+/* conversion of values to 3 base types */
+enum enum_dyncol_func_result
+dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
+ CHARSET_INFO *cs, my_bool quote);
+enum enum_dyncol_func_result
+dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val);
+enum enum_dyncol_func_result
+dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val);
+
+
+enum enum_dyncol_func_result
+dynamic_column_vals(DYNAMIC_COLUMN *str,
+ DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
+ char **free_names);
+
/***************************************************************************
Internal functions, don't use if you don't know what you are doing...
***************************************************************************/
=== modified file 'mysql-test/r/cassandra.result'
--- a/mysql-test/r/cassandra.result 2012-09-27 07:59:14 +0000
+++ b/mysql-test/r/cassandra.result 2012-09-27 14:15:13 +0000
@@ -365,8 +365,9 @@ CREATE TABLE t2 (rowkey bigint PRIMARY K
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
select * from t2;
rowkey datecol
-1 1346189025000
-10 1346189026000
+1 1346192625000
+10 1346192626000
+delete from t2;
drop table t2;
#
# Check whether changing parameters with ALTER TABLE works.
@@ -407,3 +408,87 @@ new-rowkey12 data1-value3 454
rowkey11 updated-1 34543
delete from t1;
drop table t1;
+#
+# Dynamic columns support
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+drop table t2;
+#error: dynamic column is not a blob
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+ERROR 42000: Incorrect column specifier for column 'uuidcol'
+#error: double dynamic column
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+ERROR 42000: Incorrect column specifier for column 'textcol'
+#
+# Dynamic column read
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
+insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+drop table t2;
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
+rowkey column_list(dyn) column_get(dyn, 'uuidcol' as char)
+1 `uuidcol` 9b5658dc-f32f-11e1-94cd-f46d046e9f09
+2 `uuidcol` 9b5658dc-f32f-11e1-94cd-f46d046e9f0a
+drop table t2;
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+drop table t2;
+#
+# Dynamic column insert
+#
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two"));
+select rowkey, column_json(dyn) from t2;
+rowkey column_json(dyn)
+1 [{"dyn1":"1"},{"dyn2":"two"}]
+delete from t2;
+drop table t2;
+# bigint
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543));
+select rowkey, column_json(dyn) from t1;
+rowkey column_json(dyn)
+1 [{"a":254324},{"dyn1":"1"},{"dyn2":"two"}]
+2 [{"a":2543},{"dyn1":"1"},{"dyn2":"two"}]
+delete from t1;
+drop table t1;
+# int
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543));
+select rowkey, column_json(dyn) from t1;
+rowkey column_json(dyn)
+1 [{"dyn1":"1"},{"dyn2":"two"},{"intcol":254324}]
+2 [{"dyn1":"1"},{"dyn2":"two"},{"intcol":2543}]
+delete from t1;
+drop table t1;
+# timestamp
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543));
+select rowkey, column_json(dyn) from t1;
+rowkey column_json(dyn)
+1 [{"dyn1":"1"},{"dyn2":"two"},{"datecol":254324}]
+2 [{"dyn1":"1"},{"dyn2":"two"},{"datecol":2543}]
+delete from t1;
+drop table t1;
+# boolean
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0));
+select rowkey, column_json(dyn) from t1;
+rowkey column_json(dyn)
+1 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}]
+2 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}]
+delete from t1;
+drop table t1;
+CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1';
+select * from t1;
+ERROR HY000: Internal error: 'Unable to convert value for field `dyn` from Cassandra's data format. Name length exceed limit of 255: 'very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_ver'
+drop table t1;
=== modified file 'mysql-test/t/cassandra.test'
--- a/mysql-test/t/cassandra.test 2012-09-27 07:59:14 +0000
+++ b/mysql-test/t/cassandra.test 2012-09-27 14:15:13 +0000
@@ -94,6 +94,13 @@ CREATE COLUMN FAMILY cf10
WITH comparator = UTF8Type
AND key_validation_class=UTF8Type
AND default_validation_class = UTF8Type;
+
+CREATE COLUMN FAMILY cfd1
+ WITH comparator = UTF8Type
+ AND key_validation_class=UTF8Type
+ AND default_validation_class = UTF8Type;
+SET cfd1['1']['very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_long_name']='1';
+
EOF
--error 0,1,2
@@ -463,7 +470,7 @@ drop table t2;
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, datecol bigint) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
select * from t2;
-
+delete from t2;
drop table t2;
--echo #
@@ -511,6 +518,84 @@ select * from t1;
delete from t1;
drop table t1;
+--echo #
+--echo # Dynamic columns support
+--echo #
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+drop table t2;
+
+--echo #error: dynamic column is not a blob
+--error ER_WRONG_FIELD_SPEC
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+
+--echo #error: double dynamic column
+--error ER_WRONG_FIELD_SPEC
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+
+--echo #
+--echo # Dynamic column read
+--echo #
+#prepare data
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+ thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
+insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a');
+drop table t2;
+
+#test dynamic column read
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2;
+drop table t2;
+
+#cleanup data
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
+ thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+delete from t2;
+drop table t2;
+
+--echo #
+--echo # Dynamic column insert
+--echo #
+CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
+insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two"));
+select rowkey, column_json(dyn) from t2;
+delete from t2;
+drop table t2;
+--echo # bigint
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543));
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+--echo # int
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543));
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+--echo # timestamp
+CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543));
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+--echo # boolean
+CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7';
+insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324));
+insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0));
+select rowkey, column_json(dyn) from t1;
+delete from t1;
+drop table t1;
+
+CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1';
+--error ER_INTERNAL_ERROR
+select * from t1;
+drop table t1;
+
############################################################################
## Cassandra cleanup
############################################################################
=== modified file 'mysys/ma_dyncol.c'
--- a/mysys/ma_dyncol.c 2012-09-27 12:22:44 +0000
+++ b/mysys/ma_dyncol.c 2012-09-27 14:15:13 +0000
@@ -68,6 +68,8 @@ uint32 copy_and_convert(char *to, uint32
#define MAX_OFFSET_LENGTH 5
+#define DYNCOL_NUM_CHAR 6
+
my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str)
{
if (str->length < 1)
@@ -211,7 +213,7 @@ static my_bool check_limit_num(const voi
static my_bool check_limit_str(const void *val)
{
- return (*((LEX_STRING **)val))->length > 255;
+ return (*((LEX_STRING **)val))->length > MAX_NAME_LENGTH;
}
@@ -288,7 +290,7 @@ my_bool put_header_entry_str(DYN_HEADER
size_t offset)
{
LEX_STRING *column_name= (LEX_STRING *)column_key;
- DBUG_ASSERT(column_name->length <= 255);
+ DBUG_ASSERT(column_name->length <= MAX_NAME_LENGTH);
hdr->entry[0]= column_name->length;
DBUG_ASSERT(hdr->name - hdr->nmpool < (long) 0x10000L);
int2store(hdr->entry + 1, hdr->name - hdr->nmpool);
@@ -1381,6 +1383,9 @@ dynamic_new_column_store(DYNAMIC_COLUMN
DYNCOL_SYZERESERVE))
goto err;
}
+ if (!column_count)
+ return ER_DYNCOL_OK;
+
bzero(str->str, fmt->fixed_hdr);
str->length= fmt->fixed_hdr;
@@ -1501,7 +1506,7 @@ calc_var_sizes(DYN_HEADER *hdr,
@return ER_DYNCOL_* return code
*/
-static enum enum_dyncol_func_result
+enum enum_dyncol_func_result
dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
uint column_count,
void *column_keys,
@@ -1761,7 +1766,7 @@ static my_bool
find_column(DYN_HEADER *hdr, uint numkey, LEX_STRING *strkey)
{
LEX_STRING nmkey;
- char nmkeybuff[6]; /* to fit max 2 bytes number */
+ char nmkeybuff[DYNCOL_NUM_CHAR]; /* to fit max 2 bytes number */
DBUG_ASSERT(hdr->header != NULL);
if (hdr->header + hdr->header_size > hdr->data_end)
@@ -2169,10 +2174,10 @@ dynamic_column_list_str(DYNAMIC_COLUMN *
if (header.format == DYNCOL_FMT_NUM)
{
uint nm= uint2korr(read);
- tmp.str= my_malloc(6, MYF(0));
+ tmp.str= my_malloc(DYNCOL_NUM_CHAR, MYF(0));
if (!tmp.str)
return ER_DYNCOL_RESOURCE;
- tmp.length= snprintf(tmp.str, 6, "%u", nm);
+ tmp.length= snprintf(tmp.str, DYNCOL_NUM_CHAR, "%u", nm);
}
else
{
@@ -2208,7 +2213,7 @@ find_place(DYN_HEADER *hdr, void *key, m
uint mid, start, end, val;
int flag;
LEX_STRING str;
- char buff[6];
+ char buff[DYNCOL_NUM_CHAR];
my_bool need_conversion= ((string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM) !=
hdr->format);
LINT_INIT(flag); /* 100 % safe */
@@ -2425,7 +2430,7 @@ dynamic_column_update_copy(DYNAMIC_COLUM
size_t offs;
uint nm;
DYNAMIC_COLUMN_TYPE tp;
- char buff[6];
+ char buff[DYNCOL_NUM_CHAR];
if (hdr->format == DYNCOL_FMT_NUM)
{
@@ -3438,7 +3443,7 @@ end:
enum enum_dyncol_func_result
dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
- my_bool quote)
+ CHARSET_INFO *cs, my_bool quote)
{
char buff[40];
int len;
@@ -3468,24 +3473,22 @@ dynamic_column_val_str(DYNAMIC_STRING *s
char *alloc= NULL;
char *from= val->x.string.value.str;
uint bufflen;
- my_bool conv= !my_charset_same(val->x.string.charset,
- &my_charset_utf8_general_ci);
+ my_bool conv= !my_charset_same(val->x.string.charset, cs);
my_bool rc;
len= val->x.string.value.length;
- bufflen= (len * (conv ? my_charset_utf8_general_ci.mbmaxlen : 1));
+ bufflen= (len * (conv ? cs->mbmaxlen : 1));
if (dynstr_realloc(str, bufflen))
return ER_DYNCOL_RESOURCE;
// guaranty UTF-8 string for value
- if (!my_charset_same(val->x.string.charset,
- &my_charset_utf8_general_ci))
+ if (!my_charset_same(val->x.string.charset, cs))
{
uint dummy_errors;
if (!quote)
{
/* convert to the destination */
str->length+= copy_and_convert_extended(str->str, bufflen,
- &my_charset_utf8_general_ci,
+ cs,
from, len,
val->x.string.charset,
&dummy_errors);
@@ -3494,8 +3497,7 @@ dynamic_column_val_str(DYNAMIC_STRING *s
if ((alloc= (char *)my_malloc(bufflen, MYF(0))))
{
len=
- copy_and_convert_extended(alloc, bufflen,
- &my_charset_utf8_general_ci,
+ copy_and_convert_extended(alloc, bufflen, cs,
from, len, val->x.string.charset,
&dummy_errors);
from= alloc;
@@ -3543,6 +3545,155 @@ dynamic_column_val_str(DYNAMIC_STRING *s
return(ER_DYNCOL_OK);
}
+
+enum enum_dyncol_func_result
+dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val)
+{
+ enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
+ *ll= 0;
+ switch (val->type) {
+ case DYN_COL_INT:
+ *ll= val->x.long_value;
+ break;
+ case DYN_COL_UINT:
+ *ll= (longlong)val->x.ulong_value;
+ if (val->x.ulong_value > ULONGLONG_MAX)
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ case DYN_COL_DOUBLE:
+ *ll= (longlong)val->x.double_value;
+ if (((double) *ll) != val->x.double_value)
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ case DYN_COL_STRING:
+ {
+ longlong i= 0, sign= 1;
+ char *src= val->x.string.value.str;
+ uint len= val->x.string.value.length;
+
+ while (len && my_isspace(&my_charset_latin1, *src)) src++,len--;
+
+ if (len)
+ {
+ if (*src == '-')
+ {
+ sign= -1;
+ src++;
+ } else if (*src == '-')
+ src++;
+ while(len && my_isdigit(&my_charset_latin1, *src))
+ {
+ i= i * 10 + (*src - '0');
+ src++;
+ }
+ }
+ else
+ rc= ER_DYNCOL_TRUNCATED;
+ if (len)
+ rc= ER_DYNCOL_TRUNCATED;
+ *ll= i * sign;
+ break;
+ }
+ case DYN_COL_DECIMAL:
+ if (decimal2longlong(&val->x.decimal.value, ll) != E_DEC_OK)
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ case DYN_COL_DATETIME:
+ *ll= (val->x.time_value.year * 10000000000L +
+ val->x.time_value.month * 100000000L +
+ val->x.time_value.day * 1000000 +
+ val->x.time_value.hour * 10000 +
+ val->x.time_value.minute * 100 +
+ val->x.time_value.second) *
+ (val->x.time_value.neg ? -1 : 1);
+ break;
+ case DYN_COL_DATE:
+ *ll= (val->x.time_value.year * 10000 +
+ val->x.time_value.month * 100 +
+ val->x.time_value.day) *
+ (val->x.time_value.neg ? -1 : 1);
+ break;
+ case DYN_COL_TIME:
+ *ll= (val->x.time_value.hour * 10000 +
+ val->x.time_value.minute * 100 +
+ val->x.time_value.second) *
+ (val->x.time_value.neg ? -1 : 1);
+ break;
+ case DYN_COL_NULL:
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ default:
+ return(ER_DYNCOL_FORMAT);
+ }
+ return(rc);
+}
+
+
+enum enum_dyncol_func_result
+dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val)
+{
+ enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
+ *dbl= 0;
+ switch (val->type) {
+ case DYN_COL_INT:
+ *dbl= (double)val->x.long_value;
+ if (((longlong) *dbl) != val->x.long_value)
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ case DYN_COL_UINT:
+ *dbl= (double)val->x.ulong_value;
+ if (((ulonglong) *dbl) != val->x.ulong_value)
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ case DYN_COL_DOUBLE:
+ *dbl= val->x.double_value;
+ break;
+ case DYN_COL_STRING:
+ {
+ char *str, *end;
+ if ((str= malloc(val->x.string.value.length + 1)))
+ return ER_DYNCOL_RESOURCE;
+ memcpy(str, val->x.string.value.str, val->x.string.value.length);
+ str[val->x.string.value.length]= '\0';
+ *dbl= strtod(str, &end);
+ if (*end != '\0')
+ rc= ER_DYNCOL_TRUNCATED;
+ }
+ case DYN_COL_DECIMAL:
+ if (decimal2double(&val->x.decimal.value, dbl) != E_DEC_OK)
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ case DYN_COL_DATETIME:
+ *dbl= (double)(val->x.time_value.year * 10000000000L +
+ val->x.time_value.month * 100000000L +
+ val->x.time_value.day * 1000000 +
+ val->x.time_value.hour * 10000 +
+ val->x.time_value.minute * 100 +
+ val->x.time_value.second) *
+ (val->x.time_value.neg ? -1 : 1);
+ break;
+ case DYN_COL_DATE:
+ *dbl= (double)(val->x.time_value.year * 10000 +
+ val->x.time_value.month * 100 +
+ val->x.time_value.day) *
+ (val->x.time_value.neg ? -1 : 1);
+ break;
+ case DYN_COL_TIME:
+ *dbl= (double)(val->x.time_value.hour * 10000 +
+ val->x.time_value.minute * 100 +
+ val->x.time_value.second) *
+ (val->x.time_value.neg ? -1 : 1);
+ break;
+ case DYN_COL_NULL:
+ rc= ER_DYNCOL_TRUNCATED;
+ break;
+ default:
+ return(ER_DYNCOL_FORMAT);
+ }
+ return(rc);
+}
+
+
/**
Convert to JSON
@@ -3602,10 +3753,11 @@ dynamic_column_json(DYNAMIC_COLUMN *str,
if (header.format == DYNCOL_FMT_NUM)
{
uint nm= uint2korr(header.entry);
- if (dynstr_realloc(json, 6 + 3))
+ if (dynstr_realloc(json, DYNCOL_NUM_CHAR + 3))
goto err;
json->str[json->length++]= '"';
- json->length+= (snprintf(json->str + json->length, 6, "%u", nm));
+ json->length+= (snprintf(json->str + json->length,
+ DYNCOL_NUM_CHAR, "%u", nm));
}
else
{
@@ -3619,7 +3771,8 @@ dynamic_column_json(DYNAMIC_COLUMN *str,
}
json->str[json->length++]= '"';
json->str[json->length++]= ':';
- if ((rc= dynamic_column_val_str(json, &val, TRUE)) < 0 ||
+ if ((rc= dynamic_column_val_str(json, &val,
+ &my_charset_utf8_general_ci, TRUE)) < 0 ||
dynstr_append_mem(json, "}", 1))
goto err;
}
@@ -3631,3 +3784,99 @@ err:
json->length= 0;
return rc;
}
+
+
+/**
+ Convert to DYNAMIC_COLUMN_VALUE values and names (LEX_STING) dynamic array
+
+ @param str The packed string
+ @param names Where to put names
+ @param vals Where to put values
+ @param free_names pointer to free names buffer if there is it.
+
+ @return ER_DYNCOL_* return code
+*/
+
+enum enum_dyncol_func_result
+dynamic_column_vals(DYNAMIC_COLUMN *str,
+ DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
+ char **free_names)
+{
+ DYN_HEADER header;
+ char *nm;
+ uint i;
+ enum enum_dyncol_func_result rc;
+
+ *free_names= 0;
+ bzero(names, sizeof(DYNAMIC_ARRAY)); /* In case of errors */
+ bzero(vals, sizeof(DYNAMIC_ARRAY)); /* In case of errors */
+ if (str->length == 0)
+ return ER_DYNCOL_OK; /* no columns */
+
+ if ((rc= init_read_hdr(&header, str)) < 0)
+ return rc;
+
+ if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
+ str->length)
+ return ER_DYNCOL_FORMAT;
+
+ if (init_dynamic_array(names, sizeof(LEX_STRING),
+ header.column_count, 0) ||
+ init_dynamic_array(vals, sizeof(DYNAMIC_COLUMN_VALUE),
+ header.column_count, 0) ||
+ (header.format == DYNCOL_FMT_NUM &&
+ !(*free_names= (char *)malloc(DYNCOL_NUM_CHAR * header.column_count))))
+ {
+ rc= ER_DYNCOL_RESOURCE;
+ goto err;
+ }
+ nm= *free_names;
+
+ for (i= 0, header.entry= header.header;
+ i < header.column_count;
+ i++, header.entry+= header.entry_size)
+ {
+ DYNAMIC_COLUMN_VALUE val;
+ LEX_STRING name;
+ header.length=
+ hdr_interval_length(&header, header.entry + header.entry_size);
+ header.data= header.dtpool + header.offset;
+ /*
+ Check that the found data is withing the ranges. This can happen if
+ we get data with wrong offsets.
+ */
+ if (header.length == DYNCOL_OFFSET_ERROR ||
+ header.length > INT_MAX || header.offset > header.data_size)
+ {
+ rc= ER_DYNCOL_FORMAT;
+ goto err;
+ }
+ if ((rc= dynamic_column_get_value(&header, &val)) < 0)
+ goto err;
+
+ if (header.format == DYNCOL_FMT_NUM)
+ {
+ uint num= uint2korr(header.entry);
+ name.str= nm;
+ name.length= snprintf(nm, DYNCOL_NUM_CHAR, "%u", num);
+ nm+= name.length + 1;
+ }
+ else
+ {
+ name.length= header.entry[0];
+ name.str= (char *)header.nmpool + uint2korr(header.entry + 1);
+ }
+ /* following is preallocated and so do not fail */
+ (void) insert_dynamic(names, (uchar *)&name);
+ (void) insert_dynamic(vals, (uchar *)&val);
+ }
+ return ER_DYNCOL_OK;
+
+err:
+ delete_dynamic(names);
+ delete_dynamic(vals);
+ if (*free_names)
+ my_free(*free_names);
+ *free_names= 0;
+ return rc;
+}
=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc 2012-06-14 18:05:31 +0000
+++ b/sql/sql_base.cc 2012-09-27 14:15:13 +0000
@@ -9773,6 +9773,7 @@ int dynamic_column_error_message(enum_dy
switch (rc) {
case ER_DYNCOL_YES:
case ER_DYNCOL_OK:
+ case ER_DYNCOL_TRUNCATED:
break; // it is not an error
case ER_DYNCOL_FORMAT:
my_error(ER_DYN_COL_WRONG_FORMAT, MYF(0));
=== modified file 'sql/sql_base.h'
--- a/sql/sql_base.h 2012-05-16 15:44:17 +0000
+++ b/sql/sql_base.h 2012-09-27 14:15:13 +0000
@@ -272,6 +272,7 @@ bool rename_temporary_table(THD* thd, TA
const char *table_name);
bool is_equal(const LEX_STRING *a, const LEX_STRING *b);
+class Open_tables_backup;
/* Functions to work with system tables. */
bool open_system_tables_for_read(THD *thd, TABLE_LIST *table_list,
Open_tables_backup *backup);
=== modified file 'storage/cassandra/CMakeLists.txt'
--- a/storage/cassandra/CMakeLists.txt 2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/CMakeLists.txt 2012-09-27 14:15:13 +0000
@@ -12,7 +12,7 @@ SET(cassandra_sources
gen-cpp/Cassandra.h)
#INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS})
-INCLUDE_DIRECTORIES(AFTER /home/psergey/cassandra/thrift/include/thrift/)
+INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift)
#
STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
=== modified file 'storage/cassandra/cassandra_se.cc'
--- a/storage/cassandra/cassandra_se.cc 2012-09-27 07:59:14 +0000
+++ b/storage/cassandra/cassandra_se.cc 2012-09-27 14:15:13 +0000
@@ -74,6 +74,7 @@ class Cassandra_se_impl: public Cassandr
std::string rowkey; /* key of the record we're returning now */
SlicePredicate slice_pred;
+ SliceRange slice_pred_sr;
bool get_slices_returned_less;
bool get_slice_found_rows;
public:
@@ -91,6 +92,8 @@ public:
void first_ddl_column();
bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
void get_rowkey_type(char **name, char **type);
+ size_t get_ddl_size();
+ const char* get_default_validator();
/* Settings */
void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
@@ -106,7 +109,8 @@ public:
/* Reads, point lookups */
bool get_slice(char *key, size_t key_len, bool *found);
- bool get_next_read_column(char **name, char **value, int *value_len);
+ bool get_next_read_column(char **name, int *name_len,
+ char **value, int *value_len );
void get_read_rowkey(char **value, int *value_len);
/* Reads, multi-row scans */
@@ -122,6 +126,7 @@ public:
/* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
void clear_read_columns();
+ void clear_read_all_columns();
void add_read_column(const char *name);
/* Reads, MRR scans */
@@ -277,6 +282,16 @@ void Cassandra_se_impl::get_rowkey_type(
*name= NULL;
}
+size_t Cassandra_se_impl::get_ddl_size()
+{
+ return cf_def.column_metadata.size();
+}
+
+const char* Cassandra_se_impl::get_default_validator()
+{
+ return cf_def.default_validation_class.c_str();
+}
+
/////////////////////////////////////////////////////////////////////////////
// Data writes
@@ -444,8 +459,8 @@ bool Cassandra_se_impl::retryable_get_sl
}
-bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
- int *value_len)
+bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len,
+ char **value, int *value_len)
{
bool use_counter=false;
while (1)
@@ -468,12 +483,14 @@ bool Cassandra_se_impl::get_next_read_co
ColumnOrSuperColumn& cs= *column_data_it;
if (use_counter)
{
+ *name_len= cs.counter_column.name.size();
*name= (char*)cs.counter_column.name.c_str();
*value= (char*)&cs.counter_column.value;
*value_len= sizeof(cs.counter_column.value);
}
else
{
+ *name_len= cs.column.name.size();
*name= (char*)cs.column.name.c_str();
*value= (char*)cs.column.value.c_str();
*value_len= cs.column.value.length();
@@ -601,6 +618,13 @@ void Cassandra_se_impl::clear_read_colum
slice_pred.column_names.clear();
}
+void Cassandra_se_impl::clear_read_all_columns()
+{
+ slice_pred_sr.start = "";
+ slice_pred_sr.finish = "";
+ slice_pred.__set_slice_range(slice_pred_sr);
+}
+
void Cassandra_se_impl::add_read_column(const char *name_arg)
{
=== modified file 'storage/cassandra/cassandra_se.h'
--- a/storage/cassandra/cassandra_se.h 2012-09-27 07:59:14 +0000
+++ b/storage/cassandra/cassandra_se.h 2012-09-27 14:15:13 +0000
@@ -50,6 +50,8 @@ public:
virtual bool next_ddl_column(char **name, int *name_len, char **value,
int *value_len)=0;
virtual void get_rowkey_type(char **name, char **type)=0;
+ virtual size_t get_ddl_size()=0;
+ virtual const char* get_default_validator()=0;
/* Writes */
virtual void clear_insert_buffer()=0;
@@ -62,7 +64,8 @@ public:
/* Reads */
virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
- virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
+ virtual bool get_next_read_column(char **name, int *name_len,
+ char **value, int *value_len)=0;
virtual void get_read_rowkey(char **value, int *value_len)=0;
/* Reads, multi-row scans */
@@ -70,7 +73,7 @@ public:
virtual bool get_range_slices(bool last_key_as_start_key)=0;
virtual void finish_reading_range_slices()=0;
virtual bool get_next_range_slice_row(bool *eof)=0;
-
+
/* Reads, MRR scans */
virtual void new_lookup_keys()=0;
virtual int add_lookup_key(const char *key, size_t key_len)=0;
@@ -79,8 +82,9 @@ public:
/* read_set setup */
virtual void clear_read_columns()=0;
+ virtual void clear_read_all_columns()=0;
virtual void add_read_column(const char *name)=0;
-
+
virtual bool truncate()=0;
virtual bool remove_row()=0;
=== modified file 'storage/cassandra/gen-cpp/cassandra_constants.cpp'
--- a/storage/cassandra/gen-cpp/cassandra_constants.cpp 2012-08-17 17:13:20 +0000
+++ b/storage/cassandra/gen-cpp/cassandra_constants.cpp 2012-09-27 14:15:13 +0000
@@ -11,7 +11,7 @@ namespace org { namespace apache { names
const cassandraConstants g_cassandra_constants;
cassandraConstants::cassandraConstants() {
- cassandra_const_VERSION = "19.32.0";
+ cassandra_const_VERSION = (char *)"19.32.0";
}
}}} // namespace
=== modified file 'storage/cassandra/ha_cassandra.cc'
--- a/storage/cassandra/ha_cassandra.cc 2012-09-27 07:59:14 +0000
+++ b/storage/cassandra/ha_cassandra.cc 2012-09-27 14:15:13 +0000
@@ -1,4 +1,4 @@
-/*
+/*
Copyright (c) 2012, Monty Program Ab
This program is free software; you can redistribute it and/or modify
@@ -22,15 +22,21 @@
#include "ha_cassandra.h"
#include "sql_class.h"
+#define DYNCOL_USUAL 20
+#define DYNCOL_DELTA 100
+#define DYNCOL_USUAL_REC 1024
+#define DYNCOL_DELTA_REC 1024
+
static handler *cassandra_create_handler(handlerton *hton,
- TABLE_SHARE *table,
+ TABLE_SHARE *table,
MEM_ROOT *mem_root);
+extern int dynamic_column_error_message(enum_dyncol_func_result rc);
handlerton *cassandra_hton;
-/*
+/*
Hash used to track the number of open tables; variable for example share
methods
*/
@@ -69,6 +75,25 @@ ha_create_table_option cassandra_table_o
HA_TOPTION_END
};
+/**
+ Structure for CREATE TABLE options (field options).
+*/
+
+struct ha_field_option_struct
+{
+ bool dyncol_field;
+};
+
+ha_create_table_option cassandra_field_option_list[]=
+{
+ /*
+ Collect all other columns as dynamic here,
+ the valid values are YES/NO, ON/OFF, 1/0.
+ The default is 0, that is true, yes, on.
+ */
+ HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
+ HA_FOPTION_END
+};
static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an INSERT batch",
@@ -245,17 +270,16 @@ static int cassandra_init_func(void *p)
cassandra_hton->state= SHOW_OPTION_YES;
cassandra_hton->create= cassandra_create_handler;
- /*
+ /*
Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
TABLE to create an *empty* table from scratch. Cassandra table won't be
emptied if re-created.
*/
- cassandra_hton->flags= 0;
+ cassandra_hton->flags= 0;
cassandra_hton->table_options= cassandra_table_option_list;
- //cassandra_hton->field_options= example_field_option_list;
- cassandra_hton->field_options= NULL;
-
- mysql_mutex_init(0 /* no instrumentation */,
+ cassandra_hton->field_options= cassandra_field_option_list;
+
+ mysql_mutex_init(0 /* no instrumentation */,
&cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
DBUG_RETURN(0);
@@ -352,7 +376,7 @@ static int free_share(CASSANDRA_SHARE *s
static handler* cassandra_create_handler(handlerton *hton,
- TABLE_SHARE *table,
+ TABLE_SHARE *table,
MEM_ROOT *mem_root)
{
return new (mem_root) ha_cassandra(hton, table);
@@ -361,7 +385,11 @@ static handler* cassandra_create_handler
ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
:handler(hton, table_arg),
- se(NULL), field_converters(NULL), rowkey_converter(NULL)
+ se(NULL), field_converters(NULL),
+ special_type_field_converters(NULL),
+ special_type_field_names(NULL), n_special_type_fields(0),
+ rowkey_converter(NULL),
+ dyncol_field(0), dyncol_set(0)
{}
@@ -375,6 +403,32 @@ const char **ha_cassandra::bas_ext() con
}
+int ha_cassandra::check_field_options(Field **fields)
+{
+ Field **field;
+ uint i;
+ DBUG_ENTER("ha_cassandra::check_field_options");
+ for (field= fields, i= 0; *field; field++, i++)
+ {
+ ha_field_option_struct *field_options= (*field)->option_struct;
+ if (field_options && field_options->dyncol_field)
+ {
+ if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
+ {
+ my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
+ DBUG_RETURN(HA_WRONG_CREATE_OPTION);
+ }
+ dyncol_set= 1;
+ dyncol_field= i;
+ bzero(&dynamic_values, sizeof(dynamic_values));
+ bzero(&dynamic_names, sizeof(dynamic_names));
+ bzero(&dynamic_rec, sizeof(dynamic_rec));
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+
int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
{
ha_table_option_struct *options= table->s->option_struct;
@@ -399,6 +453,9 @@ int ha_cassandra::open(const char *name,
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
+ if ((res= check_field_options(table->s->field)))
+ DBUG_RETURN(res);
+
if (setup_field_converters(table->field, table->s->fields))
{
DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -480,31 +537,12 @@ int ha_cassandra::create(const char *nam
table_arg->key_info[0].key_parts != 1 ||
table_arg->key_info[0].key_part[0].fieldnr != 1)
{
- my_error(ER_WRONG_COLUMN_NAME, MYF(0),
+ my_error(ER_WRONG_COLUMN_NAME, MYF(0),
"Table must have PRIMARY KEY defined over the first column");
DBUG_RETURN(HA_WRONG_CREATE_OPTION);
}
-
-#ifndef DBUG_OFF
-/*
- DBUG_PRINT("info", ("strparam: '%-.64s' ullparam: %llu enumparam: %u "\
- "boolparam: %u",
- (options->strparam ? options->strparam : "<NULL>"),
- options->ullparam, options->enumparam, options->boolparam));
-
- psergey-todo: check table definition!
- for (Field **field= table_arg->s->field; *field; field++)
- {
- ha_field_option_struct *field_options= (*field)->option_struct;
- DBUG_ASSERT(field_options);
- DBUG_PRINT("info", ("field: %s complex: '%-.64s'",
- (*field)->field_name,
- (field_options->complex_param_to_parse_it_in_engine ?
- field_options->complex_param_to_parse_it_in_engine :
- "<NULL>")));
- }
-*/
-#endif
+ if ((res= check_field_options(table_arg->s->field)))
+ DBUG_RETURN(res);
DBUG_ASSERT(!se);
if ((res= check_table_options(options)))
DBUG_RETURN(res);
@@ -518,7 +556,7 @@ int ha_cassandra::create(const char *nam
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
-
+
if (setup_field_converters(table_arg->s->field, table_arg->s->fields))
{
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
@@ -570,7 +608,7 @@ public:
field->store(*pdata);
return 0;
}
-
+
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
buf= field->val_real();
@@ -785,6 +823,43 @@ static int convert_hex_digit(const char
const char map2number[]="0123456789abcdef";
+static void convert_uuid2string(char *str, const char *cass_data)
+{
+ char *ptr= str;
+ /* UUID arrives as 16-byte number in network byte order */
+ for (uint i=0; i < 16; i++)
+ {
+ *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
+ *(ptr++)= map2number[cass_data[i] & 0xF];
+ if (i == 3 || i == 5 || i == 7 || i == 9)
+ *(ptr++)= '-';
+ }
+ *ptr= 0;
+}
+
+static bool convert_string2uuid(char *buf, const char *str)
+{
+ int lower, upper;
+ for (uint i= 0; i < 16; i++)
+ {
+ if ((upper= convert_hex_digit(str[0])) == -1 ||
+ (lower= convert_hex_digit(str[1])) == -1)
+ {
+ return true;
+ }
+ buf[i]= lower | (upper << 4);
+ str += 2;
+ if (i == 3 || i == 5 || i == 7 || i == 9)
+ {
+ if (str[0] != '-')
+ return true;
+ str++;
+ }
+ }
+ return false;
+}
+
+
class UuidDataConverter : public ColumnDataConverter
{
char buf[16]; /* Binary UUID representation */
@@ -794,16 +869,7 @@ public:
{
DBUG_ASSERT(cass_data_len==16);
char str[37];
- char *ptr= str;
- /* UUID arrives as 16-byte number in network byte order */
- for (uint i=0; i < 16; i++)
- {
- *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
- *(ptr++)= map2number[cass_data[i] & 0xF];
- if (i == 3 || i == 5 || i == 7 || i == 9)
- *(ptr++)= '-';
- }
- *ptr= 0;
+ convert_uuid2string(str, cass_data);
field->store(str, 36,field->charset());
return 0;
}
@@ -811,29 +877,12 @@ public:
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
String *uuid_str= field->val_str(&str_buf);
- char *pstr= (char*)uuid_str->c_ptr();
- if (uuid_str->length() != 36)
+ if (uuid_str->length() != 36)
+ return true;
+
+ if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
return true;
-
- int lower, upper;
- for (uint i=0; i < 16; i++)
- {
- if ((upper= convert_hex_digit(pstr[0])) == -1 ||
- (lower= convert_hex_digit(pstr[1])) == -1)
- {
- return true;
- }
- buf[i]= lower | (upper << 4);
- pstr += 2;
- if (i == 3 || i == 5 || i == 7 || i == 9)
- {
- if (pstr[0] != '-')
- return true;
- pstr++;
- }
- }
-
*cass_data= buf;
*cass_data_len= 16;
return false;
@@ -841,6 +890,302 @@ public:
~UuidDataConverter(){}
};
+/**
+ Converting dynamic columns types to/from casandra types
+*/
+bool cassandra_to_dyncol_intLong(const char *cass_data,
+ int cass_data_len __attribute__((unused)),
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ value->type= DYN_COL_INT;
+#ifdef WORDS_BIGENDIAN
+ value->x.long_value= (longlong *)*cass_data;
+#else
+ flip64(cass_data, (char *)&value->x.long_value);
+#endif
+ return 0;
+}
+
+bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ longlong *tmp= (longlong *) buff;
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_long(tmp, value);
+ if (rc < 0)
+ return true;
+ *cass_data_len= sizeof(longlong);
+#ifdef WORDS_BIGENDIAN
+ *cass_data= (char *)buff;
+#else
+ flip64((char *)buff, (char *)buff + sizeof(longlong));
+ *cass_data= (char *)buff + sizeof(longlong);
+#endif
+ *freemem= NULL;
+ return false;
+}
+
+bool cassandra_to_dyncol_intInt32(const char *cass_data,
+ int cass_data_len __attribute__((unused)),
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ int32 tmp;
+ value->type= DYN_COL_INT;
+#ifdef WORDS_BIGENDIAN
+ tmp= *((int32 *)cass_data);
+#else
+ flip32(cass_data, (char *)&tmp);
+#endif
+ value->x.long_value= tmp;
+ return 0;
+}
+
+
+bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_long(tmp, value);
+ if (rc < 0)
+ return true;
+ *cass_data_len= sizeof(int32);
+ *cass_data= (char *)buff;
+#ifdef WORDS_BIGENDIAN
+ *((int32 *) buff) = (int32) *tmp;
+#else
+ {
+ int32 tmp2= (int32) *tmp;
+ flip32((char *)&tmp2, (char *)buff);
+ }
+#endif
+ *freemem= NULL;
+ return false;
+}
+
+
+bool cassandra_to_dyncol_intCounter(const char *cass_data,
+ int cass_data_len __attribute__((unused)),
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ value->type= DYN_COL_INT;
+ value->x.long_value= *((longlong *)cass_data);
+ return 0;
+}
+
+
+bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ longlong *tmp= (longlong *)buff;
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_long(tmp, value);
+ if (rc < 0)
+ return true;
+ *cass_data_len= sizeof(longlong);
+ *cass_data= (char *)buff;
+ *freemem= NULL;
+ return false;
+}
+
+bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
+ int cass_data_len __attribute__((unused)),
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ value->type= DYN_COL_DOUBLE;
+ value->x.double_value= *((float *)cass_data);
+ return 0;
+}
+
+bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ double tmp;
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_double(&tmp, value);
+ if (rc < 0)
+ return true;
+ *((float *)buff)= (float) tmp;
+ *cass_data_len= sizeof(float);
+ *cass_data= (char *)buff;
+ *freemem= NULL;
+ return false;
+}
+
+bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
+ int cass_data_len __attribute__((unused)),
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ value->type= DYN_COL_DOUBLE;
+ value->x.double_value= *((double *)cass_data);
+ return 0;
+}
+
+bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ double *tmp= (double *)buff;
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_double(tmp, value);
+ if (rc < 0)
+ return true;
+ *cass_data_len= sizeof(double);
+ *cass_data= (char *)buff;
+ *freemem= NULL;
+ return false;
+}
+
+bool cassandra_to_dyncol_strStr(const char *cass_data,
+ int cass_data_len,
+ DYNAMIC_COLUMN_VALUE *value,
+ CHARSET_INFO *cs)
+{
+ value->type= DYN_COL_STRING;
+ value->x.string.charset= cs;
+ value->x.string.value.str= (char *)cass_data;
+ value->x.string.value.length= cass_data_len;
+ value->x.string.nonfreeable= TRUE; // do not try to free
+ return 0;
+}
+
+bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem, CHARSET_INFO *cs)
+{
+ DYNAMIC_STRING tmp;
+ if (init_dynamic_string(&tmp, NULL, 1024, 1024))
+ return 1;
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_str(&tmp, value, cs, FALSE);
+ if (rc < 0)
+ {
+ dynstr_free(&tmp);
+ return 1;
+ }
+ *cass_data_len= tmp.length;
+ *(cass_data)= tmp.str;
+ *freemem= tmp.str;
+ return 0;
+}
+
+bool cassandra_to_dyncol_strBytes(const char *cass_data,
+ int cass_data_len,
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+ &my_charset_bin);
+}
+
+bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
+ buff, freemem, &my_charset_bin);
+}
+
+bool cassandra_to_dyncol_strAscii(const char *cass_data,
+ int cass_data_len,
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+ &my_charset_latin1_bin);
+}
+
+bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
+ buff, freemem, &my_charset_latin1_bin);
+}
+
+bool cassandra_to_dyncol_strUTF8(const char *cass_data,
+ int cass_data_len,
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
+ &my_charset_utf8_unicode_ci);
+}
+
+bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
+ buff, freemem, &my_charset_utf8_unicode_ci);
+}
+
+bool cassandra_to_dyncol_strUUID(const char *cass_data,
+ int cass_data_len,
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ value->type= DYN_COL_STRING;
+ value->x.string.charset= &my_charset_bin;
+ value->x.string.value.str= (char *)my_malloc(37, MYF(0));
+ if (!value->x.string.value.str)
+ {
+ value->x.string.value.length= 0;
+ value->x.string.nonfreeable= TRUE;
+ return 1;
+ }
+ convert_uuid2string(value->x.string.value.str, cass_data);
+ value->x.string.value.length= 36;
+ value->x.string.nonfreeable= FALSE;
+ return 0;
+}
+
+bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ DYNAMIC_STRING tmp;
+ if (init_dynamic_string(&tmp, NULL, 1024, 1024))
+ return true;
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_str(&tmp, value, &my_charset_latin1_bin, FALSE);
+ if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
+ {
+ dynstr_free(&tmp);
+ return true;
+ }
+
+ *cass_data_len= tmp.length;
+ *(cass_data)= tmp.str;
+ *freemem= tmp.str;
+ return 0;
+}
+
+bool cassandra_to_dyncol_intBool(const char *cass_data,
+ int cass_data_len,
+ DYNAMIC_COLUMN_VALUE *value)
+{
+ value->type= DYN_COL_INT;
+ value->x.long_value= (cass_data[0] ? 1 : 0);
+ return 0;
+}
+
+bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
+ char **cass_data, int *cass_data_len,
+ void* buff, void **freemem)
+{
+ longlong tmp;
+ enum enum_dyncol_func_result rc=
+ dynamic_column_val_long(&tmp, value);
+ if (rc < 0)
+ return true;
+ ((char *)buff)[0]= (tmp ? 1 : 0);
+ *cass_data_len= 1;
+ *(cass_data)= (char *)buff;
+ *freemem= 0;
+ return 0;
+}
+
const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType";
const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type";
@@ -864,6 +1209,126 @@ const char * const validator_varint= "or
const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
+static CASSANDRA_TYPE_DEF cassandra_types[]=
+{
+ {
+ validator_bigint,
+ &cassandra_to_dyncol_intLong,
+ &dyncol_to_cassandraLong
+ },
+ {
+ validator_int,
+ &cassandra_to_dyncol_intInt32,
+ &dyncol_to_cassandraInt32
+ },
+ {
+ validator_counter,
+ cassandra_to_dyncol_intCounter,
+ &dyncol_to_cassandraCounter
+ },
+ {
+ validator_float,
+ &cassandra_to_dyncol_doubleFloat,
+ &dyncol_to_cassandraFloat
+ },
+ {
+ validator_double,
+ &cassandra_to_dyncol_doubleDouble,
+ &dyncol_to_cassandraDouble
+ },
+ {
+ validator_blob,
+ &cassandra_to_dyncol_strBytes,
+ &dyncol_to_cassandraBytes
+ },
+ {
+ validator_ascii,
+ &cassandra_to_dyncol_strAscii,
+ &dyncol_to_cassandraAscii
+ },
+ {
+ validator_text,
+ &cassandra_to_dyncol_strUTF8,
+ &dyncol_to_cassandraUTF8
+ },
+ {
+ validator_timestamp,
+ &cassandra_to_dyncol_intLong,
+ &dyncol_to_cassandraLong
+ },
+ {
+ validator_uuid,
+ &cassandra_to_dyncol_strUUID,
+ &dyncol_to_cassandraUUID
+ },
+ {
+ validator_boolean,
+ &cassandra_to_dyncol_intBool,
+ &dyncol_to_cassandraBool
+ },
+ {
+ validator_varint,
+ &cassandra_to_dyncol_strBytes,
+ &dyncol_to_cassandraBytes
+ },
+ {
+ validator_decimal,
+ &cassandra_to_dyncol_strBytes,
+ &dyncol_to_cassandraBytes
+ }
+};
+
+CASSANDRA_TYPE get_cassandra_type(const char *validator)
+{
+ CASSANDRA_TYPE rc;
+ switch(validator[32])
+ {
+ case 'L':
+ rc= CT_BIGINT;
+ break;
+ case 'I':
+ rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
+ rc= CT_INT;
+ break;
+ case 'C':
+ rc= CT_COUNTER;
+ break;
+ case 'F':
+ rc= CT_FLOAT;
+ break;
+ case 'D':
+ switch (validator[33])
+ {
+ case 'o':
+ rc= CT_DOUBLE;
+ break;
+ case 'a':
+ rc= CT_TIMESTAMP;
+ break;
+ case 'e':
+ rc= CT_DECIMAL;
+ break;
+ default:
+ rc= CT_BLOB;
+ break;
+ }
+ break;
+ case 'B':
+ rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
+ break;
+ case 'A':
+ rc= CT_ASCII;
+ break;
+ case 'U':
+ rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
+ break;
+ default:
+ rc= CT_BLOB;
+ }
+ DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
+ return rc;
+}
+
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{
ColumnDataConverter *res= NULL;
@@ -895,16 +1360,16 @@ ColumnDataConverter *map_field_to_valida
if (!strcmp(validator_name, validator_double))
res= new DoubleDataConverter;
break;
-
+
case MYSQL_TYPE_TIMESTAMP:
if (!strcmp(validator_name, validator_timestamp))
res= new TimestampDataConverter;
break;
case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
- if (!strcmp(validator_name, validator_uuid) &&
+ if (!strcmp(validator_name, validator_uuid) &&
field->real_type() == MYSQL_TYPE_STRING &&
- field->field_length == 36)
+ field->field_length == 36)
{
// UUID maps to CHAR(36), its text representation
res= new UuidDataConverter;
@@ -958,39 +1423,117 @@ bool ha_cassandra::setup_field_converter
int col_name_len;
char *col_type;
int col_type_len;
+ size_t ddl_fields= se->get_ddl_size();
+ const char *default_type= se->get_default_validator();
+ uint max_non_default_fields;
+ DBUG_ENTER("ha_cassandra::setup_field_converters");
+ DBUG_ASSERT(default_type);
DBUG_ASSERT(!field_converters);
- size_t memsize= sizeof(ColumnDataConverter*) * n_fields;
+ DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
+
+ /*
+ We always should take into account that in case of using dynamic columns
+ sql description contain one field which does not described in
+ Cassandra DDL also key field is described separately. So that
+ is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
+ */
+ max_non_default_fields= ddl_fields + 2 - n_fields;
+ if (ddl_fields < (n_fields - dyncol_set - 1))
+ {
+ se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
+ my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+ DBUG_RETURN(true);
+ }
+
+ /* allocate memory in one chunk */
+ size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
+ (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
+ (dyncol_set ? max_non_default_fields : 0);
if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
- return true;
+ DBUG_RETURN(true);
bzero(field_converters, memsize);
n_field_converters= n_fields;
+ if (dyncol_set)
+ {
+ special_type_field_converters=
+ (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
+ special_type_field_names=
+ ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
+ }
+
+ if (dyncol_set)
+ {
+ if (init_dynamic_array(&dynamic_values,
+ sizeof(DYNAMIC_COLUMN_VALUE),
+ DYNCOL_USUAL, DYNCOL_DELTA))
+ DBUG_RETURN(true);
+ else
+ if (init_dynamic_array(&dynamic_names,
+ sizeof(LEX_STRING),
+ DYNCOL_USUAL, DYNCOL_DELTA))
+ {
+ delete_dynamic(&dynamic_values);
+ DBUG_RETURN(true);
+ }
+ else
+ if (init_dynamic_string(&dynamic_rec, NULL,
+ DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
+ {
+ delete_dynamic(&dynamic_values);
+ delete_dynamic(&dynamic_names);
+ DBUG_RETURN(true);
+ }
+
+ /* Dynamic column field has special processing */
+ field_converters[dyncol_field]= NULL;
+
+ default_type_def= cassandra_types + get_cassandra_type(default_type);
+ }
+
se->first_ddl_column();
uint n_mapped= 0;
while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
&col_type_len))
{
+ Field **field;
+ uint i;
/* Mapping for the 1st field is already known */
- for (Field **field= field_arg + 1; *field; field++)
+ for (field= field_arg + 1, i= 1; *field; field++, i++)
{
- if (!strcmp((*field)->field_name, col_name))
+ if ((!dyncol_set || dyncol_field != i) &&
+ !strcmp((*field)->field_name, col_name))
{
n_mapped++;
ColumnDataConverter **conv= field_converters + (*field)->field_index;
if (!(*conv= map_field_to_validator(*field, col_type)))
{
- se->print_error("Failed to map column %s to datatype %s",
+ se->print_error("Failed to map column %s to datatype %s",
(*field)->field_name, col_type);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
- return true;
+ DBUG_RETURN(true);
}
(*conv)->field= *field;
}
}
+ if (dyncol_set && !(*field)) // is needed and not found
+ {
+ DBUG_PRINT("info",("Field not found: %s", col_name));
+ if (strcmp(col_type, default_type))
+ {
+ DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
+ col_name, col_type));
+ special_type_field_names[n_special_type_fields].length= col_name_len;
+ special_type_field_names[n_special_type_fields].str= col_name;
+ special_type_field_converters[n_special_type_fields]=
+ cassandra_types[get_cassandra_type(col_type)];
+ n_special_type_fields++;
+ }
+ }
}
- if (n_mapped != n_fields - 1)
+ if (n_mapped != n_fields - 1 - dyncol_set)
{
Field *first_unmapped= NULL;
/* Find the first field */
@@ -1005,27 +1548,28 @@ bool ha_cassandra::setup_field_converter
DBUG_ASSERT(first_unmapped);
se->print_error("Field `%s` could not be mapped to any field in Cassandra",
- first_unmapped->field_name);
+ first_unmapped->field_name);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
- return true;
+ DBUG_RETURN(true);
}
-
- /*
+
+ /*
Setup type conversion for row_key.
*/
se->get_rowkey_type(&col_name, &col_type);
if (col_name && strcmp(col_name, (*field_arg)->field_name))
{
- se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", col_name);
+ se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
+ col_name);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
- return true;
+ DBUG_RETURN(true);
}
if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
{
se->print_error("target column family has no key_alias defined, "
"PRIMARY KEY column must be named 'rowkey'");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
- return true;
+ DBUG_RETURN(true);
}
if (col_type != NULL)
@@ -1034,7 +1578,7 @@ bool ha_cassandra::setup_field_converter
{
se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
- return true;
+ DBUG_RETURN(true);
}
rowkey_converter->field= *field_arg;
}
@@ -1042,10 +1586,10 @@ bool ha_cassandra::setup_field_converter
{
se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
- return true;
+ DBUG_RETURN(true);
}
- return false;
+ DBUG_RETURN(false);
}
@@ -1054,10 +1598,20 @@ void ha_cassandra::free_field_converters
delete rowkey_converter;
rowkey_converter= NULL;
+ if (dyncol_set)
+ {
+ delete_dynamic(&dynamic_values);
+ delete_dynamic(&dynamic_names);
+ dynstr_free(&dynamic_rec);
+ }
if (field_converters)
{
for (uint i=0; i < n_field_converters; i++)
- delete field_converters[i];
+ if (field_converters[i])
+ {
+ DBUG_ASSERT(!dyncol_set || i == dyncol_field);
+ delete field_converters[i];
+ }
my_free(field_converters);
field_converters= NULL;
}
@@ -1072,7 +1626,7 @@ int ha_cassandra::index_read_map(uchar *
{
int rc= 0;
DBUG_ENTER("ha_cassandra::index_read_map");
-
+
if (find_flag != HA_READ_KEY_EXACT)
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
@@ -1099,7 +1653,7 @@ int ha_cassandra::index_read_map(uchar *
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
rc= HA_ERR_INTERNAL_ERROR;
}
-
+
/* TODO: what if we're not reading all columns?? */
if (!found)
rc= HA_ERR_KEY_NOT_FOUND;
@@ -1132,15 +1686,42 @@ void ha_cassandra::print_conversion_erro
}
+void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num)
+{
+ for (uint i= 0; i < num; i++)
+ if (vals[i].type == DYN_COL_STRING &&
+ !vals[i].x.string.nonfreeable)
+ my_free(vals[i].x.string.value.str);
+}
+
+
+CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
+ int cass_name_len)
+{
+ CASSANDRA_TYPE_DEF *type= default_type_def;
+ for(uint i= 0; i < n_special_type_fields; i++)
+ {
+ if (cass_name_len == (int)special_type_field_names[i].length &&
+ memcmp(cass_name, special_type_field_names[i].str,
+ cass_name_len) == 0)
+ {
+ type= special_type_field_converters + i;
+ break;
+ }
+ }
+ return type;
+}
+
int ha_cassandra::read_cassandra_columns(bool unpack_pk)
{
char *cass_name;
char *cass_value;
- int cass_value_len;
+ int cass_value_len, cass_name_len;
Field **field;
int res= 0;
-
- /*
+ ulong total_name_len= 0;
+
+ /*
cassandra_to_mariadb() calls will use field->store(...) methods, which
require that the column is in the table->write_set
*/
@@ -1151,16 +1732,20 @@ int ha_cassandra::read_cassandra_columns
for (field= table->field + 1; *field; field++)
(*field)->set_null();
- while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
+ while (!se->get_next_read_column(&cass_name, &cass_name_len,
+ &cass_value, &cass_value_len))
{
// map to our column. todo: use hash or something..
- int idx=1;
+ uint idx=1;
+ bool found= 0;
for (field= table->field + 1; *field; field++)
{
idx++;
- if (!strcmp((*field)->field_name, cass_name))
+ if ((!dyncol_set || dyncol_field != idx) &&
+ !strcmp((*field)->field_name, cass_name))
{
int fieldnr= (*field)->field_index;
+ found= 1;
(*field)->set_notnull();
if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
cass_value_len))
@@ -1173,8 +1758,85 @@ int ha_cassandra::read_cassandra_columns
break;
}
}
+ if (dyncol_set && !found)
+ {
+ DYNAMIC_COLUMN_VALUE val;
+ LEX_STRING nm;
+ CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
+ cass_name_len);
+ nm.str= cass_name;
+ nm.length= cass_name_len;
+ if (nm.length > MAX_NAME_LENGTH)
+ {
+ se->print_error("Unable to convert value for field `%s`"
+ " from Cassandra's data format. Name"
+ " length exceed limit of %u: '%s'",
+ table->field[dyncol_field]->field_name,
+ (uint)MAX_NAME_LENGTH, cass_name);
+ my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+ res=1;
+ goto err;
+ }
+ total_name_len+= cass_name_len;
+ if (nm.length > MAX_TOTAL_NAME_LENGTH)
+ {
+ se->print_error("Unable to convert value for field `%s`"
+ " from Cassandra's data format. Sum of all names"
+ " length exceed limit of %lu",
+ table->field[dyncol_field]->field_name,
+ cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
+ my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
+ res=1;
+ goto err;
+ }
+
+ if ((res= (*(type->cassandra_to_dynamic))(cass_value,
+ cass_value_len, &val)) ||
+ insert_dynamic(&dynamic_names, (uchar *) &nm) ||
+ insert_dynamic(&dynamic_values, (uchar *) &val))
+ {
+ if (res)
+ {
+ print_conversion_error(cass_name, cass_value, cass_value_len);
+ }
+ free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
+ dynamic_values.elements);
+ // EOM shouldm be already reported if happened
+ res=1;
+ goto err;
+ }
+ }
}
-
+
+ dynamic_rec.length= 0;
+ if (dyncol_set)
+ {
+ if (dynamic_column_create_many_internal_fmt(&dynamic_rec,
+ dynamic_names.elements,
+ dynamic_names.buffer,
+ (DYNAMIC_COLUMN_VALUE *)
+ dynamic_values.buffer,
+ FALSE,
+ TRUE) < 0)
+ dynamic_rec.length= 0;
+
+ free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
+ dynamic_values.elements);
+ dynamic_values.elements= dynamic_names.elements= 0;
+ }
+ if (dyncol_set)
+ {
+ if (dynamic_rec.length == 0)
+ table->field[dyncol_field]->set_null();
+ else
+ {
+ table->field[dyncol_field]->set_notnull();
+ table->field[dyncol_field]->store(dynamic_rec.str,
+ dynamic_rec.length,
+ &my_charset_bin);
+ }
+ }
+
if (unpack_pk)
{
/* Unpack rowkey to primary key */
@@ -1194,6 +1856,69 @@ err:
return res;
}
+int ha_cassandra::write_dynamic_row()
+{
+ DYNAMIC_ARRAY vals, names;
+ String valcol, *strcol;
+ DYNAMIC_COLUMN col;
+ char *free_names;
+ uint i;
+ enum enum_dyncol_func_result rc;
+ DBUG_ENTER("ha_cassandra::write_dynamic_row");
+ DBUG_ASSERT(dyncol_set);
+
+ Field *field= table->field[dyncol_field];
+ DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
+ /* It is blob and it does not use buffer */
+ strcol= field->val_str(NULL, &valcol);
+ /*
+ dynamic_column_vals only read the string so we can
+ cheat here with assignment
+ */
+ bzero(&col, sizeof(col));
+ col.str= (char *)strcol->ptr();
+ col.length= strcol->length();
+ if ((rc= dynamic_column_vals(&col, &names, &vals, &free_names)) < 0)
+ {
+ dynamic_column_error_message(rc);
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+ }
+ DBUG_ASSERT(names.elements == vals.elements);
+ for (i= 0; i < names.elements; i++)
+ {
+ char buff[16], stringname[256];
+ CASSANDRA_TYPE_DEF *type;
+ void *freemem= NULL;
+ char *cass_data;
+ int cass_data_len;
+ LEX_STRING *name= dynamic_element(&names, i, LEX_STRING*);
+ DYNAMIC_COLUMN_VALUE *val= dynamic_element(&vals, i, DYNAMIC_COLUMN_VALUE*);
+
+ DBUG_PRINT("info", ("field %*s", (int)name->length, name->str));
+ type= get_cassandra_field_def(name->str, (int) name->length);
+ if ((*type->dynamic_to_cassandra)(val, &cass_data, &cass_data_len,
+ buff, &freemem))
+ {
+ my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+ name->str, insert_lineno);
+ DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+ }
+ /* prepare \0 ending name */
+ DBUG_ASSERT(name->length < sizeof(stringname) - 1);
+ memcpy(stringname, (name->str), name->length);
+ stringname[name->length]= '\0';
+ se->add_insert_column(stringname,
+ cass_data, cass_data_len);
+ if (freemem)
+ my_free(freemem);
+ }
+
+ delete_dynamic(&names);
+ delete_dynamic(&vals);
+ if (free_names)
+ my_free(free_names);
+ DBUG_RETURN(0);
+}
int ha_cassandra::write_row(uchar *buf)
{
@@ -1224,15 +1949,26 @@ int ha_cassandra::write_row(uchar *buf)
{
char *cass_data;
int cass_data_len;
- if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
+ if (dyncol_set && dyncol_field == i)
{
- my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
- field_converters[i]->field->field_name, insert_lineno);
- dbug_tmp_restore_column_map(table->read_set, old_map);
- DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+ int rc;
+ DBUG_ASSERT(field_converters[i] == NULL);
+ if ((rc= write_dynamic_row()))
+ return rc;
+ }
+ else
+ {
+ if (field_converters[i]->mariadb_to_cassandra(&cass_data,
+ &cass_data_len))
+ {
+ my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
+ field_converters[i]->field->field_name, insert_lineno);
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
+ }
+ se->add_insert_column(field_converters[i]->field->field_name,
+ cass_data, cass_data_len);
}
- se->add_insert_column(field_converters[i]->field->field_name,
- cass_data, cass_data_len);
}
dbug_tmp_restore_column_map(table->read_set, old_map);
@@ -1290,9 +2026,16 @@ int ha_cassandra::rnd_init(bool scan)
DBUG_RETURN(0);
}
- se->clear_read_columns();
- for (uint i= 1; i < table->s->fields; i++)
- se->add_read_column(table->field[i]->field_name);
+ if (dyncol_set)
+ {
+ se->clear_read_all_columns();
+ }
+ else
+ {
+ se->clear_read_columns();
+ for (uint i= 1; i < table->s->fields; i++)
+ se->add_read_column(table->field[i]->field_name);
+ }
se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
bres= se->get_range_slices(false);
=== modified file 'storage/cassandra/ha_cassandra.h'
--- a/storage/cassandra/ha_cassandra.h 2012-09-27 07:59:14 +0000
+++ b/storage/cassandra/ha_cassandra.h 2012-09-27 14:15:13 +0000
@@ -40,6 +40,33 @@ class ColumnDataConverter;
struct ha_table_option_struct;
+
+struct st_dynamic_column_value;
+
+typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
+ int cass_data_len,
+ struct st_dynamic_column_value *value);
+typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
+ char **cass_data,
+ int *cass_data_len,
+ void *buf, void **freemem);
+struct cassandra_type_def
+{
+ const char *name;
+ CAS2DYN_CONVERTER cassandra_to_dynamic;
+ DYN2CAS_CONVERTER dynamic_to_cassandra;
+};
+
+typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
+
+enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
+ CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
+ CT_DECIMAL};
+
+typedef enum cassandtra_type_enum CASSANDRA_TYPE;
+
+
+
/** @brief
Class definition for the storage engine
*/
@@ -48,23 +75,35 @@ class ha_cassandra: public handler
friend class Column_name_enumerator_impl;
THR_LOCK_DATA lock; ///< MySQL lock
CASSANDRA_SHARE *share; ///< Shared lock info
-
+
Cassandra_se_interface *se;
+ /* description of static part of the table definition */
ColumnDataConverter **field_converters;
uint n_field_converters;
+ CASSANDRA_TYPE_DEF *default_type_def;
+ /* description of dynamic columns part */
+ CASSANDRA_TYPE_DEF *special_type_field_converters;
+ LEX_STRING *special_type_field_names;
+ uint n_special_type_fields;
+ DYNAMIC_ARRAY dynamic_values, dynamic_names;
+ DYNAMIC_STRING dynamic_rec;
+
ColumnDataConverter *rowkey_converter;
bool setup_field_converters(Field **field, uint n_fields);
void free_field_converters();
-
+
int read_cassandra_columns(bool unpack_pk);
int check_table_options(struct ha_table_option_struct* options);
bool doing_insert_batch;
ha_rows insert_rows_batched;
-
+
+ uint dyncol_field;
+ bool dyncol_set;
+
/* Used to produce 'wrong column %s at row %lu' warnings */
ha_rows insert_lineno;
void print_conversion_error(const char *field_name,
@@ -189,6 +228,10 @@ public:
private:
bool source_exhausted;
bool mrr_start_read();
+ int check_field_options(Field **fields);
+ int write_dynamic_row();
+ CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
+ int cass_name_length);
public:
/*
More information about the commits
mailing list