[Commits] 2c1e3ad: # This is a combination of 3 commits.

Oleksandr Byelkin sanja at mariadb.com
Sat Oct 8 00:52:59 EEST 2016


revision-id: 2c1e3adc940f7e14398cc6850eddec5ed83f2f22 (mariadb-10.2.2-13-g2c1e3ad)
parent(s): a0a4079b78a6afc5cf8332fed40efef6eb5711d4
committer: Oleksandr Byelkin
timestamp: 2016-10-07 23:52:59 +0200
message:

# This is a combination of 3 commits.
# The first commit's message is:
MDEV-9114: Bulk operations (Array binding)

(+ default values)

# The 2nd commit message will be skipped:

#	CCCC

# The 3rd commit message will be skipped:

#	CCCCC

---
 include/mysql.h.pp        |  16 ++-
 include/mysql_com.h       |  27 +++-
 sql/item.cc               | 105 ++++++++++++---
 sql/item.h                |  48 ++++++-
 sql/protocol.cc           |   1 +
 sql/share/errmsg-utf8.txt |   5 +
 sql/sql_base.cc           |  18 ++-
 sql/sql_class.cc          |  17 ++-
 sql/sql_class.h           |  17 +++
 sql/sql_error.cc          |  22 +++-
 sql/sql_error.h           |  29 +++-
 sql/sql_insert.cc         | 239 +++++++++++++++++++++------------
 sql/sql_parse.cc          |  17 ++-
 sql/sql_prepare.cc        | 327 +++++++++++++++++++++++++++++++++++++++++++++-
 sql/sql_prepare.h         |   3 +
 sql/sql_update.cc         |   4 +
 sql/wsrep_thd.cc          |   3 +-
 storage/perfschema/pfs.cc |   2 +
 18 files changed, 756 insertions(+), 144 deletions(-)

diff --git a/include/mysql.h.pp b/include/mysql.h.pp
index 857f5b9..c985792 100644
--- a/include/mysql.h.pp
+++ b/include/mysql.h.pp
@@ -11,11 +11,17 @@ enum enum_server_command
   COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
   COM_MDB_GAP_BEG,
   COM_MDB_GAP_END=250,
-  COM_SLAVE_WORKER,
-  COM_SLAVE_IO,
-  COM_SLAVE_SQL,
-  COM_MULTI,
-  COM_END
+  COM_SLAVE_WORKER=251,
+  COM_SLAVE_IO=252,
+  COM_SLAVE_SQL=253,
+  COM_MULTI=254,
+  COM_END=255
+};
+enum enum_indicator_type
+{
+  STMT_INDICATOR_NONE= 0,
+  STMT_INDICATOR_NULL,
+  STMT_INDICATOR_DEFAULT
 };
 struct st_vio;
 typedef struct st_vio Vio;
diff --git a/include/mysql_com.h b/include/mysql_com.h
index 461800f..5e79ba9 100644
--- a/include/mysql_com.h
+++ b/include/mysql_com.h
@@ -114,12 +114,24 @@ enum enum_server_command
   /* don't forget to update const char *command_name[] in sql_parse.cc */
   COM_MDB_GAP_BEG,
   COM_MDB_GAP_END=250,
-  COM_SLAVE_WORKER,
-  COM_SLAVE_IO,
-  COM_SLAVE_SQL,
-  COM_MULTI,
+  COM_SLAVE_WORKER=251,
+  COM_SLAVE_IO=252,
+  COM_SLAVE_SQL=253,
+  COM_MULTI=254,
   /* Must be last */
-  COM_END
+  COM_END=255
+};
+
+
+/*
+  Bulk PS protocol indicator value:
+  TODO: discuss maybe it should be a flag
+*/
+enum enum_indicator_type
+{
+  STMT_INDICATOR_NONE= 0,
+  STMT_INDICATOR_NULL,
+  STMT_INDICATOR_DEFAULT
 };
 
 /* sql type stored in .frm files for virtual fields */
@@ -256,6 +268,8 @@ enum enum_server_command
 #define MARIADB_CLIENT_PROGRESS (1ULL << 32)
 /* support COM_MULTI */
 #define MARIADB_CLIENT_COM_MULTI (1ULL << 33)
+/* support of array binding */
+#define MARIADB_CLIENT_STMT_BULK_OPERATIONS (1UL << 34)
 
 #ifdef HAVE_COMPRESS
 #define CAN_CLIENT_COMPRESS CLIENT_COMPRESS
@@ -295,7 +309,8 @@ enum enum_server_command
                            CLIENT_SESSION_TRACK |\
                            CLIENT_DEPRECATE_EOF |\
                            CLIENT_CONNECT_ATTRS |\
-                           MARIADB_CLIENT_COM_MULTI)
+                           MARIADB_CLIENT_COM_MULTI |\
+                           MARIADB_CLIENT_STMT_BULK_OPERATIONS)
 
 /*
   To be added later:
diff --git a/sql/item.cc b/sql/item.cc
index 61635ea..b59d35c 100644
--- a/sql/item.cc
+++ b/sql/item.cc
@@ -724,7 +724,9 @@ Item_ident::Item_ident(THD *thd, Name_resolution_context *context_arg,
 		       const char *field_name_arg)
   :Item_result_field(thd), orig_db_name(db_name_arg),
    orig_table_name(table_name_arg),
-   orig_field_name(field_name_arg), context(context_arg),
+   orig_field_name(field_name_arg),
+   default_value_target(0),
+   context(context_arg),
    db_name(db_name_arg), table_name(table_name_arg),
    field_name(field_name_arg),
    alias_name_used(FALSE), cached_field_index(NO_CACHED_FIELD_INDEX),
@@ -737,7 +739,9 @@ Item_ident::Item_ident(THD *thd, Name_resolution_context *context_arg,
 Item_ident::Item_ident(THD *thd, TABLE_LIST *view_arg, const char *field_name_arg)
   :Item_result_field(thd), orig_db_name(NullS),
    orig_table_name(view_arg->table_name),
-   orig_field_name(field_name_arg), context(&view_arg->view->select_lex.context),
+   orig_field_name(field_name_arg),
+   default_value_target(0),
+   context(&view_arg->view->select_lex.context),
    db_name(NullS), table_name(view_arg->alias),
    field_name(field_name_arg),
    alias_name_used(FALSE), cached_field_index(NO_CACHED_FIELD_INDEX),
@@ -756,6 +760,7 @@ Item_ident::Item_ident(THD *thd, Item_ident *item)
    orig_db_name(item->orig_db_name),
    orig_table_name(item->orig_table_name), 
    orig_field_name(item->orig_field_name),
+   default_value_target(item->default_value_target),
    context(item->context),
    db_name(item->db_name),
    table_name(item->table_name),
@@ -784,6 +789,7 @@ void Item_ident::cleanup()
     */
     can_be_depended= MY_TEST(depended_from);
   }
+  default_value_target= NULL;
   DBUG_VOID_RETURN;
 }
 
@@ -812,6 +818,17 @@ bool Item_ident::collect_outer_ref_processor(void *param)
   return FALSE;
 }
 
