[Commits] f6d4df8: MDEV-9059: draft for integration with client

Oleksandr Byelkin sanja at mariadb.com
Tue Oct 11 16:55:59 EEST 2016


revision-id: f6d4df871901671847cb38291fab746662530cd0 (mariadb-10.2.2-35-gf6d4df8)
parent(s): 553ca406cd44946d454d84c08a7247ef595378f5
committer: Oleksandr Byelkin
timestamp: 2016-10-11 15:55:58 +0200
message:

MDEV-9059: draft for integration with client

---
 include/mysql_com.h      |  5 +++-
 sql/sql_acl.cc           | 60 ++++++++++++++++++++++++++++++++++++++++++++----
 sql/sql_class.cc         | 11 +++++++++
 sql/sql_class.h          |  2 ++
 sql/sql_connect.cc       | 28 ++++++++++++++++++----
 sql/sql_parse.cc         | 39 +++++++++++++++----------------
 sql/sql_parse.h          |  1 +
 sql/threadpool_common.cc | 16 ++++++++++++-
 8 files changed, 132 insertions(+), 30 deletions(-)

diff --git a/include/mysql_com.h b/include/mysql_com.h
index 461800f..65ad3d9 100644
--- a/include/mysql_com.h
+++ b/include/mysql_com.h
@@ -256,6 +256,8 @@ enum enum_server_command
 #define MARIADB_CLIENT_PROGRESS (1ULL << 32)
 /* support COM_MULTI */
 #define MARIADB_CLIENT_COM_MULTI (1ULL << 33)
+/* support bundle first command with the authentication packet */
+#define MARIADB_CLIENT_COM_IN_AUTH (1ULL << 34)
 
 #ifdef HAVE_COMPRESS
 #define CAN_CLIENT_COMPRESS CLIENT_COMPRESS
@@ -295,7 +297,8 @@ enum enum_server_command
                            CLIENT_SESSION_TRACK |\
                            CLIENT_DEPRECATE_EOF |\
                            CLIENT_CONNECT_ATTRS |\
-                           MARIADB_CLIENT_COM_MULTI)
+                           MARIADB_CLIENT_COM_MULTI |\
+                           MARIADB_CLIENT_COM_IN_AUTH)
 
 /*
   To be added later:
diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc
index ab6a803..31b792b 100644
--- a/sql/sql_acl.cc
+++ b/sql/sql_acl.cc
@@ -11687,38 +11687,78 @@ static bool find_mpvio_user(MPVIO_EXT *mpvio)
 }
 
 static bool
-read_client_connect_attrs(char **ptr, char *end, CHARSET_INFO *from_cs)
+read_bundle_length (size_t *length, char **ptr, char *end)
 {
-  size_t length;
   char *ptr_save= *ptr;
 
   /* not enough bytes to hold the length */
   if (ptr_save >= end)
     return true;
 
-  length= safe_net_field_length_ll((uchar **) ptr, end - ptr_save);
+  *length= safe_net_field_length_ll((uchar **) ptr, end - ptr_save);
 
   /* cannot even read the length */
   if (*ptr == NULL)
     return true;
 
   /* length says there're more data than can fit into the packet */
-  if (*ptr + length > end)
+  if (*ptr + *length > end)
+    return true;
+
+  return false;
+}
+
+static bool
+read_client_connect_attrs(char **ptr, char *end, CHARSET_INFO *from_cs)
+{
+  size_t length;
+
+  if (read_bundle_length(&length, ptr, end))
     return true;
 
   /* impose an artificial length limit of 64k */
   if (length > 65535)
     return true;
 
+
 #ifdef HAVE_PSI_THREAD_INTERFACE
   if (PSI_THREAD_CALL(set_thread_connect_attrs)(*ptr, length, from_cs) &&
       current_thd->variables.log_warnings)
     sql_print_warning("Connection attributes of length %lu were truncated",
                       (unsigned long) length);
 #endif
+  *ptr+= length;
   return false;
 }
 
