[Commits] Rev 2904: Merge MWL#116, PBXT part into mariadb-5.2-rpl in http://bazaar.launchpad.net/~maria-captains/maria/5.2

knielsen at knielsen-hq.org knielsen at knielsen-hq.org
Mon Jan 31 16:41:07 EET 2011


At http://bazaar.launchpad.net/~maria-captains/maria/5.2

------------------------------------------------------------
revno: 2904 [merge]
revision-id: knielsen at knielsen-hq.org-20110131144058-5mfqbb5brqu51z36
parent: knielsen at knielsen-hq.org-20110127190800-w4mcx0hpo3mnkkkr
parent: knielsen at knielsen-hq.org-20101015134206-8d42qdv6oekjun2k
committer: knielsen at knielsen-hq.org
branch nick: work-5.2-rpl
timestamp: Mon 2011-01-31 15:40:58 +0100
message:
  Merge MWL#116, PBXT part into mariadb-5.2-rpl
  This makes PBXT implement the commit_ordered() method, so that cross-engine
  START TRANSACTION WITH CONSISTENT SNAPSHOT works actually consistent with
  XtraDB.
  Also mark the version number to show this is the -rpl feature preview.
added:
  tests/consistent_snapshot.pl   consistent_snapshot.-20101015134153-zfvs6km9203jgxi8-1
modified:
  configure.in                   sp1f-configure.in-19700101030959-mgdpoxtnh2ewmvusvfpkreuhwvffkcjw
  storage/pbxt/src/ha_pbxt.cc    ha_pbxt.cc-20090326121724-x683v32twzr3fi0y-30
  storage/pbxt/src/thread_xt.h   thread_xt.h-20090326121724-x683v32twzr3fi0y-68
  storage/pbxt/src/xaction_xt.cc xaction_xt.cc-20090326121724-x683v32twzr3fi0y-74
  storage/pbxt/src/xaction_xt.h  xaction_xt.h-20090326121724-x683v32twzr3fi0y-75
=== modified file 'configure.in'
--- a/configure.in	2010-12-13 13:51:47 +0000
+++ b/configure.in	2011-01-31 14:40:58 +0000
@@ -12,7 +12,7 @@ dnl
 dnl When changing the major version number please also check the switch
 dnl statement in mysqlbinlog::check_master_version().  You may also need
 dnl to update version.c in ndb.
-AC_INIT([MariaDB Server], [5.2.5-MariaDB], [], [mysql])
+AC_INIT([MariaDB Server], [5.2.4-MariaDB-rpl], [], [mysql])
 
 AC_CONFIG_SRCDIR([sql/mysqld.cc])
 AC_CANONICAL_SYSTEM

=== modified file 'storage/pbxt/src/ha_pbxt.cc'
--- a/storage/pbxt/src/ha_pbxt.cc	2010-11-30 21:11:03 +0000
+++ b/storage/pbxt/src/ha_pbxt.cc	2011-01-31 14:40:58 +0000
@@ -108,6 +108,9 @@ static int		pbxt_end(void *p);
 static int              pbxt_panic(handlerton *hton, enum ha_panic_function flag);
 static void             pbxt_drop_database(handlerton *hton, char *path);
 static int              pbxt_close_connection(handlerton *hton, THD* thd);
+#ifdef MARIADB_BASE_VERSION
+static void             pbxt_commit_ordered(handlerton *hton, THD *thd, bool all);
+#endif
 static int              pbxt_commit(handlerton *hton, THD *thd, bool all);
 static int              pbxt_rollback(handlerton *hton, THD *thd, bool all);
 static int              pbxt_prepare(handlerton *hton, THD *thd, bool all);
@@ -1147,6 +1150,9 @@ static int pbxt_init(void *p)
                 pbxt_hton->state = SHOW_OPTION_YES;
                 pbxt_hton->db_type = DB_TYPE_PBXT; // Wow! I have my own!
                 pbxt_hton->close_connection = pbxt_close_connection; /* close_connection, cleanup thread related data. */
