[Maria-developers] bzr commit into MariaDB 5.1, with Maria 1.5:maria branch (knielsen:2849)
#At lp:maria 2849 knielsen@knielsen-hq.org 2010-05-26 Preliminary commit of group commit patch. added: mysql-test/suite/binlog/r/binlog_ioerr.result mysql-test/suite/binlog/t/binlog_ioerr.test modified: sql/handler.cc sql/handler.h sql/log.cc sql/log.h sql/log_event.h sql/sql_class.cc sql/sql_class.h sql/sql_load.cc sql/table.cc sql/table.h storage/xtradb/handler/ha_innodb.cc === added file 'mysql-test/suite/binlog/r/binlog_ioerr.result' --- a/mysql-test/suite/binlog/r/binlog_ioerr.result 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/binlog/r/binlog_ioerr.result 2010-05-26 08:16:18 +0000 @@ -0,0 +1,28 @@ +CALL mtr.add_suppression("Error writing file 'master-bin'"); +RESET MASTER; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; +INSERT INTO t1 VALUES(0); +SET SESSION debug='+d,fail_binlog_write_1'; +INSERT INTO t1 VALUES(1); +ERROR HY000: Error writing file 'master-bin' (errno: 22) +INSERT INTO t1 VALUES(2); +ERROR HY000: Error writing file 'master-bin' (errno: 22) +SET SESSION debug=''; +INSERT INTO t1 VALUES(3); +SELECT * FROM t1; +a +0 +3 +SHOW BINLOG EVENTS; +Log_name Pos Event_type Server_id End_log_pos Info +BINLOG POS Format_desc 1 ENDPOS Server ver: #, Binlog ver: # +BINLOG POS Query 1 ENDPOS use `test`; CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS use `test`; INSERT INTO t1 VALUES(0) +BINLOG POS Xid 1 ENDPOS COMMIT /* XID */ +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS use `test`; INSERT INTO t1 VALUES(3) +BINLOG POS Xid 1 ENDPOS COMMIT /* XID */ +DROP TABLE t1; === added file 'mysql-test/suite/binlog/t/binlog_ioerr.test' --- a/mysql-test/suite/binlog/t/binlog_ioerr.test 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/binlog/t/binlog_ioerr.test 2010-05-26 08:16:18 +0000 @@ -0,0 +1,29 @@ +source include/have_debug.inc; +source include/have_innodb.inc; +source include/have_log_bin.inc; +source include/have_binlog_format_mixed_or_statement.inc; + +CALL mtr.add_suppression("Error writing file 'master-bin'"); + +RESET MASTER; + +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; +INSERT INTO t1 VALUES(0); +SET SESSION debug='+d,fail_binlog_write_1'; +--error ER_ERROR_ON_WRITE +INSERT INTO t1 VALUES(1); +--error ER_ERROR_ON_WRITE +INSERT INTO t1 VALUES(2); +SET SESSION debug=''; +INSERT INTO t1 VALUES(3); +SELECT * FROM t1; + +# Actually the output from this currently shows a bug. +# The injected IO error leaves partially written transactions in the binlog in +# the form of stray "BEGIN" events. +# These should disappear from the output if binlog error handling is improved. +--replace_regex /\/\* xid=.* \*\//\/* XID *\// /Server ver: .*, Binlog ver: .*/Server ver: #, Binlog ver: #/ /table_id: [0-9]+/table_id: #/ +--replace_column 1 BINLOG 2 POS 5 ENDPOS +SHOW BINLOG EVENTS; + +DROP TABLE t1; === modified file 'sql/handler.cc' --- a/sql/handler.cc 2010-04-06 22:47:08 +0000 +++ b/sql/handler.cc 2010-05-26 08:16:18 +0000 @@ -76,6 +76,7 @@ TYPELIB tx_isolation_typelib= {array_ele static TYPELIB known_extensions= {0,"known_exts", NULL, NULL}; uint known_extensions_id= 0; +static int commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered); static plugin_ref ha_default_plugin(THD *thd) @@ -544,6 +545,26 @@ err: DBUG_RETURN(1); } +/* + This is a queue of THDs waiting for being group committed with + tc_log->group_log_xid(). +*/ +static THD *group_commit_queue; +/* + This mutex protects the group_commit_queue on platforms without native + atomic operations. + */ +static pthread_mutex_t LOCK_group_commit_queue; +/* This mutex is used to serialize calls to handler prepare_ordered methods. */ +static pthread_mutex_t LOCK_prepare_ordered; +/* This mutex is used to serialize calls to handler commit_ordered methods. */ +static pthread_mutex_t LOCK_commit_ordered; +/* This mutex is used to serialize calls to group_log_xid(). */ +static pthread_mutex_t LOCK_group_commit; +static pthread_cond_t COND_group_commit; + +static bool mutexes_inited= FALSE; + int ha_init() { int error= 0; @@ -557,6 +578,19 @@ int ha_init() */ opt_using_transactions= total_ha>(ulong)opt_bin_log; savepoint_alloc_size+= sizeof(SAVEPOINT); + + group_commit_queue= NULL; + my_pthread_mutex_init(&LOCK_group_commit_queue, MY_MUTEX_INIT_FAST, + "LOCK_group_commit_queue", MYF(0)); + my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_prepare_ordered", MYF(0)); + my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_commit_ordered", MYF(0)); + my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, + "LOCK_group_commit", MYF(0)); + pthread_cond_init(&COND_group_commit, 0); + mutexes_inited= TRUE; + DBUG_RETURN(error); } @@ -574,6 +608,15 @@ int ha_end() if (ha_finish_errors()) error= 1; + if (mutexes_inited) + { + pthread_mutex_destroy(&LOCK_group_commit_queue); + pthread_mutex_destroy(&LOCK_prepare_ordered); + pthread_mutex_destroy(&LOCK_commit_ordered); + pthread_mutex_destroy(&LOCK_group_commit); + mutexes_inited= FALSE; + } + DBUG_RETURN(error); } @@ -1053,6 +1096,108 @@ ha_check_and_coalesce_trx_read_only(THD return rw_ha_count; } +/* + Atomically enqueue a THD at the head of the queue of threads waiting to + group commit, and return the previous head of the queue. +*/ +static THD * +enqueue_atomic(THD *thd) +{ + my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); + thd->next_commit_ordered= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&thd->next_commit_ordered), + thd)) + ; + my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); + return thd->next_commit_ordered; +} + +static THD * +atomic_grab_reverse_queue() +{ + my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); + THD *queue= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&queue), + NULL)) + ; + my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); + + /* + Since we enqueue at the head, the queue is actually in reverse order. + So reverse it back into correct commit order before returning. + */ + THD *prev= NULL; + while (queue) + { + THD *next= queue->next_commit_ordered; + queue->next_commit_ordered= prev; + prev= queue; + queue= next; + } + + return prev; +} + +static void +call_commit_ordered(Ha_trx_info *ha_info, THD *thd, bool all) +{ + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->commit_ordered) + continue; + ht->commit_ordered(ht, thd, all); + } +} + +static void +group_commit_wait_for_wakeup(THD *thd) +{ + pthread_mutex_lock(&thd->LOCK_commit_ordered); + while (!thd->group_commit_ready) + pthread_cond_wait(&thd->COND_commit_ordered, + &thd->LOCK_commit_ordered); + pthread_mutex_unlock(&thd->LOCK_commit_ordered); +} + +static void +group_commit_wakeup_other(THD *other_thd) +{ + pthread_mutex_lock(&other_thd->LOCK_commit_ordered); + other_thd->group_commit_ready= TRUE; + pthread_cond_signal(&other_thd->COND_commit_ordered); + pthread_mutex_unlock(&other_thd->LOCK_commit_ordered); +} + +static bool group_commit_queue_busy= 0; + +static void +group_commit_mark_queue_idle() +{ + pthread_mutex_lock(&LOCK_group_commit); + group_commit_queue_busy= FALSE; + pthread_cond_signal(&COND_group_commit); + pthread_mutex_unlock(&LOCK_group_commit); +} + +static void +group_commit_mark_queue_busy() +{ + safe_mutex_assert_owner(&LOCK_group_commit); + group_commit_queue_busy= TRUE; +} + +static void +group_commit_wait_queue_idle() +{ + /* Wait for any existing queue run to finish. */ + safe_mutex_assert_owner(&LOCK_group_commit); + while (group_commit_queue_busy) + pthread_cond_wait(&COND_group_commit, &LOCK_group_commit); +} + /** @retval @@ -1070,7 +1215,7 @@ ha_check_and_coalesce_trx_read_only(THD */ int ha_commit_trans(THD *thd, bool all) { - int error= 0, cookie= 0; + int error= 0; /* 'all' means that this is either an explicit commit issued by user, or an implicit commit issued by a DDL. @@ -1085,7 +1230,10 @@ int ha_commit_trans(THD *thd, bool all) */ bool is_real_trans= all || thd->transaction.all.ha_list == 0; Ha_trx_info *ha_info= trans->ha_list; - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); + bool need_prepare_ordered, need_commit_ordered; + bool need_enqueue; + bool is_group_commit_leader; + my_xid xid; DBUG_ENTER("ha_commit_trans"); /* @@ -1118,85 +1266,277 @@ int ha_commit_trans(THD *thd, bool all) DBUG_RETURN(2); } #ifdef USING_TRANSACTIONS - if (ha_info) + if (!ha_info) { - uint rw_ha_count; - bool rw_trans; + /* Free resources and perform other cleanup even for 'empty' transactions. */ + if (is_real_trans) + thd->transaction.cleanup(); + DBUG_RETURN(0); + } - DBUG_EXECUTE_IF("crash_commit_before", abort();); + DBUG_EXECUTE_IF("crash_commit_before", abort();); - /* Close all cursors that can not survive COMMIT */ - if (is_real_trans) /* not a statement commit */ - thd->stmt_map.close_transient_cursors(); + /* Close all cursors that can not survive COMMIT */ + if (is_real_trans) /* not a statement commit */ + thd->stmt_map.close_transient_cursors(); - rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all); - /* rw_trans is TRUE when we in a transaction changing data */ - rw_trans= is_real_trans && (rw_ha_count > 0); + uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all); + /* rw_trans is TRUE when we in a transaction changing data */ + bool rw_trans= is_real_trans && (rw_ha_count > 0); - if (rw_trans && - wait_if_global_read_lock(thd, 0, 0)) - { - ha_rollback_trans(thd, all); - DBUG_RETURN(1); - } + if (rw_trans && + wait_if_global_read_lock(thd, 0, 0)) + { + ha_rollback_trans(thd, all); + DBUG_RETURN(1); + } + + if (rw_trans && + opt_readonly && + !(thd->security_ctx->master_access & SUPER_ACL) && + !thd->slave_thread) + { + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); + goto err; + } - if (rw_trans && - opt_readonly && - !(thd->security_ctx->master_access & SUPER_ACL) && - !thd->slave_thread) + if (trans->no_2pc || (rw_ha_count <= 1)) + { + error= ha_commit_one_phase(thd, all); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + } + + need_prepare_ordered= FALSE; + need_commit_ordered= FALSE; + xid= thd->transaction.xid_state.xid.get_my_xid(); + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) + { + int err; + handlerton *ht= hi->ht(); + /* + Do not call two-phase commit if this particular + transaction is read-only. This allows for simpler + implementation in engines that are always read-only. + */ + if (! hi->is_trx_read_write()) + continue; + /* + Sic: we know that prepare() is not NULL since otherwise + trans->no_2pc would have been set. + */ + if ((err= ht->prepare(ht, thd, all))) + my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); + status_var_increment(thd->status_var.ha_prepare_count); + + if (err) + goto err; + + if (ht->prepare_ordered) + need_prepare_ordered= TRUE; + if (ht->commit_ordered) + need_commit_ordered= TRUE; + } + DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT();); + + if (!is_real_trans) + { + error= commit_one_phase_2(thd, all, FALSE); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + } + + /* + We can optimise away some of the thread synchronisation that may not be + needed. + + If need_prepare_ordered, then we need to take LOCK_prepare_ordered. + + If (xid && use_group_log_xid), then we need to enqueue (and this must + be done under LOCK_prepare_ordered if we take that lock). + + Similarly, if (need_prepare_ordered && need_commit_ordered), then we + need to enqueue under the LOCK_prepare_ordered. + + If (xid && use_group_log_xid), then we need to take LOCK_group_commit. + + If need_commit_ordered, then we need to take LOCK_commit_ordered. + + Cases not covered by above can be skipped to optimise things a bit. + */ + need_enqueue= (xid && tc_log->use_group_log_xid) || + (need_prepare_ordered && need_commit_ordered); + + thd->group_commit_ready= FALSE; + thd->group_commit_all= all; + if (need_prepare_ordered) + { + pthread_mutex_lock(&LOCK_prepare_ordered); + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) { - my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); - ha_rollback_trans(thd, all); - error= 1; - goto end; + int err; + handlerton *ht= hi->ht(); + if (! hi->is_trx_read_write()) + continue; + if (ht->prepare_ordered && (err= ht->prepare_ordered(ht, thd, all))) + { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); + pthread_mutex_unlock(&LOCK_prepare_ordered); + goto err; + } } + } + if (need_enqueue) + { + THD *previous_queue= enqueue_atomic(thd); + is_group_commit_leader= (previous_queue == NULL); + } + if (need_prepare_ordered) + pthread_mutex_unlock(&LOCK_prepare_ordered); - if (!trans->no_2pc && (rw_ha_count > 1)) + int cookie; + if (tc_log->use_group_log_xid) + { + if (is_group_commit_leader) { - for (; ha_info && !error; ha_info= ha_info->next()) + pthread_mutex_lock(&LOCK_group_commit); + group_commit_wait_queue_idle(); + + THD *queue= atomic_grab_reverse_queue(); + /* The first in the queue is the leader. */ + DBUG_ASSERT(queue == thd); + + /* + This will set individual error codes in each thd->xid_error, as + well as set thd->xid_cookie for later unlog() call. + */ + tc_log->group_log_xid(queue); + + /* + Call commit_ordered methods for all transactions in the queue + (that did not get an error in group_log_xid()). + + We do this under an additional global LOCK_commit_ordered; this is + so that transactions that do not need 2-phase commit do not have + to wait for the potentially long duration of LOCK_group_commit. + */ + if (need_commit_ordered) { - int err; - handlerton *ht= ha_info->ht(); - /* - Do not call two-phase commit if this particular - transaction is read-only. This allows for simpler - implementation in engines that are always read-only. - */ - if (! ha_info->is_trx_read_write()) - continue; - /* - Sic: we know that prepare() is not NULL since otherwise - trans->no_2pc would have been set. - */ - if ((err= ht->prepare(ht, thd, all))) + pthread_mutex_lock(&LOCK_commit_ordered); + for (THD *thd2= queue; thd2 != NULL; thd2= thd2->next_commit_ordered) { - my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); - error= 1; + if (!queue->xid_error) + call_commit_ordered(ha_info, thd2, thd2->group_commit_all); } - status_var_increment(thd->status_var.ha_prepare_count); + pthread_mutex_unlock(&LOCK_commit_ordered); } - DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT();); - if (error || (is_real_trans && xid && - (error= !(cookie= tc_log->log_xid(thd, xid))))) - { - ha_rollback_trans(thd, all); - error= 1; - goto end; - } - DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT();); + pthread_mutex_unlock(&LOCK_group_commit); + + /* Wake up everyone except ourself. */ + while ((queue= queue->next_commit_ordered) != NULL) + group_commit_wakeup_other(queue); + } + else + { + /* If not leader, just wait until leader wakes us up. */ + group_commit_wait_for_wakeup(thd); + } + + /* + Now that we're back in our own thread context, do any delayed error + reporting. + */ + if (thd->xid_error) + { + tc_log->xid_delayed_error(thd); + goto err; + } + cookie= thd->xid_cookie; + /* The cookie must be non-zero in the non-error case. */ + DBUG_ASSERT(cookie); + } + else + { + if (xid) + cookie= tc_log->log_xid(thd, xid); + + if (!need_enqueue) + { + error= commit_one_phase_2(thd, all, TRUE); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + } + + /* + We only get here to do correctly sequenced prepare_ordered and + commit_ordered() calls. + + In this case, we need to wait for the previous in queue to finish + commit_ordered before us to get the correct sequence. + */ + DBUG_ASSERT(need_prepare_ordered && need_commit_ordered); + + if (is_group_commit_leader) + { + pthread_mutex_lock(&LOCK_group_commit); + group_commit_wait_queue_idle(); + THD *queue= atomic_grab_reverse_queue(); + /* + Mark the queue busy while we bounce it from one thread to the + next. + */ + group_commit_mark_queue_busy(); + pthread_mutex_unlock(&LOCK_group_commit); + + /* The first in the queue is the leader. */ + DBUG_ASSERT(queue == thd); } - error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0; - DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT();); + else + { + /* If not leader, just wait until previous thread wakes us up. */ + group_commit_wait_for_wakeup(thd); + } + + /* Only run commit_ordered() if log_xid was successful. */ if (cookie) - tc_log->unlog(cookie, xid); - DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); -end: - if (rw_trans) - start_waiting_global_read_lock(thd); + { + pthread_mutex_lock(&LOCK_commit_ordered); + call_commit_ordered(ha_info, thd, all); + pthread_mutex_unlock(&LOCK_commit_ordered); + } + + THD *next= thd->next_commit_ordered; + if (next) + group_commit_wakeup_other(next); + else + group_commit_mark_queue_idle(); + + if (!cookie) + goto err; } - /* Free resources and perform other cleanup even for 'empty' transactions. */ - else if (is_real_trans) - thd->transaction.cleanup(); + + DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT();); + + error= commit_one_phase_2(thd, all, FALSE) ? 2 : 0; + + DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT();); + DBUG_ASSERT(cookie); + tc_log->unlog(cookie, xid); + + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + + /* Come here if error and we need to rollback. */ +err: + if (!error) + error= 1; + ha_rollback_trans(thd, all); + +end: + if (rw_trans) + start_waiting_global_read_lock(thd); #endif /* USING_TRANSACTIONS */ DBUG_RETURN(error); } @@ -1207,6 +1547,17 @@ end: */ int ha_commit_one_phase(THD *thd, bool all) { + /* + When we come here, we did not call handler commit_ordered() methods in + ha_commit_trans() 2-phase commit, so we pass TRUE to do it in + commit_one_phase_2(). + */ + return commit_one_phase_2(thd, all, TRUE); +} + +static int +commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered) +{ int error=0; THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; /* @@ -1218,10 +1569,40 @@ int ha_commit_one_phase(THD *thd, bool a */ bool is_real_trans=all || thd->transaction.all.ha_list == 0; Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; - DBUG_ENTER("ha_commit_one_phase"); + DBUG_ENTER("commit_one_phase_2"); #ifdef USING_TRANSACTIONS if (ha_info) { + if (is_real_trans && do_commit_ordered) + { + /* + If we did not do it already, call any commit_ordered() method. + + Even though we do not need to keep any ordering with other threads + (as there is no prepare or log_xid for this commit), we still need to + do this under the LOCK_commit_ordered mutex to not run in parallel + with other commit_ordered calls. + */ + + bool locked= FALSE; + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) + { + handlerton *ht= hi->ht(); + if (ht->commit_ordered) + { + if (!locked) + { + pthread_mutex_lock(&LOCK_commit_ordered); + locked= 1; + } + ht->commit_ordered(ht, thd, all); + } + } + if (locked) + pthread_mutex_unlock(&LOCK_commit_ordered); + } + for (; ha_info; ha_info= ha_info_next) { int err; === modified file 'sql/handler.h' --- a/sql/handler.h 2010-01-14 16:51:00 +0000 +++ b/sql/handler.h 2010-05-26 08:16:18 +0000 @@ -656,9 +656,81 @@ struct handlerton NOTE 'all' is also false in auto-commit mode where 'end of statement' and 'real commit' mean the same event. */ - int (*commit)(handlerton *hton, THD *thd, bool all); + int (*commit)(handlerton *hton, THD *thd, bool all); + /* + The commit_ordered() method is called prior to the commit() method, after + the transaction manager has decided to commit (not rollback) the + transaction. + + The calls to commit_ordered() in multiple parallel transactions is + guaranteed to happen in the same order in every participating + handler. This can be used to ensure the same commit order among multiple + handlers (eg. in table handler and binlog). So if transaction T1 calls + into commit_ordered() of handler A before T2, then T1 will also call + commit_ordered() of handler B before T2. + + The intension is that commit_ordered() should do the minimal amount of + work that needs to happen in consistent commit order among handlers. To + preserve ordering, calls need to be serialised on a global mutex, so + doing any time-consuming or blocking operations in commit_ordered() will + limit scalability. + + Handlers can rely on commit_ordered() calls being serialised (no two + calls can run in parallel, so no extra locking on the handler part is + required to ensure this). + + Note that commit_ordered() can be called from a different thread than the + one handling the transaction! So it can not do anything that depends on + thread local storage, in particular it can not call my_error() and + friends (instead it can store the error code and delay the call to + my_error() to the commit() method). + + Similarly, since commit_ordered() returns void, any return error code + must be saved and returned from the commit() method instead. + + commit_ordered() is called only when actually committing a transaction + (autocommit or not), not when ending a statement in the middle of a + transaction. + + The commit_ordered method is optional, and can be left unset if not + needed in a particular handler. + */ + void (*commit_ordered)(handlerton *hton, THD *thd, bool all); int (*rollback)(handlerton *hton, THD *thd, bool all); int (*prepare)(handlerton *hton, THD *thd, bool all); + /* + The prepare_ordered method is optional. If set, it will be called after + successful prepare() in all handlers participating in 2-phase commit. + + The calls to prepare_ordered() among multiple parallel transactions are + ordered consistently with calls to commit_ordered(). This means that + calls to prepare_ordered() effectively define the commit order, and that + each handler will see the same sequence of transactions calling into + prepare_ordered() and commit_ordered(). + + Thus, prepare_ordered() can be used to define commit order for handlers + that need to do this in the prepare step (like binlog). It can also be + used to release transactions locks early in an order consistent with the + order transactions will be eventually committed. + + Like commit_ordered(), prepare_ordered() calls are serialised to maintain + ordering, so the intension is that they should execute fast, with only + the minimal amount of work needed to define commit order. Handlers can + rely on this serialisation, and do not need to do any extra locking to + avoid two prepare_ordered() calls running in parallel. + + Unlike commit_ordered(), prepare_ordered() _is_ guaranteed to be called + in the context of the thread handling the rest of the transaction. + + Note that for user-level XA SQL commands, no consistent ordering among + prepare_ordered() and commit_ordered() is guaranteed (as that would + require blocking all other commits for an indefinite time). + + prepare_ordered() is called only when actually committing a transaction + (autocommit or not), not when ending a statement in the middle of a + transaction. + */ + int (*prepare_ordered)(handlerton *hton, THD *thd, bool all); int (*recover)(handlerton *hton, XID *xid_list, uint len); int (*commit_by_xid)(handlerton *hton, XID *xid); int (*rollback_by_xid)(handlerton *hton, XID *xid); === modified file 'sql/log.cc' --- a/sql/log.cc 2010-04-06 22:47:08 +0000 +++ b/sql/log.cc 2010-05-26 08:16:18 +0000 @@ -154,9 +154,12 @@ class binlog_trx_data { public: binlog_trx_data() : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF) + before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0) { trans_log.end_of_file= max_binlog_cache_size; + (void) my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, + "LOCK_group_commit", MYF(0)); + (void) pthread_cond_init(&COND_group_commit, 0); } ~binlog_trx_data() @@ -208,11 +211,12 @@ public: completely. */ void reset() { - if (!empty()) + if (trans_log.type != WRITE_CACHE || !empty()) truncate(0); before_stmt_pos= MY_OFF_T_UNDEF; incident= FALSE; trans_log.end_of_file= max_binlog_cache_size; + using_xa= FALSE; DBUG_ASSERT(empty()); } @@ -257,6 +261,41 @@ public: Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + + /* 0 or error when writing to binlog; set during group commit. */ + int error; + /* If error != 0, value of errno (for my_error() reporting). */ + int commit_errno; + /* Link for queueing transactions up for group commit to binlog. */ + binlog_trx_data *next; + /* + Flag set true when group commit for this transaction is finished; used + with pthread_cond_wait() to wait until commit is done. + This flag is protected by LOCK_group_commit. + */ + bool done; + /* + Flag set if this transaction is the group commit leader that will handle + the actual writing to the binlog. + This flag is protected by LOCK_group_commit. + */ + bool group_commit_leader; + /* + Flag set true if this transaction is committed with log_xid() as part of + XA, false if not. + */ + bool using_xa; + /* + Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + written during group commit. The incident_event is only valid if + has_incident() is true. + */ + Log_event *begin_event; + Log_event *end_event; + Log_event *incident_event; + /* Mutex and condition for wakeup after group commit. */ + pthread_mutex_t LOCK_group_commit; + pthread_cond_t COND_group_commit; }; handlerton *binlog_hton; @@ -1391,117 +1430,188 @@ static int binlog_close_connection(handl return 0; } +/* Helper functions for binlog_flush_trx_cache(). */ +static int +binlog_flush_trx_cache_prepare(THD *thd) +{ + if (thd->binlog_flush_pending_rows_event(TRUE)) + return 1; + return 0; +} + +static void +binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) +{ + IO_CACHE *trans_log= &trx_data->trans_log; + + trx_data->reset(); + + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } +} + /* - End a transaction. + End a transaction, writing events to the binary log. SYNOPSIS - binlog_end_trans() + binlog_flush_trx_cache() thd The thread whose transaction should be ended trx_data Pointer to the transaction data to use - end_ev The end event to use, or NULL - all True if the entire transaction should be ended, false if - only the statement transaction should be ended. + end_ev The end event to use (COMMIT, ROLLBACK, or commit XID) DESCRIPTION End the currently open transaction. The transaction can be either - a real transaction (if 'all' is true) or a statement transaction - (if 'all' is false). + a real transaction or a statement transaction. - If 'end_ev' is NULL, the transaction is a rollback of only - transactional tables, so the transaction cache will be truncated - to either just before the last opened statement transaction (if - 'all' is false), or reset completely (if 'all' is true). + This can be to commit a transaction, with a COMMIT query event or an XA + commit XID event. But it can also be to rollback a transaction with a + ROLLBACK query event, used for rolling back transactions which also + contain updates to non-transactional tables. */ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev) { - DBUG_ENTER("binlog_end_trans"); - int error=0; - IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx", - all ? "all" : "stmt", (long) end_ev)); + DBUG_ENTER("binlog_flush_trx_cache"); DBUG_PRINT("info", ("thd->options={ %s%s}", FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->options, OPTION_BEGIN))); + if (binlog_flush_trx_cache_prepare(thd)) + DBUG_RETURN(1); + /* - NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of - only transactional tables. If the transaction contain changes to - any non-transactiona tables, we need write the transaction and log - a ROLLBACK last. - */ - if (end_ev != NULL) - { - if (thd->binlog_flush_pending_rows_event(TRUE)) - DBUG_RETURN(1); - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev, - trx_data->has_incident()); - trx_data->reset(); + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); - /* - We need to step the table map version after writing the - transaction cache to disk. - */ - mysql_bin_log.update_table_map_version(); - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } - } - else - { - /* - If rolling back an entire transaction or a single statement not - inside a transaction, we reset the transaction cache. + binlog_flush_trx_cache_finish(thd, trx_data); - If rolling back a statement in a transaction, we truncate the - transaction cache to remove the statement. - */ - thd->binlog_remove_pending_rows_event(TRUE); - if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) - { - if (trx_data->has_incident()) - error= mysql_bin_log.write_incident(thd, TRUE); - trx_data->reset(); - } - else // ...statement - trx_data->truncate(trx_data->before_stmt_pos); + DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_RETURN(error); +} - /* - We need to step the table map version on a rollback to ensure - that a new table map event is generated instead of the one that - was written to the thrown-away transaction cache. - */ - mysql_bin_log.update_table_map_version(); +/* + Discard a transaction, ie. ROLLBACK with only transactional table updates. + + SYNOPSIS + binlog_truncate_trx_cache() + + thd The thread whose transaction should be ended + trx_data Pointer to the transaction data to use + all True if the entire transaction should be ended, false if + only the statement transaction should be ended. + + DESCRIPTION + + Rollback (and end) a transaction that only modifies transactional + tables. The transaction can be either a real transaction (if 'all' is + true) or a statement transaction (if 'all' is false). + + The transaction cache will be truncated to either just before the last + opened statement transaction (if 'all' is false), or reset completely (if + 'all' is true). + */ +static int +binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all) +{ + DBUG_ENTER("binlog_truncate_trx_cache"); + int error= 0; + DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt")); + DBUG_PRINT("info", ("thd->options={ %s%s}", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN))); + + /* + ROLLBACK with nothing to replicate: i.e., rollback of only transactional + tables. + */ + + /* + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. + + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + */ + thd->binlog_remove_pending_rows_event(TRUE); + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + { + if (trx_data->has_incident()) + error= mysql_bin_log.write_incident(thd); + trx_data->reset(); } + else // ...statement + trx_data->truncate(trx_data->before_stmt_pos); DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); } +static LEX_STRING const write_error_msg= + { C_STRING_WITH_LEN("error writing to the binary log") }; + static int binlog_prepare(handlerton *hton, THD *thd, bool all) { /* - do nothing. - just pretend we can do 2pc, so that MySQL won't - switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_xid() + If this prepare is for a single statement in the middle of a transactions, + not the actual transaction commit, then we do nothing. The real work is + only done later, in the prepare for making persistent changes. */ + if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + return 0; + + binlog_trx_data *trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + trx_data->using_xa= TRUE; + + if (binlog_flush_trx_cache_prepare(thd)) + return 1; + + my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); + if (!xid) + { + /* Skip logging this transaction, marked by setting end_event to NULL. */ + trx_data->end_event= NULL; + return 0; + } + + /* + Allocate the extra events that will be logged to the binlog in binlog group + commit. Use placement new to allocate them on the THD memroot, as they need + to remain live until log_xid() returns. + */ + size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event); + if (trx_data->has_incident()) + needed_size+= sizeof(Incident_log_event); + uchar *mem= (uchar *)thd->alloc(needed_size); + if (!mem) + return 1; + + trx_data->begin_event= new ((void *)mem) + Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + mem+= sizeof(Query_log_event); + + trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid); + + if (trx_data->has_incident()) + trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event))) + Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg); + return 0; } @@ -1525,11 +1635,11 @@ static int binlog_commit(handlerton *hto binlog_trx_data *const trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - if (trx_data->empty()) + if (trx_data->using_xa) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - trx_data->reset(); - DBUG_RETURN(0); + binlog_flush_trx_cache_finish(thd, trx_data); + DBUG_RETURN(error); } /* @@ -1556,8 +1666,8 @@ static int binlog_commit(handlerton *hto !stmt_has_updated_trans_table(thd) && thd->transaction.stmt.modified_non_trans_table)) { - Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev); } trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; @@ -1621,7 +1731,7 @@ static int binlog_rollback(handlerton *h (thd->options & OPTION_KEEP_LOG)) && mysql_bin_log.check_write_error(thd)) trx_data->set_incident(); - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, trx_data, all); } else { @@ -1641,8 +1751,8 @@ static int binlog_rollback(handlerton *h thd->current_stmt_binlog_row_based) || ((thd->options & OPTION_KEEP_LOG))) { - Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev); } /* Otherwise, we simply truncate the cache as there is no change on @@ -1650,7 +1760,7 @@ static int binlog_rollback(handlerton *h */ else if ((all && !thd->transaction.all.modified_non_trans_table) || (!all && !thd->transaction.stmt.modified_non_trans_table)) - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, trx_data, all); } if (!all) trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback @@ -2464,7 +2574,7 @@ const char *MYSQL_LOG::generate_name(con MYSQL_BIN_LOG::MYSQL_BIN_LOG() :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), - need_start_event(TRUE), m_table_map_version(0), + need_start_event(TRUE), is_relay_log(0), description_event_for_exec(0), description_event_for_queue(0) { @@ -2477,6 +2587,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG() index_file_name[0] = 0; bzero((char*) &index_file, sizeof(index_file)); bzero((char*) &purge_index_file, sizeof(purge_index_file)); + use_group_log_xid= TRUE; } /* this is called only once */ @@ -2492,6 +2603,7 @@ void MYSQL_BIN_LOG::cleanup() delete description_event_for_exec; (void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_index); + (void) pthread_mutex_destroy(&LOCK_queue); (void) pthread_cond_destroy(&update_cond); } DBUG_VOID_RETURN; @@ -2520,6 +2632,8 @@ void MYSQL_BIN_LOG::init_pthread_objects */ (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", MYF_NO_DEADLOCK_DETECTION); + (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue", + MYF(0)); (void) pthread_cond_init(&update_cond, 0); } @@ -4113,7 +4227,6 @@ int THD::binlog_write_table_map(TABLE *t DBUG_RETURN(error); binlog_table_maps++; - table->s->table_map_version= mysql_bin_log.table_map_version(); DBUG_RETURN(0); } @@ -4194,64 +4307,41 @@ MYSQL_BIN_LOG::flush_and_set_pending_row if (Rows_log_event* pending= trx_data->pending()) { - IO_CACHE *file= &log_file; - /* Decide if we should write to the log file directly or to the transaction log. */ if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) - file= &trx_data->trans_log; - - /* - If we are writing to the log file directly, we could avoid - locking the log. This does not work since we need to step the - m_table_map_version below, and that change has to be protected - by the LOCK_log mutex. - */ - pthread_mutex_lock(&LOCK_log); - - /* - Write pending event to log file or transaction cache - */ - if (pending->write(file)) { - pthread_mutex_unlock(&LOCK_log); - set_write_error(thd); - DBUG_RETURN(1); + /* Write to transaction log/cache. */ + if (pending->write(&trx_data->trans_log)) + { + set_write_error(thd); + DBUG_RETURN(1); + } } - - /* - We step the table map version if we are writing an event - representing the end of a statement. We do this regardless of - wheather we write to the transaction cache or to directly to the - file. - - In an ideal world, we could avoid stepping the table map version - if we were writing to a transaction cache, since we could then - reuse the table map that was written earlier in the transaction - cache. This does not work since STMT_END_F implies closing all - table mappings on the slave side. - - TODO: Find a solution so that table maps does not have to be - written several times within a transaction. - */ - if (pending->get_flags(Rows_log_event::STMT_END_F)) - ++m_table_map_version; - - delete pending; - - if (file == &log_file) + else { + /* Write directly to log file. */ + pthread_mutex_lock(&LOCK_log); + if (pending->write(&log_file)) + { + pthread_mutex_unlock(&LOCK_log); + set_write_error(thd); + DBUG_RETURN(1); + } + error= flush_and_sync(); if (!error) { signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + + pthread_mutex_unlock(&LOCK_log); } - pthread_mutex_unlock(&LOCK_log); + delete pending; } thd->binlog_set_pending_rows_event(event); @@ -4450,9 +4540,6 @@ err: set_write_error(thd); } - if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F) - ++m_table_map_version; - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4575,18 +4662,14 @@ uint MYSQL_BIN_LOG::next_file_id() SYNOPSIS write_cache() cache Cache to write to the binary log - lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and sync:ed DESCRIPTION Write the contents of the cache to the binary log. The cache will be reset as a READ_CACHE to be able to read the contents from it. */ -int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) +int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache) { - Mutex_sentry sentry(lock_log ? &LOCK_log : NULL); - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) return ER_ERROR_ON_WRITE; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; @@ -4697,6 +4780,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE } /* Write data to the binary log file */ + DBUG_EXECUTE_IF("fail_binlog_write_1", return ER_ERROR_ON_WRITE;); if (my_b_write(&log_file, cache->read_pos, length)) return ER_ERROR_ON_WRITE; cache->read_pos=cache->read_end; // Mark buffer used up @@ -4704,9 +4788,6 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE DBUG_ASSERT(carry == 0); - if (sync_log) - flush_and_sync(); - return 0; // All OK } @@ -4739,26 +4820,22 @@ int query_error_code(THD *thd, bool not_ return error; } -bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) +bool MYSQL_BIN_LOG::write_incident(THD *thd) { uint error= 0; DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); - LEX_STRING const write_error_msg= - { C_STRING_WITH_LEN("error writing to the binary log") }; Incident incident= INCIDENT_LOST_EVENTS; Incident_log_event ev(thd, incident, write_error_msg); - if (lock) - pthread_mutex_lock(&LOCK_log); + + pthread_mutex_lock(&LOCK_log); error= ev.write(&log_file); - if (lock) + if (!error && !(error= flush_and_sync())) { - if (!error && !(error= flush_and_sync())) - { - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); - } - pthread_mutex_unlock(&LOCK_log); + signal_update(); + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); } @@ -4786,103 +4863,364 @@ bool MYSQL_BIN_LOG::write_incident(THD * 'cache' needs to be reinitialized after this functions returns. */ -bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, - bool incident) +bool +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev) { - DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); + DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + + /* + Create the necessary events here, where we have the correct THD (and + thread context). + + Due to group commit the actual writing to binlog may happen in a different + thread. + */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + trx_data->begin_event= &qinfo; + trx_data->end_event= end_ev; + if (trx_data->has_incident()) + { + Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); + trx_data->incident_event= &inc_ev; + DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + } + else + { + trx_data->incident_event= NULL; + DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + } +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) +{ + /* + To facilitate group commit for the binlog, we first queue up ourselves in + the group commit queue. Then the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. + */ + + pthread_mutex_lock(&trx_data->LOCK_group_commit); + const binlog_trx_data *orig_queue= atomic_enqueue_trx(trx_data); + + if (orig_queue != NULL) + { + trx_data->group_commit_leader= FALSE; + trx_data->done= FALSE; + trx_group_commit_participant(trx_data); + } + else + { + trx_data->group_commit_leader= TRUE; + pthread_mutex_unlock(&trx_data->LOCK_group_commit); + trx_group_commit_leader(NULL); + } + + return trx_group_commit_finish(trx_data); +} + +/* + Participate as secondary transaction in group commit. + + Another thread is already waiting to obtain the LOCK_log, and should include + this thread in the group commit once the log is obtained. So here we put + ourself in the queue and wait to be signalled that the group commit is done. + + Note that this function must be called with the trs_data->LOCK_group_commit + locked; the mutex will be released before return. +*/ +void +MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data) +{ + safe_mutex_assert_owner(&trx_data->LOCK_group_commit); + + /* Wait until trx_data.done == true and woken up by the leader. */ + while (!trx_data->done) + pthread_cond_wait(&trx_data->COND_group_commit, + &trx_data->LOCK_group_commit); + pthread_mutex_unlock(&trx_data->LOCK_group_commit); +} + +bool +MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) +{ + if (trx_data->error) + { + switch (trx_data->error) + { + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + trx_data->trans_log.file_name, trx_data->commit_errno); + break; + default: + /* + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(trx_data->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), trx_data->error); + } + + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (trx_data->end_event->get_type_code() == XID_EVENT) + mark_xid_done(); + + return 1; + } + + return 0; +} + +/* + Do binlog group commit as the lead thread. + + This must be called when this thread/transaction is queued at the start of + the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group + commit all the transactions in the queue (more may have entered while waiting + for LOCK_log). After commit is done, all other threads in the queue will be + signalled. + + */ +void +MYSQL_BIN_LOG::trx_group_commit_leader(THD *first_thd) +{ + uint xid_count= 0; + uint write_count= 0; + + /* First, put anything from group_log_xid into the queue. */ + binlog_trx_data *full_queue= NULL; + binlog_trx_data **next_ptr= &full_queue; + for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered) + { + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + /* Skip log_xid for transactions without xid, marked by NULL end_event. */ + if (!trx_data->end_event) + continue; + + trx_data->error= 0; + *next_ptr= trx_data; + next_ptr= &(trx_data->next); + } + + /* + Next, lock the LOCK_log(), and once we get it, add any additional writes + that queued up while we were waiting. + + Note that if some writer not going through log_xid() comes in and gets the + LOCK_log before us, they will not be able to include us in their group + commit (and they are not able to handle ensuring same commit order between + us and participating transactional storage engines anyway). + + On the other hand, when we get the LOCK_log, we will be able to include + any non-trasactional writes that queued up in our group commit. This + should hopefully not be too big of a problem, as group commit is most + important for the transactional case anyway when durability (fsync) is + enabled. + */ VOID(pthread_mutex_lock(&LOCK_log)); - /* NULL would represent nothing to replicate after ROLLBACK */ - DBUG_ASSERT(commit_event != NULL); + /* + As the queue is in reverse order of entering, reverse the queue as we add + it to the existing one. Note that there is no ordering defined between + transactional and non-transactional commits. + */ + binlog_trx_data *current= atomic_grab_trx_queue(); + binlog_trx_data *xtra_queue= NULL; + while (current) + { + current->error= 0; + binlog_trx_data *next= current->next; + current->next= xtra_queue; + xtra_queue= current; + current= next; + } + *next_ptr= xtra_queue; + /* + Now we have in full_queue the list of transactions to be committed in + order. + */ DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { /* - We only bother to write to the binary log if there is anything - to write. - */ - if (my_b_tell(cache) > 0) + Commit every transaction in the queue. + + Note that we are doing this in a different thread than the one running + the transaction! So we are limited in the operations we can do. In + particular, we cannot call my_error() on behalf of a transaction, as + that obtains the THD from thread local storage. Instead, we must set + current->error and let the thread do the error reporting itself once + we wake it up. + */ + for (current= full_queue; current != NULL; current= current->next) { - /* - Log "BEGIN" at the beginning of every transaction. Here, a - transaction is either a BEGIN..COMMIT block or a single - statement in autocommit mode. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + IO_CACHE *cache= ¤t->trans_log; /* - Now this Query_log_event has artificial log_pos 0. It must be - adjusted to reflect the real position in the log. Not doing it - would confuse the slave: it would prevent this one from - knowing where he is in the master's binlog, which would result - in wrong positions being shown to the user, MASTER_POS_WAIT - undue waiting etc. + We only bother to write to the binary log if there is anything + to write. */ - if (qinfo.write(&log_file)) - goto err; - - DBUG_EXECUTE_IF("crash_before_writing_xid", - { - if ((write_error= write_cache(cache, false, true))) - DBUG_PRINT("info", ("error writing binlog cache: %d", - write_error)); - DBUG_PRINT("info", ("crashing before writing xid")); - abort(); - }); - - if ((write_error= write_cache(cache, false, false))) - goto err; + if (my_b_tell(cache) > 0) + { + current->error= write_transaction(current); + if (current->error) + current->commit_errno= errno; - if (commit_event && commit_event->write(&log_file)) - goto err; + write_count++; + } - if (incident && write_incident(thd, FALSE)) - goto err; + if (current->end_event->get_type_code() == XID_EVENT) + xid_count++; + } + if (write_count > 0) + { if (flush_and_sync()) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT();); - if (cache->error) // Error on read { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + for (current= full_queue; current != NULL; current= current->next) + { + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + } + } + } + else + { + signal_update(); } - signal_update(); } /* - if commit_event is Xid_log_event, increase the number of + if any commit_events are Xid_log_event, increase the number of prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. - If the commit_event is not Xid_log_event (then it's a Query_log_event) - rotate binlog, if necessary. + If no Xid_log_events (then it's all Query_log_event) rotate binlog, + if necessary. */ - if (commit_event && commit_event->get_type_code() == XID_EVENT) + if (xid_count > 0) { - pthread_mutex_lock(&LOCK_prep_xids); - prepared_xids++; - pthread_mutex_unlock(&LOCK_prep_xids); + mark_xids_active(xid_count); } else rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(0); + /* + Signal those that are not part of group_log_xid, and are not group leaders + running the queue. -err: - if (!write_error) + Since a group leader runs the queue itself if a group_log_xid does not get + to do it forst, such leader threads do not need wait or wakeup. + */ + for (current= xtra_queue; current != NULL; current= current->next) { - write_error= 1; - sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); + /* + Note that we need to take LOCK_group_commit even in the case of a leader! + + Otherwise there is a race between setting and testing the + group_commit_leader flag. + */ + pthread_mutex_lock(¤t->LOCK_group_commit); + if (!current->group_commit_leader) + { + current->done= true; + pthread_cond_signal(¤t->COND_group_commit); + } + pthread_mutex_unlock(¤t->LOCK_group_commit); } - VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(1); } +int +MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) +{ + IO_CACHE *cache= &trx_data->trans_log; + /* + Log "BEGIN" at the beginning of every transaction. Here, a transaction is + either a BEGIN..COMMIT block or a single statement in autocommit mode. The + event was constructed in write_transaction_to_binlog(), in the thread + running the transaction. + + Now this Query_log_event has artificial log_pos 0. It must be + adjusted to reflect the real position in the log. Not doing it + would confuse the slave: it would prevent this one from + knowing where he is in the master's binlog, which would result + in wrong positions being shown to the user, MASTER_POS_WAIT + undue waiting etc. + */ + if (trx_data->begin_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_cache(cache))) + DBUG_PRINT("info", ("error writing binlog cache")); + else + flush_and_sync(); + + DBUG_PRINT("info", ("crashing before writing xid")); + abort(); + }); + + if (write_cache(cache)) + return ER_ERROR_ON_WRITE; + + if (trx_data->end_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + if (trx_data->has_incident() && trx_data->incident_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + if (cache->error) // Error on read + return ER_ERROR_ON_READ; + + return 0; +} + +binlog_trx_data * +MYSQL_BIN_LOG::atomic_enqueue_trx(binlog_trx_data *trx_data) +{ + my_atomic_rwlock_wrlock(&LOCK_queue); + trx_data->next= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&trx_data->next), + trx_data)) + ; + my_atomic_rwlock_wrunlock(&LOCK_queue); + return trx_data->next; +} + +binlog_trx_data * +MYSQL_BIN_LOG::atomic_grab_trx_queue() +{ + my_atomic_rwlock_wrlock(&LOCK_queue); + binlog_trx_data *queue= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&queue), + NULL)) + ; + my_atomic_rwlock_wrunlock(&LOCK_queue); + return queue; +} /** Wait until we get a signal that the binary log has been updated. @@ -5879,9 +6217,6 @@ void TC_LOG_BINLOG::close() } /** - @todo - group commit - @retval 0 error @retval @@ -5889,19 +6224,83 @@ void TC_LOG_BINLOG::close() */ int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) { - DBUG_ENTER("TC_LOG_BINLOG::log"); - Xid_log_event xle(thd, xid); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + int error; + DBUG_ENTER("TC_LOG_BINLOG::log_xid"); + + thd->next_commit_ordered= 0; + group_log_xid(thd); + if (thd->xid_error) + error= xid_delayed_error(thd); + else + error= 0; + /* - We always commit the entire transaction when writing an XID. Also - note that the return value is inverted. - */ - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); + Note that the return value is inverted: zero on failure, private non-zero + 'cookie' on success. + */ + DBUG_RETURN(!error); } -void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +/* + Do a binlog log_xid() for a group of transactions, linked through + thd->next_commit_ordered. +*/ +void +TC_LOG_BINLOG::group_log_xid(THD *first_thd) +{ + DBUG_ENTER("TC_LOG_BINLOG::group_log_xid"); + trx_group_commit_leader(first_thd); + for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered) + { + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + thd->xid_error= trx_data->error; + thd->xid_cookie= !trx_data->error; + } + DBUG_VOID_RETURN; +} + +int +TC_LOG_BINLOG::xid_delayed_error(THD *thd) { + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + return trx_group_commit_finish(trx_data); +} + +/* + After an XID is logged, we need to hold on to the current binlog file until + it is fully committed in the storage engine. The reason is that crash + recovery only looks at the latest binlog, so we must make sure there are no + outstanding prepared (but not committed) transactions before rotating the + binlog. + + To handle this, we keep a count of outstanding XIDs. This function is used + to increase this count when committing one or more transactions to the + binary log. +*/ +void +TC_LOG_BINLOG::mark_xids_active(uint xid_count) +{ + DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); + DBUG_PRINT("info", ("xid_count=%u", xid_count)); + pthread_mutex_lock(&LOCK_prep_xids); + prepared_xids+= xid_count; + pthread_mutex_unlock(&LOCK_prep_xids); + DBUG_VOID_RETURN; +} + +/* + Once an XID is committed, it is safe to rotate the binary log, as it can no + longer be needed during crash recovery. + + This function is called to mark an XID this way. It needs to decrease the + count of pending XIDs, and signal the log rotator thread when it reaches zero. +*/ +void +TC_LOG_BINLOG::mark_xid_done() +{ + DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); pthread_mutex_lock(&LOCK_prep_xids); DBUG_ASSERT(prepared_xids > 0); if (--prepared_xids == 0) { @@ -5909,7 +6308,16 @@ void TC_LOG_BINLOG::unlog(ulong cookie, pthread_cond_signal(&COND_prep_xids); } pthread_mutex_unlock(&LOCK_prep_xids); - rotate_and_purge(0); // as ::write() did not rotate + DBUG_VOID_RETURN; +} + +void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +{ + DBUG_ENTER("TC_LOG_BINLOG::unlog"); + if (xid) + mark_xid_done(); + rotate_and_purge(0); // as ::write_transaction_to_binlog() did not rotate + DBUG_VOID_RETURN; } int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) === modified file 'sql/log.h' --- a/sql/log.h 2009-12-04 14:40:42 +0000 +++ b/sql/log.h 2010-05-26 08:16:18 +0000 @@ -28,13 +28,49 @@ class TC_LOG { public: int using_heuristic_recover(); - TC_LOG() {} + /* True if we should use group_log_xid(), false to use log_xid(). */ + bool use_group_log_xid; + + TC_LOG() : use_group_log_xid(0) {} virtual ~TC_LOG() {} virtual int open(const char *opt_name)=0; virtual void close()=0; virtual int log_xid(THD *thd, my_xid xid)=0; virtual void unlog(ulong cookie, my_xid xid)=0; + /* + If use_group_log_xid is true, then this method is used instead of + log_xid() to do logging of a group of transactions all at once. + + The transactions will be linked through THD::next_commit_ordered. + + Additionally, when this method is used instead of log_xid(), the order in + which handler->prepare_ordered() and handler->commit_ordered() are called + is guaranteed to be the same as the order of calls and THD list elements + for group_log_xid(). + + This can be used to efficiently implement group commit that at the same + time preserves the order of commits among handlers and TC (eg. to get same + commit order in InnoDB and binary log). + + For TCs that do not need this, it can be preferable to use plain log_xid() + instead, as it allows threads to run log_xid() in parallel with each + other. In contrast, group_log_xid() runs under a global mutex, so it is + guaranteed that only once call into it will be active at once. + + Since this call handles multiple threads/THDs at once, my_error() (and + other code that relies on thread local storage) cannot be used in this + method. Instead, in case of error, thd->xid_error should be set to the + error code, and xid_delayed_error() will be called later in the correct + thread context to actually report the error. + + In the success case, this method must set thd->xid_cookie for each thread + to the cookie that is normally returned from log_xid() (which must be + non-zero in the non-error case). + */ + virtual void group_log_xid(THD *first_thd) { DBUG_ASSERT(FALSE); } + /* Error reporting for group_log_xid(). */ + virtual int xid_delayed_error(THD *thd) { DBUG_ASSERT(FALSE); return 0; } }; class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging @@ -227,12 +263,19 @@ private: time_t last_time; }; +class binlog_trx_data; class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { private: /* LOCK_log and LOCK_index are inited by init_pthread_objects() */ pthread_mutex_t LOCK_index; pthread_mutex_t LOCK_prep_xids; + /* + Mutex to protect the queue of transactions waiting to participate in group + commit. (Only used on platforms without native atomic operations). + */ + pthread_mutex_t LOCK_queue; + pthread_cond_t COND_prep_xids; pthread_cond_t update_cond; ulonglong bytes_written; @@ -271,8 +314,8 @@ class MYSQL_BIN_LOG: public TC_LOG, priv In 5.0 it's 0 for relay logs too! */ bool no_auto_events; - - ulonglong m_table_map_version; + /* Queue of transactions queued up to participate in group commit. */ + binlog_trx_data *group_commit_queue; int write_to_file(IO_CACHE *cache); /* @@ -282,6 +325,14 @@ class MYSQL_BIN_LOG: public TC_LOG, priv */ void new_file_without_locking(); void new_file_impl(bool need_lock); + int write_transaction(binlog_trx_data *trx_data); + bool write_transaction_to_binlog_events(binlog_trx_data *trx_data); + void trx_group_commit_participant(binlog_trx_data *trx_data); + void trx_group_commit_leader(THD *first_thd); + binlog_trx_data *atomic_enqueue_trx(binlog_trx_data *trx_data); + binlog_trx_data *atomic_grab_trx_queue(); + void mark_xid_done(); + void mark_xids_active(uint xid_count); public: MYSQL_LOG::generate_name; @@ -311,17 +362,11 @@ public: int open(const char *opt_name); void close(); int log_xid(THD *thd, my_xid xid); + int xid_delayed_error(THD *thd); + void group_log_xid(THD *first_thd); void unlog(ulong cookie, my_xid xid); int recover(IO_CACHE *log, Format_description_log_event *fdle); #if !defined(MYSQL_CLIENT) - bool is_table_mapped(TABLE *table) const - { - return table->s->table_map_version == table_map_version(); - } - - ulonglong table_map_version() const { return m_table_map_version; } - void update_table_map_version() { ++m_table_map_version; } - int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event); int remove_pending_rows_event(THD *thd); @@ -362,10 +407,12 @@ public: void new_file(); bool write(Log_event* event_info); // binary log write - bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident); - bool write_incident(THD *thd, bool lock); + bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev); + bool trx_group_commit_finish(binlog_trx_data *trx_data); + bool write_incident(THD *thd); - int write_cache(IO_CACHE *cache, bool lock_log, bool flush_and_sync); + int write_cache(IO_CACHE *cache); void set_write_error(THD *thd); bool check_write_error(THD *thd); === modified file 'sql/log_event.h' --- a/sql/log_event.h 2010-03-04 08:03:07 +0000 +++ b/sql/log_event.h 2010-05-26 08:16:18 +0000 @@ -463,10 +463,9 @@ struct sql_ex_info #define LOG_EVENT_SUPPRESS_USE_F 0x8 /* - The table map version internal to the log should be increased after - the event has been written to the binary log. + This used to be LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F, but is now unused. */ -#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x10 +#define LOG_EVENT_UNUSED1_F 0x10 /** @def LOG_EVENT_ARTIFICIAL_F === modified file 'sql/sql_class.cc' --- a/sql/sql_class.cc 2010-01-15 15:27:55 +0000 +++ b/sql/sql_class.cc 2010-05-26 08:16:18 +0000 @@ -673,6 +673,8 @@ THD::THD() active_vio = 0; #endif pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_commit_ordered, 0); /* Variables with default values */ proc_info="login"; @@ -3773,7 +3775,6 @@ int THD::binlog_flush_pending_rows_event if (stmt_end) { pending->set_flags(Rows_log_event::STMT_END_F); - pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; binlog_table_maps= 0; } @@ -3901,7 +3902,6 @@ int THD::binlog_query(THD::enum_binlog_q { Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use, errcode); - qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; /* Binlog table maps will be irrelevant after a Query_log_event (they are just removed on the slave side) so after the query === modified file 'sql/sql_class.h' --- a/sql/sql_class.h 2010-03-30 12:36:49 +0000 +++ b/sql/sql_class.h 2010-05-26 08:16:18 +0000 @@ -1438,6 +1438,21 @@ public: /* container for handler's private per-connection data */ Ha_data ha_data[MAX_HA]; + /* Mutex and condition for waking up threads after group commit. */ + pthread_mutex_t LOCK_commit_ordered; + pthread_cond_t COND_commit_ordered; + bool group_commit_ready; + /* Pointer for linking THDs into queue waiting for group commit. */ + THD *next_commit_ordered; + /* + The "all" parameter of commit(), to communicate it to the thread that + calls commit_ordered(). + */ + bool group_commit_all; + /* Set by TC_LOG::group_log_xid(), to return per-thd error and cookie. */ + int xid_error; + int xid_cookie; + #ifndef MYSQL_CLIENT int binlog_setup_trx_data(); === modified file 'sql/sql_load.cc' --- a/sql/sql_load.cc 2010-03-04 08:03:07 +0000 +++ b/sql/sql_load.cc 2010-05-26 08:16:18 +0000 @@ -516,7 +516,6 @@ int mysql_load(THD *thd,sql_exchange *ex else { Delete_file_log_event d(thd, db, transactional_table); - d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; (void) mysql_bin_log.write(&d); } } @@ -698,7 +697,6 @@ static bool write_execute_load_query_log (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE : (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR), transactional_table, FALSE, errcode); - e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; return mysql_bin_log.write(&e); } === modified file 'sql/table.cc' --- a/sql/table.cc 2010-03-10 10:32:14 +0000 +++ b/sql/table.cc 2010-05-26 08:16:18 +0000 @@ -297,13 +297,6 @@ TABLE_SHARE *alloc_table_share(TABLE_LIS share->version= refresh_version; /* - This constant is used to mark that no table map version has been - assigned. No arithmetic is done on the value: it will be - overwritten with a value taken from MYSQL_BIN_LOG. - */ - share->table_map_version= ~(ulonglong)0; - - /* Since alloc_table_share() can be called without any locking (for example, ha_create_table... functions), we do not assign a table map id here. Instead we assign a value that is not used @@ -367,10 +360,9 @@ void init_tmp_table_share(THD *thd, TABL share->frm_version= FRM_VER_TRUE_VARCHAR; /* - Temporary tables are not replicated, but we set up these fields + Temporary tables are not replicated, but we set up this fields anyway to be able to catch errors. */ - share->table_map_version= ~(ulonglong)0; share->cached_row_logging_check= -1; /* === modified file 'sql/table.h' --- a/sql/table.h 2010-02-10 19:06:24 +0000 +++ b/sql/table.h 2010-05-26 08:16:18 +0000 @@ -433,7 +433,6 @@ typedef struct st_table_share bool waiting_on_cond; /* Protection against free */ bool deleting; /* going to delete this table */ ulong table_map_id; /* for row-based replication */ - ulonglong table_map_version; /* Cache for row-based replication table share checks that does not === modified file 'storage/xtradb/handler/ha_innodb.cc' --- a/storage/xtradb/handler/ha_innodb.cc 2010-01-15 21:12:30 +0000 +++ b/storage/xtradb/handler/ha_innodb.cc 2010-05-26 08:16:18 +0000 @@ -138,8 +138,6 @@ bool check_global_access(THD *thd, ulong /** to protect innobase_open_files */ static pthread_mutex_t innobase_share_mutex; -/** to force correct commit order in binlog */ -static pthread_mutex_t prepare_commit_mutex; static ulong commit_threads = 0; static pthread_mutex_t commit_threads_m; static pthread_cond_t commit_cond; @@ -239,6 +237,7 @@ static const char* innobase_change_buffe static INNOBASE_SHARE *get_share(const char *table_name); static void free_share(INNOBASE_SHARE *share); static int innobase_close_connection(handlerton *hton, THD* thd); +static void innobase_commit_ordered(handlerton *hton, THD* thd, bool all); static int innobase_commit(handlerton *hton, THD* thd, bool all); static int innobase_rollback(handlerton *hton, THD* thd, bool all); static int innobase_rollback_to_savepoint(handlerton *hton, THD* thd, @@ -1356,7 +1355,6 @@ innobase_trx_init( trx_t* trx) /*!< in/out: InnoDB transaction handle */ { DBUG_ENTER("innobase_trx_init"); - DBUG_ASSERT(EQ_CURRENT_THD(thd)); DBUG_ASSERT(thd == trx->mysql_thd); trx->check_foreigns = !thd_test_options( @@ -1416,8 +1414,6 @@ check_trx_exists( { trx_t*& trx = thd_to_trx(thd); - ut_ad(EQ_CURRENT_THD(thd)); - if (trx == NULL) { trx = innobase_trx_allocate(thd); } else if (UNIV_UNLIKELY(trx->magic_n != TRX_MAGIC_N)) { @@ -2024,6 +2020,7 @@ innobase_init( innobase_hton->savepoint_set=innobase_savepoint; innobase_hton->savepoint_rollback=innobase_rollback_to_savepoint; innobase_hton->savepoint_release=innobase_release_savepoint; + innobase_hton->commit_ordered=innobase_commit_ordered; innobase_hton->commit=innobase_commit; innobase_hton->rollback=innobase_rollback; innobase_hton->prepare=innobase_xa_prepare; @@ -2492,7 +2489,6 @@ skip_overwrite: innobase_open_tables = hash_create(200); pthread_mutex_init(&innobase_share_mutex, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&prepare_commit_mutex, MY_MUTEX_INIT_FAST); pthread_mutex_init(&commit_threads_m, MY_MUTEX_INIT_FAST); pthread_mutex_init(&commit_cond_m, MY_MUTEX_INIT_FAST); pthread_mutex_init(&analyze_mutex, MY_MUTEX_INIT_FAST); @@ -2547,7 +2543,6 @@ innobase_end( my_free(internal_innobase_data_file_path, MYF(MY_ALLOW_ZERO_PTR)); pthread_mutex_destroy(&innobase_share_mutex); - pthread_mutex_destroy(&prepare_commit_mutex); pthread_mutex_destroy(&commit_threads_m); pthread_mutex_destroy(&commit_cond_m); pthread_mutex_destroy(&analyze_mutex); @@ -2681,6 +2676,101 @@ innobase_start_trx_and_assign_read_view( } /*****************************************************************//** +Perform the first, fast part of InnoDB commit. + +Doing it in this call ensures that we get the same commit order here +as in binlog and any other participating transactional storage engines. + +Note that we want to do as little as really needed here, as we run +under a global mutex. The expensive fsync() is done later, in +innobase_commit(), without a lock so group commit can take place. + +Note also that this method can be called from a different thread than +the one handling the rest of the transaction. */ +static +void +innobase_commit_ordered( +/*============*/ + handlerton *hton, /*!< in: Innodb handlerton */ + THD* thd, /*!< in: MySQL thread handle of the user for whom + the transaction should be committed */ + bool all) /*!< in: TRUE - commit transaction + FALSE - the current SQL statement ended */ +{ + trx_t* trx; + DBUG_ENTER("innobase_commit_ordered"); + DBUG_ASSERT(hton == innodb_hton_ptr); + + trx = check_trx_exists(thd); + + if (trx->active_trans == 0 + && trx->conc_state != TRX_NOT_STARTED) { + /* We throw an error here; instead we will catch this error + again in innobase_commit() and report it from there. */ + DBUG_VOID_RETURN; + } + /* Since we will reserve the kernel mutex, we have to release + the search system latch first to obey the latching order. */ + + if (trx->has_search_latch) { + trx_search_latch_release_if_reserved(trx); + } + + /* commit_ordered is only called when committing the whole transaction + (or an SQL statement when autocommit is on). */ + DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); + + /* We need current binlog position for ibbackup to work. + Note, the position is current because commit_ordered is guaranteed + to be called in same sequenece as writing to binlog. */ + +retry: + if (innobase_commit_concurrency > 0) { + pthread_mutex_lock(&commit_cond_m); + commit_threads++; + + if (commit_threads > innobase_commit_concurrency) { + commit_threads--; + pthread_cond_wait(&commit_cond, + &commit_cond_m); + pthread_mutex_unlock(&commit_cond_m); + goto retry; + } + else { + pthread_mutex_unlock(&commit_cond_m); + } + } + + /* The following calls to read the MySQL binary log + file name and the position return consistent results: + 1) We use commit_ordered() to get same commit order + in InnoDB as in binary log. + 2) A MySQL log file rotation cannot happen because + MySQL protects against this by having a counter of + transactions in prepared state and it only allows + a rotation when the counter drops to zero. See + LOCK_prep_xids and COND_prep_xids in log.cc. */ + trx->mysql_log_file_name = mysql_bin_log_file_name(); + trx->mysql_log_offset = (ib_int64_t) mysql_bin_log_file_pos(); + + /* Don't do write + flush right now. For group commit + to work we want to do the flush in the innobase_commit() + method, which runs without holding any locks. */ + trx->flush_log_later = TRUE; + innobase_commit_low(trx); + trx->flush_log_later = FALSE; + + if (innobase_commit_concurrency > 0) { + pthread_mutex_lock(&commit_cond_m); + commit_threads--; + pthread_cond_signal(&commit_cond); + pthread_mutex_unlock(&commit_cond_m); + } + + DBUG_VOID_RETURN; +} + +/*****************************************************************//** Commits a transaction in an InnoDB database or marks an SQL statement ended. @return 0 */ @@ -2702,13 +2792,6 @@ innobase_commit( trx = check_trx_exists(thd); - /* Since we will reserve the kernel mutex, we have to release - the search system latch first to obey the latching order. */ - - if (trx->has_search_latch) { - trx_search_latch_release_if_reserved(trx); - } - /* The flag trx->active_trans is set to 1 in 1. ::external_lock(), @@ -2736,62 +2819,8 @@ innobase_commit( /* We were instructed to commit the whole transaction, or this is an SQL statement end and autocommit is on */ - /* We need current binlog position for ibbackup to work. - Note, the position is current because of - prepare_commit_mutex */ -retry: - if (innobase_commit_concurrency > 0) { - pthread_mutex_lock(&commit_cond_m); - commit_threads++; - - if (commit_threads > innobase_commit_concurrency) { - commit_threads--; - pthread_cond_wait(&commit_cond, - &commit_cond_m); - pthread_mutex_unlock(&commit_cond_m); - goto retry; - } - else { - pthread_mutex_unlock(&commit_cond_m); - } - } - - /* The following calls to read the MySQL binary log - file name and the position return consistent results: - 1) Other InnoDB transactions cannot intervene between - these calls as we are holding prepare_commit_mutex. - 2) Binary logging of other engines is not relevant - to InnoDB as all InnoDB requires is that committing - InnoDB transactions appear in the same order in the - MySQL binary log as they appear in InnoDB logs. - 3) A MySQL log file rotation cannot happen because - MySQL protects against this by having a counter of - transactions in prepared state and it only allows - a rotation when the counter drops to zero. See - LOCK_prep_xids and COND_prep_xids in log.cc. */ - trx->mysql_log_file_name = mysql_bin_log_file_name(); - trx->mysql_log_offset = (ib_int64_t) mysql_bin_log_file_pos(); - - /* Don't do write + flush right now. For group commit - to work we want to do the flush after releasing the - prepare_commit_mutex. */ - trx->flush_log_later = TRUE; - innobase_commit_low(trx); - trx->flush_log_later = FALSE; - - if (innobase_commit_concurrency > 0) { - pthread_mutex_lock(&commit_cond_m); - commit_threads--; - pthread_cond_signal(&commit_cond); - pthread_mutex_unlock(&commit_cond_m); - } - - if (trx->active_trans == 2) { - - pthread_mutex_unlock(&prepare_commit_mutex); - } - - /* Now do a write + flush of logs. */ + /* We did the first part already in innobase_commit_ordered(), + Now finish by doing a write + flush of logs. */ trx_commit_complete_for_mysql(trx); trx->active_trans = 0; @@ -4621,6 +4650,7 @@ no_commit: no need to re-acquire locks on it. */ /* Altering to InnoDB format */ + innobase_commit_ordered(ht, user_thd, 1); innobase_commit(ht, user_thd, 1); /* Note that this transaction is still active. */ prebuilt->trx->active_trans = 1; @@ -4637,6 +4667,7 @@ no_commit: /* Commit the transaction. This will release the table locks, so they have to be acquired again. */ + innobase_commit_ordered(ht, user_thd, 1); innobase_commit(ht, user_thd, 1); /* Note that this transaction is still active. */ prebuilt->trx->active_trans = 1; @@ -8339,6 +8370,7 @@ ha_innobase::external_lock( if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) { if (trx->active_trans != 0) { + innobase_commit_ordered(ht, thd, TRUE); innobase_commit(ht, thd, TRUE); } } else { @@ -9448,36 +9480,6 @@ innobase_xa_prepare( srv_active_wake_master_thread(); - if (thd_sql_command(thd) != SQLCOM_XA_PREPARE && - (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) - { - if (srv_enable_unsafe_group_commit && !THDVAR(thd, support_xa)) { - /* choose group commit rather than binlog order */ - return(error); - } - - /* For ibbackup to work the order of transactions in binlog - and InnoDB must be the same. Consider the situation - - thread1> prepare; write to binlog; ... - <context switch> - thread2> prepare; write to binlog; commit - thread1> ... commit - - To ensure this will not happen we're taking the mutex on - prepare, and releasing it on commit. - - Note: only do it for normal commits, done via ha_commit_trans. - If 2pc protocol is executed by external transaction - coordinator, it will be just a regular MySQL client - executing XA PREPARE and XA COMMIT commands. - In this case we cannot know how many minutes or hours - will be between XA PREPARE and XA COMMIT, and we don't want - to block for undefined period of time. */ - pthread_mutex_lock(&prepare_commit_mutex); - trx->active_trans = 2; - } - return(error); } @@ -10669,11 +10671,6 @@ static MYSQL_SYSVAR_ENUM(adaptive_checkp "Enable/Disable flushing along modified age. (none, reflex, [estimate])", NULL, innodb_adaptive_checkpoint_update, 2, &adaptive_checkpoint_typelib); -static MYSQL_SYSVAR_ULONG(enable_unsafe_group_commit, srv_enable_unsafe_group_commit, - PLUGIN_VAR_RQCMDARG, - "Enable/Disable unsafe group commit when support_xa=OFF and use with binlog or other XA storage engine.", - NULL, NULL, 0, 0, 1, 0); - static MYSQL_SYSVAR_ULONG(expand_import, srv_expand_import, PLUGIN_VAR_RQCMDARG, "Enable/Disable converting automatically *.ibd files when import tablespace.", @@ -10763,7 +10760,6 @@ static struct st_mysql_sys_var* innobase MYSQL_SYSVAR(flush_neighbor_pages), MYSQL_SYSVAR(read_ahead), MYSQL_SYSVAR(adaptive_checkpoint), - MYSQL_SYSVAR(enable_unsafe_group_commit), MYSQL_SYSVAR(expand_import), MYSQL_SYSVAR(extra_rsegments), MYSQL_SYSVAR(dict_size_limit),
participants (1)
-
knielsen@knielsen-hq.org