[Commits] ee1080e: MDEV-9114: Bulk operations (Array binding)

Oleksandr Byelkin sanja at mariadb.com
Sat Oct 8 00:52:53 EEST 2016


revision-id: ee1080e91dc87900e40523d5d794351a9ff7e8fe (mariadb-10.2.2-13-gee1080e)
parent(s): a0a4079b78a6afc5cf8332fed40efef6eb5711d4
committer: Oleksandr Byelkin
timestamp: 2016-10-07 23:52:30 +0200
message:

MDEV-9114: Bulk operations (Array binding)

(+ default values)

---
 include/mysql.h.pp        |  19 ++-
 include/mysql_com.h       |  30 ++++-
 sql/item.cc               |   2 +-
 sql/item.h                |   6 +
 sql/protocol.cc           |   1 +
 sql/sql_class.cc          |   1 +
 sql/sql_class.h           |  17 +++
 sql/sql_error.cc          |  22 +++-
 sql/sql_error.h           |  24 +++-
 sql/sql_insert.cc         | 175 +++++++++++++------------
 sql/sql_parse.cc          |  17 ++-
 sql/sql_prepare.cc        | 318 +++++++++++++++++++++++++++++++++++++++++++++-
 sql/sql_prepare.h         |   3 +
 sql/wsrep_thd.cc          |   3 +-
 storage/perfschema/pfs.cc |   2 +
 15 files changed, 525 insertions(+), 115 deletions(-)

diff --git a/include/mysql.h.pp b/include/mysql.h.pp
index 857f5b9..1a04116 100644
--- a/include/mysql.h.pp
+++ b/include/mysql.h.pp
@@ -10,12 +10,19 @@ enum enum_server_command
   COM_STMT_PREPARE, COM_STMT_EXECUTE, COM_STMT_SEND_LONG_DATA, COM_STMT_CLOSE,
   COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
   COM_MDB_GAP_BEG,
-  COM_MDB_GAP_END=250,
-  COM_SLAVE_WORKER,
-  COM_SLAVE_IO,
-  COM_SLAVE_SQL,
-  COM_MULTI,
-  COM_END
+  COM_MDB_GAP_END=249,
+  COM_STMT_BULK_EXECUTE=250,
+  COM_SLAVE_WORKER=251,
+  COM_SLAVE_IO=252,
+  COM_SLAVE_SQL=253,
+  COM_MULTI=254,
+  COM_END=255
+};
+enum enum_indicator_type
+{
+  STMT_INDICATOR_NONE= 0,
+  STMT_INDICATOR_NULL,
+  STMT_INDICATOR_DEFAULT
 };
 struct st_vio;
 typedef struct st_vio Vio;
diff --git a/include/mysql_com.h b/include/mysql_com.h
index 461800f..491ea95 100644
--- a/include/mysql_com.h
+++ b/include/mysql_com.h
@@ -113,13 +113,26 @@ enum enum_server_command
   COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
   /* don't forget to update const char *command_name[] in sql_parse.cc */
   COM_MDB_GAP_BEG,
-  COM_MDB_GAP_END=250,
-  COM_SLAVE_WORKER,
-  COM_SLAVE_IO,
-  COM_SLAVE_SQL,
-  COM_MULTI,
+  COM_MDB_GAP_END=249,
+  COM_STMT_BULK_EXECUTE=250,
+  COM_SLAVE_WORKER=251,
+  COM_SLAVE_IO=252,
+  COM_SLAVE_SQL=253,
+  COM_MULTI=254,
   /* Must be last */
-  COM_END
+  COM_END=255
+};
+
+
+/*
+  Bulk PS protocol indicator value:
+  TODO: discuss maybe it should be a flag
+*/
+enum enum_indicator_type
+{
+  STMT_INDICATOR_NONE= 0,
+  STMT_INDICATOR_NULL,
+  STMT_INDICATOR_DEFAULT
 };
 
 /* sql type stored in .frm files for virtual fields */
@@ -256,6 +269,8 @@ enum enum_server_command
 #define MARIADB_CLIENT_PROGRESS (1ULL << 32)
 /* support COM_MULTI */
 #define MARIADB_CLIENT_COM_MULTI (1ULL << 33)
+/* support of array binding */
+#define MARIADB_CLIENT_STMT_BULK_OPERATIONS (1UL << 34)
 
 #ifdef HAVE_COMPRESS
 #define CAN_CLIENT_COMPRESS CLIENT_COMPRESS
@@ -295,7 +310,8 @@ enum enum_server_command
                            CLIENT_SESSION_TRACK |\
                            CLIENT_DEPRECATE_EOF |\
                            CLIENT_CONNECT_ATTRS |\
