revision-id: 8fa609b4f2525dce0fad79a5842caa4174aeb0d5 (mariadb-10.4.1-103-g8fa609b) parent(s): 8aae31cf494678b6253031c627566e50bc666920 committer: Alexey Botchkov timestamp: 2019-02-18 02:36:19 +0400 message: MDEV-7974 backport fix for mysql bug#12161 (XA and binlog). XA transactions now are kept persistent after prepare. XA_prepare_log_event implamented, and XA tranasctions are logged as XA transactions. --- mysql-test/suite/rpl/r/rpl_xa.result | 31 +++ mysql-test/suite/rpl/t/rpl_xa.test | 26 ++ sql/handler.cc | 9 + sql/handler.h | 10 + sql/log.cc | 115 ++++++-- sql/log.h | 10 + sql/log_event.cc | 502 +++++++++++++++++++++++++++++------ sql/log_event.h | 116 +++++++- sql/sql_class.cc | 18 +- sql/sql_class.h | 20 +- sql/sql_connect.cc | 1 + sql/transaction.cc | 89 ++++++- sql/transaction.h | 1 + 13 files changed, 826 insertions(+), 122 deletions(-) diff --git a/mysql-test/suite/rpl/r/rpl_xa.result b/mysql-test/suite/rpl/r/rpl_xa.result new file mode 100644 index 0000000..58c1b09 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_xa.result @@ -0,0 +1,31 @@ +include/master-slave.inc +[connection master] +create table t1 (a int, b int) engine=InnoDB; +xa start 't'; +insert into t1 values(1, 2); +xa end 't'; +xa prepare 't'; +xa commit 't'; +select * from t1; +a b +1 2 +connection slave; +select * from t1; +a b +1 2 +connection master; +xa start 't'; +insert into t1 values(3, 4); +xa end 't'; +xa prepare 't'; +xa rollback 't'; +select * from t1; +a b +1 2 +connection slave; +select * from t1; +a b +1 2 +connection master; +drop table t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_xa.test b/mysql-test/suite/rpl/t/rpl_xa.test new file mode 100644 index 0000000..f93d787 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_xa.test @@ -0,0 +1,26 @@ +source include/have_innodb.inc; +source include/master-slave.inc; + +create table t1 (a int, b int) engine=InnoDB; +xa start 't'; +insert into t1 values(1, 2); +xa end 't'; +xa prepare 't'; +xa commit 't'; +select * from t1; +sync_slave_with_master; +select * from t1; +connection master; + +xa start 't'; +insert into t1 values(3, 4); +xa end 't'; +xa prepare 't'; +xa rollback 't'; +select * from t1; +sync_slave_with_master; +select * from t1; + +connection master; +drop table t1; +source include/rpl_end.inc; diff --git a/sql/handler.cc b/sql/handler.cc index 001055c..3b2a3e0 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1214,6 +1214,9 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all) } +/*static inline */int +binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, + bool all, my_xid xid); /** @retval 0 ok @@ -1225,6 +1228,7 @@ int ha_prepare(THD *thd) int error=0, all=1; THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; Ha_trx_info *ha_info= trans->ha_list; + DBUG_ENTER("ha_prepare"); if (ha_info) @@ -1250,6 +1254,11 @@ int ha_prepare(THD *thd) } } + if (unlikely(tc_log->log_prepare(thd, all))) + { + ha_rollback_trans(thd, all); + error=1; + } } DBUG_RETURN(error); diff --git a/sql/handler.h b/sql/handler.h index fc6246c..613c1c3 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -810,6 +810,16 @@ struct xid_t { long gtrid_length; long bqual_length; char data[XIDDATASIZE]; // not \0-terminated ! + /* + The size of the string containing serialized Xid representation + is computed as a sum of + eight as the number of formatting symbols (X'',X'',) + plus 2 x XIDDATASIZE (2 due to hex format), + plus space for decimal digits of XID::formatID, + plus one for 0x0. + */ + static const uint ser_buf_size= + 8 + 2 * XIDDATASIZE + 4 * sizeof(long) + 1; xid_t() {} /* Remove gcc warning */ bool eq(struct xid_t *xid) diff --git a/sql/log.cc b/sql/log.cc index a56117a..316b871 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -87,6 +87,9 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton, static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static int binlog_xa_recover(handlerton *hton, XID *xid_list, uint len); +static int binlog_commit_by_xid(handlerton *hton, XID *xid); +static int binlog_rollback_by_xid(handlerton *hton, XID *xid); static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); static const LEX_CSTRING write_error_msg= @@ -1688,6 +1691,9 @@ int binlog_init(void *p) binlog_hton->commit= binlog_commit; binlog_hton->rollback= binlog_rollback; binlog_hton->prepare= binlog_prepare; + binlog_hton->recover= binlog_xa_recover; + binlog_hton->commit_by_xid = binlog_commit_by_xid; + binlog_hton->rollback_by_xid = binlog_rollback_by_xid; binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; return 0; @@ -1883,23 +1889,16 @@ static inline int binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, bool all, my_xid xid) { - if (xid) - { - Xid_log_event end_evt(thd, xid, TRUE); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); - } - else + /* Mask XA COMMIT ... ONE PHASE as plain BEGIN ... COMMIT */ + if (!xid) { - /* - Empty xid occurs in XA COMMIT ... ONE PHASE. - In this case, we do not have a MySQL xid for the transaction, and the - external XA transaction coordinator will have to handle recovery if - needed. So we end the transaction with a plain COMMIT query event. - */ - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, TRUE, TRUE, 0); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); + DBUG_ASSERT(thd->transaction.xid_state.xa_state == XA_IDLE && + thd->lex->xa_opt == XA_ONE_PHASE); + xid= thd->query_id; } + + Xid_log_event end_evt(thd, xid, TRUE); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } /** @@ -1961,11 +1960,77 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) do nothing. just pretend we can do 2pc, so that MySQL won't switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_and_order() + real work is done in MYSQL_BIN_LOG::log_and_order() */ return 0; } + +static int serialize_xid(XID *xid, char *buf) +{ + size_t size; + buf[0]= '\''; + memcpy(buf+1, xid->data, xid->gtrid_length); + size= xid->gtrid_length + 2; + buf[size-1]= '\''; + if (xid->bqual_length == 0 && xid->formatID == 1) + return size; + + memcpy(buf+size, ", '", 3); + memcpy(buf+size+3, xid->data+xid->gtrid_length, xid->bqual_length); + size+= 3 + xid->bqual_length; + buf[size]= '\''; + size++; + if (xid->formatID != 1) + size+= sprintf(buf+size, ", %ld", xid->formatID); + return size; +} + + +static int binlog_xa_recover(handlerton *hton __attribute__((unused)), + XID *xid_list __attribute__((unused)), + uint len __attribute__((unused))) +{ + /* Does nothing. */ + return 0; +} + + +static int binlog_commit_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + const size_t xc_len= sizeof("XA COMMIT ") - 1; // do not count trailing 0 + char buf[xc_len + xid_t::ser_buf_size]; + size_t buflen; + binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); + + if (!cache_mngr) + return 1; + + memcpy(buf, "XA COMMIT ", xc_len); + buflen= xc_len + serialize_xid(xid, buf+xc_len); + Query_log_event qe(thd, buf, buflen, TRUE, FALSE, FALSE, 0); + return binlog_flush_cache(thd, cache_mngr, &qe, TRUE, TRUE, TRUE); +} + + +static int binlog_rollback_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + const size_t xr_len= sizeof("XA ROLLBACK ") - 1; // do not count trailing 0 + char buf[xr_len + xid_t::ser_buf_size]; + size_t buflen; + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK); + memcpy(buf, "XA ROLLBACK ", xr_len); + buflen= xr_len + serialize_xid(xid, buf+xr_len); + Query_log_event qe(thd, buf, buflen, FALSE, TRUE, TRUE, 0); + return mysql_bin_log.write_event(&qe); +} + + /* We flush the cache wrapped in a beging/rollback if: . aborting a single or multi-statement transaction and; @@ -9809,6 +9874,24 @@ int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie)); } + +int TC_LOG_BINLOG::log_prepare(THD *thd, bool all) +{ + XID *xid= &thd->transaction.xid_state.xid; + XA_prepare_log_event end_evt(thd, xid, FALSE); + binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); + + if (!cache_mngr) + { + WSREP_DEBUG("Skipping empty log_xid: %s", thd->query()); + return 0; + } + + cache_mngr->using_xa= FALSE; + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); +} + + void TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) { diff --git a/sql/log.h b/sql/log.h index 7dfdb36..92fdf95 100644 --- a/sql/log.h +++ b/sql/log.h @@ -61,6 +61,7 @@ class TC_LOG bool need_prepare_ordered, bool need_commit_ordered) = 0; virtual int unlog(ulong cookie, my_xid xid)=0; + virtual int log_prepare(THD *thd, bool all)= 0; virtual void commit_checkpoint_notify(void *cookie)= 0; protected: @@ -115,6 +116,10 @@ class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging return 1; } int unlog(ulong cookie, my_xid xid) { return 0; } + int log_prepare(THD *thd, bool all) + { + return 0; + } void commit_checkpoint_notify(void *cookie) { DBUG_ASSERT(0); }; }; @@ -198,6 +203,10 @@ class TC_LOG_MMAP: public TC_LOG int log_and_order(THD *thd, my_xid xid, bool all, bool need_prepare_ordered, bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); + int log_prepare(THD *thd, bool all) + { + return 0; + } void commit_checkpoint_notify(void *cookie); int recover(); @@ -698,6 +707,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG int log_and_order(THD *thd, my_xid xid, bool all, bool need_prepare_ordered, bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); + int log_prepare(THD *thd, bool all); void commit_checkpoint_notify(void *cookie); int recover(LOG_INFO *linfo, const char *last_log_name, IO_CACHE *first_log, Format_description_log_event *fdle, bool do_xa); diff --git a/sql/log_event.cc b/sql/log_event.cc index 7a0d0be..486f8f5 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2139,6 +2139,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case XID_EVENT: ev = new Xid_log_event(buf, fdle); break; + case XA_PREPARE_LOG_EVENT: + ev = new XA_prepare_log_event(buf, fdle); + break; case RAND_EVENT: ev = new Rand_log_event(buf, fdle); break; @@ -2190,7 +2193,6 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case PREVIOUS_GTIDS_LOG_EVENT: case TRANSACTION_CONTEXT_EVENT: case VIEW_CHANGE_EVENT: - case XA_PREPARE_LOG_EVENT: ev= new Ignorable_log_event(buf, fdle, get_type_str((Log_event_type) event_type)); break; @@ -4418,6 +4420,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, size_t que case SQLCOM_RELEASE_SAVEPOINT: case SQLCOM_ROLLBACK_TO_SAVEPOINT: case SQLCOM_SAVEPOINT: + case SQLCOM_XA_END: use_cache= trx_cache= TRUE; break; default: @@ -6222,6 +6225,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[USER_VAR_EVENT-1]= USER_VAR_HEADER_LEN; post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN; post_header_len[XID_EVENT-1]= XID_HEADER_LEN; + post_header_len[XA_PREPARE_LOG_EVENT-1]= XA_PREPARE_HEADER_LEN; post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= BEGIN_LOAD_QUERY_HEADER_LEN; post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN; /* @@ -7874,7 +7878,7 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, buf+= 8; domain_id= uint4korr(buf); buf+= 4; - flags2= *buf; + flags2= *(buf++); if (flags2 & FL_GROUP_COMMIT_ID) { if (event_len < (uint)header_size + GTID_HEADER_LEN + 2) @@ -7882,8 +7886,22 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, seq_no= 0; // So is_valid() returns false return; } - ++buf; commit_id= uint8korr(buf); + buf+= 8; + } + if (flags2 & FL_XA_TRANSACTION) + { + xid.formatID= (long) buf[0]; + xid.gtrid_length= (long) buf[1]; + xid.bqual_length= (long) buf[2]; + + buf+= 3; + if (xid.formatID >= 0) + { + long data_length= xid.bqual_length + xid.gtrid_length; + memcpy(xid.data, buf, data_length); + buf+= data_length; + } } } @@ -7914,6 +7932,12 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, /* Preserve any DDL or WAITED flag in the slave's binlog. */ if (thd_arg->rgi_slave) flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED)); + if (thd->transaction.xid_state.xa_state == XA_IDLE && + thd->lex->xa_opt != XA_ONE_PHASE) + { + flags2|= FL_XA_TRANSACTION; + xid= thd->transaction.xid_state.xid; + } } @@ -7956,7 +7980,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len, bool Gtid_log_event::write() { - uchar buf[GTID_HEADER_LEN+2]; + uchar buf[GTID_HEADER_LEN+2+sizeof(XID)]; size_t write_len; int8store(buf, seq_no); @@ -7968,8 +7992,25 @@ Gtid_log_event::write() write_len= GTID_HEADER_LEN + 2; } else + write_len= 13; + + if (flags2 & FL_XA_TRANSACTION) { - bzero(buf+13, GTID_HEADER_LEN-13); + buf[write_len]= (uchar) ((char) xid.formatID); + buf[write_len+1]= (uchar) xid.gtrid_length; + buf[write_len+2]= (uchar) xid.bqual_length; + write_len+= 3; + if (xid.formatID >= 0) + { + long data_length= xid.bqual_length + xid.gtrid_length; + memcpy(buf+write_len, xid.data, data_length); + write_len+= data_length; + } + } + + if (write_len < GTID_HEADER_LEN) + { + bzero(buf+write_len, GTID_HEADER_LEN-write_len); write_len= GTID_HEADER_LEN; } return write_header(write_len) || @@ -8012,7 +8053,7 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, void Gtid_log_event::pack_info(Protocol *protocol) { - char buf[6+5+10+1+10+1+20+1+4+20+1]; + char buf[6+5+10+1+10+1+20+1+4+20+1+5+128]; char *p; p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); p= longlong10_to_str(domain_id, p, 10); @@ -8026,6 +8067,12 @@ Gtid_log_event::pack_info(Protocol *protocol) p= longlong10_to_str(commit_id, p, 10); } + if (flags2 & FL_XA_TRANSACTION) + { + p= strmov(p, " XID :"); + p= strnmov(p, xid.data, xid.bqual_length + xid.gtrid_length); + } + protocol->store(buf, p-buf, &my_charset_bin); } @@ -8079,11 +8126,25 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) thd->lex->sql_command= SQLCOM_BEGIN; thd->is_slave_error= 0; status_var_increment(thd->status_var.com_stat[thd->lex->sql_command]); - if (trans_begin(thd, 0)) + if (flags2 & FL_XA_TRANSACTION) { - DBUG_PRINT("error", ("trans_begin() failed")); - thd->is_slave_error= 1; + thd->lex->xid= &xid; + thd->lex->xa_opt= XA_NONE; + if (trans_xa_start(thd)) + { + DBUG_PRINT("error", ("trans_xa_start() failed")); + thd->is_slave_error= 1; + } + } + else + { + if (trans_begin(thd, 0)) + { + DBUG_PRINT("error", ("trans_begin() failed")); + thd->is_slave_error= 1; + } } + thd->update_stats(); if (likely(!thd->is_slave_error)) @@ -8202,9 +8263,29 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) buf, print_event_info->delimiter)) goto err; } - if (!(flags2 & FL_STANDALONE)) - if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n", print_event_info->delimiter)) + if ((flags2 & FL_XA_TRANSACTION) && !is_flashback) + { + my_b_write_string(&cache, "XA START '"); + my_b_write(&cache, (uchar *) xid.data, xid.gtrid_length); + my_b_write_string(&cache, "'"); + if (xid.bqual_length > 0 || xid.formatID != 1) + { + my_b_write_string(&cache, ", '"); + my_b_write(&cache, (uchar *) xid.data+xid.gtrid_length, xid.bqual_length); + my_b_write_string(&cache, "'"); + if (xid.formatID != 1) + if (my_b_printf(&cache, ", %d", xid.formatID)) + goto err; + } + if (my_b_printf(&cache, "%s\n", print_event_info->delimiter)) goto err; + } + else if (!(flags2 & FL_STANDALONE)) + { + if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n", + print_event_info->delimiter)) + goto err; + } return cache.flush_data(); err: @@ -8825,80 +8906,20 @@ bool slave_execute_deferred_events(THD *thd) /************************************************************************** - Xid_log_event methods + Xid_apply_log_event methods **************************************************************************/ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Xid_log_event::pack_info(Protocol *protocol) -{ - char buf[128], *pos; - pos= strmov(buf, "COMMIT /* xid="); - pos= longlong10_to_str(xid, pos, 10); - pos= strmov(pos, " */"); - protocol->store(buf, (uint) (pos-buf), &my_charset_bin); -} -#endif - -/** - @note - It's ok not to use int8store here, - as long as xid_t::set(ulonglong) and - xid_t::get_my_xid doesn't do it either. - We don't care about actual values of xids as long as - identical numbers compare identically -*/ -Xid_log_event:: -Xid_log_event(const char* buf, - const Format_description_log_event *description_event) - :Log_event(buf, description_event) +int Xid_apply_log_event::record_gtid(const rpl_gtid *gtid, uint64 sub_id, + void **out_hton) { - /* The Post-Header is empty. The Variable Data part begins immediately. */ - buf+= description_event->common_header_len + - description_event->post_header_len[XID_EVENT-1]; - memcpy((char*) &xid, buf, sizeof(xid)); + return rpl_global_gtid_slave_state->record_gtid(thd, gtid, sub_id, true, + false, out_hton); } -#ifndef MYSQL_CLIENT -bool Xid_log_event::write() -{ - DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return write_header(sizeof(xid)) || - write_data((uchar*)&xid, sizeof(xid)) || - write_footer(); -} -#endif - - -#ifdef MYSQL_CLIENT -bool Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) -{ - Write_on_release_cache cache(&print_event_info->head_cache, file, - Write_on_release_cache::FLUSH_F, this); - - if (!print_event_info->short_form) - { - char buf[64]; - longlong10_to_str(xid, buf, 10); - - if (print_header(&cache, print_event_info, FALSE) || - my_b_printf(&cache, "\tXid = %s\n", buf)) - goto err; - } - if (my_b_printf(&cache, is_flashback ? "BEGIN%s\n" : "COMMIT%s\n", - print_event_info->delimiter)) - goto err; - - return cache.flush_data(); -err: - return 1; -} -#endif /* MYSQL_CLIENT */ - - -#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Xid_log_event::do_apply_event(rpl_group_info *rgi) +int Xid_apply_log_event::do_apply_event(rpl_group_info *rgi) { bool res; int err; @@ -8929,8 +8950,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) rgi->gtid_pending= false; gtid= rgi->current_gtid; - err= rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, true, - false, &hton); + err= record_gtid(>id, sub_id, &hton); if (unlikely(err)) { int ec= thd->get_stmt_da()->sql_errno(); @@ -8959,8 +8979,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) general_log_print(thd, COM_QUERY, "COMMIT /* implicit, from Xid_log_event */"); thd->variables.option_bits&= ~OPTION_GTID_BEGIN; - res= trans_commit(thd); /* Automatically rolls back on error. */ - thd->mdl_context.release_transactional_locks(); + res= do_commit(); if (likely(!res) && sub_id) rpl_global_gtid_slave_state->update_state_hash(sub_id, >id, hton, rgi); @@ -8973,10 +8992,11 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) return res; } + Log_event::enum_skip_reason -Xid_log_event::do_shall_skip(rpl_group_info *rgi) +Xid_apply_log_event::do_shall_skip(rpl_group_info *rgi) { - DBUG_ENTER("Xid_log_event::do_shall_skip"); + DBUG_ENTER("Xid_apply_log_event::do_shall_skip"); if (rgi->rli->slave_skip_counter > 0) { DBUG_ASSERT(!rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION)); @@ -9000,6 +9020,321 @@ Xid_log_event::do_shall_skip(rpl_group_info *rgi) #endif DBUG_RETURN(Log_event::do_shall_skip(rgi)); } + +#endif /*MYSQL_SERVER*/ + + +/************************************************************************** + Xid_log_event methods +**************************************************************************/ + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +void Xid_log_event::pack_info(Protocol *protocol) +{ + char buf[128], *pos; + pos= strmov(buf, "COMMIT /* xid="); + pos= longlong10_to_str(xid, pos, 10); + pos= strmov(pos, " */"); + protocol->store(buf, (uint) (pos-buf), &my_charset_bin); +} +#endif + +/** + @note + It's ok not to use int8store here, + as long as xid_t::set(ulonglong) and + xid_t::get_my_xid doesn't do it either. + We don't care about actual values of xids as long as + identical numbers compare identically +*/ + +Xid_log_event:: +Xid_log_event(const char* buf, + const Format_description_log_event *description_event) + :Xid_apply_log_event(buf, description_event) +{ + /* The Post-Header is empty. The Variable Data part begins immediately. */ + buf+= description_event->common_header_len + + description_event->post_header_len[XID_EVENT-1]; + memcpy((char*) &xid, buf, sizeof(xid)); +} + + +#ifndef MYSQL_CLIENT +bool Xid_log_event::write() +{ + DBUG_EXECUTE_IF("do_not_write_xid", return 0;); + return write_header(sizeof(xid)) || + write_data((uchar*)&xid, sizeof(xid)) || + write_footer(); +} +#endif + + +#ifdef MYSQL_CLIENT +bool Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) +{ + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F, this); + + if (!print_event_info->short_form) + { + char buf[64]; + longlong10_to_str(xid, buf, 10); + + if (print_header(&cache, print_event_info, FALSE) || + my_b_printf(&cache, "\tXid = %s\n", buf)) + goto err; + } + if (my_b_printf(&cache, is_flashback ? "BEGIN%s\n" : "COMMIT%s\n", + print_event_info->delimiter)) + goto err; + + return cache.flush_data(); +err: + return 1; +} +#endif /* MYSQL_CLIENT */ + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +int Xid_log_event::do_commit() +{ + bool res; + res= trans_commit(thd); /* Automatically rolls back on error. */ + thd->mdl_context.release_transactional_locks(); + return res; +} +#endif /* !MYSQL_CLIENT */ + + +#ifdef TODO7974 +/** + Function serializes XID which is characterized by by four last arguments + of the function. + Serialized XID is presented in valid hex format and is returned to + the caller in a buffer pointed by the first argument. + The buffer size provived by the caller must be not less than + 8 + 2 * XIDDATASIZE + 4 * sizeof(XID::formatID) + 1, see + XID::serialize_xid() that is a caller and plugin.h for XID declaration. + + @param buf pointer to a buffer allocated for storing serialized data + + @return the value of the buffer pointer +*/ + +char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const +{ + int i; + char *c= buf; + /* + Build a string like following pattern: + X'hex11hex12...hex1m',X'hex21hex22...hex2n',11 + and store it into buf. + Here hex1i and hex2k are hexadecimals representing XID's internal + raw bytes (1 <= i <= m, 1 <= k <= n), and `m' and `n' even numbers + half of which corresponding to the lengths of XID's components. + */ + c[0]= 'X'; + c[1]= '\''; + c+= 2; + for (i= 0; i < gtrid_length; i++) + { + c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4]; + c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f]; + c+= 2; + } + c[0]= '\''; + c[1]= ','; + c[2]= 'X'; + c[3]= '\''; + c+= 4; + + for (; i < gtrid_length + bqual_length; i++) + { + c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4]; + c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f]; + c+= 2; + } + c[0]= '\''; + sprintf(c+1, ",%lu", formatID); + + return buf; +} +#endif /*TODO7974*/ + +char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const +{ + char *c= buf; + + c[0]= '\''; + memcpy(c+1, data, gtrid_length); + c[gtrid_length+1]= '\''; + c+= gtrid_length + 2; + + if (bqual_length) + { + c[0]= ','; + c[1]= '\''; + memcpy(c+2, data+gtrid_length, bqual_length); + c[bqual_length+2]= '\''; + c+= bqual_length+3; + } + + if (formatID != 1) + sprintf(c, ",%lu", formatID); + else + c[0]=0; + + return buf; +} + + +/************************************************************************** + XA_prepare_log_event methods +**************************************************************************/ +/** + @note + It's ok not to use int8store here, + as long as xid_t::set(ulonglong) and + xid_t::get_n_xid doesn't do it either. + We don't care about actual values of xids as long as + identical numbers compare identically +*/ + +XA_prepare_log_event:: +XA_prepare_log_event(const char* buf, + const Format_description_log_event *description_event) + :Xid_apply_log_event(buf, description_event) +{ + uint32 temp= 0; + uint8 temp_byte; + + buf+= description_event->common_header_len + + description_event->post_header_len[XA_PREPARE_LOG_EVENT-1]; + memcpy(&temp_byte, buf, 1); + one_phase= (bool) temp_byte; + buf += sizeof(temp_byte); + memcpy(&temp, buf, sizeof(temp)); + m_xid.formatID= le32toh(temp); + buf += sizeof(temp); + memcpy(&temp, buf, sizeof(temp)); + m_xid.gtrid_length= le32toh(temp); + buf += sizeof(temp); + memcpy(&temp, buf, sizeof(temp)); + m_xid.bqual_length= le32toh(temp); + buf += sizeof(temp); + memcpy(m_xid.data, buf, m_xid.gtrid_length + m_xid.bqual_length); + + xid= NULL; +} + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +void XA_prepare_log_event::pack_info(Protocol *protocol) +{ + char buf[ser_buf_size]; + char query[sizeof("XA COMMIT ONE PHASE") + 1 + sizeof(buf)]; + + /* RHS of the following assert is unknown to client sources */ + compile_time_assert(ser_buf_size == XID::ser_buf_size); + m_xid.serialize(buf); + sprintf(query, + (one_phase ? "XA COMMIT %s ONE PHASE" : "XA PREPARE %s"), + buf); + + protocol->store(query, strlen(query), &my_charset_bin); +} +#endif + + +#ifndef MYSQL_CLIENT +bool XA_prepare_log_event::write() +{ + uchar data[1 + 4 + 4 + 4]; + uint8 one_phase_byte= one_phase; + + data[0]= one_phase; + int4store(data+1, static_cast<XID*>(xid)->formatID); + int4store(data+(1+4), static_cast<XID*>(xid)->gtrid_length); + int4store(data+(1+4+4), static_cast<XID*>(xid)->bqual_length); + + DBUG_ASSERT(xid_bufs_size == sizeof(data) - 1); + + return write_header(sizeof(one_phase_byte) + xid_bufs_size + + static_cast<XID*>(xid)->gtrid_length + + static_cast<XID*>(xid)->bqual_length) || + write_data(data, sizeof(data)) || + write_data((uchar*) static_cast<XID*>(xid)->data, + static_cast<XID*>(xid)->gtrid_length + + static_cast<XID*>(xid)->bqual_length) || + write_footer(); +} +#endif + + +#ifdef MYSQL_CLIENT +bool XA_prepare_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) +{ + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F, this); + char buf[ser_buf_size]; + + m_xid.serialize(buf); + + if (!print_event_info->short_form) + { + print_header(&cache, print_event_info, FALSE); + if (my_b_printf(&cache, "\tXID = %s\n", buf)) + goto error; + } + + if (my_b_printf(&cache, "XA PREPARE %s\n%s\n", + buf, print_event_info->delimiter)) + goto error; + + return cache.flush_data(); +error: + return TRUE; +} +#endif /* MYSQL_CLIENT */ + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +int XA_prepare_log_event::record_gtid(const rpl_gtid *gtid, uint64 sub_id, + void **out_hton) +{ + int err; + xa_states c_state= thd->transaction.xid_state.xa_state; + thd->transaction.xid_state.xa_state= XA_ACTIVE; + err= rpl_global_gtid_slave_state->record_gtid(thd, gtid, sub_id, true, + false, out_hton); + thd->transaction.xid_state.xa_state= c_state; + return err; +} + + +int XA_prepare_log_event::do_commit() +{ + int res; + xid_t xid; + xid.set(m_xid.formatID, + m_xid.data, m_xid.gtrid_length, + m_xid.data + m_xid.gtrid_length, m_xid.bqual_length); + + thd->lex->xid= &xid; + if (!one_phase) + { + res= trans_xa_prepare(thd); + } + else + { + res= trans_xa_commit(thd); + thd->mdl_context.release_transactional_locks(); + } + + return res; +} #endif /* !MYSQL_CLIENT */ @@ -14789,7 +15124,6 @@ bool event_that_should_be_ignored(const char *buf) event_type == PREVIOUS_GTIDS_LOG_EVENT || event_type == TRANSACTION_CONTEXT_EVENT || event_type == VIEW_CHANGE_EVENT || - event_type == XA_PREPARE_LOG_EVENT || (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)) return 1; return 0; diff --git a/sql/log_event.h b/sql/log_event.h index 38a40c9..f2f784b 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -217,6 +217,7 @@ class String; #define GTID_HEADER_LEN 19 #define GTID_LIST_HEADER_LEN 4 #define START_ENCRYPTION_HEADER_LEN 0 +#define XA_PREPARE_HEADER_LEN 0 /* Max number of possible extra bytes in a replication event compared to a @@ -3016,6 +3017,30 @@ class Rand_log_event: public Log_event #endif }; + +class Xid_apply_log_event: public Log_event +{ +public: +#ifdef MYSQL_SERVER + Xid_apply_log_event(THD* thd_arg): + Log_event(thd_arg, 0, TRUE) {} +#endif + Xid_apply_log_event(const char* buf, + const Format_description_log_event *description_event): + Log_event(buf, description_event) {} + + ~Xid_apply_log_event() {} + bool is_valid() const { return 1; } +private: +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) + virtual int do_commit()= 0; + virtual int record_gtid(const rpl_gtid *gtid, uint64 sub_id, void **out_hton); + virtual int do_apply_event(rpl_group_info *rgi); + enum_skip_reason do_shall_skip(rpl_group_info *rgi); +#endif +}; + + /** @class Xid_log_event @@ -3028,14 +3053,14 @@ class Rand_log_event: public Log_event typedef ulonglong my_xid; // this line is the same as in handler.h #endif -class Xid_log_event: public Log_event +class Xid_log_event: public Xid_apply_log_event { - public: - my_xid xid; +public: + my_xid xid; #ifdef MYSQL_SERVER Xid_log_event(THD* thd_arg, my_xid x, bool direct): - Log_event(thd_arg, 0, TRUE), xid(x) + Xid_apply_log_event(thd_arg) { if (direct) cache_type= Log_event::EVENT_NO_CACHE; @@ -3055,15 +3080,85 @@ class Xid_log_event: public Log_event #ifdef MYSQL_SERVER bool write(); #endif - bool is_valid() const { return 1; } private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(rpl_group_info *rgi); - enum_skip_reason do_shall_skip(rpl_group_info *rgi); + int do_commit(); #endif }; + +/** + @class XA_prepare_log_event + + Similar to Xid_log_event except that + - it is specific to XA transaction + - it carries out the prepare logics rather than the final committing + when @c one_phase member is off. + From the groupping perspective the event finalizes the current "prepare" group + started with XA START Query-log-event. + When @c one_phase is false Commit of Rollback for XA transaction are + logged separately to the prepare-group events so being a groups of + their own. +*/ + +class XA_prepare_log_event: public Xid_apply_log_event +{ +protected: + /* The event_xid_t members were copied from handler.h */ + struct event_xid_t + { + long formatID; + long gtrid_length; + long bqual_length; + char data[MYSQL_XIDDATASIZE]; // not \0-terminated ! + char *serialize(char *buf) const; + }; + + /* size of serialization buffer is explained in $MYSQL/sql/xa.h. */ + static const uint ser_buf_size= + 8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1; + + /* Total size of buffers to hold serialized members of XID struct */ + static const int xid_bufs_size= 12; + event_xid_t m_xid; + void *xid; + bool one_phase; + +public: +#ifdef MYSQL_SERVER + XA_prepare_log_event(THD* thd_arg, XID *xid_arg, bool one_phase_arg): + Xid_apply_log_event(thd_arg), xid(xid_arg), one_phase(one_phase_arg) + { + cache_type= Log_event::EVENT_NO_CACHE; + } +#ifdef HAVE_REPLICATION + void pack_info(Protocol* protocol); +#endif /* HAVE_REPLICATION */ +#else + bool print(FILE* file, PRINT_EVENT_INFO* print_event_info); +#endif + XA_prepare_log_event(const char* buf, + const Format_description_log_event *description_event); + ~XA_prepare_log_event() {} + Log_event_type get_type_code() { return XA_PREPARE_LOG_EVENT; } + int get_data_size() + { + return xid_bufs_size + m_xid.gtrid_length + m_xid.bqual_length; + } + +#ifdef MYSQL_SERVER + bool write(); +#endif + +private: +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) + int record_gtid(const rpl_gtid *gtid, uint64 sub_id, void **out_hton); + int do_commit(); +#endif +}; + + /** @class User_var_log_event @@ -3376,6 +3471,11 @@ class Gtid_log_event: public Log_event uint64 seq_no; uint64 commit_id; uint32 domain_id; +#ifdef MYSQL_SERVER + XID xid; +#else + struct st_mysql_xid xid; +#endif uchar flags2; /* Flags2. */ @@ -3404,6 +3504,8 @@ class Gtid_log_event: public Log_event static const uchar FL_WAITED= 16; /* FL_DDL is set for event group containing DDL. */ static const uchar FL_DDL= 32; + /* FL_XA_TRANSACTION is set for XA transaction. */ + static const uchar FL_XA_TRANSACTION= 64; #ifdef MYSQL_SERVER Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, diff --git a/sql/sql_class.cc b/sql/sql_class.cc index fa2f866..cc75da9 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1461,12 +1461,19 @@ void THD::cleanup(void) DBUG_ASSERT(cleanup_done == 0); set_killed(KILL_CONNECTION); -#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE if (transaction.xid_state.xa_state == XA_PREPARED) { -#error xid_state in the cache should be replaced by the allocated value + trans_detach(this); + transaction.xid_state.xa_state= XA_NOTR; + transaction.xid_state.rm_error= 0; + } + else + { + transaction.xid_state.xa_state= XA_NOTR; + transaction.xid_state.rm_error= 0; + trans_rollback(this); + xid_cache_delete(this, &transaction.xid_state); } -#endif mysql_ha_cleanup(this); locked_tables_list.unlock_locked_tables(this); @@ -1474,11 +1481,6 @@ void THD::cleanup(void) delete_dynamic(&user_var_events); close_temporary_tables(); - transaction.xid_state.xa_state= XA_NOTR; - transaction.xid_state.rm_error= 0; - trans_rollback(this); - xid_cache_delete(this, &transaction.xid_state); - DBUG_ASSERT(open_tables == NULL); /* If the thread was in the middle of an ongoing transaction (rolled diff --git a/sql/sql_class.h b/sql/sql_class.h index 69fabee..76befcb 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1255,6 +1255,18 @@ typedef struct st_xid_state { /* Error reported by the Resource Manager (RM) to the Transaction Manager. */ uint rm_error; XID_cache_element *xid_cache_element; + /* + Binary logging status. + It is set to TRUE at XA PREPARE if the transaction was written + to the binlog. + Naturally FALSE means the transaction was not written to + the binlog. Happens if the trnasaction did not modify anything + or binlogging was turned off. In that case we shouldn't binlog + the consequent XA COMMIT/ROLLBACK. + The recovered transaction after server restart sets it to TRUE always. + That can cause inconsistencies (shoud be fixed?). + */ + bool is_binlogged; /** Check that XA transaction has an uncommitted work. Report an error @@ -1278,6 +1290,12 @@ typedef struct st_xid_state { } return false; } + + void reset() + { + xid.null(); + is_binlogged= FALSE; + } } XID_STATE; void xid_cache_init(void); @@ -2603,7 +2621,7 @@ class THD :public Statement, then. */ if (!xid_state.rm_error) - xid_state.xid.null(); + xid_state.reset(); free_root(&mem_root,MYF(MY_KEEP_PREALLOC)); DBUG_VOID_RETURN; } diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index b48070b..3e4a067 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -1414,6 +1414,7 @@ void do_handle_one_connection(CONNECT *connect) #endif end_thread: close_connection(thd); + thd->get_stmt_da()->reset_diagnostics_area(); if (thd->userstat_running) update_global_user_stats(thd, create_user, time(NULL)); diff --git a/sql/transaction.cc b/sql/transaction.cc index 13614d3..447a06b 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -790,6 +790,44 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) /** + Detach the current XA transaction; + + @param thd Current thread + + @retval FALSE Success + @retval TRUE Failure +*/ + +bool trans_detach(THD *thd) +{ + XID_STATE *xid_s= &thd->transaction.xid_state; + Ha_trx_info *ha_info, *ha_info_next; + + DBUG_ENTER("trans_detach"); + +// DBUG_ASSERT(xid_s->xa_state == XA_PREPARED && +// xid_cache_search(thd, &xid_s->xid)); + + xid_cache_delete(thd, xid_s); + if (xid_cache_insert(&xid_s->xid, XA_PREPARED)) + DBUG_RETURN(TRUE); + + for (ha_info= thd->transaction.all.ha_list; + ha_info; + ha_info= ha_info_next) + { + ha_info_next= ha_info->next(); + ha_info->reset(); /* keep it conveniently zero-filled */ + } + + thd->transaction.all.ha_list= 0; + thd->transaction.all.no_2pc= 0; + + DBUG_RETURN(FALSE); +} + + +/** Starts an XA transaction with the given xid value. @param thd Current thread @@ -862,7 +900,15 @@ bool trans_xa_end(THD *thd) else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) my_error(ER_XAER_NOTA, MYF(0)); else if (!xa_trans_rolled_back(&thd->transaction.xid_state)) - thd->transaction.xid_state.xa_state= XA_IDLE; + { + if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && + thd->binlog_query(THD::STMT_QUERY_TYPE, + thd->query(), thd->query_length(), + TRUE, FALSE, FALSE, 0)) + my_error(ER_XAER_RMERR, MYF(0)); + else + thd->transaction.xid_state.xa_state= XA_IDLE; + } DBUG_RETURN(thd->is_error() || thd->transaction.xid_state.xa_state != XA_IDLE); @@ -928,6 +974,12 @@ bool trans_xa_commit(THD *thd) res= !xs; if (res) my_error(ER_XAER_NOTA, MYF(0)); + else if (thd->in_multi_stmt_transaction_mode()) + { + my_error(ER_XAER_RMFAIL, MYF(0), + xa_state_names[thd->transaction.xid_state.xa_state]); + res= TRUE; + } else { res= xa_trans_rolled_back(xs); @@ -978,8 +1030,16 @@ bool trans_xa_commit(THD *thd) { DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); - res= MY_TEST(ha_commit_one_phase(thd, 1)); - if (res) + if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) + { + res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, + thd->query(), thd->query_length(), + FALSE, FALSE, FALSE, 0); + } + else + res= 0; + + if (res || (res= MY_TEST(ha_commit_one_phase(thd, 1)))) my_error(ER_XAER_RMERR, MYF(0)); } } @@ -1032,19 +1092,36 @@ bool trans_xa_rollback(THD *thd) else { xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, 0); + if (ha_commit_or_rollback_by_xid(thd->lex->xid, 0) == 0 && + (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())) + thd->binlog_query(THD::THD::STMT_QUERY_TYPE, + thd->query(), thd->query_length(), + FALSE, FALSE, FALSE, 0); xid_cache_delete(thd, xs); } DBUG_RETURN(thd->get_stmt_da()->is_error()); } - if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY) + if (xa_state != XA_IDLE && xa_state != XA_PREPARED && + xa_state != XA_ROLLBACK_ONLY) { my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); DBUG_RETURN(TRUE); } - res= xa_trans_force_rollback(thd); + if(xa_state == XA_PREPARED && + (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())) + { + res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, + thd->query(), thd->query_length(), + FALSE, FALSE, FALSE, 0); + } + else + res= 0; + + res= res || xa_trans_force_rollback(thd); + if (res || (res= MY_TEST(xa_trans_force_rollback(thd)))) + my_error(ER_XAER_RMERR, MYF(0)); thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); thd->transaction.all.reset(); diff --git a/sql/transaction.h b/sql/transaction.h index 7e34693..f228cc6 100644 --- a/sql/transaction.h +++ b/sql/transaction.h @@ -29,6 +29,7 @@ bool trans_commit(THD *thd); bool trans_commit_implicit(THD *thd); bool trans_rollback(THD *thd); bool trans_rollback_implicit(THD *thd); +bool trans_detach(THD *thd); bool trans_commit_stmt(THD *thd); bool trans_rollback_stmt(THD *thd);