+#ifdef MARIADB_BASE_VERSION
+                pbxt_hton->commit_ordered = pbxt_commit_ordered;
+#endif
                 pbxt_hton->commit = pbxt_commit; /* commit */
                 pbxt_hton->rollback = pbxt_rollback; /* rollback */
                 if (pbxt_support_xa) {
@@ -1484,6 +1490,29 @@ static int pbxt_start_consistent_snapsho
         return err;
 }
 
+#ifdef MARIADB_BASE_VERSION
+/*
+ * Quickly commit the transaction to memory and make it visible to others.
+ * The remaining part of commit will happen later, in pbxt_commit().
+ */
+static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all)
+{
+        XTThreadPtr     self;
+
+        if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) {
+                XT_PRINT2(self, "%s pbxt_commit_ordered all=%d\n", all ? "END CONN XACT" : "END STAT", all);
+
+                if (self->st_xact_data) {
+                        if (all || self->st_auto_commit) {
+                                self->st_commit_ordered = TRUE;
+                                self->st_writer = self->st_xact_writer;
+                                self->st_delayed_error= !xt_xn_commit_fast(self, self->st_writer);
+                        }
+                }
+        }
+}
+#endif
+
 /*
  * Commit the PBXT transaction of the given thread.
  * thd is the MySQL thread structure.
@@ -1512,7 +1541,13 @@ static int pbxt_commit(handlerton *hton,
                         if (all || self->st_auto_commit) {
                                 XT_PRINT0(self, "xt_xn_commit in pbxt_commit\n");
 
-                                if (!xt_xn_commit(self))
+                                if (self->st_commit_ordered) {
+                                        self->st_commit_ordered = FALSE;
+                                        err = !xt_xn_commit_slow(self, self->st_writer) || self->st_delayed_error;
+                                } else {
+                                        err = !xt_xn_commit(self);
+                                }
+                                if (err)
                                         err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
                         }
                 }
@@ -6064,7 +6099,7 @@ static MYSQL_SYSVAR_INT(max_threads, pbx
         NULL, NULL, 0, 0, 20000, 1);
 #endif
 
-#ifndef DEBUG
+#if !defined(DEBUG) || defined(MARIADB_BASE_VERSION)
 static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa,
         PLUGIN_VAR_OPCMDARG,
         "Enable PBXT support for the XA two-phase commit, default is enabled",

=== modified file 'storage/pbxt/src/thread_xt.h'
--- a/storage/pbxt/src/thread_xt.h	2010-05-05 10:59:57 +0000
+++ b/storage/pbxt/src/thread_xt.h	2010-10-15 13:42:06 +0000
@@ -299,6 +299,9 @@ typedef struct XTThread {
         xtBool                                  st_stat_ended;                                  /* TRUE if the statement was ended. */
         xtBool                                  st_stat_trans;                                  /* TRUE if a statement transaction is running (started on UPDATE). */
         xtBool                                  st_stat_modify;                                 /* TRUE if the statement is an INSERT/UPDATE/DELETE */
+        xtBool                                  st_commit_ordered;                              /* TRUE if we have run commit_ordered() */
+        xtBool                                  st_delayed_error;                               /* TRUE if we got an error in commit_ordered() */
+        xtBool                                  st_writer;                                              /* Copy of thread->st_xact_writer (which is clobbered by xlog_append()) */
 #ifdef XT_IMPLEMENT_NO_ACTION
         XTBasicListRec                  st_restrict_list;                               /* These records have been deleted and should have no reference. */
 #endif

=== modified file 'storage/pbxt/src/xaction_xt.cc'
--- a/storage/pbxt/src/xaction_xt.cc	2010-09-28 13:05:45 +0000
+++ b/storage/pbxt/src/xaction_xt.cc	2010-10-15 13:42:06 +0000
@@ -1287,27 +1287,61 @@ xtPublic xtBool xt_xn_begin(XTThreadPtr 
         return OK;
 }
 