-                           MARIADB_CLIENT_COM_MULTI)
+                           MARIADB_CLIENT_COM_MULTI |\
+                           MARIADB_CLIENT_STMT_BULK_OPERATIONS)
 
 /*
   To be added later:
diff --git a/sql/item.cc b/sql/item.cc
index 61635ea..8399012 100644
--- a/sql/item.cc
+++ b/sql/item.cc
@@ -3227,7 +3227,7 @@ Item_param::Item_param(THD *thd, uint pos_in_query_arg):
   Item_basic_value(thd),
   Rewritable_query_parameter(pos_in_query_arg, 1),
   Type_handler_hybrid_field_type(MYSQL_TYPE_VARCHAR),
-  state(NO_VALUE),
+  state(NO_VALUE), indicators(0),
   /* Don't pretend to be a literal unless value for this item is set. */
   item_type(PARAM_ITEM),
   set_param_func(default_set_param_func),
diff --git a/sql/item.h b/sql/item.h
index 5b82548..7e9cfbb 100644
--- a/sql/item.h
+++ b/sql/item.h
@@ -2728,6 +2728,12 @@ class Item_param :public Item_basic_value,
   } state;
 
   /*
+    Used for bulk protocol. Indicates if we should expect
+    indicators byte before value of the parameter
+  */
+  my_bool indicators;
+
+  /*
     A buffer for string and long data values. Historically all allocated
     values returned from val_str() were treated as eligible to
     modification. I. e. in some cases Item_func_concat can append it's
diff --git a/sql/protocol.cc b/sql/protocol.cc
index be73c94..64bdce5 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -572,6 +572,7 @@ void Protocol::end_statement()
                     thd->get_stmt_da()->statement_warn_count());
     break;
   case Diagnostics_area::DA_OK:
+  case Diagnostics_area::DA_OK_BULK:
     error= send_ok(thd->server_status,
                    thd->get_stmt_da()->statement_warn_count(),
                    thd->get_stmt_da()->affected_rows(),
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 6433786..887b153 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -854,6 +854,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
    in_sub_stmt(0), log_all_errors(0),
    binlog_unsafe_warning_flags(0),
    binlog_table_maps(0),
+   bulk_param(0),
    table_map_for_update(0),
    m_examined_row_count(0),
    accessed_rows_and_keys(0),
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 51642ec..b444b36 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2465,6 +2465,8 @@ class THD :public Statement,
   */
   Query_arena *stmt_arena;
 
+  void *bulk_param;
+
   /*
     map for tables that will be updated for a multi-table update query
     statement, for other query statements, this will be zero.
@@ -3440,6 +3442,12 @@ class THD :public Statement,
     To raise this flag, use my_error().
   */
   inline bool is_error() const { return m_stmt_da->is_error(); }
+  void set_bulk_execution(void *bulk)
+  {
+    bulk_param= bulk;
+    m_stmt_da->set_bulk_execution(MY_TEST(bulk));
+  }
+  bool is_bulk_op() const { return m_stmt_da->is_bulk_op(); }
 
   /// Returns Diagnostics-area for the current statement.
   Diagnostics_area *get_stmt_da()
