[Commits] 93e746a: MDEV-7793 - Race condition between XA COMMIT/ROLLBACK and disconnect

svoj at mariadb.org svoj at mariadb.org
Tue Mar 17 17:55:13 EET 2015


revision-id: 93e746afb2d1a23cb933291ad736a20858b5ac3e
parent(s): ccc7297fe94af1129c717f91d31fa075d54a0371
committer: Sergey Vojtovich
branch nick: mariadb
timestamp: 2015-03-17 19:49:04 +0400
message:

MDEV-7793 - Race condition between XA COMMIT/ROLLBACK and disconnect

XA COMMIT/ROLLBACK of XA transaction owned by different thread may access
freed memory if that thread disconnects at the same time.

Also concurrent XA COMMIT/ROLLBACK of recovered XA transaction were not
serialized properly.

---
 mysql-test/r/xa_sync.result |  24 +++++++++++
 mysql-test/t/xa_sync.test   |  46 ++++++++++++++++++++
 sql/sql_class.cc            | 102 +++++++++++++++++++++++++++-----------------
 sql/sql_class.h             |   1 -
 sql/transaction.cc          |  15 ++-----
 storage/spider/spd_trx.cc   |   1 -
 6 files changed, 136 insertions(+), 53 deletions(-)

diff --git a/mysql-test/r/xa_sync.result b/mysql-test/r/xa_sync.result
new file mode 100644
index 0000000..769a13d
--- /dev/null
+++ b/mysql-test/r/xa_sync.result
@@ -0,0 +1,24 @@
+#
+# MDEV-7793 - Race condition between XA COMMIT/ROLLBACK and disconnect
+#
+# Note that this test is meaningful only with valgrind.
+XA START 'xatest';
+XA END 'xatest';
+XA PREPARE 'xatest';
+SET debug_sync='xa_after_search SIGNAL parked WAIT_FOR go';
+XA COMMIT 'xatest';
+SET debug_sync='now WAIT_FOR parked';
+# Waiting for thread to get deleted
+SET debug_sync='now SIGNAL go';
+ERROR XAE04: XAER_NOTA: Unknown XID
+SET debug_sync='RESET';
+XA START 'xatest';
+XA END 'xatest';
+XA PREPARE 'xatest';
+SET debug_sync='xa_after_search SIGNAL parked WAIT_FOR go';
+XA ROLLBACK 'xatest';
+SET debug_sync='now WAIT_FOR parked';
+# Waiting for thread to get deleted
+SET debug_sync='now SIGNAL go';
+ERROR XAE04: XAER_NOTA: Unknown XID
+SET debug_sync='RESET';
diff --git a/mysql-test/t/xa_sync.test b/mysql-test/t/xa_sync.test
new file mode 100644
index 0000000..8a995c4
--- /dev/null
+++ b/mysql-test/t/xa_sync.test
@@ -0,0 +1,46 @@
+--source include/have_debug_sync.inc
+
+--echo #
+--echo # MDEV-7793 - Race condition between XA COMMIT/ROLLBACK and disconnect
+--echo #
+--echo # Note that this test is meaningful only with valgrind.
+let $op= XA COMMIT 'xatest';
+let $i= 2;
+while ($i)
+{
+  connect con1, localhost, root;
+  connect con2, localhost, root;
+
+  connection con1;
+  XA START 'xatest';
+  XA END 'xatest';
+  XA PREPARE 'xatest';
+
+  connection con2;
+  SET debug_sync='xa_after_search SIGNAL parked WAIT_FOR go';
+  send_eval $op;
+
+  connection default;
+  SET debug_sync='now WAIT_FOR parked';
+  disconnect con1;
+  disable_query_log;
+  echo # Waiting for thread to get deleted;
+  while (`SELECT VARIABLE_VALUE!=2 FROM INFORMATION_SCHEMA.SESSION_STATUS WHERE VARIABLE_NAME='max_used_connections'`)
+  {
+    real_sleep 0.1;
+    FLUSH STATUS;
+  }
+  enable_query_log;
+  SET debug_sync='now SIGNAL go';
+
+  connection con2;
+  --error ER_XAER_NOTA
+  reap;
+  disconnect con2;
+
+  connection default;
+  SET debug_sync='RESET';
+
+  let $op= XA ROLLBACK 'xatest';
+  dec $i;
+}
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 3071ba6..f18231d 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1540,7 +1540,6 @@ void THD::init_for_queries()
                       variables.trans_alloc_block_size,
                       variables.trans_prealloc_size);
   transaction.xid_state.xid.null();
-  transaction.xid_state.in_thd=1;
 }
 
 