+void Item_ident::set_default_value_target(Item *item)
+{
+  DBUG_ENTER("Item_ident::set_default_value_target");
+  DBUG_PRINT("XXX", ("Itent: 0x%lx  Item  0x%lx", (ulong) this,
+                     (ulong) item));
+  if ((default_value_target= item) && fixed &&
+      type() == FIELD_ITEM)
+    default_value_target->set_default_value_source(((Item_field *)this)->
+                                                   field);
+  DBUG_VOID_RETURN;
+}
 
 /**
   Store the pointer to this item field into a list if not already there.
@@ -2534,6 +2551,10 @@ void Item_field::set_field(Field *field_par)
   max_length= adjust_max_effective_column_length(field_par, max_length);
 
   fixed= 1;
+  if (default_value_target != 0)
+  {
+    default_value_target->set_default_value_source(field);
+  }
   if (field->table->s->tmp_table == SYSTEM_TMP_TABLE)
     any_privileges= 0;
 }
@@ -3227,7 +3248,8 @@ Item_param::Item_param(THD *thd, uint pos_in_query_arg):
   Item_basic_value(thd),
   Rewritable_query_parameter(pos_in_query_arg, 1),
   Type_handler_hybrid_field_type(MYSQL_TYPE_VARCHAR),
-  state(NO_VALUE),
+  state(NO_VALUE), default_value_ref(NULL), default_value_source(NULL),
+  indicators(0), indicator(STMT_INDICATOR_NONE),
   /* Don't pretend to be a literal unless value for this item is set. */
   item_type(PARAM_ITEM),
   set_param_func(default_set_param_func),
@@ -3527,6 +3549,8 @@ void Item_param::reset()
   state= NO_VALUE;
   maybe_null= 1;
   null_value= 0;
+  default_value_source= NULL;
+  default_value_ref= NULL;
   /*
     Don't reset item_type to PARAM_ITEM: it's only needed to guard
     us from item optimizations at prepare stage, when item doesn't yet
@@ -3919,6 +3943,9 @@ Item_param::set_param_type_and_swap_value(Item_param *src)
   null_value= src->null_value;
   state= src->state;
   value= src->value;
+  default_value_ref= src->default_value_ref;
+  default_value_source= src->default_value_source;
+
 
   decimal_value.swap(src->decimal_value);
   str_value.swap(src->str_value);
@@ -3926,6 +3953,32 @@ Item_param::set_param_type_and_swap_value(Item_param *src)
 }
 
 
+bool Item_param::set_default(bool can_be_missed)
+{
+  DBUG_ENTER("Item_param::set_default");
+  DBUG_PRINT("XXX", ("Item_param: 0x%lx", (ulong) this));
+  if (!default_value_ref)
+  {
+    if (can_be_missed)
+      DBUG_RETURN(FALSE);
+    my_message(ER_INVALID_DEFAULT_PARAM,
+               ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
+    DBUG_RETURN(TRUE);
+  }
+  THD *thd= default_value_ref->table->in_use;
+  if (!default_value_source)
+  {
+    default_value_source= new (thd->mem_root)
+      Item_default_value(thd, &thd->lex->select_lex.context, default_value_ref);
+    if (!default_value_source ||
+        default_value_source->fix_fields(thd, (Item **)&default_value_source))
+      DBUG_RETURN(TRUE);
+    bitmap_set_bit(default_value_ref->table->read_set,
+                   default_value_ref->field_index);
+  }
+  DBUG_RETURN(set_value(thd, NULL, (Item**)&default_value_source));
+}
+
 /**
   This operation is intended to store some item value in Item_param to be
   used later.
@@ -4084,6 +4137,17 @@ bool Item_param::append_for_log(THD *thd, String *str)
   return str->append(*val);
 }
 
+
+bool Item_param::walk(Item_processor processor, bool walk_subquery, void *arg)
+{
+  if (default_value_source &&
+      default_value_source->walk(processor, walk_subquery, arg))
+  {
+    return TRUE;
+  }
+  return (this->*processor)(arg);
+}
+
 /****************************************************************************
   Item_copy
 ****************************************************************************/
@@ -7242,6 +7306,7 @@ bool Item_ref::fix_fields(THD *thd, Item **reference)
         Item_field* fld;
         if (!(fld= new (thd->mem_root) Item_field(thd, from_field)))
           goto error;
+        fld->set_default_value_target(default_value_target);
         thd->change_item_tree(reference, fld);
         mark_as_dependent(thd, last_checked_context->select_lex,
                           current_sel, fld, fld);
@@ -8428,36 +8493,38 @@ bool Item_default_value::eq(const Item *item, bool binary_cmp) const
 bool Item_default_value::fix_fields(THD *thd, Item **items)
 {
   Item *real_arg;
-  Item_field *field_arg;
   Field *def_field;
   DBUG_ASSERT(fixed == 0);
 
-  if (!arg)
+  if (!arg && !arg_fld)
   {
     fixed= 1;
     return FALSE;
   }
-  if (!arg->fixed && arg->fix_fields(thd, &arg))
-    goto error;
+  if (arg)
+  {
+    if (!arg->fixed && arg->fix_fields(thd, &arg))
+      goto error;
 
 
-  real_arg= arg->real_item();
-  if (real_arg->type() != FIELD_ITEM)
-  {
-    my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), arg->name);
-    goto error;
-  }
+    real_arg= arg->real_item();
+    if (real_arg->type() != FIELD_ITEM)
+    {
+      my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), arg->name);
+      goto error;
+    }
 
-  field_arg= (Item_field *)real_arg;
-  if ((field_arg->field->flags & NO_DEFAULT_VALUE_FLAG))
+    arg_fld= ((Item_field *)real_arg)->field;
+  }
+  if ((arg_fld->flags & NO_DEFAULT_VALUE_FLAG))
   {
-    my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), field_arg->field->field_name);
+    my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), arg_fld->field_name);
     goto error;
   }
-  if (!(def_field= (Field*) thd->alloc(field_arg->field->size_of())))
+  if (!(def_field= (Field*) thd->alloc(arg_fld->size_of())))
     goto error;
-  memcpy((void *)def_field, (void *)field_arg->field,
-         field_arg->field->size_of());
+  memcpy((void *)def_field, (void *)arg_fld,
+         arg_fld->size_of());
   def_field->move_field_offset((my_ptrdiff_t)
                                (def_field->table->s->default_values -
                                 def_field->table->record[0]));
diff --git a/sql/item.h b/sql/item.h
index 5b82548..c37d0ef 100644
--- a/sql/item.h
+++ b/sql/item.h
@@ -1872,6 +1872,9 @@ class Item: public Value_source,
   {
     marker &= ~EXTRACTION_MASK;
   }
+
+  virtual void set_default_value_source(Field *fld) {};
+  virtual bool set_default_if_needed() { return FALSE; };
 };
 
 
@@ -2351,6 +2354,8 @@ class Item_ident :public Item_result_field
   const char *orig_table_name;
   const char *orig_field_name;
 
+  Item *default_value_target;
+
 public:
   Name_resolution_context *context;
   const char *db_name;
@@ -2395,6 +2400,7 @@ class Item_ident :public Item_result_field
   virtual void print(String *str, enum_query_type query_type);
   virtual bool change_context_processor(void *cntx)
     { context= (Name_resolution_context *)cntx; return FALSE; }
