18 Oct
2010
18 Oct
'10
4:57 p.m.
Hi Kristian, On Oct 15, 2010, at 4:07 PM, Kristian Nielsen wrote: > Kristian Nielsen <knielsen@knielsen-hq.org> writes: > >> Ok, thanks a lot for the advice, I will give it another shot. > > Thanks to your help, I got it working! It was _really_ nice to see > that the > new API applies well to PBXT also. Wow! That's great. > As a bonus, we now get START TRANSACTION WITH CONSISTENT SNAPSHOT > actually be > consistent! In MySQL, this does not really do much except start a > transaction > in all engines, it certainly does not ensure any consistency between > engines. > With this change, it becomes consistent, I added a small Perl test > program > tests/consistent_snapshot.pl that shows this. I think this is > particularly > useful for backups; That's cool. > I plan to add a way to get the corresponding binlog > position, so START TRANSACTION WITH CONSISTENT SNAPSHOT can be used > to make a > fully consistent and non-blocking backup (current mysqldump needs > FLUSH TABLES > WITH READ LOCK, which is not really non-blocking). > > I hope you can take a look at the patch (attached) when you get some > time and > let me know what you think, and if you see any mistakes. I did it a > little > differently from what we discussed, as I wanted to minimise the > amount of work > done while holding the global mutex around commit_ordered(). OK, I will check it out when I have time. > I also pushed the patch here, in case you want to see or run the > full code: > > lp:~maria-captains/maria/mariadb-5.1-mwl116-pbxt > > It passes the test suite, but I did at one point see this in the > log, which I > am not sure what means, maybe you can help? > > void XTTabCache::xt_tc_release_page(XTOpenFile*, XTTabCachePage*, > XTThread*)(tabcache_xt.cc:409) page->tcp_lock_count > 0 Hmmm. Not so good. > Finally a couple of questions: > >>>> In particular this, flushing the data log (is this flush to disk?): >> >>>> if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) { >>>> ok = FALSE; >>>> status = XT_LOG_ENT_ABORT; >>>> } >> >>> >>> Yes, this is a flush to disk. >>> >>> This could be done in the slow part (obviously this would be ideal). > >>> If we do not flush the data log, then there is a chance that such a >>> commit transaction is incomplete, because the associated data log >>> data >>> has not been committed. > > This is done in commit, but I could not see where similar data log > flush is > done in prepare(). It seems prepare() mostly adds a "prepare" record > and > flushes the transaction log. Yes, this is all it does.. > Is it correct that no data log flush happens in prepare? If so, > don't we have > the same problem? Oops, that looks like a bug... Prepare should also flush the data log. Well done! :) > Suppose we prepare() in PBXT and write (and flush) the transaction > into the > binary log. Then we crash. When the server comes back up, it will > try to > recover the transaction inside PBXT, but that will not be possible > if the data > log was lost due to no flush, right? Rollback would be possible, but commit may not be possible. Right. > Final question: > > In commit() we call xt_tab_restrict_rows(). It seems to be delayed > checking > for defered foreign key constraints or something like that? If it > is, then > shouldn't it be done in prepare() (it's wrong to rollback with error > in > commit() after successful prepare)? I see the #ifdef > XT_IMPLEMENT_NO_ACTION > around the call, so I suppose this code is not actually used, but I > just > wondered ... Yup. Right again, on all counts! :) There was/is a bug in MySQL that prevents me from activating this code. Best regards, Paul > ------------------------------------------------------------ > revno: 2852 > committer: knielsen@knielsen-hq.org > branch nick: work-5.1-pbxt-commit-ordered > timestamp: Fri 2010-10-15 15:42:06 +0200 > message: > MWL#116: Efficient group commit: PBXT part > > Implement the commit_ordered() API in PBXT, getting consistent > commit ordering > with other engines and binlog. > > Make pbxt_support_xa default in MariaDB debug build (as the bug > that causes > assert in MySQL is fixed in MariaDB). > diff: > === modified file 'storage/pbxt/src/ha_pbxt.cc' > --- storage/pbxt/src/ha_pbxt.cc 2010-09-28 13:05:45 +0000 > +++ storage/pbxt/src/ha_pbxt.cc 2010-10-15 13:42:06 +0000 > @@ -108,6 +108,9 @@ > static int pbxt_panic(handlerton *hton, enum ha_panic_function flag); > static void pbxt_drop_database(handlerton *hton, char *path); > static int pbxt_close_connection(handlerton *hton, THD* thd); > +#ifdef MARIADB_BASE_VERSION > +static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool > all); > +#endif > static int pbxt_commit(handlerton *hton, THD *thd, bool all); > static int pbxt_rollback(handlerton *hton, THD *thd, bool all); > static int pbxt_prepare(handlerton *hton, THD *thd, bool all); > @@ -1147,6 +1150,9 @@ > pbxt_hton->state = SHOW_OPTION_YES; > pbxt_hton->db_type = DB_TYPE_PBXT; // Wow! I have my own! > pbxt_hton->close_connection = pbxt_close_connection; /* > close_connection, cleanup thread related data. */ > +#ifdef MARIADB_BASE_VERSION > + pbxt_hton->commit_ordered = pbxt_commit_ordered; > +#endif > pbxt_hton->commit = pbxt_commit; /* commit */ > pbxt_hton->rollback = pbxt_rollback; /* rollback */ > if (pbxt_support_xa) { > @@ -1484,6 +1490,29 @@ > return err; > } > > +#ifdef MARIADB_BASE_VERSION > +/* > + * Quickly commit the transaction to memory and make it visible to > others. > + * The remaining part of commit will happen later, in pbxt_commit(). > + */ > +static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all) > +{ > + XTThreadPtr self; > + > + if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) { > + XT_PRINT2(self, "%s pbxt_commit_ordered all=%d\n", all ? "END > CONN XACT" : "END STAT", all); > + > + if (self->st_xact_data) { > + if (all || self->st_auto_commit) { > + self->st_commit_ordered = TRUE; > + self->st_writer = self->st_xact_writer; > + self->st_delayed_error= !xt_xn_commit_fast(self, self- > >st_writer); > + } > + } > + } > +} > +#endif > + > /* > * Commit the PBXT transaction of the given thread. > * thd is the MySQL thread structure. > @@ -1512,7 +1541,13 @@ > if (all || self->st_auto_commit) { > XT_PRINT0(self, "xt_xn_commit in pbxt_commit\n"); > > - if (!xt_xn_commit(self)) > + if (self->st_commit_ordered) { > + self->st_commit_ordered = FALSE; > + err = !xt_xn_commit_slow(self, self->st_writer) || self- > >st_delayed_error; > + } else { > + err = !xt_xn_commit(self); > + } > + if (err) > err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE); > } > } > @@ -6064,7 +6099,7 @@ > NULL, NULL, 0, 0, 20000, 1); > #endif > > -#ifndef DEBUG > +#if !defined(DEBUG) || defined(MARIADB_BASE_VERSION) > static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa, > PLUGIN_VAR_OPCMDARG, > "Enable PBXT support for the XA two-phase commit, default is > enabled", > > === modified file 'storage/pbxt/src/thread_xt.h' > --- storage/pbxt/src/thread_xt.h 2010-05-05 10:59:57 +0000 > +++ storage/pbxt/src/thread_xt.h 2010-10-15 13:42:06 +0000 > @@ -299,6 +299,9 @@ > xtBool st_stat_ended; /* TRUE if the statement was ended. */ > xtBool st_stat_trans; /* TRUE if a statement transaction is > running (started on UPDATE). */ > xtBool st_stat_modify; /* TRUE if the statement is an > INSERT/UPDATE/DELETE */ > + xtBool st_commit_ordered; /* TRUE if we have run > commit_ordered() */ > + xtBool st_delayed_error; /* TRUE if we got an error in > commit_ordered() */ > + xtBool st_writer; /* Copy of thread->st_xact_writer > (which is clobbered by xlog_append()) */ > #ifdef XT_IMPLEMENT_NO_ACTION > XTBasicListRec st_restrict_list; /* These records have been > deleted and should have no reference. */ > #endif > > === modified file 'storage/pbxt/src/xaction_xt.cc' > --- storage/pbxt/src/xaction_xt.cc 2010-09-28 13:05:45 +0000 > +++ storage/pbxt/src/xaction_xt.cc 2010-10-15 13:42:06 +0000 > @@ -1287,27 +1287,61 @@ > return OK; > } > > -static xtBool xn_end_xact(XTThreadPtr thread, u_int status) > +static void xn_end_release_locks(XTThreadPtr thread) > +{ > + XTXactDataPtr xact = thread->st_xact_data; > + XTDatabaseHPtr db = thread->st_database; > + ASSERT_NS(xact); > + > + /* {REMOVE-LOCKS} Drop locks if you have any: */ > + thread->st_lock_list.xt_remove_all_locks(db, thread); > + > + /* Do this afterwards to make sure the sweeper > + * does not cleanup transactions start cleaning up > + * before any transactions that were waiting for > + * this transaction have completed! > + */ > + xact->xd_end_xn_id = db->db_xn_curr_id; > + > + /* Now you can sweep! */ > + xact->xd_flags |= XT_XN_XAC_SWEEP; > +} > + > +/* The commit is split into two phases: one "fast" for MariaDB > commit_ordered(), > + * and one "slow" for commit(). When not using internal 2pc, there > is only one > + * call combining both phases. > + */ > + > +enum { > + XN_END_PHASE_FAST = 1, > + XN_END_PHASE_SLOW = 2, > + XN_END_PHASE_BOTH = 3 > +}; > + > +static xtBool xn_end_xact(XTThreadPtr thread, u_int status, xtBool > writer, int phase) > { > XTXactDataPtr xact; > xtBool ok = TRUE; > + xtBool err; > > ASSERT_NS(thread->st_xact_data); > if ((xact = thread->st_xact_data)) { > XTDatabaseHPtr db = thread->st_database; > xtXactID xn_id = xact->xd_start_xn_id; > - xtBool writer; > > - if ((writer = thread->st_xact_writer)) { > + if (writer) { > /* The transaction wrote something: */ > XTXactEndEntryDRec entry; > xtWord4 sum; > > - sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0); > - entry.xe_status_1 = status; > - entry.xe_checksum_1 = XT_CHECKSUM_1(sum); > - XT_SET_DISK_4(entry.xe_xact_id_4, xn_id); > - XT_SET_DISK_4(entry.xe_not_used_4, 0); > + if (phase & XN_END_PHASE_FAST) > + { > + sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0); > + entry.xe_status_1 = status; > + entry.xe_checksum_1 = XT_CHECKSUM_1(sum); > + XT_SET_DISK_4(entry.xe_xact_id_4, xn_id); > + XT_SET_DISK_4(entry.xe_not_used_4, 0); > + } > > #ifdef XT_IMPLEMENT_NO_ACTION > /* This will check any resticts that have been delayed to the end > of the statement. */ > @@ -1319,20 +1353,35 @@ > } > #endif > > - /* Flush the data log: */ > - if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) { > + /* Flush the data log (in the "fast" case we already did it in > prepare: */ > + if ((phase & XN_END_PHASE_SLOW) && !thread- > >st_dlog_buf.dlb_flush_log(TRUE, thread)) { > ok = FALSE; > status = XT_LOG_ENT_ABORT; > } > > /* Write and flush the transaction log: */ > - if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), > (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) { > + if (phase == XN_END_PHASE_FAST) { > + /* Fast phase, delay any write or flush to later. */ > + err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), > (XTXactLogBufferDPtr) &entry, XT_XLOG_NO_WRITE_NO_FLUSH); > + } else if (phase == XN_END_PHASE_SLOW) { > + /* We already appended the commit record in the fast phase. > + * Now just call with empty record to ensure we write/flush > + * the log as needed for this commit. > + */ > + err = !xt_xlog_log_data(thread, 0, NULL, > xt_db_flush_log_at_trx_commit); > + } else /* phase == XN_END_PHASE_BOTH */ { > + /* Both phases at once, append commit record and write/flush > normally. */ > + ASSERT_NS(phase == XN_END_PHASE_BOTH); > + err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), > (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit); > + } > + > + if (err) { > ok = FALSE; > status = XT_LOG_ENT_ABORT; > /* Make sure this is done, if we failed to log > * the transction end! > */ > - if (thread->st_xact_writer) { > + if (writer) { > /* Adjust this in case of error, but don't forget > * to lock! > */ > @@ -1347,46 +1396,46 @@ > } > } > > - /* Setting this flag completes the transaction, > - * Do this before we release the locks, because > - * the unlocked transactions expect the > - * transaction they are waiting for to be > - * gone! > + if (phase & XN_END_PHASE_FAST) { > + /* Setting this flag completes the transaction, > + * Do this before we release the locks, because > + * the unlocked transactions expect the > + * transaction they are waiting for to be > + * gone! > + */ > + xact->xd_end_time = ++db->db_xn_end_time; > + if (status == XT_LOG_ENT_COMMIT) { > + thread->st_statistics.st_commits++; > + xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED); > + } > + else { > + thread->st_statistics.st_rollbacks++; > + xact->xd_flags |= XT_XN_XAC_ENDED; > + } > + } > + > + /* Be as fast as possible in the "fast" path, as we want to be as > + * fast as possible here (we will release slow locks immediately > + * after in the "slow" part). > + * ToDo: If we ran the fast part, the slow part could release > locks > + * _before_ fsync(), rather than after. > */ > - xact->xd_end_time = ++db->db_xn_end_time; > - if (status == XT_LOG_ENT_COMMIT) { > - thread->st_statistics.st_commits++; > + if (!(phase & XN_END_PHASE_SLOW)) > + return ok; > + > + xn_end_release_locks(thread); > + } > + else { > + /* Read-only transaction can be removed, immediately */ > + if (phase & XN_END_PHASE_FAST) { > + xact->xd_end_time = ++db->db_xn_end_time; > xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED); > - } > - else { > - thread->st_statistics.st_rollbacks++; > - xact->xd_flags |= XT_XN_XAC_ENDED; > - } > - > - /* {REMOVE-LOCKS} Drop locks is you have any: */ > - thread->st_lock_list.xt_remove_all_locks(db, thread); > - > - /* Do this afterwards to make sure the sweeper > - * does not cleanup transactions start cleaning up > - * before any transactions that were waiting for > - * this transaction have completed! > - */ > - xact->xd_end_xn_id = db->db_xn_curr_id; > - > - /* Now you can sweep! */ > - xact->xd_flags |= XT_XN_XAC_SWEEP; > - } > - else { > - /* Read-only transaction can be removed, immediately */ > - xact->xd_end_time = ++db->db_xn_end_time; > - xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED); > - > - /* Drop locks is you have any: */ > - thread->st_lock_list.xt_remove_all_locks(db, thread); > - > - xact->xd_end_xn_id = db->db_xn_curr_id; > - > - xact->xd_flags |= XT_XN_XAC_SWEEP; > + > + if (!(phase & XN_END_PHASE_SLOW)) > + return ok; > + } > + > + xn_end_release_locks(thread); > > if (xt_xn_delete_xact(db, xn_id, thread)) { > if (db->db_xn_min_ram_id == xn_id) > @@ -1478,12 +1527,22 @@ > > xtPublic xtBool xt_xn_commit(XTThreadPtr thread) > { > - return xn_end_xact(thread, XT_LOG_ENT_COMMIT); > + return xn_end_xact(thread, XT_LOG_ENT_COMMIT, thread- > >st_xact_writer, XN_END_PHASE_BOTH); > +} > + > +xtPublic xtBool xt_xn_commit_fast(XTThreadPtr thread, xtBool writer) > +{ > + return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, > XN_END_PHASE_FAST); > +} > + > +xtPublic xtBool xt_xn_commit_slow(XTThreadPtr thread, xtBool writer) > +{ > + return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, > XN_END_PHASE_SLOW); > } > > xtPublic xtBool xt_xn_rollback(XTThreadPtr thread) > { > - return xn_end_xact(thread, XT_LOG_ENT_ABORT); > + return xn_end_xact(thread, XT_LOG_ENT_ABORT, thread- > >st_xact_writer, XN_END_PHASE_BOTH); > } > > xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id) > > === modified file 'storage/pbxt/src/xaction_xt.h' > --- storage/pbxt/src/xaction_xt.h 2010-05-05 10:59:57 +0000 > +++ storage/pbxt/src/xaction_xt.h 2010-10-15 13:42:06 +0000 > @@ -193,6 +193,8 @@ > > xtBool xt_xn_begin(struct XTThread *self); > xtBool xt_xn_commit(struct XTThread *self); > +xtBool xt_xn_commit_fast(struct XTThread *self, xtBool writer); > +xtBool xt_xn_commit_slow(struct XTThread *self, xtBool writer); > xtBool xt_xn_rollback(struct XTThread *self); > xtBool xt_xn_log_tab_id(struct XTThread *self, xtTableID tab_id); > int xt_xn_status(struct XTOpenTable *ot, xtXactID xn_id, > xtRecordID rec_id); > > === added file 'tests/consistent_snapshot.pl' > --- tests/consistent_snapshot.pl 1970-01-01 00:00:00 +0000 > +++ tests/consistent_snapshot.pl 2010-10-15 13:42:06 +0000 > @@ -0,0 +1,107 @@ > +#! /usr/bin/perl > + > +# Test START TRANSACTION WITH CONSISTENT SNAPSHOT. > +# With MWL#116, this is implemented so it is actually consistent. > + > +use strict; > +use warnings; > + > +use DBI; > + > +my $UPDATERS= 10; > +my $READERS= 5; > + > +my $ROWS= 50; > +my $DURATION= 20; > + > +my $stop_time= time() + $DURATION; > + > +sub my_connect { > + my $dbh= DBI->connect("dbi:mysql:mysql_socket=/tmp/ > mysql.sock;database=test", > + "root", undef, { RaiseError=>1, > PrintError=>0, AutoCommit=>0}); > + $dbh->do("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE > READ"); > + $dbh->do("SET SESSION autocommit = 0"); > + return $dbh; > +} > + > +sub my_setup { > + my $dbh= my_connect(); > + > + $dbh->do("DROP TABLE IF EXISTS test_consistent_snapshot1, > test_consistent_snapshot2"); > + $dbh->do(<<TABLE); > +CREATE TABLE test_consistent_snapshot1 ( > + a INT PRIMARY KEY, > + b INT NOT NULL > +) ENGINE=InnoDB > +TABLE > + $dbh->do(<<TABLE); > +CREATE TABLE test_consistent_snapshot2( > + a INT PRIMARY KEY, > + b INT NOT NULL > +) ENGINE=PBXT > +TABLE > + > + for (my $i= 0; $i < $ROWS; $i++) { > + my $value= int(rand()*1000); > + $dbh->do("INSERT INTO test_consistent_snapshot1 VALUES (?, ?)", > undef, > + $i, $value); > + $dbh->do("INSERT INTO test_consistent_snapshot2 VALUES (?, ?)", > undef, > + $i, -$value); > + } > + $dbh->commit(); > + $dbh->disconnect(); > +} > + > +sub my_updater { > + my $dbh= my_connect(); > + > + while (time() < $stop_time) { > + my $i1= int(rand()*$ROWS); > + my $i2= int(rand()*$ROWS); > + my $v= int(rand()*99)-49; > + $dbh->do("UPDATE test_consistent_snapshot1 SET b = b + ? WHERE > a = ?", > + undef, $v, $i1); > + $dbh->do("UPDATE test_consistent_snapshot2 SET b = b - ? WHERE > a = ?", > + undef, $v, $i2); > + $dbh->commit(); > + } > + > + $dbh->disconnect(); > + exit(0); > +} > + > +sub my_reader { > + my $dbh= my_connect(); > + > + my $iteration= 0; > + while (time() < $stop_time) { > + $dbh->do("START TRANSACTION WITH CONSISTENT SNAPSHOT"); > + my $s1= $dbh->selectrow_arrayref("SELECT SUM(b) FROM > test_consistent_snapshot1"); > + $s1= $s1->[0]; > + my $s2= $dbh->selectrow_arrayref("SELECT SUM(b) FROM > test_consistent_snapshot2"); > + $s2= $s2->[0]; > + $dbh->commit(); > + if ($s1 + $s2 != 0) { > + print STDERR "Found inconsistency, s1=$s1 s2=$s2 iteration= > $iteration\n"; > + last; > + } > + ++$iteration; > + } > + > + $dbh->disconnect(); > + exit(0); > +} > + > +my_setup(); > + > +for (1 .. $UPDATERS) { > + fork() || my_updater(); > +} > + > +for (1 .. $READERS) { > + fork() || my_reader(); > +} > + > +waitpid(-1, 0) for (1 .. ($UPDATERS + $READERS)); > + > +print "All checks done\n"; > _______________________________________________ > Mailing list: https://launchpad.net/~maria-developers > Post to : maria-developers@lists.launchpad.net > Unsubscribe : https://launchpad.net/~maria-developers > More help : https://help.launchpad.net/ListHelp -- Paul McCullagh PrimeBase Technologies www.primebase.org www.blobstreaming.org pbxt.blogspot.com