@@ -5512,6 +5520,15 @@ class select_dumpvar :public select_result_interceptor {
 */
 #define CF_UPDATES_DATA (1U << 18)
 
+/**
+  SP Bulk execution safe
+*/
+#define CF_SP_BULK_SAFE (1U << 19)
+/**
+  SP Bulk execution optimized
+*/
+#define CF_SP_BULK_OPTIMIZED (1U << 20)
+
 /* Bits in server_command_flags */
 
 /**
diff --git a/sql/sql_error.cc b/sql/sql_error.cc
index 1d234c5..34d608f 100644
--- a/sql/sql_error.cc
+++ b/sql/sql_error.cc
@@ -320,7 +320,7 @@ Sql_condition::set_sqlstate(const char* sqlstate)
 }
 
 Diagnostics_area::Diagnostics_area(bool initialize)
-  : m_main_wi(0, false, initialize)
+  : is_bulk_execution(0), m_main_wi(0, false, initialize)
 {
   push_warning_info(&m_main_wi);
 
@@ -330,7 +330,8 @@ Diagnostics_area::Diagnostics_area(bool initialize)
 Diagnostics_area::Diagnostics_area(ulonglong warning_info_id,
                                    bool allow_unlimited_warnings,
                                    bool initialize)
-  : m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
+  : is_bulk_execution(0),
+  m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
 {
   push_warning_info(&m_main_wi);
 
@@ -376,7 +377,7 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
                                 const char *message)
 {
   DBUG_ENTER("set_ok_status");
-  DBUG_ASSERT(! is_set());
+  DBUG_ASSERT(!is_set() || (m_status == DA_OK_BULK && is_bulk_op()));
   /*
     In production, refuse to overwrite an error or a custom response
     with an OK packet.
@@ -384,14 +385,23 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
   if (is_error() || is_disabled())
     return;
 
-  m_statement_warn_count= current_statement_warn_count();
-  m_affected_rows= affected_rows;
+  if (m_status == DA_OK_BULK)
+  {
+    DBUG_ASSERT(is_bulk_op());
+    m_statement_warn_count+= current_statement_warn_count();
+    m_affected_rows+= affected_rows;
+  }
+  else
+  {
+    m_statement_warn_count= current_statement_warn_count();
+    m_affected_rows= affected_rows;
+    m_status= (is_bulk_op() ? DA_OK_BULK : DA_OK);
+  }
   m_last_insert_id= last_insert_id;
   if (message)
     strmake_buf(m_message, message);
   else
     m_message[0]= '\0';
-  m_status= DA_OK;
   DBUG_VOID_RETURN;
 }
 
diff --git a/sql/sql_error.h b/sql/sql_error.h
index 8fb1aba..70d40e3 100644
--- a/sql/sql_error.h
+++ b/sql/sql_error.h
@@ -658,6 +658,8 @@ class Diagnostics_area
     DA_OK,
     /** Set whenever one calls my_eof(). */
     DA_EOF,
+    /** Set whenever one calls my_ok() in PS bulk mode. */
+    DA_OK_BULK,
     /** Set whenever one calls my_error() or my_message(). */
     DA_ERROR,
     /** Set in case of a custom response, such as one from COM_STMT_PREPARE. */
@@ -699,10 +701,15 @@ class Diagnostics_area
 
   bool is_disabled() const { return m_status == DA_DISABLED; }
 
+  void set_bulk_execution(bool bulk) { is_bulk_execution= bulk; }
+
+  bool is_bulk_op() const { return is_bulk_execution; }
+
   enum_diagnostics_status status() const { return m_status; }
 
   const char *message() const
-  { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK); return m_message; }
+  { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
+                m_status == DA_OK_BULK); return m_message; }
 
   bool skip_flush() const
   { DBUG_ASSERT(m_status == DA_OK); return m_skip_flush; }
@@ -717,14 +724,21 @@ class Diagnostics_area
   { DBUG_ASSERT(m_status == DA_ERROR); return m_sqlstate; }
 
   ulonglong affected_rows() const
-  { DBUG_ASSERT(m_status == DA_OK); return m_affected_rows; }
+  {
+    DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+    return m_affected_rows;
+  }
 
   ulonglong last_insert_id() const
-  { DBUG_ASSERT(m_status == DA_OK); return m_last_insert_id; }
+  {
+    DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+    return m_last_insert_id;
+  }
 
   uint statement_warn_count() const
   {
-    DBUG_ASSERT(m_status == DA_OK || m_status == DA_EOF);
+    DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK ||
+                m_status == DA_EOF);
     return m_statement_warn_count;
   }
 
@@ -907,6 +921,8 @@ class Diagnostics_area
 
   enum_diagnostics_status m_status;
 
+  my_bool is_bulk_execution;
+
   Warning_info m_main_wi;
 
   Warning_info_list m_wi_stack;
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 3025c6e..7dcade9 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -77,6 +77,7 @@
 #include "transaction.h"
 #include "sql_audit.h"
 #include "sql_derived.h"                        // mysql_handle_derived
+#include "sql_prepare.h"
 
 #include "debug_sync.h"
 
@@ -662,6 +663,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
   uint value_count;
   ulong counter = 1;
   ulonglong id;
+  ulong bulk_iterations= bulk_parameters_iterations(thd);
   COPY_INFO info;
   TABLE *table= 0;
   List_iterator_fast<List_item> its(values_list);
@@ -725,6 +727,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
   THD_STAGE_INFO(thd, stage_init);
   thd->lex->used_tables=0;
   values= its++;
+  DBUG_ASSERT(bulk_iterations > 0);
+  if (bulk_parameters_set(thd))
+    DBUG_RETURN(TRUE);
   value_count= values->elements;
 
   if (mysql_prepare_insert(thd, table_list, table, fields, values,
@@ -885,105 +890,113 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
       goto values_loop_end;
     }
   }