@@ -5182,83 +5181,101 @@ void mark_transaction_to_rollback(THD *thd, bool all)
 class XID_cache_element
 {
   /*
-    bits 1..30 are reference counter
-    bit 31 is UNINITIALIZED flag
+    m_state is used to prevent elements from being deleted while XA RECOVER
+    iterates xid cache and to prevent recovered elments from being acquired by
+    multiple threads.
+
+    bits 1..29 are reference counter
+    bit 30 is RECOVERED flag
+    bit 31 is ACQUIRED flag (thread owns this xid)
     bit 32 is unused
 
-    Newly allocated and deleted elements have UNINITIALIZED flag set.
+    Newly allocated and deleted elements have m_state set to 0.
 
     On lock() m_state is atomically incremented. It also creates load-ACQUIRE
     memory barrier to make sure m_state is actually updated before furhter
-    memory accesses. Attempting to lock UNINITIALIED element returns failure
-    and further accesses to element memory are forbidden.
+    memory accesses. Attempting to lock an element that has neither ACQUIRED
+    nor RECOVERED flag set returns failure and further accesses to element
+    memory are forbidden.
 
     On unlock() m_state is decremented. It also creates store-RELEASE memory
     barrier to make sure m_state is actually updated after preceding memory
     accesses.
 
-    UNINITIALIZED flag is cleared upon successful insert.
+    ACQUIRED flag is set when thread registers it's xid or when thread acquires
+    recovered xid.
 
-    UNINITIALIZED flag is set before delete in a spin loop, after last reference
-    is released.
+    RECOVERED flag is set for elements found during crash recovery.
 
-    Currently m_state is only used to prevent elements from being deleted while
-    XA RECOVER iterates xid cache.
+    ACQUIRED and RECOVERED flags are cleared before element is deleted from
+    hash in a spin loop, after last reference is released.
   */
   int32 m_state;
-  static const int32 UNINITIALIZED= 1 << 30;
 public:
+  static const int32 ACQUIRED= 1 << 30;
+  static const int32 RECOVERED= 1 << 29;
   XID_STATE *m_xid_state;
-  bool lock()
+  bool is_set(int32 flag)
+  { return my_atomic_load32_explicit(&m_state, MY_MEMORY_ORDER_RELAXED) & flag; }
+  void set(int32 flag)
   {
-    if (my_atomic_add32_explicit(&m_state, 1,
-                                 MY_MEMORY_ORDER_ACQUIRE) & UNINITIALIZED)
-    {
-      unlock();
-      return false;
-    }
-    return true;
+    DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
+    my_atomic_add32_explicit(&m_state, flag, MY_MEMORY_ORDER_RELAXED);
   }
-  void unlock()
+  bool lock()
   {
-    my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE);
+    int32 old= my_atomic_add32_explicit(&m_state, 1, MY_MEMORY_ORDER_ACQUIRE);
+    if (old & (ACQUIRED | RECOVERED))
+      return true;
+    unlock();
+    return false;
   }
+  void unlock()
+  { my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE); }
   void mark_uninitialized()
   {
-    int32 old= 0;
-    while (!my_atomic_cas32_weak_explicit(&m_state, &old, UNINITIALIZED,
+    int32 old= ACQUIRED;
+    while (!my_atomic_cas32_weak_explicit(&m_state, &old, 0,
                                           MY_MEMORY_ORDER_RELAXED,
                                           MY_MEMORY_ORDER_RELAXED))
     {
-      old= 0;
+      old&= ACQUIRED | RECOVERED;
       (void) LF_BACKOFF;
     }
   }
-  void mark_initialized()
+  bool mark_acquired()
   {
-    DBUG_ASSERT(m_state & UNINITIALIZED);
-    my_atomic_add32_explicit(&m_state, -UNINITIALIZED, MY_MEMORY_ORDER_RELAXED);
+    int32 old= RECOVERED;
+    while (!my_atomic_cas32_weak_explicit(&m_state, &old, ACQUIRED | RECOVERED,
+                                          MY_MEMORY_ORDER_RELAXED,
+                                          MY_MEMORY_ORDER_RELAXED))
+    {
+      if (!(old & RECOVERED) || (old & ACQUIRED))
+        return false;
+      old= RECOVERED;
+      (void) LF_BACKOFF;
+    }
+    return true;
   }
   static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
                                   XID_cache_element *element,
                                   XID_STATE *xid_state)
   {
+    DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
     element->m_xid_state= xid_state;
     xid_state->xid_cache_element= element;
   }
   static void lf_alloc_constructor(uchar *ptr)
   {
     XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
-    element->m_state= UNINITIALIZED;
+    element->m_state= 0;
   }
   static void lf_alloc_destructor(uchar *ptr)
   {
     XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
-    if (element->m_state != UNINITIALIZED)
-    {
-      DBUG_ASSERT(!element->m_xid_state->in_thd);
+    DBUG_ASSERT(!element->is_set(ACQUIRED));
+    if (element->is_set(RECOVERED))
       my_free(element->m_xid_state);
-    }
   }
   static uchar *key(const XID_cache_element *element, size_t *length,
                     my_bool not_used __attribute__((unused)))
@@ -5303,18 +5320,25 @@ void xid_cache_free()
 }
 
 