+  void set_default_value_target(Item *item);
   /**
     Collect outer references
   */
@@ -2702,6 +2708,9 @@ class Item_null_result :public Item_null
   }
 };
 
+
+class Item_default_value;
+
 /*
   Item represents one placeholder ('?') of prepared statement
 
@@ -2727,6 +2736,15 @@ class Item_param :public Item_basic_value,
     DECIMAL_VALUE
   } state;
 
+  Field *default_value_ref;
+  Item_default_value *default_value_source;
+  /*
+    Used for bulk protocol. Indicates if we should expect
+    indicators byte before value of the parameter
+  */
+  my_bool indicators;
+  uint indicator;
+
   /*
     A buffer for string and long data values. Historically all allocated
     values returned from val_str() were treated as eligible to
@@ -2784,6 +2802,16 @@ class Item_param :public Item_basic_value,
   bool get_date(MYSQL_TIME *tm, ulonglong fuzzydate);
   int  save_in_field(Field *field, bool no_conversions);
 
+  virtual void set_default_value_source(Field *fld)
+  {
+    DBUG_ENTER("Item_param::set_default_value_source");
+    DBUG_PRINT("XXX", ("Item_param: 0x%lx  Field: 0x%lx",
+                       (ulong) this, (ulong) fld));
+    default_value_ref= fld;
+    DBUG_VOID_RETURN;
+  }
+
+  bool set_default(bool can_be_missed);
   void set_null();
   void set_int(longlong i, uint32 max_length_arg);
   void set_double(double i);
@@ -2827,8 +2855,8 @@ class Item_param :public Item_basic_value,
     constant, assert otherwise. This method is called only if
     basic_const_item returned TRUE.
   */
-  Item *safe_charset_converter(THD *thd, CHARSET_INFO *tocs);
   Item *clone_item(THD *thd);
+  Item *safe_charset_converter(THD *thd, CHARSET_INFO *tocs);
   /*
     Implement by-value equality evaluation if parameter value
     is set and is a basic constant (integer, real or string).
@@ -2852,10 +2880,19 @@ class Item_param :public Item_basic_value,
   virtual void set_out_param_info(Send_field *info);
 
 public:
+  bool set_default_if_needed()
+  {
+    if (state == NO_VALUE && indicator == STMT_INDICATOR_DEFAULT &&
+        set_default(FALSE))
+      return TRUE;
+    return FALSE;
+  }
+
   virtual const Send_field *get_out_param_info() const;
 
   virtual void make_field(THD *thd, Send_field *field);
 
+  virtual bool walk(Item_processor processor, bool walk_subquery, void *arg);
 private:
   Send_field *m_out_param_info;
 };
@@ -4980,14 +5017,19 @@ class Item_default_value : public Item_field
   void calculate();
 public:
   Item *arg;
+  Field *arg_fld;
   Item_default_value(THD *thd, Name_resolution_context *context_arg)
     :Item_field(thd, context_arg, (const char *)NULL, (const char *)NULL,
                (const char *)NULL),
-     arg(NULL) {}
+     arg(NULL), arg_fld(NULL) {}
   Item_default_value(THD *thd, Name_resolution_context *context_arg, Item *a)
     :Item_field(thd, context_arg, (const char *)NULL, (const char *)NULL,
                 (const char *)NULL),
-     arg(a) {}
+     arg(a), arg_fld(NULL) {}
+  Item_default_value(THD *thd, Name_resolution_context *context_arg, Field *a)
+    :Item_field(thd, context_arg, (const char *)NULL, (const char *)NULL,
+                (const char *)NULL),
+     arg(NULL), arg_fld(a) {}
   enum Type type() const { return DEFAULT_VALUE_ITEM; }
   bool eq(const Item *item, bool binary_cmp) const;
   bool fix_fields(THD *, Item **);
diff --git a/sql/protocol.cc b/sql/protocol.cc
index be73c94..64bdce5 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -572,6 +572,7 @@ void Protocol::end_statement()
                     thd->get_stmt_da()->statement_warn_count());
     break;
   case Diagnostics_area::DA_OK:
+  case Diagnostics_area::DA_OK_BULK:
     error= send_ok(thd->server_status,
                    thd->get_stmt_da()->statement_warn_count(),
                    thd->get_stmt_da()->affected_rows(),
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 4d3861b..a61e9af 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -7230,3 +7230,8 @@ ER_EXPRESSION_REFERS_TO_UNINIT_FIELD 01000
 ER_PARTITION_DEFAULT_ERROR
         eng "Only one DEFAULT partition allowed"
         ukr "Припустимо мати тільки один DEFAULT розділ" 
+ER_INVALID_DEFAULT_PARAM
+        eng "Default value is not supported for such parameter usage"
+        ukr "Значення за замовчуванням не підтримано для цього випадку використання параьетра"
+ER_BINLOG_NON_SUPPORTED_BULK
+        eng "Only row based replication supported for bulk operations"
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index d3832a7..fe4aec7 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -7810,9 +7810,10 @@ fill_record(THD *thd, TABLE *table_arg, List<Item> &fields, List<Item> &values,
     if (table->next_number_field &&
         rfield->field_index ==  table->next_number_field->field_index)
       table->auto_increment_field_not_null= TRUE;
-    if (rfield->vcol_info && 
-        value->type() != Item::DEFAULT_VALUE_ITEM && 
-        value->type() != Item::NULL_ITEM &&
+    Item::Type type= value->type();
+    if (rfield->vcol_info &&
+        type != Item::DEFAULT_VALUE_ITEM &&
+        type != Item::NULL_ITEM &&
         table->s->table_category != TABLE_CATEGORY_TEMPORARY)
     {
       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
@@ -7820,6 +7821,8 @@ fill_record(THD *thd, TABLE *table_arg, List<Item> &fields, List<Item> &values,
                           ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
                           rfield->field_name, table->s->table_name.str);
     }
+    if (value->set_default_if_needed())
+      goto err;
     if (rfield->stored_in_db() &&
         (value->save_in_field(rfield, 0)) < 0 && !ignore_errors)
     {
@@ -8060,9 +8063,10 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,
     value=v++;
     if (field->field_index == autoinc_index)
       table->auto_increment_field_not_null= TRUE;
-    if (field->vcol_info && 
-        value->type() != Item::DEFAULT_VALUE_ITEM && 
-        value->type() != Item::NULL_ITEM &&
+    Item::Type type= value->type();
+    if (field->vcol_info &&
+        type != Item::DEFAULT_VALUE_ITEM &&
+        type != Item::NULL_ITEM &&
         table->s->table_category != TABLE_CATEGORY_TEMPORARY)
     {
       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
@@ -8070,6 +8074,8 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,
                           ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
                           field->field_name, table->s->table_name.str);
     }
+    if (value->set_default_if_needed())
+      goto err;
 
     if (use_value)
       value->save_val(field);
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 6433786..08f8a16 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -854,6 +854,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
    in_sub_stmt(0), log_all_errors(0),
    binlog_unsafe_warning_flags(0),
    binlog_table_maps(0),
+   bulk_param(0),
    table_map_for_update(0),
    m_examined_row_count(0),
    accessed_rows_and_keys(0),
@@ -5766,6 +5767,17 @@ int THD::decide_logging_format(TABLE_LIST *tables)
       !(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
         !binlog_filter->db_ok(db)))
   {
+
+    if (is_bulk_op())
+    {
+      if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
+      {
+        my_error(ER_BINLOG_NON_SUPPORTED_BULK, MYF(0));
+        DBUG_PRINT("info",
+                   ("decision: no logging since an error was generated"));
+        DBUG_RETURN(-1);
+      }
+    }
     /*
       Compute one bit field with the union of all the engine
       capabilities, and one with the intersection of all the engine
@@ -6024,7 +6036,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
         */
         my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
       }