-
-  while ((values= its++))
+  for (ulong iteration= 0; iteration < bulk_iterations; iteration++)
   {
-    if (fields.elements || !value_count)
+
+    if (iteration && bulk_parameters_set(thd))
+      goto abort;
+
+    while ((values= its++))
     {
-      /*
-        There are possibly some default values:
-        INSERT INTO t1 (fields) VALUES ...
-        INSERT INTO t1 VALUES ()
-      */
-      restore_record(table,s->default_values);	// Get empty record
-      table->reset_default_fields();
-      if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
-                                               TRG_EVENT_INSERT))
+      if (fields.elements || !value_count)
       {
-	if (values_list.elements != 1 && ! thd->is_error())
-	{
-	  info.records++;
-	  continue;
-	}
-	/*
-	  TODO: set thd->abort_on_warning if values_list.elements == 1
-	  and check that all items return warning in case of problem with
-	  storing field.
+        /*
+          There are possibly some default values:
+          INSERT INTO t1 (fields) VALUES ...
+          INSERT INTO t1 VALUES ()
         */
-	error=1;
-	break;
+        restore_record(table,s->default_values);	// Get empty record
+        table->reset_default_fields();
+        if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
+                                                 TRG_EVENT_INSERT))
+        {
+          if (values_list.elements != 1 && ! thd->is_error())
+          {
+            info.records++;
+            continue;
+          }
+          /*
+            TODO: set thd->abort_on_warning if values_list.elements == 1
+	    and check that all items return warning in case of problem with
+	    storing field.
+          */
+	  error=1;
+	  break;
+        }
       }
-    }
-    else
-    {
-      /*
-        No field list, all fields are set explicitly:
-        INSERT INTO t1 VALUES (values)
-      */
-      if (thd->lex->used_tables)		      // Column used in values()
-	restore_record(table,s->default_values);	// Get empty record
       else
       {
-        TABLE_SHARE *share= table->s;
-
         /*
-          Fix delete marker. No need to restore rest of record since it will
-          be overwritten by fill_record() anyway (and fill_record() does not
-          use default values in this case).
+          No field list, all fields are set explicitly:
+          INSERT INTO t1 VALUES (values)
         */
-#ifdef HAVE_valgrind
-        if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
-          restore_record(table,s->default_values);	// Get empty record
+        if (thd->lex->used_tables)		      // Column used in values()
+	  restore_record(table,s->default_values);	// Get empty record
         else
+        {
+          TABLE_SHARE *share= table->s;
+
+          /*
+            Fix delete marker. No need to restore rest of record since it will
+            be overwritten by fill_record() anyway (and fill_record() does not
+            use default values in this case).
+          */
+#ifdef HAVE_valgrind
+          if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
+            restore_record(table,s->default_values);	// Get empty record
+          else
 #endif
-          table->record[0][0]= share->default_values[0];
+            table->record[0][0]= share->default_values[0];
 
-        /* Fix undefined null_bits. */
-        if (share->null_bytes > 1 && share->last_null_bit_pos)
+          /* Fix undefined null_bits. */
+          if (share->null_bytes > 1 && share->last_null_bit_pos)
+          {
+            table->record[0][share->null_bytes - 1]= 
+              share->default_values[share->null_bytes - 1];
+          }
+        }
+        if (fill_record_n_invoke_before_triggers(thd, table,
+                                                 table->field_to_fill(),
+                                                 *values, 0, TRG_EVENT_INSERT))
         {
-          table->record[0][share->null_bytes - 1]= 
-            share->default_values[share->null_bytes - 1];
+          if (values_list.elements != 1 && ! thd->is_error())
+	  {
+	    info.records++;
+	    continue;
+	  }
+	  error=1;
+	  break;
         }
       }
-      if (fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
-                                               *values, 0, TRG_EVENT_INSERT))
+
+      if ((res= table_list->view_check_option(thd,
+                                              (values_list.elements == 1 ?
+                                               0 :
+                                               ignore))) ==
+          VIEW_CHECK_SKIP)
+        continue;
+      else if (res == VIEW_CHECK_ERROR)
       {
-	if (values_list.elements != 1 && ! thd->is_error())
-	{
-	  info.records++;
-	  continue;
-	}
-	error=1;
-	break;
+        error= 1;
+        break;
       }
-    }
-
-    if ((res= table_list->view_check_option(thd,
-					    (values_list.elements == 1 ?
-					     0 :
-					     ignore))) ==
-        VIEW_CHECK_SKIP)
-      continue;
-    else if (res == VIEW_CHECK_ERROR)
-    {
-      error= 1;
-      break;
-    }
 #ifndef EMBEDDED_LIBRARY
-    if (lock_type == TL_WRITE_DELAYED)
-    {
-      LEX_STRING const st_query = { query, thd->query_length() };
-      DEBUG_SYNC(thd, "before_write_delayed");
-      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
-      DEBUG_SYNC(thd, "after_write_delayed");
-      query=0;
-    }
-    else
+      if (lock_type == TL_WRITE_DELAYED)
+      {
+        LEX_STRING const st_query = { query, thd->query_length() };
+        DEBUG_SYNC(thd, "before_write_delayed");
+        error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
+        DEBUG_SYNC(thd, "after_write_delayed");
+        query=0;
+      }
+      else
 #endif
