[Commits] 714ca49: MDEV-11419: Report all INSERT ID for bulk operation INSERT

Oleksandr Byelkin sanja at mariadb.com
Thu Mar 16 10:10:28 EET 2017


revision-id: 714ca49a8b29cb23cb6520574b0d09dd683e5ef6 (mariadb-10.2.4-66-g714ca49)
parent(s): aad15eae89e9700d4c1ed4c83a68f8c7b6775a27
committer: Oleksandr Byelkin
timestamp: 2017-03-16 09:10:28 +0100
message:

MDEV-11419: Report all INSERT ID for bulk operation INSERT

Send all Insert IDs of the buld operation to client (JDBC need it)

---
 include/mysql.h.pp  |   9 +++-
 include/mysql_com.h |  10 ++++-
 sql/protocol.cc     |  22 +++++++---
 sql/protocol.h      |   6 +++
 sql/sql_class.cc    | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 sql/sql_class.h     |   8 ++++
 sql/sql_insert.cc   |  11 ++++-
 sql/sql_prepare.cc  |  22 ++++++++--
 8 files changed, 191 insertions(+), 13 deletions(-)

diff --git a/include/mysql.h.pp b/include/mysql.h.pp
index 517516a..9c1fbd8 100644
--- a/include/mysql.h.pp
+++ b/include/mysql.h.pp
@@ -89,7 +89,14 @@ enum enum_cursor_type
   CURSOR_TYPE_NO_CURSOR= 0,
   CURSOR_TYPE_READ_ONLY= 1,
   CURSOR_TYPE_FOR_UPDATE= 2,
-  CURSOR_TYPE_SCROLLABLE= 4
+  CURSOR_TYPE_SCROLLABLE= 4,
+};
+enum stmt_flags_type
+{
+  STMTFLG_CURSOR_TYPE_READ_ONLY= CURSOR_TYPE_READ_ONLY,
+  STMTFLG_CURSOR_TYPE_FOR_UPDATE= CURSOR_TYPE_FOR_UPDATE,
+  STMTFLG_CURSOR_TYPE_SCROLLABLE= CURSOR_TYPE_FOR_UPDATE,
+  STMTFLG_INSERT_ID_REQUEST= 128
 };
 enum enum_mysql_set_option
 {
diff --git a/include/mysql_com.h b/include/mysql_com.h
index 30f2b5d..eb75d74 100644
--- a/include/mysql_com.h
+++ b/include/mysql_com.h
@@ -553,7 +553,15 @@ enum enum_cursor_type
   CURSOR_TYPE_NO_CURSOR= 0,
   CURSOR_TYPE_READ_ONLY= 1,
   CURSOR_TYPE_FOR_UPDATE= 2,
-  CURSOR_TYPE_SCROLLABLE= 4
+  CURSOR_TYPE_SCROLLABLE= 4,
+};
+/* first values should be the same as enum_cursor_type */
+enum stmt_flags_type
+{
+  STMTFLG_CURSOR_TYPE_READ_ONLY= CURSOR_TYPE_READ_ONLY,
+  STMTFLG_CURSOR_TYPE_FOR_UPDATE= CURSOR_TYPE_FOR_UPDATE,
+  STMTFLG_CURSOR_TYPE_SCROLLABLE= CURSOR_TYPE_FOR_UPDATE,
+  STMTFLG_INSERT_ID_REQUEST= 128
 };
 
 