-static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
+static void xn_end_release_locks(XTThreadPtr thread)
+{
+        XTXactDataPtr   xact = thread->st_xact_data;
+        XTDatabaseHPtr  db = thread->st_database;
+        ASSERT_NS(xact);
+
+        /* {REMOVE-LOCKS} Drop locks if you have any: */
+        thread->st_lock_list.xt_remove_all_locks(db, thread);
+
+        /* Do this afterwards to make sure the sweeper
+         * does not cleanup transactions start cleaning up
+         * before any transactions that were waiting for
+         * this transaction have completed!
+         */
+        xact->xd_end_xn_id = db->db_xn_curr_id;
+
+        /* Now you can sweep! */
+        xact->xd_flags |= XT_XN_XAC_SWEEP;
+}
+
+/* The commit is split into two phases: one "fast" for MariaDB commit_ordered(),
+ * and one "slow" for commit(). When not using internal 2pc, there is only one
+ * call combining both phases.
+ */
+
+enum {
+        XN_END_PHASE_FAST = 1,
+        XN_END_PHASE_SLOW = 2,
+        XN_END_PHASE_BOTH = 3
+};
+
+static xtBool xn_end_xact(XTThreadPtr thread, u_int status, xtBool writer, int phase)
 {
         XTXactDataPtr   xact;
         xtBool                  ok = TRUE;
+        xtBool                  err;
 
         ASSERT_NS(thread->st_xact_data);
         if ((xact = thread->st_xact_data)) {
                 XTDatabaseHPtr  db = thread->st_database;
                 xtXactID                xn_id = xact->xd_start_xn_id;
-                xtBool                  writer;
                 
-                if ((writer = thread->st_xact_writer)) {
+                if (writer) {
                         /* The transaction wrote something: */
                         XTXactEndEntryDRec      entry;
                         xtWord4                         sum;
 
-                        sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
-                        entry.xe_status_1 = status;
-                        entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
-                        XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
-                        XT_SET_DISK_4(entry.xe_not_used_4, 0);
+                        if (phase & XN_END_PHASE_FAST)
+                        {
+                                sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
+                                entry.xe_status_1 = status;
+                                entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
+                                XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
+                                XT_SET_DISK_4(entry.xe_not_used_4, 0);
+                        }
 
 #ifdef XT_IMPLEMENT_NO_ACTION
                         /* This will check any resticts that have been delayed to the end of the statement. */
@@ -1319,20 +1353,35 @@ static xtBool xn_end_xact(XTThreadPtr th
                         }
 #endif
 
-                        /* Flush the data log: */
-                        if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
+                        /* Flush the data log (in the "fast" case we already did it in prepare: */
+                        if ((phase & XN_END_PHASE_SLOW) && !thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
                                 ok = FALSE;
                                 status = XT_LOG_ENT_ABORT;
                         }
 
                         /* Write and flush the transaction log: */
-                        if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) {
+                        if (phase == XN_END_PHASE_FAST) {
+                                /* Fast phase, delay any write or flush to later. */
+                                err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_NO_WRITE_NO_FLUSH);
+                        } else if (phase == XN_END_PHASE_SLOW) {
+                                /* We already appended the commit record in the fast phase.
+                                 * Now just call with empty record to ensure we write/flush
+                                 * the log as needed for this commit.
+                                 */
+                                err = !xt_xlog_log_data(thread, 0, NULL, xt_db_flush_log_at_trx_commit);
+                        } else /* phase == XN_END_PHASE_BOTH */ {
+                                /* Both phases at once, append commit record and write/flush normally. */
+                                ASSERT_NS(phase == XN_END_PHASE_BOTH);
+                                err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit);
+                        }
+
+                        if (err) {
                                 ok = FALSE;
                                 status = XT_LOG_ENT_ABORT;
                                 /* Make sure this is done, if we failed to log
                                  * the transction end!
                                  */
-                                if (thread->st_xact_writer) {
+                                if (writer) {
                                         /* Adjust this in case of error, but don't forget
                                          * to lock!
                                          */
@@ -1347,46 +1396,46 @@ static xtBool xn_end_xact(XTThreadPtr th
                                 }
                         }
 
-                        /* Setting this flag completes the transaction,
-                         * Do this before we release the locks, because
-                         * the unlocked transactions expect the
-                         * transaction they are waiting for to be
-                         * gone!
-                         */
-                        xact->xd_end_time = ++db->db_xn_end_time;
-                        if (status == XT_LOG_ENT_COMMIT) {
-                                thread->st_statistics.st_commits++;
-                                xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
-                        }
-                        else {
-                                thread->st_statistics.st_rollbacks++;
-                                xact->xd_flags |= XT_XN_XAC_ENDED;
+                        if (phase & XN_END_PHASE_FAST) {
+                                /* Setting this flag completes the transaction,
+                                 * Do this before we release the locks, because
+                                 * the unlocked transactions expect the
+                                 * transaction they are waiting for to be
+                                 * gone!
+                                 */
+                                xact->xd_end_time = ++db->db_xn_end_time;
+                                if (status == XT_LOG_ENT_COMMIT) {
+                                        thread->st_statistics.st_commits++;
+                                        xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
+                                }
+                                else {
+                                        thread->st_statistics.st_rollbacks++;
+                                        xact->xd_flags |= XT_XN_XAC_ENDED;
+                                }
                         }
 
-                        /* {REMOVE-LOCKS} Drop locks is you have any: */
-                        thread->st_lock_list.xt_remove_all_locks(db, thread);
-
-                        /* Do this afterwards to make sure the sweeper
-                         * does not cleanup transactions start cleaning up
-                         * before any transactions that were waiting for
-                         * this transaction have completed!
+                        /* Be as fast as possible in the "fast" path, as we want to be as
+                         * fast as possible here (we will release slow locks immediately
+                         * after in the "slow" part).
+                         * ToDo: If we ran the fast part, the slow part could release locks
+                         * _before_ fsync(), rather than after.
                          */
-                        xact->xd_end_xn_id = db->db_xn_curr_id;
+                        if (!(phase & XN_END_PHASE_SLOW))
+                                return ok;
 
-                        /* Now you can sweep! */
-                        xact->xd_flags |= XT_XN_XAC_SWEEP;
+                        xn_end_release_locks(thread);
                 }
                 else {
                         /* Read-only transaction can be removed, immediately */
-                        xact->xd_end_time = ++db->db_xn_end_time;
-                        xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
-
-                        /* Drop locks is you have any: */
-                        thread->st_lock_list.xt_remove_all_locks(db, thread);
+                        if (phase & XN_END_PHASE_FAST) {
+                                xact->xd_end_time = ++db->db_xn_end_time;
+                                xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
 
-                        xact->xd_end_xn_id = db->db_xn_curr_id;
+                                if (!(phase & XN_END_PHASE_SLOW))
+                                        return ok;
+                        }
 
-                        xact->xd_flags |= XT_XN_XAC_SWEEP;
+                        xn_end_release_locks(thread);
 
                         if (xt_xn_delete_xact(db, xn_id, thread)) {
                                 if (db->db_xn_min_ram_id == xn_id)
@@ -1478,12 +1527,22 @@ static xtBool xn_end_xact(XTThreadPtr th
 
 xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
 {
-        return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
+        return xn_end_xact(thread, XT_LOG_ENT_COMMIT, thread->st_xact_writer, XN_END_PHASE_BOTH);
+}
+
+xtPublic xtBool xt_xn_commit_fast(XTThreadPtr thread, xtBool writer)
+{
+        return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_FAST);
+}
+
+xtPublic xtBool xt_xn_commit_slow(XTThreadPtr thread, xtBool writer)
+{
+        return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_SLOW);
 }
 
 xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
 {
-        return xn_end_xact(thread, XT_LOG_ENT_ABORT);
+        return xn_end_xact(thread, XT_LOG_ENT_ABORT, thread->st_xact_writer, XN_END_PHASE_BOTH);
 }
 
 xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)

=== modified file 'storage/pbxt/src/xaction_xt.h'
--- a/storage/pbxt/src/xaction_xt.h	2010-05-05 10:59:57 +0000
+++ b/storage/pbxt/src/xaction_xt.h	2010-10-15 13:42:06 +0000
@@ -193,6 +193,8 @@ void			xt_wakeup_sweeper(struct XTDataba
 
 xtBool                  xt_xn_begin(struct XTThread *self);
 xtBool                  xt_xn_commit(struct XTThread *self);
+xtBool                  xt_xn_commit_fast(struct XTThread *self, xtBool writer);
+xtBool                  xt_xn_commit_slow(struct XTThread *self, xtBool writer);
 xtBool                  xt_xn_rollback(struct XTThread *self);
 xtBool                  xt_xn_log_tab_id(struct XTThread *self, xtTableID tab_id);
 int                             xt_xn_status(struct XTOpenTable *ot, xtXactID xn_id, xtRecordID rec_id);

=== added file 'tests/consistent_snapshot.pl'
--- a/tests/consistent_snapshot.pl	1970-01-01 00:00:00 +0000
+++ b/tests/consistent_snapshot.pl	2010-10-15 13:42:06 +0000
@@ -0,0 +1,107 @@
+#! /usr/bin/perl
+
+# Test START TRANSACTION WITH CONSISTENT SNAPSHOT.
+# With MWL#116, this is implemented so it is actually consistent.
+
+use strict;
+use warnings;
+
+use DBI;
+
+my $UPDATERS= 10;
+my $READERS= 5;
+
+my $ROWS= 50;
+my $DURATION= 20;
+
+my $stop_time= time() + $DURATION;
+
+sub my_connect {
+  my $dbh= DBI->connect("dbi:mysql:mysql_socket=/tmp/mysql.sock;database=test",
+                        "root", undef, { RaiseError=>1, PrintError=>0, AutoCommit=>0});
+  $dbh->do("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+  $dbh->do("SET SESSION autocommit = 0");
+  return $dbh;
+}
+
+sub my_setup {
+  my $dbh= my_connect();
+
+  $dbh->do("DROP TABLE IF EXISTS test_consistent_snapshot1, test_consistent_snapshot2");
+  $dbh->do(<<TABLE);
+CREATE TABLE test_consistent_snapshot1 (
+  a INT PRIMARY KEY,
+  b INT NOT NULL
+) ENGINE=InnoDB
+TABLE
+  $dbh->do(<<TABLE);
+CREATE TABLE test_consistent_snapshot2(
+  a INT PRIMARY KEY,
+  b INT NOT NULL
+) ENGINE=PBXT
+TABLE
+
+  for (my $i= 0; $i < $ROWS; $i++) {
+    my $value= int(rand()*1000);
+    $dbh->do("INSERT INTO test_consistent_snapshot1 VALUES (?, ?)", undef,
+             $i, $value);
+    $dbh->do("INSERT INTO test_consistent_snapshot2 VALUES (?, ?)", undef,
+             $i, -$value);
+  }
+  $dbh->commit();
+  $dbh->disconnect();
+}
+
+sub my_updater {
+  my $dbh= my_connect();
+
+  while (time() < $stop_time) {
+    my $i1= int(rand()*$ROWS);
+    my $i2= int(rand()*$ROWS);
+    my $v= int(rand()*99)-49;
+    $dbh->do("UPDATE test_consistent_snapshot1 SET b = b + ? WHERE a = ?",
+             undef, $v, $i1);
+    $dbh->do("UPDATE test_consistent_snapshot2 SET b = b - ? WHERE a = ?",
+             undef, $v, $i2);
+    $dbh->commit();
+  }
+
+  $dbh->disconnect();
+  exit(0);
+}
+
+sub my_reader {
+  my $dbh= my_connect();
+
+  my $iteration= 0;
+  while (time() < $stop_time) {
+    $dbh->do("START TRANSACTION WITH CONSISTENT SNAPSHOT");
+    my $s1= $dbh->selectrow_arrayref("SELECT SUM(b) FROM test_consistent_snapshot1");
+    $s1= $s1->[0];
+    my $s2= $dbh->selectrow_arrayref("SELECT SUM(b) FROM test_consistent_snapshot2");
+    $s2= $s2->[0];
+    $dbh->commit();
+    if ($s1 + $s2 != 0) {
+      print STDERR "Found inconsistency, s1=$s1 s2=$s2 iteration=$iteration\n";
+      last;
+    }
+    ++$iteration;
+  }
+
+  $dbh->disconnect();
+  exit(0);
+}
+
+my_setup();
+
+for (1 .. $UPDATERS) {
+  fork() || my_updater();
+}
+
+for (1 .. $READERS) {
+  fork() || my_reader();
+}
+
+waitpid(-1, 0) for (1 .. ($UPDATERS + $READERS));
+
+print "All checks done\n";



More information about the commits mailing list