-      else if (wsrep_binlog_format() == BINLOG_FORMAT_ROW &&
+      else if ((wsrep_binlog_format() == BINLOG_FORMAT_ROW || is_bulk_op()) &&
                sqlcom_can_generate_row_events(this))
       {
         /*
@@ -6097,7 +6109,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
       else
       {
         if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection()
-            || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0)
+            || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 ||
+            is_bulk_op())
         {
           /* log in row format! */
           set_current_stmt_binlog_format_row_if_mixed();
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 51642ec..b444b36 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2465,6 +2465,8 @@ class THD :public Statement,
   */
   Query_arena *stmt_arena;
 
+  void *bulk_param;
+
   /*
     map for tables that will be updated for a multi-table update query
     statement, for other query statements, this will be zero.
@@ -3440,6 +3442,12 @@ class THD :public Statement,
     To raise this flag, use my_error().
   */
   inline bool is_error() const { return m_stmt_da->is_error(); }
+  void set_bulk_execution(void *bulk)
+  {
+    bulk_param= bulk;
+    m_stmt_da->set_bulk_execution(MY_TEST(bulk));
+  }
+  bool is_bulk_op() const { return m_stmt_da->is_bulk_op(); }
 
   /// Returns Diagnostics-area for the current statement.
   Diagnostics_area *get_stmt_da()
@@ -5512,6 +5520,15 @@ class select_dumpvar :public select_result_interceptor {
 */
 #define CF_UPDATES_DATA (1U << 18)
 
+/**
+  SP Bulk execution safe
+*/
+#define CF_SP_BULK_SAFE (1U << 19)
+/**
+  SP Bulk execution optimized
+*/
+#define CF_SP_BULK_OPTIMIZED (1U << 20)
+
 /* Bits in server_command_flags */
 
 /**
diff --git a/sql/sql_error.cc b/sql/sql_error.cc
index 1d234c5..34d608f 100644
--- a/sql/sql_error.cc
+++ b/sql/sql_error.cc
@@ -320,7 +320,7 @@ Sql_condition::set_sqlstate(const char* sqlstate)
 }
 
 Diagnostics_area::Diagnostics_area(bool initialize)
-  : m_main_wi(0, false, initialize)
+  : is_bulk_execution(0), m_main_wi(0, false, initialize)
 {
   push_warning_info(&m_main_wi);
 
@@ -330,7 +330,8 @@ Diagnostics_area::Diagnostics_area(bool initialize)
 Diagnostics_area::Diagnostics_area(ulonglong warning_info_id,
                                    bool allow_unlimited_warnings,
                                    bool initialize)
-  : m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
+  : is_bulk_execution(0),
+  m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
 {
   push_warning_info(&m_main_wi);
 
@@ -376,7 +377,7 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
                                 const char *message)
 {
   DBUG_ENTER("set_ok_status");
-  DBUG_ASSERT(! is_set());
+  DBUG_ASSERT(!is_set() || (m_status == DA_OK_BULK && is_bulk_op()));
   /*
     In production, refuse to overwrite an error or a custom response
     with an OK packet.
@@ -384,14 +385,23 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
   if (is_error() || is_disabled())
     return;
 
-  m_statement_warn_count= current_statement_warn_count();
-  m_affected_rows= affected_rows;
+  if (m_status == DA_OK_BULK)
+  {
+    DBUG_ASSERT(is_bulk_op());
+    m_statement_warn_count+= current_statement_warn_count();
+    m_affected_rows+= affected_rows;
+  }
+  else
+  {
+    m_statement_warn_count= current_statement_warn_count();
+    m_affected_rows= affected_rows;
+    m_status= (is_bulk_op() ? DA_OK_BULK : DA_OK);
+  }
   m_last_insert_id= last_insert_id;
   if (message)
     strmake_buf(m_message, message);
   else
     m_message[0]= '\0';
-  m_status= DA_OK;
   DBUG_VOID_RETURN;
 }
 
diff --git a/sql/sql_error.h b/sql/sql_error.h
index 8fb1aba..aa8e6c6b 100644
--- a/sql/sql_error.h
+++ b/sql/sql_error.h
@@ -658,6 +658,8 @@ class Diagnostics_area
     DA_OK,
     /** Set whenever one calls my_eof(). */
     DA_EOF,
+    /** Set whenever one calls my_ok() in PS bulk mode. */
+    DA_OK_BULK,
     /** Set whenever one calls my_error() or my_message(). */
     DA_ERROR,
     /** Set in case of a custom response, such as one from COM_STMT_PREPARE. */
@@ -699,13 +701,21 @@ class Diagnostics_area
 
   bool is_disabled() const { return m_status == DA_DISABLED; }
 
+  void set_bulk_execution(bool bulk) { is_bulk_execution= bulk; }
+
+  bool is_bulk_op() const { return is_bulk_execution; }
+
   enum_diagnostics_status status() const { return m_status; }
 
   const char *message() const
-  { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK); return m_message; }
+  { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
+                m_status == DA_OK_BULK); return m_message; }
 
   bool skip_flush() const
-  { DBUG_ASSERT(m_status == DA_OK); return m_skip_flush; }
+  {
+    DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+    return m_skip_flush;
+  }
 
   void set_skip_flush()
   { m_skip_flush= TRUE; }
@@ -717,14 +727,21 @@ class Diagnostics_area
   { DBUG_ASSERT(m_status == DA_ERROR); return m_sqlstate; }
 
   ulonglong affected_rows() const
-  { DBUG_ASSERT(m_status == DA_OK); return m_affected_rows; }
+  {
+    DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+    return m_affected_rows;
+  }
 
   ulonglong last_insert_id() const
-  { DBUG_ASSERT(m_status == DA_OK); return m_last_insert_id; }
+  {
+    DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+    return m_last_insert_id;
+  }
 
   uint statement_warn_count() const
   {
-    DBUG_ASSERT(m_status == DA_OK || m_status == DA_EOF);
+    DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK ||
+                m_status == DA_EOF);
     return m_statement_warn_count;
   }
 
@@ -907,6 +924,8 @@ class Diagnostics_area
 
   enum_diagnostics_status m_status;
 
+  my_bool is_bulk_execution;
+
   Warning_info m_main_wi;
 
   Warning_info_list m_wi_stack;
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 3025c6e..d2ec008 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -77,6 +77,7 @@
 #include "transaction.h"
 #include "sql_audit.h"
 #include "sql_derived.h"                        // mysql_handle_derived
+#include "sql_prepare.h"
 
 #include "debug_sync.h"
 
@@ -637,6 +638,66 @@ static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
 }
 
 
