[Commits] 2df5774d27d: MDEV-12471: BULK Command

Oleksandr Byelkin sanja at mariadb.com
Wed Jun 14 10:25:32 EEST 2017


revision-id: 2df5774d27dcdd0cf03064738a1910d3e3629f5b (mariadb-10.2.6-43-g2df5774d27d)
parent(s): e813fe862226554cfe31754b3dfeafbb2b9a7159
committer: Oleksandr Byelkin
timestamp: 2017-06-14 09:25:00 +0200
message:

MDEV-12471: BULK Command

BULK execution moved to a new command.

---
 include/mysql.h.pp                                 |   3 +-
 include/mysql_com.h                                |  13 +-
 mysql-test/r/mysqld--help.result                   |   2 +-
 .../sys_vars/r/sysvars_server_notembedded.result   |   4 +-
 sql/sql_insert.cc                                  |  11 +-
 sql/sql_parse.cc                                   |   7 +-
 sql/sql_prepare.cc                                 | 141 +++++++++++++++------
 sql/sql_prepare.h                                  |   1 +
 8 files changed, 134 insertions(+), 48 deletions(-)

diff --git a/include/mysql.h.pp b/include/mysql.h.pp
index 517516aeb30..9c5d9fa4303 100644
--- a/include/mysql.h.pp
+++ b/include/mysql.h.pp
@@ -12,7 +12,8 @@ enum enum_server_command
   COM_UNIMPLEMENTED,
   COM_RESET_CONNECTION,
   COM_MDB_GAP_BEG,
-  COM_MDB_GAP_END=250,
+  COM_MDB_GAP_END=249,
+  COM_STMT_BULK_EXECUTE=250,
   COM_SLAVE_WORKER=251,
   COM_SLAVE_IO=252,
   COM_SLAVE_SQL=253,
diff --git a/include/mysql_com.h b/include/mysql_com.h
index ace54767b06..0ca40dd0418 100644
--- a/include/mysql_com.h
+++ b/include/mysql_com.h
@@ -115,7 +115,8 @@ enum enum_server_command
   COM_RESET_CONNECTION,
   /* don't forget to update const char *command_name[] in sql_parse.cc */
   COM_MDB_GAP_BEG,
-  COM_MDB_GAP_END=250,
+  COM_MDB_GAP_END=249,
+  COM_STMT_BULK_EXECUTE=250,
   COM_SLAVE_WORKER=251,
   COM_SLAVE_IO=252,
   COM_SLAVE_SQL=253,
@@ -136,6 +137,13 @@ enum enum_indicator_type
   STMT_INDICATOR_IGNORE
 };
 
+/*
+  bulk PS flags
+*/
+#define STMT_BULK_FLAG_CLIENT_SEND_TYPES 128
+#define STMT_BULK_FLAG_INSERT_ID_REQUEST 64
+
+
 /* sql type stored in .frm files for virtual fields */
 #define MYSQL_TYPE_VIRTUAL 245
 /*
@@ -311,7 +319,8 @@ enum enum_indicator_type
                            CLIENT_SESSION_TRACK |\
                            CLIENT_DEPRECATE_EOF |\
                            CLIENT_CONNECT_ATTRS |\
-                           MARIADB_CLIENT_COM_MULTI)
+                           MARIADB_CLIENT_COM_MULTI |\
+                           MARIADB_CLIENT_STMT_BULK_OPERATIONS)
 
 /*
   To be added later:
diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result
index cfd2ede6ef9..24827da53be 100644
--- a/mysql-test/r/mysqld--help.result
+++ b/mysql-test/r/mysqld--help.result
@@ -1396,7 +1396,7 @@ performance-schema-max-rwlock-instances -1
 performance-schema-max-socket-classes 10
 performance-schema-max-socket-instances -1
 performance-schema-max-stage-classes 150
-performance-schema-max-statement-classes 187
+performance-schema-max-statement-classes 188
 performance-schema-max-table-handles -1
 performance-schema-max-table-instances -1
 performance-schema-max-thread-classes 50
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
index 11d6594ee80..8b062075db8 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
@@ -3063,9 +3063,9 @@ READ_ONLY	YES
 COMMAND_LINE_ARGUMENT	REQUIRED
 VARIABLE_NAME	PERFORMANCE_SCHEMA_MAX_STATEMENT_CLASSES
 SESSION_VALUE	NULL
-GLOBAL_VALUE	187
+GLOBAL_VALUE	188
 GLOBAL_VALUE_ORIGIN	COMPILE-TIME
-DEFAULT_VALUE	187
+DEFAULT_VALUE	188
 VARIABLE_SCOPE	GLOBAL
 VARIABLE_TYPE	BIGINT UNSIGNED
 VARIABLE_COMMENT	Maximum number of statement instruments.
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 8e5dfb4f69c..0049c1dc58b 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -697,9 +697,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
   bool using_bulk_insert= 0;
   uint value_count;
   ulong counter = 1;
-  ulong iteration= 0;
+  /* counter of iteration in bulk PS operation*/
+  ulonglong iteration= 0;
   ulonglong id;