-      error=write_record(thd, table ,&info);
-    if (error)
-      break;
-    thd->get_stmt_da()->inc_current_row_for_warning();
+        error=write_record(thd, table ,&info);
+      if (error)
+        break;
+      thd->get_stmt_da()->inc_current_row_for_warning();
+    }
+    its.rewind();
   }
 
 values_loop_end:
@@ -1131,7 +1144,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     retval= thd->lex->explain->send_explain(thd);
     goto abort;
   }
-  if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
+  if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
 				    !thd->cuted_fields))
   {
     my_ok(thd, info.copied + info.deleted +
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index ac00b21..f0a0f51 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -391,7 +391,7 @@ const LEX_STRING command_name[257]={
   { 0, 0 }, //247
   { 0, 0 }, //248
   { 0, 0 }, //249
-  { 0, 0 }, //250
+  { C_STRING_WITH_LEN("Execute_bulk") }, //250
   { C_STRING_WITH_LEN("Slave_worker") }, //251
   { C_STRING_WITH_LEN("Slave_IO") }, //252
   { C_STRING_WITH_LEN("Slave_SQL") }, //253
@@ -570,17 +570,19 @@ void init_update_queries(void)
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_UPDATES_DATA;
+                                            CF_UPDATES_DATA | CF_SP_BULK_SAFE;
   sql_command_flags[SQLCOM_UPDATE_MULTI]=   CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_UPDATES_DATA;
+                                            CF_UPDATES_DATA | CF_SP_BULK_SAFE;
   sql_command_flags[SQLCOM_INSERT]=	    CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_INSERTS_DATA;
+                                            CF_INSERTS_DATA |
+                                            CF_SP_BULK_SAFE |
+                                            CF_SP_BULK_OPTIMIZED;
   sql_command_flags[SQLCOM_INSERT_SELECT]=  CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
@@ -598,7 +600,7 @@ void init_update_queries(void)
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_INSERTS_DATA;;
+                                            CF_INSERTS_DATA | CF_SP_BULK_SAFE;
   sql_command_flags[SQLCOM_REPLACE_SELECT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
@@ -1747,6 +1749,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
     mysqld_stmt_execute(thd, packet, packet_length);
     break;
   }
+  case COM_STMT_BULK_EXECUTE:
+  {
+    mysqld_stmt_bulk_execute(thd, packet, packet_length);
+    break;
+  }
   case COM_STMT_FETCH:
   {
     mysqld_stmt_fetch(thd, packet, packet_length);
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index df15a62..d7ee99e 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -162,6 +162,9 @@ class Prepared_statement: public Statement
   Select_fetch_protocol_binary result;
   Item_param **param_array;
   Server_side_cursor *cursor;
+  uchar *packet;
+  uchar *packet_end;
+  ulong iterations;
   uint param_count;
   uint last_errno;
   uint flags;
@@ -180,11 +183,15 @@ class Prepared_statement: public Statement
   */
   uint select_number_after_prepare;
   char last_error[MYSQL_ERRMSG_SIZE];
+  my_bool start_param;
 #ifndef EMBEDDED_LIBRARY
   bool (*set_params)(Prepared_statement *st, uchar *data, uchar *data_end,
                      uchar *read_pos, String *expanded_query);
+  bool (*set_bulk_params)(Prepared_statement *st,
+                          uchar **read_pos, uchar *data_end);
 #else
   bool (*set_params_data)(Prepared_statement *st, String *expanded_query);
+  /*TODO: add bulk support for builtin server */
 #endif
   bool (*set_params_from_vars)(Prepared_statement *stmt,
                                List<LEX_STRING>& varnames,
@@ -204,7 +211,13 @@ class Prepared_statement: public Statement
   bool execute_loop(String *expanded_query,
                     bool open_cursor,
                     uchar *packet_arg, uchar *packet_end_arg);
+  bool execute_bulk_loop(String *expanded_query,
+                         bool open_cursor,
+                         uchar *packet_arg, uchar *packet_end_arg,
+                         ulong iterations);
   bool execute_server_runnable(Server_runnable *server_runnable);
+  my_bool set_bulk_parameters();
+  ulong bulk_iterations();
   /* Destroy this statement */
   void deallocate();
 private:
@@ -960,11 +973,63 @@ static bool insert_params(Prepared_statement *stmt, uchar *null_array,
 }
 
 
+static bool insert_bulk_params(Prepared_statement *stmt,
+                               uchar **read_pos, uchar *data_end)
+{
+  Item_param **begin= stmt->param_array;
+  Item_param **end= begin + stmt->param_count;
+
+  DBUG_ENTER("insert_params");
+
+  for (Item_param **it= begin; it < end; ++it)
+  {
+    Item_param *param= *it;
+    if (param->state != Item_param::LONG_DATA_VALUE)
+    {
+      uint indicators;
+      if (param->indicators)
+        indicators= *((*read_pos)++);
+      else
+        indicators= STMT_INDICATOR_NONE;
+      if ((*read_pos) >= data_end)
+        DBUG_RETURN(1);
+      switch (indicators) {
+      case STMT_INDICATOR_NONE:
+        if ((*read_pos) >= data_end)
+          DBUG_RETURN(1);
+        param->set_param_func(param, read_pos, (uint) (data_end - (*read_pos)));
+        if (param->state == Item_param::NO_VALUE)
+          DBUG_RETURN(1);
+        break;
+      case STMT_INDICATOR_NULL:
+        param->set_null();
+        break;
+      case STMT_INDICATOR_DEFAULT:
+        // TODO: support default
+        DBUG_ASSERT(0);
+        break;
+      }
+    }
+    /*
+      A long data stream was supplied for this parameter marker.
+      This was done after prepare, prior to providing a placeholder
+      type (the types are supplied at execute). Check that the
+      supplied type of placeholder can accept a data stream.
+    */
+    else
+      DBUG_RETURN(1); // long is not supported here
+  }
+  DBUG_RETURN(0);
+}
+
 static bool setup_conversion_functions(Prepared_statement *stmt,
-                                       uchar **data, uchar *data_end)
+                                       uchar **data, uchar *data_end,
+                                       bool bulk_protocol= 0)
 {
   /* skip null bits */
-  uchar *read_pos= *data + (stmt->param_count+7) / 8;
+  uchar *read_pos= *data;
+  if (!bulk_protocol)
+    read_pos+= (stmt->param_count+7) / 8;
 
   DBUG_ENTER("setup_conversion_functions");
 
@@ -981,6 +1046,7 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
     {
       ushort typecode;
       const uint signed_bit= 1 << 15;
+      const uint indicators_bit= 1 << 14;
 
       if (read_pos >= data_end)
         DBUG_RETURN(1);
@@ -988,7 +1054,10 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
       typecode= sint2korr(read_pos);
       read_pos+= 2;
       (**it).unsigned_flag= MY_TEST(typecode & signed_bit);
-      setup_one_conversion_function(thd, *it, (uchar) (typecode & ~signed_bit));
+      if (bulk_protocol)
+        (**it).indicators= MY_TEST(typecode & indicators_bit);
+      setup_one_conversion_function(thd, *it,
+                                    (uchar) (typecode & 0xff));
     }
   }
   *data= read_pos;
@@ -997,6 +1066,8 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
 
 #else
 
+//TODO: support bulk parameters
+
 /**
   Embedded counterparts of parameter assignment routines.
 
@@ -2999,6 +3070,54 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
   DBUG_VOID_RETURN;
 }
 
+void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length)
+{
+  uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
+  ulong stmt_id= uint4korr(packet);
+  ulong flags= (ulong) packet[4];
+  ulong iterations= uint4korr(packet + 5);
+  /* Query text for binary, general or slow log, if any of them is open */
+  String expanded_query;
+  uchar *packet_end= packet + packet_length;
+  Prepared_statement *stmt;
+  Protocol *save_protocol= thd->protocol;
+  bool open_cursor;
+  DBUG_ENTER("mysqld_stmt_bulk_execute");
+
+  packet+= 9;                 /* stmt_id + flags + iterations */
+
+  /* First of all clear possible warnings from the previous command */
+  thd->reset_for_next_command();
+
+  if (!(stmt= find_prepared_statement(thd, stmt_id)))
+  {
+    char llbuf[22];
+    my_error(ER_UNKNOWN_STMT_HANDLER, MYF(0), static_cast<int>(sizeof(llbuf)),
+             llstr(stmt_id, llbuf), "mysqld_stmt_execute");
+    DBUG_VOID_RETURN;
+  }
+
+#if defined(ENABLED_PROFILING)
+  thd->profiling.set_query_source(stmt->query(), stmt->query_length());
+#endif
+  DBUG_PRINT("exec_query", ("%s", stmt->query()));
+  DBUG_PRINT("info",("stmt: 0x%lx", (long) stmt));
+
+  open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY);
+
+  thd->protocol= &thd->protocol_binary;
+  stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end,
+                          iterations);
+  thd->protocol= save_protocol;
+
+  sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
+  sp_cache_enforce_limit(thd->sp_func_cache, stored_program_cache_size);
+
+  /* Close connection socket; for use with client testing (Bug#43560). */
+  DBUG_EXECUTE_IF("close_conn_after_stmt_execute", vio_close(thd->net.vio););
+
+  DBUG_VOID_RETURN;
+}
 
 /**
   SQLCOM_EXECUTE implementation.
@@ -3455,6 +3574,9 @@ Prepared_statement::Prepared_statement(THD *thd_arg)
   result(thd_arg),
   param_array(0),
   cursor(0),
+  packet(0),
+  packet_end(0),
+  iterations(0),
   param_count(0),
   last_errno(0),
   flags((uint) IS_IN_USE)
@@ -3493,7 +3615,9 @@ void Prepared_statement::setup_set_params()
     set_params_from_vars= insert_params_from_vars_with_log;
 #ifndef EMBEDDED_LIBRARY
     set_params= insert_params_with_log;
+    set_bulk_params= insert_bulk_params; // TODO: add binlog support
 #else
+    //TODO: add bulk support for bulk parameters
     set_params_data= emb_insert_params_with_log;
 #endif
   }
@@ -3502,7 +3626,9 @@ void Prepared_statement::setup_set_params()
     set_params_from_vars= insert_params_from_vars;
 #ifndef EMBEDDED_LIBRARY
     set_params= insert_params;
+    set_bulk_params= insert_bulk_params;
 #else
+    //TODO: add bulk support for bulk parameters
     set_params_data= emb_insert_params;
 #endif
   }
@@ -3859,6 +3985,7 @@ Prepared_statement::set_parameters(String *expanded_query,
   @retval  FALSE   successfully executed the statement, perhaps
                    after having reprepared it a few times.
 */
+const static int MAX_REPREPARE_ATTEMPTS= 3;
 
 bool
 Prepared_statement::execute_loop(String *expanded_query,
@@ -3866,7 +3993,6 @@ Prepared_statement::execute_loop(String *expanded_query,
                                  uchar *packet,
                                  uchar *packet_end)
 {
-  const int MAX_REPREPARE_ATTEMPTS= 3;
   Reprepare_observer reprepare_observer;
   bool error;
   int reprepare_attempt= 0;
@@ -3961,6 +4087,190 @@ Prepared_statement::execute_loop(String *expanded_query,
   return error;
 }
 
+my_bool bulk_parameters_set(THD *thd)
+{
+  Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
+
+  if (stmt && stmt->set_bulk_parameters())
+    return FALSE;
+  return FALSE;
+}
+
+ulong bulk_parameters_iterations(THD *thd)
+{
+  Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
+  if (!stmt)
+    return 1;
+  return stmt->bulk_iterations();
+}
+
+
+my_bool Prepared_statement::set_bulk_parameters()
+{
+  if (iterations)
+  {
+#ifndef EMBEDDED_LIBRARY
+    if ((*set_bulk_params)(this, &packet, packet_end))
+#else
+      DBUG_ASSERT(0); //TODO: support bulk parameters for embedded server
+#endif
+    {
+      my_error(ER_WRONG_ARGUMENTS, MYF(0),
+               "mysqld_stmt_bulk_execute");
+      reset_stmt_params(this);
+      return true;
+    }
+    iterations--;
+  }
+  start_param= 0;
+  return false;
+}
+
+ulong Prepared_statement::bulk_iterations()
+{
+  if (iterations)
+    return iterations;
+  return start_param ? 1 : 0;
+}
+
+bool
+Prepared_statement::execute_bulk_loop(String *expanded_query,
+                                      bool open_cursor,
+                                      uchar *packet_arg,
+                                      uchar *packet_end_arg,
+                                      ulong iterations_arg)
+{
+  Reprepare_observer reprepare_observer;
+  bool error= 0;
+  packet= packet_arg;
+  packet_end= packet_end_arg;
+  iterations= iterations_arg;
+  start_param= true;
+#ifndef DBUG_OFF
+  Item *free_list_state= thd->free_list;
+#endif
+  thd->select_number= select_number_after_prepare;
+  thd->set_bulk_execution((void *)this);
+  /* Check if we got an error when sending long data */
+  if (state == Query_arena::STMT_ERROR)
+  {
+    my_message(last_errno, last_error, MYF(0));
+    thd->set_bulk_execution(0);
+    return TRUE;
+  }
+
+  if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE))
+  {
+    my_error(ER_UNSUPPORTED_PS, MYF(0));
+    thd->set_bulk_execution(0);
+    return TRUE;
+  }
+
+#ifndef EMBEDDED_LIBRARY
+  if (setup_conversion_functions(this, &packet, packet_end, TRUE))
+#else
+  DBUG_ASSERT(0); //TODO: support bulk parameters for embedded server
+#endif
+  {
+    my_error(ER_WRONG_ARGUMENTS, MYF(0),
+            "mysqld_stmt_bulk_execute");
+    reset_stmt_params(this);
+    thd->set_bulk_execution(0);
+    return true;
+  }
+
+#ifdef NOT_YET_FROM_MYSQL_5_6
+  if (unlikely(thd->security_ctx->password_expired && 
+               !lex->is_change_password))
+  {
+    my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
+    thd->set_bulk_execution(0);
+    return true;
+  }
+#endif
+
+  while((iterations || start_param) && !error && !thd->is_error())
+  {
+    int reprepare_attempt= 0;
+
+    if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_OPTIMIZED))
+    {
+      if (set_bulk_parameters())
+      {
+        thd->set_bulk_execution(0);
+        return true;
+      }
+    }
+
+reexecute:
+    /*
+      If the free_list is not empty, we'll wrongly free some externally
+      allocated items when cleaning up after validation of the prepared
+      statement.
+    */
+    DBUG_ASSERT(thd->free_list == free_list_state);
+
+    /*
+      Install the metadata observer. If some metadata version is
+      different from prepare time and an observer is installed,
+      the observer method will be invoked to push an error into
+      the error stack.
+    */
+
+    if (sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE)
+    {
+      reprepare_observer.reset_reprepare_observer();
+      DBUG_ASSERT(thd->m_reprepare_observer == NULL);
+      thd->m_reprepare_observer= &reprepare_observer;
+    }
+
+    error= execute(expanded_query, open_cursor) || thd->is_error();
+
+    thd->m_reprepare_observer= NULL;
+#ifdef WITH_WSREP
+
+    if (WSREP_ON)
+    {
+      mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+      switch (thd->wsrep_conflict_state)
+      {
+      case CERT_FAILURE:
+        WSREP_DEBUG("PS execute fail for CERT_FAILURE: thd: %ld err: %d",
+                    thd->thread_id, thd->get_stmt_da()->sql_errno() );
+        thd->wsrep_conflict_state = NO_CONFLICT;
+        break;
+
+      case MUST_REPLAY:
+        (void)wsrep_replay_transaction(thd);
+        break;
+
+      default:
+        break;
+      }
+      mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+    }
+#endif /* WITH_WSREP */
+
+    if ((sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE) &&
+        error && !thd->is_fatal_error && !thd->killed &&
+        reprepare_observer.is_invalidated() &&
+        reprepare_attempt++ < MAX_REPREPARE_ATTEMPTS)
+    {
+      DBUG_ASSERT(thd->get_stmt_da()->sql_errno() == ER_NEED_REPREPARE);
+      thd->clear_error();
+
+      error= reprepare();
+
+      if (! error)                                /* Success */
+        goto reexecute;
+    }
+  }
+  reset_stmt_params(this);
+  thd->set_bulk_execution(0);
+
+  return error;
+}
+
 
 bool
 Prepared_statement::execute_server_runnable(Server_runnable *server_runnable)