+static LEX_STRING
+read_client_bundle_com(char **ptr, char *end)
+{
+  LEX_STRING res= {0, packet_error};
+
+  if (read_bundle_length(&res.length, ptr, end))
+    return res;
+
+  if (!res.length)
+    return res;
+
+  /* do_command add \0 to the end so we need allocate more */
+  res.str= (char *)my_malloc(res.length + 1, MYF(MY_WME));
+
+  if (likely(res.str))
+  {
+    memcpy(res.str, *ptr, res.length);
+    *ptr+= res.length;
+  }
+  else
+  {
+    *ptr+= res.length;
+    res.length= packet_error;
+  }
+
+  return res;
+}
+
 #endif
 
 /* the packet format is described in send_change_user_packet() */
@@ -12096,6 +12136,18 @@ static ulong parse_client_handshake_packet(MPVIO_EXT *mpvio,
                                 mpvio->thd->charset()))
     return packet_error;
 
+  if (thd->client_capabilities & MARIADB_CLIENT_COM_IN_AUTH)
+  {
+    thd->bundle_command=
+      read_client_bundle_com(&next_field,
+                             ((char *)net->read_pos) + pkt_len);
+    if (thd->bundle_command.length == packet_error)
+    {
+      thd->bundle_command.length= 0;
+      return packet_error;
+    }
+  }
+
   /*
     if the acl_user needs a different plugin to authenticate
     (specified in GRANT ... AUTHENTICATED VIA plugin_name ..)
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 6433786..8bb7adf 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1077,6 +1077,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
   prepare_derived_at_open= FALSE;
   create_tmp_table_for_derived= FALSE;
   save_prep_leaf_list= FALSE;
+  bundle_command.str= NULL;
+  bundle_command.length= 0;
   /* Restore THR_THD */
   set_current_thd(old_THR_THD);
   inc_thread_count();
@@ -1473,6 +1475,8 @@ void THD::init(void)
 #endif //EMBEDDED_LIBRARY
 
   apc_target.init(&LOCK_thd_data);
+  bundle_command.str= NULL;
+  bundle_command.length= 0;
   DBUG_VOID_RETURN;
 }
 
@@ -1582,6 +1586,13 @@ void THD::cleanup(void)
   DBUG_ENTER("THD::cleanup");
   DBUG_ASSERT(cleanup_done == 0);
 
+  if (bundle_command.str)
+  {
+    my_free(bundle_command.str);
+    bundle_command.str= 0;
+    bundle_command.length= 0;
+  }
+
   killed= KILL_CONNECTION;
 #ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
   if (transaction.xid_state.xa_state == XA_PREPARED)
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 994a161..e0dbd16 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4263,6 +4263,8 @@ class THD :public Statement,
     current_linfo= 0;
     mysql_mutex_unlock(&LOCK_thread_count);
   }
+  /* Auth packet bundle packet */
+  LEX_STRING bundle_command;
 };
 
 inline void add_to_active_threads(THD *thd)
diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc
index fe6a101..9712b15 100644
--- a/sql/sql_connect.cc
+++ b/sql/sql_connect.cc
@@ -1294,6 +1294,8 @@ void do_handle_one_connection(CONNECT *connect)
 {
   ulonglong thr_create_utime= microsecond_interval_timer();
   THD *thd;
+  bool close_conn= false;
+
   if (connect->scheduler->init_new_connection_thread() ||
       !(thd= connect->create_thd(NULL)))
   {
@@ -1346,13 +1348,31 @@ void do_handle_one_connection(CONNECT *connect)
     {
       create_user= FALSE;
       goto end_thread;
-    }      
+    }
 
-    while (thd_is_connection_alive(thd))
+    if (thd->bundle_command.str)
     {
+      thd->bundle_command.str[thd->bundle_command.length]= '\0'; /* safety */
+
+      enum enum_server_command command=
+        fetch_command(thd, thd->bundle_command.str);
+
+      close_conn= dispatch_command(command, thd, thd->bundle_command.str + 1,
+                                   (uint) (thd->bundle_command.length - 1),
+                                   FALSE, FALSE);
+
       mysql_audit_release(thd);
-      if (do_command(thd))
-	break;
+    }
+
+    if (!close_conn)
+    {
+      while (thd_is_connection_alive(thd))
+      {
+        DBUG_ASSERT(thd->bundle_command.str == NULL);
+        mysql_audit_release(thd);
+        if (do_command(thd))
+          break;
+      }
     }
     end_connection(thd);
 
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index f4ac33d..3c58391 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1131,7 +1131,7 @@ void cleanup_items(Item *item)
   DBUG_VOID_RETURN;
 }
 