-  ulong bulk_iterations= bulk_parameters_iterations(thd);
   COPY_INFO info;
   TABLE *table= 0;
   List_iterator_fast<List_item> its(values_list);
@@ -767,7 +767,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     DBUG_RETURN(TRUE);
   value_count= values->elements;
 
-  DBUG_ASSERT(bulk_iterations > 0);
   if (mysql_prepare_insert(thd, table_list, table, fields, values,
 			   update_fields, update_values, duplic, &unused_conds,
                            FALSE))
@@ -939,6 +938,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
   }
   do
   {
+    DBUG_PRINT("info", ("iteration %llu", iteration));
+    /* for 0 iteration parameters are already set */
     if (iteration && bulk_parameters_set(thd))
       goto abort;
 
@@ -1059,7 +1060,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     }
     its.rewind();
     iteration++;
-  } while (iteration < bulk_iterations);
+  } while (bulk_parameters_iterations(thd));
 
 values_loop_end:
   free_underlaid_joins(thd, &thd->lex->select_lex);
@@ -1206,7 +1207,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     retval= thd->lex->explain->send_explain(thd);
     goto abort;
   }
-  if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
+  if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
 				    !thd->cuted_fields))
   {
     my_ok(thd, info.copied + info.deleted +
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index e97871e3dcf..412cd1dac70 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -391,7 +391,7 @@ const LEX_STRING command_name[257]={
   { 0, 0 }, //247
   { 0, 0 }, //248
   { 0, 0 }, //249
-  { 0, 0 }, //250
+  { C_STRING_WITH_LEN("Bulk_execute") }, //250
   { C_STRING_WITH_LEN("Slave_worker") }, //251
   { C_STRING_WITH_LEN("Slave_IO") }, //252
   { C_STRING_WITH_LEN("Slave_SQL") }, //253
@@ -1749,6 +1749,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
     }
     break;
   }
+  case COM_STMT_BULK_EXECUTE:
+  {
+    mysqld_stmt_bulk_execute(thd, packet, packet_length);
+    break;
+  }
   case COM_STMT_EXECUTE:
   {
     mysqld_stmt_execute(thd, packet, packet_length);
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index cd330d8ee26..f574016221c 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -164,7 +164,6 @@ class Prepared_statement: public Statement
   Server_side_cursor *cursor;
   uchar *packet;
   uchar *packet_end;
-  ulong iterations;
   uint param_count;
   uint last_errno;
   uint flags;
@@ -183,6 +182,7 @@ class Prepared_statement: public Statement
   */
   uint select_number_after_prepare;
   char last_error[MYSQL_ERRMSG_SIZE];
+  my_bool iterations;
   my_bool start_param;
 #ifndef EMBEDDED_LIBRARY
   bool (*set_params)(Prepared_statement *st, uchar *data, uchar *data_end,
@@ -213,11 +213,10 @@ class Prepared_statement: public Statement
                     uchar *packet_arg, uchar *packet_end_arg);
   bool execute_bulk_loop(String *expanded_query,
                          bool open_cursor,
-                         uchar *packet_arg, uchar *packet_end_arg,
-                         ulong iterations);
+                         uchar *packet_arg, uchar *packet_end_arg);
   bool execute_server_runnable(Server_runnable *server_runnable);
   my_bool set_bulk_parameters(bool reset);
-  ulong bulk_iterations();
+  bool bulk_iterations() { return iterations; };
   /* Destroy this statement */
   void deallocate();
   bool execute_immediate(const char *query, uint query_length);
@@ -1024,6 +1023,7 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
 
   if (*read_pos++) //types supplied / first execute
   {
+    const uint signed_bit= 1 << 15;
     /*
       First execute or types altered by the client, setup the
       conversion routines for all parameters (one time)
@@ -1034,8 +1034,6 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
     for (; it < end; ++it)
     {
       ushort typecode;
-      const uint signed_bit= 1 << 15;
-      const uint indicators_bit= 1 << 14;
 
       if (read_pos >= data_end)
         DBUG_RETURN(1);
@@ -1044,7 +1042,7 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
       read_pos+= 2;
       (**it).unsigned_flag= MY_TEST(typecode & signed_bit);
       if (bulk_protocol)
-        (**it).indicators= MY_TEST(typecode & indicators_bit);
+        (**it).indicators= TRUE;
       setup_one_conversion_function(thd, *it,
                                     (uchar) (typecode & 0xff));
     }
@@ -3032,6 +3030,13 @@ static void reset_stmt_params(Prepared_statement *stmt)
 }
 
 
+static void mysql_stmt_execute_common(THD *thd,
+                                      ulong stmt_id,
+                                      uchar *packet,
+                                      uchar *packet_end,
+                                      ulong cursor_flags,
+                                      bool iteration);
+
 /**
   COM_STMT_EXECUTE handler: execute a previously prepared statement.
 
@@ -3054,20 +3059,92 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
   uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
   ulong stmt_id= uint4korr(packet);
   ulong flags= (ulong) packet[4];
-#ifndef EMBEDDED_LIBRARY
-  ulong iterations= uint4korr(packet + 5);
-#else
-  ulong iterations= 0; // no support
-#endif
+  uchar *packet_end= packet + packet_length;
+  DBUG_ENTER("mysqld_stmt_execute");
+
+  packet+= 9;                               /* stmt_id + 5 bytes of flags */
+
+  mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, flags, FALSE);
+  DBUG_VOID_RETURN;
+}
+
+
+/**
+  COM_STMT_BULK_EXECUTE handler: execute a previously prepared statement.
+
+    If there are any parameters, then replace parameter markers with the
+    data supplied from the client, and then execute the statement.
+    This function uses binary protocol to send a possible result set
+    to the client.
+
+  @param thd                current thread
+  @param packet_arg         parameter types and data, if any
+  @param packet_length      packet length, including the terminator character.
+
+  @return
+    none: in case of success OK packet or a result set is sent to the
+    client, otherwise an error message is set in THD.
+*/
+
+void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length)
+{
+  uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
+  ulong stmt_id= uint4korr(packet);
+  ulong flags= (ulong) uint2korr(packet + 4);
+  uchar *packet_end= packet + packet_length;
+  DBUG_ENTER("mysqld_stmt_execute_bulk");
+
+  if (!(thd->client_capabilities &
+        MARIADB_CLIENT_STMT_BULK_OPERATIONS))
+  {
+    DBUG_PRINT("error",
+               ("An attempt to execute bulk operation without support"));
+    my_error(ER_UNSUPPORTED_PS, MYF(0));
+  }
+  /* Check for implemented parameters */
+  if (flags & (~STMT_BULK_FLAG_CLIENT_SEND_TYPES))
+  {
+    DBUG_PRINT("error", ("unsupported bulk execute flags %lx", flags));
+    my_error(ER_UNSUPPORTED_PS, MYF(0));
+  }
+
+  /* stmt id and one byte of flegs (other will be used for emulation) */
+  packet+= 4 + 2 - 1;
+  /*
+    Emulate all command buffer, where there was a bit before parameter
+    which tell if there is type following
+  */
+  packet[0]= MY_TEST(flags & STMT_BULK_FLAG_CLIENT_SEND_TYPES);
+
+  mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, 0, TRUE);
+  DBUG_VOID_RETURN;
+}
+
+
+/**
+  Common part of prepared statement execution
+
+  @param thd             THD handle
+  @param stmt_id         id of the prepared statement
+  @param paket           packet with parameters to bind
+  @param packet_end      pointer to the byte after parameters end
+  @param cursor_flags    cursor flags
+  @param bulk_op
+*/
+
+static void mysql_stmt_execute_common(THD *thd,
+                                      ulong stmt_id,
+                                      uchar *packet,
+                                      uchar *packet_end,
+                                      ulong cursor_flags,
+                                      bool bulk_op)
+{
   /* Query text for binary, general or slow log, if any of them is open */
   String expanded_query;
-  uchar *packet_end= packet + packet_length;
   Prepared_statement *stmt;
   Protocol *save_protocol= thd->protocol;
   bool open_cursor;
-  DBUG_ENTER("mysqld_stmt_execute");
-
-  packet+= 9;                               /* stmt_id + 5 bytes of flags */
+  DBUG_ENTER("mysqld_stmt_execute_common");
 
   /* First of all clear possible warnings from the previous command */
   thd->reset_for_next_command();
@@ -3084,16 +3161,15 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
   thd->profiling.set_query_source(stmt->query(), stmt->query_length());
 #endif
   DBUG_PRINT("exec_query", ("%s", stmt->query()));
-  DBUG_PRINT("info",("stmt: %p iterations: %lu", stmt, iterations));
+  DBUG_PRINT("info",("stmt: %p bulk_op %d", stmt, bulk_op));
 
-  open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY);
+  open_cursor= MY_TEST(cursor_flags & (ulong) CURSOR_TYPE_READ_ONLY);
 
   thd->protocol= &thd->protocol_binary;