diff --git a/sql/sql_prepare.h b/sql/sql_prepare.h
index aec4ac4..ddbca68 100644
--- a/sql/sql_prepare.h
+++ b/sql/sql_prepare.h
@@ -72,6 +72,7 @@ class Reprepare_observer
 
 void mysqld_stmt_prepare(THD *thd, const char *packet, uint packet_length);
 void mysqld_stmt_execute(THD *thd, char *packet, uint packet_length);
+void mysqld_stmt_bulk_execute(THD *thd, char *packet, uint packet_length);
 void mysqld_stmt_close(THD *thd, char *packet);
 void mysql_sql_stmt_prepare(THD *thd);
 void mysql_sql_stmt_execute(THD *thd);
@@ -81,6 +82,8 @@ void mysqld_stmt_reset(THD *thd, char *packet);
 void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length);
 void reinit_stmt_before_use(THD *thd, LEX *lex);
 
+ulong bulk_parameters_iterations(THD *thd);
+my_bool bulk_parameters_set(THD *thd);
 /**
   Execute a fragment of server code in an isolated context, so that
   it doesn't leave any effect on THD. THD must have no open tables.
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index a810a5a..4ad4404 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -266,7 +266,8 @@ void wsrep_replay_transaction(THD *thd)
         }
         else if (thd->get_stmt_da()->is_set())
         {
-          if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK)
+          if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
+              thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
           {
             WSREP_WARN("replay ok, thd has error status %d",
                        thd->get_stmt_da()->status());
diff --git a/storage/perfschema/pfs.cc b/storage/perfschema/pfs.cc
index dbe3241..4cd1a35 100644
--- a/storage/perfschema/pfs.cc
+++ b/storage/perfschema/pfs.cc
@@ -4827,6 +4827,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
 
       switch(da->status())
       {
+        case Diagnostics_area::DA_OK_BULK:
         case Diagnostics_area::DA_EMPTY:
           break;
         case Diagnostics_area::DA_OK:
@@ -4960,6 +4961,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
 
   switch (da->status())
   {
+    case Diagnostics_area::DA_OK_BULK:
     case Diagnostics_area::DA_EMPTY:
       break;
     case Diagnostics_area::DA_OK:



More information about the commits mailing list