-static enum enum_server_command fetch_command(THD *thd, char *packet)
+enum enum_server_command fetch_command(THD *thd, char *packet)
 {
   enum enum_server_command
     command= (enum enum_server_command) (uchar) packet[0];
@@ -1338,25 +1338,6 @@ bool do_command(THD *thd)
 
   command= fetch_command(thd, packet);
 
-#ifdef WITH_WSREP
-  /*
-    Bail out if DB snapshot has not been installed.
-  */
-  if (!(server_command_flags[command] & CF_SKIP_WSREP_CHECK) &&
-      !wsrep_node_is_ready(thd))
-  {
-    thd->protocol->end_statement();
-
-    /* Performance Schema Interface instrumentation end. */
-    MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
-    thd->m_statement_psi= NULL;
-    thd->m_digest= NULL;
-
-    return_value= FALSE;
-    goto out;
-  }
-#endif
-
   /* Restore read timeout value */
   my_net_set_read_timeout(net, thd->variables.net_read_timeout);
 
@@ -1549,6 +1530,24 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
                        command_name[command].str :
                        "<?>")));
   bool drop_more_results= 0;
+#ifdef WITH_WSREP
+  /*
+    Bail out if DB snapshot has not been installed.
+  */
+  if (!(server_command_flags[command] & CF_SKIP_WSREP_CHECK) &&
+      !wsrep_node_is_ready(thd))
+  {
+    thd->protocol->end_statement();
+
+    /* Performance Schema Interface instrumentation end. */
+    MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
+    thd->m_statement_psi= NULL;
+    thd->m_digest= NULL;
+
+    DBUG_RETURN (FALSE);
+  }
+#endif
+
 
   if (!is_com_multi)
     inc_thread_running();
diff --git a/sql/sql_parse.h b/sql/sql_parse.h
index ad29bb2..10e4a12 100644
--- a/sql/sql_parse.h
+++ b/sql/sql_parse.h
@@ -101,6 +101,7 @@ pthread_handler_t handle_bootstrap(void *arg);
 int mysql_execute_command(THD *thd);
 bool do_command(THD *thd);
 void do_handle_bootstrap(THD *thd);
+enum enum_server_command fetch_command(THD *thd, char *packet);
 bool dispatch_command(enum enum_server_command command, THD *thd,
 		      char* packet, uint packet_length,
                       bool is_com_multi, bool is_next_command);
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 2308f42..b5f0e91 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,6 +23,7 @@
 #include <sql_audit.h>
 #include <debug_sync.h>
 #include <threadpool.h>
+#include "sql_parse.h"
 
 
 /* Threadpool parameters */
@@ -46,7 +47,6 @@ static int   threadpool_process_request(THD *thd);
 static THD*  threadpool_add_connection(CONNECT *connect, void *scheduler_data);
 
 extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
-extern bool do_command(THD*);
 
 static inline TP_connection *get_TP_connection(THD *thd)
 {
@@ -255,6 +255,20 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
       }
     }
   }
+  if (thd && thd->bundle_command.str)
+  {
+    thd->bundle_command.str[thd->bundle_command.length]= '\0'; /* safety */
+    enum enum_server_command command=
+      fetch_command(thd, thd->bundle_command.str);
+
+    /* it is not a real error, just QUIT */
+    error= dispatch_command(command, thd, thd->bundle_command.str + 1,
+                                 (uint) (thd->bundle_command.length - 1),
+                                 FALSE, FALSE);
+    net_flush(&thd->net);
+    mysql_audit_release(thd);
+  }
+
   if (error)
   {
     threadpool_remove_connection(thd);


More information about the commits mailing list