+/**
+  Find recovered XA transaction by XID.
+*/
+
 XID_STATE *xid_cache_search(THD *thd, XID *xid)
 {
+  XID_STATE *xs= 0;
   DBUG_ASSERT(thd->xid_hash_pins);
   XID_cache_element *element=
     (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
                                         xid->key(), xid->key_length());
   if (element)
   {
+    if (element->mark_acquired())
+      xs= element->m_xid_state;
     lf_hash_search_unpin(thd->xid_hash_pins);
-    return element->m_xid_state;
+    DEBUG_SYNC(thd, "xa_after_search");
   }
-  return 0;
+  return xs;
 }
 
 
@@ -5331,13 +5355,12 @@ bool xid_cache_insert(XID *xid, enum xa_states xa_state)
   {
     xs->xa_state=xa_state;
     xs->xid.set(xid);
-    xs->in_thd=0;
     xs->rm_error=0;
 
     if ((res= lf_hash_insert(&xid_cache, pins, xs)))
       my_free(xs);
     else
-      xs->xid_cache_element->mark_initialized();
+      xs->xid_cache_element->set(XID_cache_element::RECOVERED);
     if (res == 1)
       res= 0;
   }
@@ -5355,7 +5378,7 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
   switch (res)
   {
   case 0:
-    xid_state->xid_cache_element->mark_initialized();
+    xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
     break;
   case 1:
     my_error(ER_XAER_DUPID, MYF(0));
@@ -5370,12 +5393,13 @@ void xid_cache_delete(THD *thd, XID_STATE *xid_state)
 {
   if (xid_state->xid_cache_element)
   {
+    bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED);
     DBUG_ASSERT(thd->xid_hash_pins);
     xid_state->xid_cache_element->mark_uninitialized();
     lf_hash_delete(&xid_cache, thd->xid_hash_pins,
                    xid_state->xid.key(), xid_state->xid.key_length());
     xid_state->xid_cache_element= 0;
-    if (!xid_state->in_thd)
+    if (recovered)
       my_free(xid_state);
   }
 }
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 44e4be7..9d303ef 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1130,7 +1130,6 @@ struct st_savepoint {
   /* For now, this is only used to catch duplicated external xids */
   XID  xid;                           // transaction identifier
   enum xa_states xa_state;            // used by external XA only
-  bool in_thd;
   /* Error reported by the Resource Manager (RM) to the Transaction Manager. */
   uint rm_error;
   XID_cache_element *xid_cache_element;
diff --git a/sql/transaction.cc b/sql/transaction.cc
index d6ef160..47d83a4 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -835,18 +835,9 @@ bool trans_xa_commit(THD *thd)
       my_error(ER_OUT_OF_RESOURCES, MYF(0));
       DBUG_RETURN(TRUE);
     }
-    /*
-      xid_state.in_thd is always true beside of xa recovery procedure.
-      Note, that there is no race condition here between xid_cache_search
-      and xid_cache_delete, since we always delete our own XID
-      (thd->lex->xid == thd->transaction.xid_state.xid).
-      The only case when thd->lex->xid != thd->transaction.xid_state.xid
-      and xid_state->in_thd == 0 is in the function
-      xa_cache_insert(XID, xa_states), which is called before starting
-      client connections, and thus is always single-threaded.
-    */
+
     XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
-    res= !xs || xs->in_thd;
+    res= !xs;
     if (res)
       my_error(ER_XAER_NOTA, MYF(0));
     else
@@ -947,7 +938,7 @@ bool trans_xa_rollback(THD *thd)
     }
 
     XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
-    if (!xs || xs->in_thd)
+    if (!xs)
       my_error(ER_XAER_NOTA, MYF(0));
     else
     {
diff --git a/storage/spider/spd_trx.cc b/storage/spider/spd_trx.cc
index 1b02bb8..1a0c519 100644
--- a/storage/spider/spd_trx.cc
+++ b/storage/spider/spd_trx.cc
@@ -1872,7 +1872,6 @@ int spider_internal_start_trx(
 
       trx->internal_xid_state.xa_state = XA_ACTIVE;
       trx->internal_xid_state.xid.set(&trx->xid);
-      trx->internal_xid_state.in_thd = 1;
       if ((error_num = spider_xa_lock(&trx->internal_xid_state)))
       {
         if (error_num == ER_SPIDER_XA_LOCKED_NUM)


More information about the commits mailing list