diff --git a/sql/protocol.cc b/sql/protocol.cc
index b9d9f28..2d6b83a 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -562,6 +562,7 @@ void Protocol::end_statement()
 
   switch (thd->get_stmt_da()->status()) {
   case Diagnostics_area::DA_ERROR:
+    thd->stop_collecting_insert_id();
     /* The query failed, send error to log and abort bootstrap. */
     error= send_error(thd->get_stmt_da()->sql_errno(),
                       thd->get_stmt_da()->message(),
@@ -573,12 +574,21 @@ void Protocol::end_statement()
     break;
   case Diagnostics_area::DA_OK:
   case Diagnostics_area::DA_OK_BULK:
-    error= send_ok(thd->server_status,
-                   thd->get_stmt_da()->statement_warn_count(),
-                   thd->get_stmt_da()->affected_rows(),
-                   thd->get_stmt_da()->last_insert_id(),
-                   thd->get_stmt_da()->message(),
-                   thd->get_stmt_da()->skip_flush());
+    if (thd->report_collected_insert_id())
+      if (thd->is_error())
+        error= send_error(thd->get_stmt_da()->sql_errno(),
+                          thd->get_stmt_da()->message(),
+                          thd->get_stmt_da()->get_sqlstate());
+      else
+        error= send_eof(thd->server_status,
+                        thd->get_stmt_da()->statement_warn_count());
+    else
+      error= send_ok(thd->server_status,
+                     thd->get_stmt_da()->statement_warn_count(),
+                     thd->get_stmt_da()->affected_rows(),
+                     thd->get_stmt_da()->last_insert_id(),
+                     thd->get_stmt_da()->message(),
+                     thd->get_stmt_da()->skip_flush());
     break;
   case Diagnostics_area::DA_DISABLED:
     break;
diff --git a/sql/protocol.h b/sql/protocol.h
index 6397e3d..bf74f52 100644
--- a/sql/protocol.h
+++ b/sql/protocol.h
@@ -30,6 +30,12 @@ class Item_param;
 typedef struct st_mysql_field MYSQL_FIELD;
 typedef struct st_mysql_rows MYSQL_ROWS;
 
+struct insert_id_desc
+{
+  ulonglong first_id;
+  ulonglong sequence;
+};
+
 class Protocol
 {
 protected:
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index f042e66..553aa05 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1319,6 +1319,7 @@ void THD::init(void)
 #endif //EMBEDDED_LIBRARY
 
   apc_target.init(&LOCK_thd_data);
+  insert_ids= NULL;
   DBUG_VOID_RETURN;
 }
 
@@ -7299,4 +7300,119 @@ bool Discrete_intervals_list::append(Discrete_interval *new_interval)
   DBUG_RETURN(0);
 }
 
+bool THD::init_collecting_insert_id()
+{
+  if (!insert_ids)
+  {
+    void *buff;
+    if (!(my_multi_malloc(MYF(MY_WME), &insert_ids, sizeof(DYNAMIC_ARRAY),
+                          &buff, sizeof(insert_id_desc) * 10,
+                          NullS)) ||
+        my_init_dynamic_array2(insert_ids, sizeof(insert_id_desc),
+                               buff, 10, 100, MYF(MY_WME)))
+    {
+      if (insert_ids)
+        my_free(insert_ids);
+      insert_ids= NULL;
+      return TRUE;
+    }
+    collect_auto_increment_increment= variables.auto_increment_increment;
+  }
+  return FALSE;
+}
+
+void THD::stop_collecting_insert_id()
+{
+  if (insert_ids)
+  {
+    delete_dynamic(insert_ids);
+    my_free(insert_ids);
+    insert_ids= NULL;
+  }
+}
+
+bool THD::collect_insert_id(ulonglong id)
+{
+  if (insert_ids)
+  {
+    if (insert_ids->elements)
+    {
+      insert_id_desc *last=
+        (insert_id_desc *)dynamic_array_ptr(insert_ids,
+                                            insert_ids->elements - 1);
+      if (id == last->first_id)
+      {
+        return FALSE; // no new insert id
+      }
+      if (id == last->first_id + (last->sequence *
+                                  collect_auto_increment_increment))
+      {
+        last->sequence++;
+        return FALSE;
+      }
+    }
+    insert_id_desc el;
+    el.first_id= id;
+    el.sequence= 1;
+    if (insert_dynamic(insert_ids, &el))
+    {
+      return TRUE;
+    }
+  }
+  return FALSE;
+}
+
+
+bool THD::report_collected_insert_id()
+{
+  if (insert_ids)
+  {
+    List<Item> field_list;
+    MEM_ROOT tmp_mem_root;
+    Query_arena arena(&tmp_mem_root, Query_arena::STMT_INITIALIZED), backup;
+
+    init_alloc_root(arena.mem_root, 2048, 4096, MYF(0));
+    set_n_backup_active_arena(&arena, &backup);
+    DBUG_ASSERT(mem_root == &tmp_mem_root);
+
+    field_list.push_back(new (mem_root)
+                         Item_int(this, "Id", 0, MY_INT64_NUM_DECIMAL_DIGITS),
+                         mem_root);
+    field_list.push_back(new (mem_root)
+                         Item_int(this, "Len", 0, MY_INT64_NUM_DECIMAL_DIGITS),
+                         mem_root);
+    field_list.push_back(new (mem_root)
+                         Item_return_int(this, "Inc", 0, MYSQL_TYPE_LONG),
+                         mem_root);
+
+    if (protocol_binary.send_result_set_metadata(&field_list,
+                                                  Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
+      goto error;
+
+    for (ulonglong i= 0; i < insert_ids->elements; i++)
+    {
+      insert_id_desc *last=
+        (insert_id_desc *)dynamic_array_ptr(insert_ids, i);
+      if (insert_ids->elements == 1 && last->first_id == 0 &&
+          get_stmt_da()->affected_rows() != 1)
+        continue; // No insert IDs
+      protocol_binary.prepare_for_resend();
+      protocol_binary.store_longlong(last->first_id, TRUE);
+      protocol_binary.store_longlong(last->sequence, TRUE);
+      protocol_binary.store_long(collect_auto_increment_increment);
+      if (protocol_binary.write())
+        goto error;
+    }
+error:
+    restore_active_arena(&arena, &backup);
+    DBUG_ASSERT(arena.mem_root == &tmp_mem_root);
+    // no need free Items because they was only constants
+    free_root(arena.mem_root, MYF(0));
+    stop_collecting_insert_id();
+    return TRUE;
+  }
+  return FALSE;
+
+}
+
 #endif /* !defined(MYSQL_CLIENT) */
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 22895d7..1ed2f8e 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4286,6 +4286,14 @@ class THD :public Statement,
     current_linfo= 0;
     mysql_mutex_unlock(&LOCK_thread_count);
   }
+
+  /* Data and methods for buld INSERT IDs reporting */
+  DYNAMIC_ARRAY *insert_ids;
+  ulong collect_auto_increment_increment;
+  bool init_collecting_insert_id();
+  bool collect_insert_id(ulonglong id);
+  bool report_collected_insert_id();
+  void stop_collecting_insert_id();
 };
 
 inline void add_to_active_threads(THD *thd)
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 27820c1..4773a3e 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -1054,6 +1054,12 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     }
     its.rewind();
     iteration++;