+inline static void set_defaults_relation(Item *fld, Item *val)
+{
+  Item::Type type= fld->type();
+  if (type == Item::FIELD_ITEM)
+  {
+    Item_field *item_field= (Item_field *)fld;
+    if (item_field->fixed)
+      val->set_default_value_source(item_field->field);
+    else
+      item_field->set_default_value_target(val);
+  }
+  else if (type == Item::REF_ITEM)
+  {
+    Item_ref *item_field= (Item_ref *)fld;
+    // may turn to Item_field after fix_fields()
+    if (!item_field->fixed)
+      item_field->set_default_value_target(val);
+  }
+}
+
+void setup_deault_parameters(TABLE_LIST *table, List<Item> *fields,
+                             List<Item> *values)
+{
+
+  List_iterator_fast<Item> itv(*values);
+  Item *val;
+  if (fields->elements)
+  {
+    List_iterator_fast<Item> itf(*fields);
+    Item *fld;
+    while((fld= itf++) && (val= itv++))
+    {
+      set_defaults_relation(fld->real_item(), val);
+    }
+  }
+  else if (table != NULL)
+  {
+    if (table->view)
+    {
+      Field_iterator_view field_it;
+      field_it.set(table);
+      for (; !field_it.end_of_fields() && (val= itv++); field_it.next())
+      {
+        set_defaults_relation(field_it.item()->real_item(), val);
+      }
+    }
+    else
+    {
+      Field_iterator_table_ref field_it;
+      field_it.set(table);
+      for (; !field_it.end_of_fields() && (val= itv++); field_it.next())
+      {
+        Field *fld= field_it.field();
+        val->set_default_value_source(fld);
+      }
+    }
+  }
+}
+
+
 /**
   INSERT statement implementation
 
@@ -662,6 +723,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
   uint value_count;
   ulong counter = 1;
   ulonglong id;
+  ulong bulk_iterations= bulk_parameters_iterations(thd);
   COPY_INFO info;
   TABLE *table= 0;
   List_iterator_fast<List_item> its(values_list);
@@ -727,6 +789,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
   values= its++;
   value_count= values->elements;
 
+  DBUG_ASSERT(bulk_iterations > 0);
   if (mysql_prepare_insert(thd, table_list, table, fields, values,
 			   update_fields, update_values, duplic, &unused_conds,
                            FALSE,
@@ -770,6 +833,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     if (setup_fields(thd, Ref_ptr_array(), *values, MARK_COLUMNS_READ, 0, 0))
       goto abort;
     switch_to_nullable_trigger_fields(*values, table);
+    setup_deault_parameters(table_list, &fields, values);
   }
   its.rewind ();
  
@@ -885,105 +949,113 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
       goto values_loop_end;
     }
   }
-
-  while ((values= its++))
+  for (ulong iteration= 0; iteration < bulk_iterations; iteration++)
   {
-    if (fields.elements || !value_count)
+
+    if (iteration && bulk_parameters_set(thd))
+      goto abort;
+
+    while ((values= its++))
     {
-      /*
-        There are possibly some default values:
-        INSERT INTO t1 (fields) VALUES ...
-        INSERT INTO t1 VALUES ()
-      */
-      restore_record(table,s->default_values);	// Get empty record
-      table->reset_default_fields();
-      if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
-                                               TRG_EVENT_INSERT))
+      if (fields.elements || !value_count)
       {
-	if (values_list.elements != 1 && ! thd->is_error())
-	{
-	  info.records++;
-	  continue;
-	}
-	/*
-	  TODO: set thd->abort_on_warning if values_list.elements == 1
-	  and check that all items return warning in case of problem with
-	  storing field.
+        /*
+          There are possibly some default values:
+          INSERT INTO t1 (fields) VALUES ...
+          INSERT INTO t1 VALUES ()
         */
-	error=1;
-	break;
+        restore_record(table,s->default_values);	// Get empty record
+        table->reset_default_fields();
+        if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
+                                                 TRG_EVENT_INSERT))
+        {
+          if (values_list.elements != 1 && ! thd->is_error())
+          {
+            info.records++;
+            continue;
+          }
+          /*
+            TODO: set thd->abort_on_warning if values_list.elements == 1
+	    and check that all items return warning in case of problem with
+	    storing field.
+          */
+	  error=1;
+	  break;
+        }
       }
-    }
-    else
-    {
-      /*
-        No field list, all fields are set explicitly:
-        INSERT INTO t1 VALUES (values)
-      */
-      if (thd->lex->used_tables)		      // Column used in values()
-	restore_record(table,s->default_values);	// Get empty record
       else
       {
-        TABLE_SHARE *share= table->s;
-
         /*
-          Fix delete marker. No need to restore rest of record since it will
-          be overwritten by fill_record() anyway (and fill_record() does not
-          use default values in this case).
+          No field list, all fields are set explicitly:
+          INSERT INTO t1 VALUES (values)
         */
-#ifdef HAVE_valgrind
-        if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
-          restore_record(table,s->default_values);	// Get empty record
+        if (thd->lex->used_tables)		      // Column used in values()
+	  restore_record(table,s->default_values);	// Get empty record
         else
+        {
+          TABLE_SHARE *share= table->s;
+
+          /*
+            Fix delete marker. No need to restore rest of record since it will
+            be overwritten by fill_record() anyway (and fill_record() does not
+            use default values in this case).
+          */
+#ifdef HAVE_valgrind
+          if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
+            restore_record(table,s->default_values);	// Get empty record
+          else
 #endif
-          table->record[0][0]= share->default_values[0];
+            table->record[0][0]= share->default_values[0];
 
-        /* Fix undefined null_bits. */
-        if (share->null_bytes > 1 && share->last_null_bit_pos)
+          /* Fix undefined null_bits. */
+          if (share->null_bytes > 1 && share->last_null_bit_pos)
+          {
+            table->record[0][share->null_bytes - 1]= 
+              share->default_values[share->null_bytes - 1];
+          }
+        }
+        if (fill_record_n_invoke_before_triggers(thd, table,
+                                                 table->field_to_fill(),
+                                                 *values, 0, TRG_EVENT_INSERT))
         {
-          table->record[0][share->null_bytes - 1]= 
-            share->default_values[share->null_bytes - 1];
+          if (values_list.elements != 1 && ! thd->is_error())
+	  {
+	    info.records++;
+	    continue;
+	  }
+	  error=1;
+	  break;
         }
       }
-      if (fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
-                                               *values, 0, TRG_EVENT_INSERT))
+
+      if ((res= table_list->view_check_option(thd,
+                                              (values_list.elements == 1 ?
+                                               0 :
+                                               ignore))) ==
+          VIEW_CHECK_SKIP)
+        continue;
+      else if (res == VIEW_CHECK_ERROR)
       {
-	if (values_list.elements != 1 && ! thd->is_error())
-	{
-	  info.records++;
-	  continue;
-	}
-	error=1;
-	break;
+        error= 1;
+        break;
       }
-    }
-
-    if ((res= table_list->view_check_option(thd,
-					    (values_list.elements == 1 ?
-					     0 :
-					     ignore))) ==
-        VIEW_CHECK_SKIP)
-      continue;
-    else if (res == VIEW_CHECK_ERROR)
-    {
-      error= 1;
-      break;
-    }
 #ifndef EMBEDDED_LIBRARY