-  if (iterations <= 1)
+  if (!bulk_op)
     stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end);
   else
-    stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end,
-                            iterations);
+    stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end);
   thd->protocol= save_protocol;
 
   sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
@@ -3600,10 +3676,10 @@ Prepared_statement::Prepared_statement(THD *thd_arg)
   cursor(0),
   packet(0),
   packet_end(0),
-  iterations(0),
   param_count(0),
   last_errno(0),
   flags((uint) IS_IN_USE),
+  iterations(0),
   start_param(0),
   m_sql_mode(thd->variables.sql_mode)
 {
@@ -4023,7 +4099,7 @@ Prepared_statement::execute_loop(String *expanded_query,
   Reprepare_observer reprepare_observer;
   bool error;
   int reprepare_attempt= 0;
-  iterations= 0;
+  iterations= FALSE;
 
   /*
     - In mysql_sql_stmt_execute() we hide all "external" Items
@@ -4130,7 +4206,7 @@ ulong bulk_parameters_iterations(THD *thd)
 {
   Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
   if (!stmt)
-    return 1;
+    return FALSE;
   return stmt->bulk_iterations();
 }
 
@@ -4138,7 +4214,7 @@ ulong bulk_parameters_iterations(THD *thd)
 my_bool Prepared_statement::set_bulk_parameters(bool reset)
 {
   DBUG_ENTER("Prepared_statement::set_bulk_parameters");
-  DBUG_PRINT("info", ("iteration: %lu", iterations));
+  DBUG_PRINT("info", ("iteration: %d", iterations));
   if (iterations)
   {
 #ifndef EMBEDDED_LIBRARY
@@ -4152,31 +4228,24 @@ my_bool Prepared_statement::set_bulk_parameters(bool reset)
       reset_stmt_params(this);
       DBUG_RETURN(true);
     }
-    iterations--;
+    if (packet >= packet_end)
+      iterations= FALSE;
   }
   start_param= 0;
   DBUG_RETURN(false);
 }
 
-ulong Prepared_statement::bulk_iterations()
-{
-  if (iterations)
-    return iterations;
-  return start_param ? 1 : 0;
-}
-
 bool
 Prepared_statement::execute_bulk_loop(String *expanded_query,
                                       bool open_cursor,
                                       uchar *packet_arg,
-                                      uchar *packet_end_arg,
-                                      ulong iterations_arg)
+                                      uchar *packet_end_arg)
 {
   Reprepare_observer reprepare_observer;
   bool error= 0;
   packet= packet_arg;
   packet_end= packet_end_arg;
-  iterations= iterations_arg;
+  iterations= TRUE;
   start_param= true;
 #ifndef DBUG_OFF
   Item *free_list_state= thd->free_list;
diff --git a/sql/sql_prepare.h b/sql/sql_prepare.h
index 820cb43e6d5..dcd094031c4 100644
--- a/sql/sql_prepare.h
+++ b/sql/sql_prepare.h
@@ -72,6 +72,7 @@ class Reprepare_observer
 
 void mysqld_stmt_prepare(THD *thd, const char *packet, uint packet_length);
 void mysqld_stmt_execute(THD *thd, char *packet, uint packet_length);
+void mysqld_stmt_execute_bulk(THD *thd, char *packet, uint packet_length);
 void mysqld_stmt_bulk_execute(THD *thd, char *packet, uint packet_length);
 void mysqld_stmt_close(THD *thd, char *packet);
 void mysql_sql_stmt_prepare(THD *thd);


More information about the commits mailing list