+
+    if (!error && thd->bulk_param)
+    {
+      thd->collect_insert_id(table->file->insert_id_for_cur_row);
+    }
+
   } while (iteration < bulk_iterations);
 
 values_loop_end:
@@ -1201,8 +1207,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
     retval= thd->lex->explain->send_explain(thd);
     goto abort;
   }
-  if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
-				    !thd->cuted_fields))
+  if ((bulk_iterations * values_list.elements) == 1 &&
+      (!(thd->variables.option_bits & OPTION_WARNINGS) ||
+       !thd->cuted_fields))
   {
     my_ok(thd, info.copied + info.deleted +
                ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index c8765f4..923952d 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -214,7 +214,7 @@ class Prepared_statement: public Statement
   bool execute_bulk_loop(String *expanded_query,
                          bool open_cursor,
                          uchar *packet_arg, uchar *packet_end_arg,
-                         ulong iterations);
+                         ulong iterations, bool insert_id_request);
   bool execute_server_runnable(Server_runnable *server_runnable);
   my_bool set_bulk_parameters(bool reset);
   ulong bulk_iterations();
@@ -3089,11 +3089,20 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
   open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY);
 
   thd->protocol= &thd->protocol_binary;
+
+  if (!(thd->client_capabilities & MARIADB_CLIENT_STMT_BULK_OPERATIONS))
+  {
+    DBUG_PRINT("info",
+               ("There is no bulk capability so reset iteration counter"));
+    iterations= 0;
+  }
   if (iterations <= 1)
     stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end);
   else
     stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end,
-                            iterations);
+                            iterations,
+                            MY_TEST((flags &
+                                     (ulong) STMTFLG_INSERT_ID_REQUEST)));
   thd->protocol= save_protocol;
 
   sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
@@ -4170,7 +4179,8 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
                                       bool open_cursor,
                                       uchar *packet_arg,
                                       uchar *packet_end_arg,
-                                      ulong iterations_arg)
+                                      ulong iterations_arg,
+                                      bool insert_id_request)
 {
   Reprepare_observer reprepare_observer;
   bool error= 0;
@@ -4197,6 +4207,12 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
     thd->set_bulk_execution(0);
     return TRUE;
   }
+  if (lex->sql_command == SQLCOM_INSERT && insert_id_request &&
+      thd->init_collecting_insert_id())
+  {
+    thd->set_bulk_execution(0);
+    return TRUE;
+  }
 
 #ifndef EMBEDDED_LIBRARY
   if (setup_conversion_functions(this, &packet, packet_end, TRUE))


More information about the commits mailing list