-    if (lock_type == TL_WRITE_DELAYED)
-    {
-      LEX_STRING const st_query = { query, thd->query_length() };
-      DEBUG_SYNC(thd, "before_write_delayed");
-      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
-      DEBUG_SYNC(thd, "after_write_delayed");
-      query=0;
-    }
-    else
+      if (lock_type == TL_WRITE_DELAYED)
+      {
+        LEX_STRING const st_query = { query, thd->query_length() };
+        DEBUG_SYNC(thd, "before_write_delayed");
+        error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
+        DEBUG_SYNC(thd, "after_write_delayed");
+        query=0;
+      }
+      else
 #endif
-      error=write_record(thd, table ,&info);
-    if (error)
-      break;
-    thd->get_stmt_da()->inc_current_row_for_warning();
+        error=write_record(thd, table ,&info);
+      if (error)
+        break;
+      thd->get_stmt_da()->inc_current_row_for_warning();
+    }
+    its.rewind();
   }
 
 values_loop_end:
@@ -1131,7 +1203,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     retval= thd->lex->explain->send_explain(thd);
     goto abort;
   }
-  if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
+  if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
 				    !thd->cuted_fields))
   {
     my_ok(thd, info.copied + info.deleted +
@@ -1445,6 +1517,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
   /* Prepare the fields in the statement. */
   if (values)
   {
+
     /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
     DBUG_ASSERT (!select_lex->group_list.elements);
 
@@ -1463,6 +1536,10 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
           check_insert_fields(thd, context->table_list, fields, *values,
                               !insert_into_view, 0, &map));
 
+    setup_deault_parameters(table_list, &fields, values);
+    if (bulk_parameters_set(thd))
+      DBUG_RETURN(TRUE);
+
     if (!res && check_fields)
     {
       bool saved_abort_on_warning= thd->abort_on_warning;
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index ac00b21..956dea9 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -570,17 +570,19 @@ void init_update_queries(void)
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_UPDATES_DATA;
+                                            CF_UPDATES_DATA | CF_SP_BULK_SAFE;
   sql_command_flags[SQLCOM_UPDATE_MULTI]=   CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_UPDATES_DATA;
+                                            CF_UPDATES_DATA | CF_SP_BULK_SAFE;
   sql_command_flags[SQLCOM_INSERT]=	    CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_INSERTS_DATA;
+                                            CF_INSERTS_DATA |
+                                            CF_SP_BULK_SAFE |
+                                            CF_SP_BULK_OPTIMIZED;
   sql_command_flags[SQLCOM_INSERT_SELECT]=  CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
@@ -598,7 +600,7 @@ void init_update_queries(void)
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
                                             CF_CAN_BE_EXPLAINED |
-                                            CF_INSERTS_DATA;;
+                                            CF_INSERTS_DATA | CF_SP_BULK_SAFE;
   sql_command_flags[SQLCOM_REPLACE_SELECT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
                                             CF_CAN_GENERATE_ROW_EVENTS |
                                             CF_OPTIMIZER_TRACE |
@@ -1744,7 +1746,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
   }
   case COM_STMT_EXECUTE:
   {
-    mysqld_stmt_execute(thd, packet, packet_length);
+    ulong iterations= uint4korr(packet + 5);
+    if (iterations < 2  ||
+        (thd->client_capabilities & MARIADB_CLIENT_STMT_BULK_OPERATIONS))
+      mysqld_stmt_execute(thd, packet, packet_length);
+    else
+      mysqld_stmt_bulk_execute(thd, packet, packet_length);
     break;
   }
   case COM_STMT_FETCH:
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index df15a62..f3c22bc 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -162,6 +162,9 @@ class Prepared_statement: public Statement
   Select_fetch_protocol_binary result;
   Item_param **param_array;
   Server_side_cursor *cursor;
+  uchar *packet;
+  uchar *packet_end;
+  ulong iterations;
   uint param_count;
   uint last_errno;
   uint flags;
@@ -180,11 +183,15 @@ class Prepared_statement: public Statement
   */
   uint select_number_after_prepare;
   char last_error[MYSQL_ERRMSG_SIZE];
+  my_bool start_param;
 #ifndef EMBEDDED_LIBRARY
   bool (*set_params)(Prepared_statement *st, uchar *data, uchar *data_end,
                      uchar *read_pos, String *expanded_query);
+  bool (*set_bulk_params)(Prepared_statement *st,
+                          uchar **read_pos, uchar *data_end, bool reset);
 #else
   bool (*set_params_data)(Prepared_statement *st, String *expanded_query);
+  /*TODO: add bulk support for builtin server */
 #endif
   bool (*set_params_from_vars)(Prepared_statement *stmt,
                                List<LEX_STRING>& varnames,
@@ -204,7 +211,13 @@ class Prepared_statement: public Statement
   bool execute_loop(String *expanded_query,
                     bool open_cursor,
                     uchar *packet_arg, uchar *packet_end_arg);
+  bool execute_bulk_loop(String *expanded_query,
+                         bool open_cursor,
+                         uchar *packet_arg, uchar *packet_end_arg,
+                         ulong iterations);
   bool execute_server_runnable(Server_runnable *server_runnable);
+  my_bool set_bulk_parameters(bool reset);
+  ulong bulk_iterations();
   /* Destroy this statement */
   void deallocate();
 private:
@@ -960,11 +973,65 @@ static bool insert_params(Prepared_statement *stmt, uchar *null_array,
 }
 
 
+static bool insert_bulk_params(Prepared_statement *stmt,
+                               uchar **read_pos, uchar *data_end,
+                               bool reset)
+{
+  Item_param **begin= stmt->param_array;
+  Item_param **end= begin + stmt->param_count;
+
+  DBUG_ENTER("insert_params");
+
+  for (Item_param **it= begin; it < end; ++it)
+  {
+    Item_param *param= *it;
+    if (reset)
+      param->reset();
+    if (param->state != Item_param::LONG_DATA_VALUE)
+    {
+      if (param->indicators)
+        param->indicator= *((*read_pos)++);
+      else
+        param->indicator= STMT_INDICATOR_NONE;
+      if ((*read_pos) > data_end)
+        DBUG_RETURN(1);
+      switch (param->indicator) {
+      case STMT_INDICATOR_NONE:
+        if ((*read_pos) >= data_end)
+          DBUG_RETURN(1);
+        param->set_param_func(param, read_pos, (uint) (data_end - (*read_pos)));
+        if (param->state == Item_param::NO_VALUE)
+          DBUG_RETURN(1);
+        break;
+      case STMT_INDICATOR_NULL:
+        param->set_null();
+        break;
+      case STMT_INDICATOR_DEFAULT:
+        if (param->set_default(TRUE))
+          DBUG_RETURN(1);
+        break;
+      }
+    }
+    /*
+      A long data stream was supplied for this parameter marker.
+      This was done after prepare, prior to providing a placeholder
+      type (the types are supplied at execute). Check that the
+      supplied type of placeholder can accept a data stream.
+    */
+    else
+      DBUG_RETURN(1); // long is not supported here
+  }
+  DBUG_RETURN(0);
+}
+
 static bool setup_conversion_functions(Prepared_statement *stmt,
-                                       uchar **data, uchar *data_end)
+                                       uchar **data, uchar *data_end,
+                                       bool bulk_protocol= 0)
 {
   /* skip null bits */
-  uchar *read_pos= *data + (stmt->param_count+7) / 8;
+  uchar *read_pos= *data;
+  if (!bulk_protocol)
+    read_pos+= (stmt->param_count+7) / 8;
 
   DBUG_ENTER("setup_conversion_functions");
 
@@ -981,6 +1048,7 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
     {
       ushort typecode;
       const uint signed_bit= 1 << 15;
+      const uint indicators_bit= 1 << 14;
 
       if (read_pos >= data_end)
         DBUG_RETURN(1);
@@ -988,7 +1056,10 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
       typecode= sint2korr(read_pos);
       read_pos+= 2;
       (**it).unsigned_flag= MY_TEST(typecode & signed_bit);
-      setup_one_conversion_function(thd, *it, (uchar) (typecode & ~signed_bit));
+      if (bulk_protocol)
+        (**it).indicators= MY_TEST(typecode & indicators_bit);
+      setup_one_conversion_function(thd, *it,
+                                    (uchar) (typecode & 0xff));
     }
   }
   *data= read_pos;
@@ -997,6 +1068,8 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
 
 #else
 
+//TODO: support bulk parameters
+
 /**
   Embedded counterparts of parameter assignment routines.
 
@@ -2999,6 +3072,54 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
   DBUG_VOID_RETURN;
 }
 
+void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length)
+{
+  uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
+  ulong stmt_id= uint4korr(packet);
+  ulong flags= (ulong) packet[4];
+  ulong iterations= uint4korr(packet + 5);
+  /* Query text for binary, general or slow log, if any of them is open */
+  String expanded_query;
+  uchar *packet_end= packet + packet_length;
+  Prepared_statement *stmt;
+  Protocol *save_protocol= thd->protocol;
+  bool open_cursor;
+  DBUG_ENTER("mysqld_stmt_bulk_execute");
+
+  packet+= 9;                 /* stmt_id + flags + iterations */
+
+  /* First of all clear possible warnings from the previous command */
+  thd->reset_for_next_command();
+
+  if (!(stmt= find_prepared_statement(thd, stmt_id)))
+  {
+    char llbuf[22];
+    my_error(ER_UNKNOWN_STMT_HANDLER, MYF(0), static_cast<int>(sizeof(llbuf)),
+             llstr(stmt_id, llbuf), "mysqld_stmt_execute");
+    DBUG_VOID_RETURN;
+  }
+
+#if defined(ENABLED_PROFILING)
+  thd->profiling.set_query_source(stmt->query(), stmt->query_length());
+#endif
+  DBUG_PRINT("exec_query", ("%s", stmt->query()));
+  DBUG_PRINT("info",("stmt: 0x%lx", (long) stmt));
+
+  open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY);
+
+  thd->protocol= &thd->protocol_binary;
+  stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end,
+                          iterations);
+  thd->protocol= save_protocol;
+
+  sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
+  sp_cache_enforce_limit(thd->sp_func_cache, stored_program_cache_size);
+
+  /* Close connection socket; for use with client testing (Bug#43560). */
+  DBUG_EXECUTE_IF("close_conn_after_stmt_execute", vio_close(thd->net.vio););
+
+  DBUG_VOID_RETURN;
+}
 
 /**
   SQLCOM_EXECUTE implementation.
@@ -3455,9 +3576,13 @@ Prepared_statement::Prepared_statement(THD *thd_arg)
   result(thd_arg),
   param_array(0),
   cursor(0),
+  packet(0),
+  packet_end(0),
+  iterations(0),
   param_count(0),
   last_errno(0),
-  flags((uint) IS_IN_USE)
+  flags((uint) IS_IN_USE),
+  start_param(0)
 {
   init_sql_alloc(&main_mem_root, thd_arg->variables.query_alloc_block_size,
                  thd_arg->variables.query_prealloc_size, MYF(MY_THREAD_SPECIFIC));
@@ -3493,7 +3618,9 @@ void Prepared_statement::setup_set_params()
     set_params_from_vars= insert_params_from_vars_with_log;
 #ifndef EMBEDDED_LIBRARY
     set_params= insert_params_with_log;
+    set_bulk_params= insert_bulk_params; // TODO: add binlog support
 #else
+    //TODO: add bulk support for bulk parameters
     set_params_data= emb_insert_params_with_log;
 #endif
   }
@@ -3502,7 +3629,9 @@ void Prepared_statement::setup_set_params()
     set_params_from_vars= insert_params_from_vars;
 #ifndef EMBEDDED_LIBRARY
     set_params= insert_params;
+    set_bulk_params= insert_bulk_params;
 #else
+    //TODO: add bulk support for bulk parameters
     set_params_data= emb_insert_params;
 #endif
   }
@@ -3859,6 +3988,7 @@ Prepared_statement::set_parameters(String *expanded_query,
   @retval  FALSE   successfully executed the statement, perhaps
                    after having reprepared it a few times.
 */
+const static int MAX_REPREPARE_ATTEMPTS= 3;
 
 bool
 Prepared_statement::execute_loop(String *expanded_query,
@@ -3866,10 +3996,10 @@ Prepared_statement::execute_loop(String *expanded_query,
                                  uchar *packet,
                                  uchar *packet_end)
 {
-  const int MAX_REPREPARE_ATTEMPTS= 3;
   Reprepare_observer reprepare_observer;
   bool error;
   int reprepare_attempt= 0;
+  iterations= 0;
 #ifndef DBUG_OFF
   Item *free_list_state= thd->free_list;
 #endif
@@ -3961,6 +4091,193 @@ Prepared_statement::execute_loop(String *expanded_query,
   return error;
 }
 
+my_bool bulk_parameters_set(THD *thd)
+{
+  DBUG_ENTER("bulk_parameters_set");
+  Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
+
+  if (stmt && stmt->set_bulk_parameters(FALSE))
+    DBUG_RETURN(TRUE);
+  DBUG_RETURN(FALSE);
+}
+
+ulong bulk_parameters_iterations(THD *thd)
+{
+  Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
+  if (!stmt)
+    return 1;
+  return stmt->bulk_iterations();
+}
+
+
+my_bool Prepared_statement::set_bulk_parameters(bool reset)
+{
+  DBUG_ENTER("Prepared_statement::set_bulk_parameters");
+  DBUG_PRINT("info", ("iteration: %lu", iterations));
+  if (iterations)
+  {
+#ifndef EMBEDDED_LIBRARY
+    if ((*set_bulk_params)(this, &packet, packet_end, reset))
+#else
+      DBUG_ASSERT(0); //TODO: support bulk parameters for embedded server
+#endif
+    {
+      my_error(ER_WRONG_ARGUMENTS, MYF(0),
+               "mysqld_stmt_bulk_execute");
+      reset_stmt_params(this);
+      DBUG_RETURN(true);
+    }
+    iterations--;
+  }
+  start_param= 0;
+  DBUG_RETURN(false);
+}
+
+ulong Prepared_statement::bulk_iterations()
+{
+  if (iterations)
+    return iterations;
+  return start_param ? 1 : 0;
+}
+
+bool
+Prepared_statement::execute_bulk_loop(String *expanded_query,
+                                      bool open_cursor,
+                                      uchar *packet_arg,
+                                      uchar *packet_end_arg,
+                                      ulong iterations_arg)
+{
+  Reprepare_observer reprepare_observer;
+  bool error= 0;
+  packet= packet_arg;
+  packet_end= packet_end_arg;
+  iterations= iterations_arg;
+  start_param= true;
+#ifndef DBUG_OFF
+  Item *free_list_state= thd->free_list;
+#endif
+  thd->select_number= select_number_after_prepare;
+  thd->set_bulk_execution((void *)this);
+  /* Check if we got an error when sending long data */
+  if (state == Query_arena::STMT_ERROR)
+  {
+    my_message(last_errno, last_error, MYF(0));
+    thd->set_bulk_execution(0);
+    return TRUE;
+  }
+
+  if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE))
+  {
+    my_error(ER_UNSUPPORTED_PS, MYF(0));
+    thd->set_bulk_execution(0);
+    return TRUE;
+  }
+
+#ifndef EMBEDDED_LIBRARY
+  if (setup_conversion_functions(this, &packet, packet_end, TRUE))
+#else
+  DBUG_ASSERT(0); //TODO: support bulk parameters for embedded server
+#endif
+  {
+    my_error(ER_WRONG_ARGUMENTS, MYF(0),
+            "mysqld_stmt_bulk_execute");
+    reset_stmt_params(this);
+    thd->set_bulk_execution(0);
+    return true;
+  }
+
+#ifdef NOT_YET_FROM_MYSQL_5_6
+  if (unlikely(thd->security_ctx->password_expired &&
+               !lex->is_change_password))
+  {
+    my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
+    thd->set_bulk_execution(0);
+    return true;
+  }
+#endif
+
+  while((iterations || start_param) && !error && !thd->is_error())
+  {
+    int reprepare_attempt= 0;
+
+    if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_OPTIMIZED))
+    {
+      if (set_bulk_parameters(TRUE))
+      {
+        thd->set_bulk_execution(0);
+        return true;
+      }
+    }
+
+reexecute:
+    /*
+      If the free_list is not empty, we'll wrongly free some externally
+      allocated items when cleaning up after validation of the prepared
+      statement.
+    */
+    DBUG_ASSERT(thd->free_list == free_list_state);
+
+    /*
+      Install the metadata observer. If some metadata version is
+      different from prepare time and an observer is installed,
+      the observer method will be invoked to push an error into
+      the error stack.
+    */
+
+    if (sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE)
+    {
+      reprepare_observer.reset_reprepare_observer();
+      DBUG_ASSERT(thd->m_reprepare_observer == NULL);
+      thd->m_reprepare_observer= &reprepare_observer;
+    }
+
+    error= execute(expanded_query, open_cursor) || thd->is_error();
+
+    thd->m_reprepare_observer= NULL;
+#ifdef WITH_WSREP
+
+    if (WSREP_ON)
+    {
+      mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+      switch (thd->wsrep_conflict_state)
+      {
+      case CERT_FAILURE:
+        WSREP_DEBUG("PS execute fail for CERT_FAILURE: thd: %ld err: %d",
+                    thd->thread_id, thd->get_stmt_da()->sql_errno() );
+        thd->wsrep_conflict_state = NO_CONFLICT;
+        break;
+
+      case MUST_REPLAY:
+        (void)wsrep_replay_transaction(thd);
+        break;
+
+      default:
+        break;
+      }
+      mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+    }
+#endif /* WITH_WSREP */
+
+    if ((sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE) &&
+        error && !thd->is_fatal_error && !thd->killed &&
+        reprepare_observer.is_invalidated() &&
+        reprepare_attempt++ < MAX_REPREPARE_ATTEMPTS)
+    {
+      DBUG_ASSERT(thd->get_stmt_da()->sql_errno() == ER_NEED_REPREPARE);
+      thd->clear_error();
+
+      error= reprepare();
+
+      if (! error)                                /* Success */
+        goto reexecute;
+    }
+  }
+  reset_stmt_params(this);
+  thd->set_bulk_execution(0);
+
+  return error;
+}
+
 
 bool
 Prepared_statement::execute_server_runnable(Server_runnable *server_runnable)
diff --git a/sql/sql_prepare.h b/sql/sql_prepare.h
index aec4ac4..ddbca68 100644
--- a/sql/sql_prepare.h
+++ b/sql/sql_prepare.h
@@ -72,6 +72,7 @@ class Reprepare_observer
 
 void mysqld_stmt_prepare(THD *thd, const char *packet, uint packet_length);
 void mysqld_stmt_execute(THD *thd, char *packet, uint packet_length);
+void mysqld_stmt_bulk_execute(THD *thd, char *packet, uint packet_length);
 void mysqld_stmt_close(THD *thd, char *packet);
 void mysql_sql_stmt_prepare(THD *thd);
 void mysql_sql_stmt_execute(THD *thd);
@@ -81,6 +82,8 @@ void mysqld_stmt_reset(THD *thd, char *packet);
 void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length);
 void reinit_stmt_before_use(THD *thd, LEX *lex);
 
+ulong bulk_parameters_iterations(THD *thd);
+my_bool bulk_parameters_set(THD *thd);
 /**
   Execute a fragment of server code in an isolated context, so that
   it doesn't leave any effect on THD. THD must have no open tables.
diff --git a/sql/sql_update.cc b/sql/sql_update.cc
index b452e4f..ecd32e2 100644
--- a/sql/sql_update.cc
+++ b/sql/sql_update.cc
@@ -221,6 +221,8 @@ static void prepare_record_for_error_message(int error, TABLE *table)
   DBUG_VOID_RETURN;
 }
 
+void setup_deault_parameters(TABLE_LIST *table, List<Item> *fields,
+                             List<Item> *values);
 
 /*
   Process usual UPDATE
@@ -354,6 +356,8 @@ int mysql_update(THD *thd,
     DBUG_RETURN(1);
   }
 
+  setup_deault_parameters(NULL, &fields, &values);
+
 #ifndef NO_EMBEDDED_ACCESS_CHECKS
   /* Check values */
   table_list->grant.want_privilege= table->grant.want_privilege=
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index a810a5a..4ad4404 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -266,7 +266,8 @@ void wsrep_replay_transaction(THD *thd)
         }
         else if (thd->get_stmt_da()->is_set())
         {
-          if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK)
+          if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
+              thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
           {
             WSREP_WARN("replay ok, thd has error status %d",
                        thd->get_stmt_da()->status());
diff --git a/storage/perfschema/pfs.cc b/storage/perfschema/pfs.cc
index dbe3241..4cd1a35 100644
--- a/storage/perfschema/pfs.cc
+++ b/storage/perfschema/pfs.cc
@@ -4827,6 +4827,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
 
       switch(da->status())
       {
+        case Diagnostics_area::DA_OK_BULK:
         case Diagnostics_area::DA_EMPTY:
           break;
         case Diagnostics_area::DA_OK:
@@ -4960,6 +4961,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
 
   switch (da->status())
   {
+    case Diagnostics_area::DA_OK_BULK:
     case Diagnostics_area::DA_EMPTY:
       break;
     case Diagnostics_area::DA_OK:


More information about the commits mailing list