[PATCH] MDEV-31949 parallel slave xa Round-Robin distribution
From: Andrei <andrei.elkin@mariadb.com> XA-Prepare group of events XA START xid ... XA END xid XA PREPARE xid and its XA-"complete" terminator XA COMMIT or XA ROLLBACK are made distributed Round-Robin across slave parallel workers. The former hash-based policy was proven to attribute to execution latency through creating a big - many times larger than the size of the worker pool - queue of binlog-ordered transactions to commit. Acronyms and notations used below: XAP := XA-Prepare event or the whole prepared XA group of events XAC := XA-"complete", which is a solitary group of events |W| := the size of the slave worker pool Subscripts like `_k' denote order in a corresponding sequence (e.g binlog file). KEY CHANGES: The parallel slave ------------------ driver thread now maintains a list XAP:s currently in processing. It's purpose is to avoid "wild" parallel execution of XA:s with duplicate xids (unlikely, but that's the user's right). The list is arranged as a sliding window with the size of 2*|W| to account a possibility of XAP_k -> XAP_k+2|W|-1 the largest (in the group-of-events count sense) dependency. Say k=1, and |W| the # of Workers is 4. As transactions are distributed Round-Robin, it's possible to have T^*_1 -> T^*_8 as the largest dependency ('*' marks the dependents) in runtime. It can be seen from worker queues, like in the picture below. Let Q_i worker queues develop downward: Q1 ... Q4 1^* 2 3 4 5 6 7 8^* Worker # 1 has assigned with T_1 and T_5. Worker #4 can take on its T_8 when T_1 is yet at the beginning of its processing, so even before XA START of that XAP. XA related ---------- XID_cache_element is extended with two pointers to resolve two types of dependencies: the duplicate xid XAP_k -> XAP_k+i and the ordinary completion on the prepare XAP_k -> XAC_k+j. The former is handled by a wait-for-xid protocol conducted by xid_cache_delete() and xid_cache_insert_maybe_wait(). The later is done analogously by xid_cache_search_maybe_wait() and slave_applier_reset_xa_trans(). XA-"complete" are allowed to go forward before its XAP parent has released the xid (all recovery concerns are covered in MDEV-21496, MDEV-21777). Yet XAC is going to wait for it at a critical point of execution which is at "complete" the work in Engine. CAVEAT: storage/innobase/trx/trx0undo.cc changes are due to possibly fixed MDEV-32144, TODO: to be verified. Thanks to Brandon Nesterenko at mariadb.com for initial review and a lot of creative efforts to advance with this work! --- mysql-test/include/show_binlog_events2.inc | 11 + .../binlog/r/binlog_xa_prepared_bugs.result | 53 + .../binlog/t/binlog_xa_prepared_bugs.test | 30 + .../rpl/include/rpl_xa_concurrent_2pc.inc | 442 ++++++++ .../suite/rpl/r/rpl_xa_concurrent_2pc.result | 953 ++++++++++++++++++ .../rpl/r/rpl_xa_empty_transaction.result | 51 + .../rpl/r/rpl_xa_prepare_gtid_fail.result | 2 +- .../suite/rpl/t/rpl_xa_concurrent_2pc.test | 111 ++ .../suite/rpl/t/rpl_xa_empty_transaction.test | 89 ++ .../suite/rpl/t/rpl_xa_prepare_gtid_fail.test | 2 +- sql/handler.cc | 23 +- sql/log.cc | 219 +++- sql/log_event_server.cc | 17 +- sql/mysqld.cc | 4 +- sql/mysqld.h | 4 +- sql/rpl_parallel.cc | 128 ++- sql/rpl_parallel.h | 16 + sql/rpl_rli.cc | 6 +- sql/rpl_rli.h | 8 + sql/sql_array.h | 4 +- sql/sql_class.cc | 2 + sql/sql_class.h | 9 + sql/xa.cc | 333 +++++- storage/innobase/trx/trx0undo.cc | 3 +- 24 files changed, 2419 insertions(+), 101 deletions(-) create mode 100644 mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result create mode 100644 mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test create mode 100644 mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc create mode 100644 mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result create mode 100644 mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test diff --git a/mysql-test/include/show_binlog_events2.inc b/mysql-test/include/show_binlog_events2.inc index 84c62cced66..416514faea4 100644 --- a/mysql-test/include/show_binlog_events2.inc +++ b/mysql-test/include/show_binlog_events2.inc @@ -1,3 +1,9 @@ +# ==== Usage ==== +# +# [--let $binlog_file= [<FILENAME> | LAST]] +# [--let $binlog_start= <POSITION> ] +# [--let $filter_cid= [0 | 1] + if ($binlog_start) { --let $_binlog_start=$binlog_start @@ -14,4 +20,9 @@ if ($binlog_file) --replace_result "$_from_binlog_start" "from <binlog_start>" $MYSQLTEST_VARDIR MYSQLTEST_VARDIR --replace_column 2 # 5 # --replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/ +if ($filter_cid) +{ +--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/ / cid=[0-9]+// + +} --eval show binlog events $_in_binlog_file from $_binlog_start diff --git a/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result b/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result new file mode 100644 index 00000000000..35b7accfb24 --- /dev/null +++ b/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result @@ -0,0 +1,53 @@ +CREATE TABLE ta (c INT KEY) engine=Aria; +XA START 'xid_a'; +INSERT INTO ta VALUES (1); +XA END 'xid_a'; +XA PREPARE 'xid_a'; +Warnings: +Warning 1030 Got error 131 "Command not supported by the engine" from storage engine Aria +LOAD INDEX INTO CACHE c KEY(PRIMARY); +Table Op Msg_type Msg_text +test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state +test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state +test.c preload_keys error Corrupt +Warnings: +Warning 1196 Some non-transactional changed tables couldn't be rolled back +XA ROLLBACK 'xid_a'; +CREATE TABLE ti (c INT KEY) engine=Innodb; +XA START 'xid_i'; +INSERT INTO ti VALUES (1); +XA END 'xid_i'; +XA PREPARE 'xid_i'; +LOAD INDEX INTO CACHE c KEY(PRIMARY); +Table Op Msg_type Msg_text +test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state +test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state +test.c preload_keys error Corrupt +XA COMMIT 'xid_i'; +SELECT * FROM ti; +c +include/show_binlog_events.inc +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; CREATE TABLE ta (c INT KEY) engine=Aria +master-bin.000001 # Gtid # # BEGIN GTID #-#-# +master-bin.000001 # Annotate_rows # # INSERT INTO ta VALUES (1) +master-bin.000001 # Table_map # # table_id: # (test.ta) +master-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F +master-bin.000001 # Query # # COMMIT +master-bin.000001 # Gtid # # XA START X'7869645f61',X'',1 GTID #-#-# +master-bin.000001 # Query # # XA END X'7869645f61',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7869645f61',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'7869645f61',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; CREATE TABLE ti (c INT KEY) engine=Innodb +master-bin.000001 # Gtid # # XA START X'7869645f69',X'',1 GTID #-#-# +master-bin.000001 # Annotate_rows # # INSERT INTO ti VALUES (1) +master-bin.000001 # Table_map # # table_id: # (test.ti) +master-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F +master-bin.000001 # Query # # XA END X'7869645f69',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7869645f69',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'7869645f69',X'',1 +drop table ta,ti; diff --git a/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test b/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test new file mode 100644 index 00000000000..3a5bb15968e --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test @@ -0,0 +1,30 @@ +--source include/have_binlog_format_row.inc +--source include/have_innodb.inc + +CREATE TABLE ta (c INT KEY) engine=Aria; +XA START 'xid_a'; +INSERT INTO ta VALUES (1); +XA END 'xid_a'; +XA PREPARE 'xid_a'; + +#--error ER_XAER_RMFAIL +LOAD INDEX INTO CACHE c KEY(PRIMARY); + +XA ROLLBACK 'xid_a'; + +CREATE TABLE ti (c INT KEY) engine=Innodb; +XA START 'xid_i'; +INSERT INTO ti VALUES (1); +XA END 'xid_i'; +XA PREPARE 'xid_i'; + +# --error ER_XAER_RMFAIL +LOAD INDEX INTO CACHE c KEY(PRIMARY); + +XA COMMIT 'xid_i'; +SELECT * FROM ti; + +# +--source include/show_binlog_events.inc + +drop table ta,ti; diff --git a/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc b/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc new file mode 100644 index 00000000000..d1f1868d2c9 --- /dev/null +++ b/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc @@ -0,0 +1,442 @@ +# +# Helper file to run the 1-4(a,b) test cases for rpl_xa_concurrent_2pc, +# with either XA COMMIT or XA ROLLBACK used to complete XA transactions. +# +# Parameters +# $xa_complete_sym (string) : COMMIT or ROLLBACK, the action used to complete +# a prepared XA transaction +# + +if (!$xa_complete_sym) +{ + die MTR variable xa_complete_sym not specified, must be either COMMIT or ROLLBACK; +} + +--let $is_xac= 0 +--let $is_xar= 0 + +if (`SELECT strcmp("COMMIT", "$xa_complete_sym") = 0`) +{ + --let $is_xac= 1 +} + +if (`SELECT strcmp("ROLLBACK", "$xa_complete_sym") = 0`) +{ + --let $is_xar= 1 +} + +if (`SELECT !$is_xar && !$is_xac`) +{ + die MTR variable xa_complete_sym invalid, must be either COMMIT or ROLLBACK; +} + + +--echo # +--echo # Initialize test data +--connection slave +--source include/stop_slave.inc +RESET SLAVE; +set @@global.gtid_slave_pos= ""; + +if ($is_xac) +{ +--connection slave +RESET MASTER; + +--connection master +RESET MASTER; +} + +--connection master +create table t1 (a int primary key, b int) engine=innodb; + +# Slave locks this row before updates to pause transaction progress +--let $hold_row= -1 +--let $t1_ctr= 0 +--eval insert into t1 values ($hold_row, 0) + +--source include/save_master_gtid.inc + +--connection slave +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +--source include/stop_slave.inc +set @save_debug= @@GLOBAL.debug_dbug; +set @save_par_thds= @@GLOBAL.slave_parallel_threads; +set @save_par_mode= @@GLOBAL.slave_parallel_mode; +set @@GLOBAL.slave_parallel_threads= 4; +set @@GLOBAL.slave_parallel_mode= optimistic; + +set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends"); + + +--echo # +--echo # Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE +--echo # and XA $xa_complete_sym run concurrently. That is, the +--echo # XA $xa_complete_sym will wait at group commit until the XA PREPARE +--echo # binlogs, and then it will wait again until the XA PREPARE finishes +--echo # preparing in all engines. At this point, the XA $xa_complete_sym will +--echo # run to completion. +--connection master +# For worker thread to hold XAP at dequeue time via debug_sync through +# `hold_worker_on_schedule`. +set @@session.gtid_seq_no= 100; +XA START 'x'; +--eval insert into t1 values ($t1_ctr, 0) +--inc $t1_ctr +XA END 'x'; +XA PREPARE 'x'; +--eval XA $xa_complete_sym 'x' +--source include/save_master_gtid.inc + +--connection slave +# For worker to stop at dequeue event time and after binlogging XA PREPARE +set @@global.debug_dbug= "+d,hold_worker_on_schedule,stop_after_binlog_prepare"; +--source include/start_slave.inc + +--echo # Waiting for XAP to pause when it is pulled from the queue +set debug_sync= "now wait_for reached_pause"; + +--echo # Before the XA PREPARE executes, the XA $xa_complete_sym should wait in group commit.. +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior transaction to commit" +--source include/wait_condition.inc +--echo # ..done + + +--echo # Execute the XA PREPARE +set debug_sync= "now signal continue_worker"; + +--echo # Wait for XA PREPARE to have binlogged, but hold it before it prepares in engines +set debug_sync= "now wait_for xa_prepare_binlogged"; + +--echo # The XA $xa_complete_sym should move on from binlog to wait for the XA PREPARE to complete in engines +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" +--source include/wait_condition.inc +--echo # ..done + +--echo # Signal the XAP to complete in engines (which will automatically signal XAC) +set debug_sync= "now signal continue_xap"; + +--source include/sync_with_master_gtid.inc + +--let $diff_tables=master:test.t1, slave:test.t1 +--source include/diff_tables.inc + +--connection slave +--source include/stop_slave.inc +set @@global.debug_dbug= @save_debug; + + +--echo # +--echo # Test Case 2: If two XA $xa_complete_sym transactions have different +--echo # XIDs, ensure both phases of both transactions all execute concurrently. +--echo # + +--echo # Ensure slave is stopped +--connection slave +--source include/wait_for_slave_to_stop.inc + +# Stop both XAP after their binlogging and before their engine changing +set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare"; + +--connection master +XA START 'x1'; +--eval insert into t1 values ($t1_ctr, 0) +--inc $t1_ctr +XA END 'x1'; +XA PREPARE 'x1'; +--eval XA $xa_complete_sym 'x1' + +XA START 'x2'; +--eval insert into t1 values ($t1_ctr, 0) +--inc $t1_ctr +XA END 'x2'; +XA PREPARE 'x2'; +--eval XA $xa_complete_sym 'x2' +--source include/save_master_gtid.inc + +--connection slave +--source include/start_slave.inc + +# This stage is necessary to avoid XAP_1 <-register-> XAC_2 race in that +# XAC_2 may get stuck in the below WFPT2C state all time until XAP_1 has finished. +# Prove the workers' status are like the following: +--let $count_wait= 2 +--let $wait_condition=SELECT count(*) = $count_wait FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior transaction to commit" +--source include/wait_condition.inc + +--let $count_wait= 2 +--let $wait_condition=SELECT count(*) = $count_wait FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now" +--source include/wait_condition.inc +set debug_sync= "now signal binlog_xap"; + +# wait for two XAP:s arrive at their stations (XAP_1 to next one after binlog is done) +--let $wait_condition=SELECT count(*) >= 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now" +--source include/wait_condition.inc +set debug_sync= "now signal binlog_xap"; + +--echo # Ensuring both phases of both transactions all execute concurrently +# Waiting for both XA "COMPLETE"s to binlog proves this, as they would not pass +# group commit if their preceding XA PREPAREs had not also binlogged +--let $count_xa_wait_workers= 2 + +--let $wait_condition=SELECT count(*) = $count_xa_wait_workers FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" +--source include/wait_condition.inc +--echo # ..done + +--echo # Verify XA PREPARE has binlogged +set debug_sync= "now wait_for xa_prepare_binlogged"; + +--echo # Signal the XAPs to complete in engines (which will automatically signal XACs) +set debug_sync= "now signal continue_xap"; + +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now" +--source include/wait_condition.inc +set debug_sync= "now signal continue_xap"; + +--source include/sync_with_master_gtid.inc + +--let $diff_tables=master:test.t1, slave:test.t1 +--source include/diff_tables.inc + +--connection slave +--source include/stop_slave.inc +set @@global.debug_dbug= @save_debug; + + +--echo # +--echo # Test Case 3: Two current 2-phase XA transactions with matching XIDs +--echo # should run one after the other, while each transaction still allows +--echo # its XA PREPARE and XA $xa_complete_sym to run concurrently + +--echo # Ensure slave is stopped +--connection slave +--source include/wait_for_slave_to_stop.inc + +# Stop both XAP after their binlogging and before their engine changing +set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid"; + +--connection master +XA START 'x'; +--eval insert into t1 values ($t1_ctr, 0) +--inc $t1_ctr +XA END 'x'; +XA PREPARE 'x'; +--eval XA $xa_complete_sym 'x' + +XA START 'x'; +--eval insert into t1 values ($t1_ctr, 0) +--inc $t1_ctr +XA END 'x'; +XA PREPARE 'x'; +--eval XA $xa_complete_sym 'x' +--source include/save_master_gtid.inc + +--connection slave +--source include/start_slave.inc + +# This stage is necessary to avoid XAP_1 <-register-> XAC_2 race in that +# XAC_2 may get stuck in the below WFPT2C state all time until XAP_1 has finished. +--let $count_wait= 3 +--let $wait_condition=SELECT count(*) = $count_wait FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior transaction to commit" + +--source include/wait_condition.inc +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now" +--source include/wait_condition.inc +set debug_sync= "now signal binlog_xap"; + +--echo # Verify first XA PREPARE has binlogged +set debug_sync= "now wait_for xa_prepare_binlogged"; + +--echo # Ensure first XA transaction is running concurrently +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" AND info LIKE "XA $xa_complete_sym%" +--source include/wait_condition.inc + +--echo # Ensure second XA transaction's XAP waits for the first transaction +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" AND info LIKE "XA START%" +--source include/wait_condition.inc + +--echo # Signal first XA PREPARE to complete +set debug_sync= "now signal continue_xap"; + +--echo # Wait for first XA $xa_complete_sym to binlog +set debug_sync= "now wait_for xa_cor_binlogged"; + +--echo # Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released its XID +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" +--source include/wait_condition.inc + +--echo # Signal first XA $xa_complete_sym to complete +set debug_sync= "now signal continue_xa_cor"; + +--echo # Wait for second XA PREPARE to binlogged +--echo # First pass through binlog_xap +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now" +--source include/wait_condition.inc +set debug_sync= "now signal binlog_xap"; +set debug_sync= "now wait_for xa_prepare_binlogged"; + +--echo # Ensure second XA $xa_complete_sym is concurrent with XAP +--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" +--source include/wait_condition.inc + +--echo # Signal second XA transaction to complete +set debug_sync= "now signal continue_xap"; +set debug_sync= "now wait_for xa_cor_binlogged"; +set debug_sync= "now signal continue_xa_cor"; + +--source include/sync_with_master_gtid.inc + +--let $diff_tables=master:test.t1, slave:test.t1 +--source include/diff_tables.inc + +--connection slave +--source include/stop_slave.inc +set @@global.debug_dbug= @save_debug; + + +--echo # +--echo # Test Case 4 (Error Case): If an XA PREPARE errors while its +--echo # XA $xa_complete_sym is waiting on it, both phases should rollback +--echo # successfully. Note this tests both: +--echo # a) XA $xa_complete_sym is waiting in group commit (first phase +--echo # times out in DMLs) +--echo # b) XA $xa_complete_sym is waiting in group commit, with another XAP +--echo # with a duplicate XID waiting on it. + +--echo # Case a) +--echo # Ensure slave is stopped +--connection slave +--source include/wait_for_slave_to_stop.inc +set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout; +set @save_trans_retries= @@GLOBAL.slave_transaction_retries; +set @@global.innodb_lock_wait_timeout= 1; +set @@global.slave_transaction_retries= 0; + +--connection master +XA START 'x'; +--eval update t1 set b=b+1 where a=$hold_row +XA END 'x'; +XA PREPARE 'x'; +--eval XA $xa_complete_sym 'x' +--source include/save_master_gtid.inc + +--connection slave1 +BEGIN; +--eval select * from t1 where a=$hold_row for update; + +--connection slave +--source include/start_slave.inc + +--let $slave_sql_errno= 1205 +--source include/wait_for_slave_sql_error.inc + +--connection slave1 +ROLLBACK; + +--connection slave +# Stop the IO thread too +--source include/stop_slave_io.inc +set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout; +set @@global.slave_transaction_retries= @save_trans_retries; + +--echo # Ensure on slave restart, we can re-execute the XA transaction +--source include/start_slave.inc +--source include/save_master_gtid.inc +--source include/stop_slave.inc +set @@global.debug_dbug= @save_debug; + + +--echo # Case b) +--echo # Ensure slave is stopped +--connection slave +--source include/wait_for_slave_to_stop.inc +set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout; +set @save_trans_retries= @@GLOBAL.slave_transaction_retries; +set @@global.innodb_lock_wait_timeout= 1; +set @@global.slave_transaction_retries= 0; + +--connection master +XA START 'x'; +--eval update t1 set b=b+1 where a=$hold_row +XA END 'x'; +XA PREPARE 'x'; +--eval XA $xa_complete_sym 'x' + +XA START 'x'; +--eval insert into t1 values ($t1_ctr, 0) +--let $new_row_idx= $t1_ctr +--inc $t1_ctr +XA END 'x'; +XA PREPARE 'x'; +--source include/save_master_gtid.inc +--eval XA $xa_complete_sym 'x' + +--connection slave1 +BEGIN; +--eval select * from t1 where a=$hold_row for update; + +--connection slave +--source include/start_slave.inc + +--let $slave_sql_errno= 1205 +--source include/wait_for_slave_sql_error.inc + +--connection slave1 +ROLLBACK; + +--echo # There should not be any prepared rows seen by XA RECOVER +XA RECOVER; + +--echo # Ensuring data from second XAP isn't visible.. +if (`select count(*) from t1 where a=$new_row_idx`) +{ + --die Failed, row exists +} +--echo # ..done + +--connection slave +--source include/stop_slave_io.inc +set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout; +set @@global.slave_transaction_retries= @save_trans_retries; + +--echo # Ensure on slave restart, we can re-execute the XA transaction +--source include/start_slave.inc +--source include/save_master_gtid.inc +--source include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +--source include/start_slave.inc + +--echo # Ensuring data from second XAP is visible.. +if ($is_xac) +{ + --let $expected_row_count= 1 +} +if ($is_xar) +{ + --let $expected_row_count= 0 +} +if (`select count(*) != $expected_row_count from t1 where a=$new_row_idx`) +{ + --die Failed, XA $xa_complete_sym was not observed +} +--echo # ..done + +--echo # +--echo # Cleanup +--connection master +DROP TABLE t1; +--source include/save_master_gtid.inc +--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1) +--source include/show_binlog_events.inc + +--connection slave +--source include/sync_with_master_gtid.inc +--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1) +--let $filter_cid=1 +--source include/show_binlog_events2.inc + +--source include/stop_slave.inc +set @@GLOBAL.slave_parallel_threads= @save_par_thds; +set @@GLOBAL.slave_parallel_mode= @save_par_mode; +--source include/start_slave.inc diff --git a/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result b/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result new file mode 100644 index 00000000000..d30943b23da --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result @@ -0,0 +1,953 @@ +include/master-slave.inc +[connection master] +# +# Initialize test data +connection slave; +include/stop_slave.inc +RESET SLAVE; +set @@global.gtid_slave_pos= ""; +connection slave; +RESET MASTER; +connection master; +RESET MASTER; +connection master; +create table t1 (a int primary key, b int) engine=innodb; +insert into t1 values (-1, 0); +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +set @save_debug= @@GLOBAL.debug_dbug; +set @save_par_thds= @@GLOBAL.slave_parallel_threads; +set @save_par_mode= @@GLOBAL.slave_parallel_mode; +set @@GLOBAL.slave_parallel_threads= 4; +set @@GLOBAL.slave_parallel_mode= optimistic; +set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends"); +# +# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE +# and XA COMMIT run concurrently. That is, the +# XA COMMIT will wait at group commit until the XA PREPARE +# binlogs, and then it will wait again until the XA PREPARE finishes +# preparing in all engines. At this point, the XA COMMIT will +# run to completion. +connection master; +set @@session.gtid_seq_no= 100; +XA START 'x'; +insert into t1 values (0, 0); +XA END 'x'; +XA PREPARE 'x'; +XA COMMIT 'x'; +include/save_master_gtid.inc +connection slave; +set @@global.debug_dbug= "+d,hold_worker_on_schedule,stop_after_binlog_prepare"; +include/start_slave.inc +# Waiting for XAP to pause when it is pulled from the queue +set debug_sync= "now wait_for reached_pause"; +# Before the XA PREPARE executes, the XA COMMIT should wait in group commit.. +# ..done +# Execute the XA PREPARE +set debug_sync= "now signal continue_worker"; +# Wait for XA PREPARE to have binlogged, but hold it before it prepares in engines +set debug_sync= "now wait_for xa_prepare_binlogged"; +# The XA COMMIT should move on from binlog to wait for the XA PREPARE to complete in engines +# ..done +# Signal the XAP to complete in engines (which will automatically signal XAC) +set debug_sync= "now signal continue_xap"; +include/sync_with_master_gtid.inc +include/diff_tables.inc [master:test.t1, slave:test.t1] +connection slave; +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# +# Test Case 2: If two XA COMMIT transactions have different +# XIDs, ensure both phases of both transactions all execute concurrently. +# +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare"; +connection master; +XA START 'x1'; +insert into t1 values (1, 0); +XA END 'x1'; +XA PREPARE 'x1'; +XA COMMIT 'x1'; +XA START 'x2'; +insert into t1 values (2, 0); +XA END 'x2'; +XA PREPARE 'x2'; +XA COMMIT 'x2'; +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +set debug_sync= "now signal binlog_xap"; +set debug_sync= "now signal binlog_xap"; +# Ensuring both phases of both transactions all execute concurrently +# ..done +# Verify XA PREPARE has binlogged +set debug_sync= "now wait_for xa_prepare_binlogged"; +# Signal the XAPs to complete in engines (which will automatically signal XACs) +set debug_sync= "now signal continue_xap"; +set debug_sync= "now signal continue_xap"; +include/sync_with_master_gtid.inc +include/diff_tables.inc [master:test.t1, slave:test.t1] +connection slave; +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# +# Test Case 3: Two current 2-phase XA transactions with matching XIDs +# should run one after the other, while each transaction still allows +# its XA PREPARE and XA COMMIT to run concurrently +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid"; +connection master; +XA START 'x'; +insert into t1 values (3, 0); +XA END 'x'; +XA PREPARE 'x'; +XA COMMIT 'x'; +XA START 'x'; +insert into t1 values (4, 0); +XA END 'x'; +XA PREPARE 'x'; +XA COMMIT 'x'; +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +set debug_sync= "now signal binlog_xap"; +# Verify first XA PREPARE has binlogged +set debug_sync= "now wait_for xa_prepare_binlogged"; +# Ensure first XA transaction is running concurrently +# Ensure second XA transaction's XAP waits for the first transaction +# Signal first XA PREPARE to complete +set debug_sync= "now signal continue_xap"; +# Wait for first XA COMMIT to binlog +set debug_sync= "now wait_for xa_cor_binlogged"; +# Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released its XID +# Signal first XA COMMIT to complete +set debug_sync= "now signal continue_xa_cor"; +# Wait for second XA PREPARE to binlogged +# First pass through binlog_xap +set debug_sync= "now signal binlog_xap"; +set debug_sync= "now wait_for xa_prepare_binlogged"; +# Ensure second XA COMMIT is concurrent with XAP +# Signal second XA transaction to complete +set debug_sync= "now signal continue_xap"; +set debug_sync= "now wait_for xa_cor_binlogged"; +set debug_sync= "now signal continue_xa_cor"; +include/sync_with_master_gtid.inc +include/diff_tables.inc [master:test.t1, slave:test.t1] +connection slave; +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# +# Test Case 4 (Error Case): If an XA PREPARE errors while its +# XA COMMIT is waiting on it, both phases should rollback +# successfully. Note this tests both: +# a) XA COMMIT is waiting in group commit (first phase +# times out in DMLs) +# b) XA COMMIT is waiting in group commit, with another XAP +# with a duplicate XID waiting on it. +# Case a) +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout; +set @save_trans_retries= @@GLOBAL.slave_transaction_retries; +set @@global.innodb_lock_wait_timeout= 1; +set @@global.slave_transaction_retries= 0; +connection master; +XA START 'x'; +update t1 set b=b+1 where a=-1; +XA END 'x'; +XA PREPARE 'x'; +XA COMMIT 'x'; +include/save_master_gtid.inc +connection slave1; +BEGIN; +select * from t1 where a=-1 for update;; +a b +-1 0 +connection slave; +include/start_slave.inc +include/wait_for_slave_sql_error.inc [errno=1205] +connection slave1; +ROLLBACK; +connection slave; +include/stop_slave_io.inc +set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout; +set @@global.slave_transaction_retries= @save_trans_retries; +# Ensure on slave restart, we can re-execute the XA transaction +include/start_slave.inc +include/save_master_gtid.inc +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# Case b) +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout; +set @save_trans_retries= @@GLOBAL.slave_transaction_retries; +set @@global.innodb_lock_wait_timeout= 1; +set @@global.slave_transaction_retries= 0; +connection master; +XA START 'x'; +update t1 set b=b+1 where a=-1; +XA END 'x'; +XA PREPARE 'x'; +XA COMMIT 'x'; +XA START 'x'; +insert into t1 values (5, 0); +XA END 'x'; +XA PREPARE 'x'; +include/save_master_gtid.inc +XA COMMIT 'x'; +connection slave1; +BEGIN; +select * from t1 where a=-1 for update;; +a b +-1 1 +connection slave; +include/start_slave.inc +include/wait_for_slave_sql_error.inc [errno=1205] +connection slave1; +ROLLBACK; +# There should not be any prepared rows seen by XA RECOVER +XA RECOVER; +formatID gtrid_length bqual_length data +# Ensuring data from second XAP isn't visible.. +# ..done +connection slave; +include/stop_slave_io.inc +set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout; +set @@global.slave_transaction_retries= @save_trans_retries; +# Ensure on slave restart, we can re-execute the XA transaction +include/start_slave.inc +include/save_master_gtid.inc +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +include/start_slave.inc +# Ensuring data from second XAP is visible.. +# ..done +# +# Cleanup +connection master; +DROP TABLE t1; +include/save_master_gtid.inc +include/show_binlog_events.inc +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; create table t1 (a int primary key, b int) engine=innodb +master-bin.000001 # Gtid # # BEGIN GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (-1, 0) +master-bin.000001 # Xid # # COMMIT /* XID */ +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (0, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'7831',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (1, 0) +master-bin.000001 # Query # # XA END X'7831',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7831',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'7831',X'',1 +master-bin.000001 # Gtid # # XA START X'7832',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (2, 0) +master-bin.000001 # Query # # XA END X'7832',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7832',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'7832',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (3, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (4, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1 +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1 +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (5, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; DROP TABLE `t1` /* generated by server */ +connection slave; +include/sync_with_master_gtid.inc +show binlog events in 'slave-bin.000001' from <binlog_start>; +Log_name Pos Event_type Server_id End_log_pos Info +slave-bin.000001 # Gtid_list 2 # [] +slave-bin.000001 # Binlog_checkpoint 2 # slave-bin.000001 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb +slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0) +slave-bin.000001 # Xid 1 # COMMIT /* XID */ +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0) +slave-bin.000001 # Query 1 # XA END X'7831',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'7831',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0) +slave-bin.000001 # Query 1 # XA END X'7832',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'7832',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */ +include/stop_slave.inc +set @@GLOBAL.slave_parallel_threads= @save_par_thds; +set @@GLOBAL.slave_parallel_mode= @save_par_mode; +include/start_slave.inc +# +# Initialize test data +connection slave; +include/stop_slave.inc +RESET SLAVE; +set @@global.gtid_slave_pos= ""; +connection master; +create table t1 (a int primary key, b int) engine=innodb; +insert into t1 values (-1, 0); +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +set @save_debug= @@GLOBAL.debug_dbug; +set @save_par_thds= @@GLOBAL.slave_parallel_threads; +set @save_par_mode= @@GLOBAL.slave_parallel_mode; +set @@GLOBAL.slave_parallel_threads= 4; +set @@GLOBAL.slave_parallel_mode= optimistic; +set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends"); +# +# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE +# and XA ROLLBACK run concurrently. That is, the +# XA ROLLBACK will wait at group commit until the XA PREPARE +# binlogs, and then it will wait again until the XA PREPARE finishes +# preparing in all engines. At this point, the XA ROLLBACK will +# run to completion. +connection master; +set @@session.gtid_seq_no= 100; +XA START 'x'; +insert into t1 values (0, 0); +XA END 'x'; +XA PREPARE 'x'; +XA ROLLBACK 'x'; +include/save_master_gtid.inc +connection slave; +set @@global.debug_dbug= "+d,hold_worker_on_schedule,stop_after_binlog_prepare"; +include/start_slave.inc +# Waiting for XAP to pause when it is pulled from the queue +set debug_sync= "now wait_for reached_pause"; +# Before the XA PREPARE executes, the XA ROLLBACK should wait in group commit.. +# ..done +# Execute the XA PREPARE +set debug_sync= "now signal continue_worker"; +# Wait for XA PREPARE to have binlogged, but hold it before it prepares in engines +set debug_sync= "now wait_for xa_prepare_binlogged"; +# The XA ROLLBACK should move on from binlog to wait for the XA PREPARE to complete in engines +# ..done +# Signal the XAP to complete in engines (which will automatically signal XAC) +set debug_sync= "now signal continue_xap"; +include/sync_with_master_gtid.inc +include/diff_tables.inc [master:test.t1, slave:test.t1] +connection slave; +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# +# Test Case 2: If two XA ROLLBACK transactions have different +# XIDs, ensure both phases of both transactions all execute concurrently. +# +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare"; +connection master; +XA START 'x1'; +insert into t1 values (1, 0); +XA END 'x1'; +XA PREPARE 'x1'; +XA ROLLBACK 'x1'; +XA START 'x2'; +insert into t1 values (2, 0); +XA END 'x2'; +XA PREPARE 'x2'; +XA ROLLBACK 'x2'; +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +set debug_sync= "now signal binlog_xap"; +set debug_sync= "now signal binlog_xap"; +# Ensuring both phases of both transactions all execute concurrently +# ..done +# Verify XA PREPARE has binlogged +set debug_sync= "now wait_for xa_prepare_binlogged"; +# Signal the XAPs to complete in engines (which will automatically signal XACs) +set debug_sync= "now signal continue_xap"; +set debug_sync= "now signal continue_xap"; +include/sync_with_master_gtid.inc +include/diff_tables.inc [master:test.t1, slave:test.t1] +connection slave; +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# +# Test Case 3: Two current 2-phase XA transactions with matching XIDs +# should run one after the other, while each transaction still allows +# its XA PREPARE and XA ROLLBACK to run concurrently +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid"; +connection master; +XA START 'x'; +insert into t1 values (3, 0); +XA END 'x'; +XA PREPARE 'x'; +XA ROLLBACK 'x'; +XA START 'x'; +insert into t1 values (4, 0); +XA END 'x'; +XA PREPARE 'x'; +XA ROLLBACK 'x'; +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +set debug_sync= "now signal binlog_xap"; +# Verify first XA PREPARE has binlogged +set debug_sync= "now wait_for xa_prepare_binlogged"; +# Ensure first XA transaction is running concurrently +# Ensure second XA transaction's XAP waits for the first transaction +# Signal first XA PREPARE to complete +set debug_sync= "now signal continue_xap"; +# Wait for first XA ROLLBACK to binlog +set debug_sync= "now wait_for xa_cor_binlogged"; +# Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released its XID +# Signal first XA ROLLBACK to complete +set debug_sync= "now signal continue_xa_cor"; +# Wait for second XA PREPARE to binlogged +# First pass through binlog_xap +set debug_sync= "now signal binlog_xap"; +set debug_sync= "now wait_for xa_prepare_binlogged"; +# Ensure second XA ROLLBACK is concurrent with XAP +# Signal second XA transaction to complete +set debug_sync= "now signal continue_xap"; +set debug_sync= "now wait_for xa_cor_binlogged"; +set debug_sync= "now signal continue_xa_cor"; +include/sync_with_master_gtid.inc +include/diff_tables.inc [master:test.t1, slave:test.t1] +connection slave; +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# +# Test Case 4 (Error Case): If an XA PREPARE errors while its +# XA ROLLBACK is waiting on it, both phases should rollback +# successfully. Note this tests both: +# a) XA ROLLBACK is waiting in group commit (first phase +# times out in DMLs) +# b) XA ROLLBACK is waiting in group commit, with another XAP +# with a duplicate XID waiting on it. +# Case a) +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout; +set @save_trans_retries= @@GLOBAL.slave_transaction_retries; +set @@global.innodb_lock_wait_timeout= 1; +set @@global.slave_transaction_retries= 0; +connection master; +XA START 'x'; +update t1 set b=b+1 where a=-1; +XA END 'x'; +XA PREPARE 'x'; +XA ROLLBACK 'x'; +include/save_master_gtid.inc +connection slave1; +BEGIN; +select * from t1 where a=-1 for update;; +a b +-1 0 +connection slave; +include/start_slave.inc +include/wait_for_slave_sql_error.inc [errno=1205] +connection slave1; +ROLLBACK; +connection slave; +include/stop_slave_io.inc +set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout; +set @@global.slave_transaction_retries= @save_trans_retries; +# Ensure on slave restart, we can re-execute the XA transaction +include/start_slave.inc +include/save_master_gtid.inc +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +# Case b) +# Ensure slave is stopped +connection slave; +include/wait_for_slave_to_stop.inc +set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout; +set @save_trans_retries= @@GLOBAL.slave_transaction_retries; +set @@global.innodb_lock_wait_timeout= 1; +set @@global.slave_transaction_retries= 0; +connection master; +XA START 'x'; +update t1 set b=b+1 where a=-1; +XA END 'x'; +XA PREPARE 'x'; +XA ROLLBACK 'x'; +XA START 'x'; +insert into t1 values (5, 0); +XA END 'x'; +XA PREPARE 'x'; +include/save_master_gtid.inc +XA ROLLBACK 'x'; +connection slave1; +BEGIN; +select * from t1 where a=-1 for update;; +a b +-1 0 +connection slave; +include/start_slave.inc +include/wait_for_slave_sql_error.inc [errno=1205] +connection slave1; +ROLLBACK; +# There should not be any prepared rows seen by XA RECOVER +XA RECOVER; +formatID gtrid_length bqual_length data +# Ensuring data from second XAP isn't visible.. +# ..done +connection slave; +include/stop_slave_io.inc +set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout; +set @@global.slave_transaction_retries= @save_trans_retries; +# Ensure on slave restart, we can re-execute the XA transaction +include/start_slave.inc +include/save_master_gtid.inc +include/stop_slave.inc +set @@global.debug_dbug= @save_debug; +include/start_slave.inc +# Ensuring data from second XAP is visible.. +# ..done +# +# Cleanup +connection master; +DROP TABLE t1; +include/save_master_gtid.inc +include/show_binlog_events.inc +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; create table t1 (a int primary key, b int) engine=innodb +master-bin.000001 # Gtid # # BEGIN GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (-1, 0) +master-bin.000001 # Xid # # COMMIT /* XID */ +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (0, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'7831',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (1, 0) +master-bin.000001 # Query # # XA END X'7831',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7831',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'7831',X'',1 +master-bin.000001 # Gtid # # XA START X'7832',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (2, 0) +master-bin.000001 # Query # # XA END X'7832',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7832',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'7832',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (3, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (4, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1 +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1 +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (5, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA COMMIT X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; DROP TABLE `t1` /* generated by server */ +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; create table t1 (a int primary key, b int) engine=innodb +master-bin.000001 # Gtid # # BEGIN GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (-1, 0) +master-bin.000001 # Xid # # COMMIT /* XID */ +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (0, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'7831',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (1, 0) +master-bin.000001 # Query # # XA END X'7831',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7831',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'7831',X'',1 +master-bin.000001 # Gtid # # XA START X'7832',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (2, 0) +master-bin.000001 # Query # # XA END X'7832',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'7832',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'7832',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (3, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (4, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1 +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1 +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1 +master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-# +master-bin.000001 # Query # # use `test`; insert into t1 values (5, 0) +master-bin.000001 # Query # # XA END X'78',X'',1 +master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1 +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; DROP TABLE `t1` /* generated by server */ +connection slave; +include/sync_with_master_gtid.inc +show binlog events in 'slave-bin.000001' from <binlog_start>; +Log_name Pos Event_type Server_id End_log_pos Info +slave-bin.000001 # Gtid_list 2 # [] +slave-bin.000001 # Binlog_checkpoint 2 # slave-bin.000001 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb +slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0) +slave-bin.000001 # Xid 1 # COMMIT /* XID */ +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0) +slave-bin.000001 # Query 1 # XA END X'7831',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'7831',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0) +slave-bin.000001 # Query 1 # XA END X'7832',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'7832',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */ +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb +slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0) +slave-bin.000001 # Xid 1 # COMMIT /* XID */ +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0) +slave-bin.000001 # Query 1 # XA END X'7831',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'7831',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0) +slave-bin.000001 # Query 1 # XA END X'7832',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'7832',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */ +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb +slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0) +slave-bin.000001 # Xid 1 # COMMIT /* XID */ +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0) +slave-bin.000001 # Query 1 # XA END X'7831',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'7831',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0) +slave-bin.000001 # Query 1 # XA END X'7832',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'7832',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1 +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1 +slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0) +slave-bin.000001 # Query 1 # XA END X'78',X'',1 +slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1 +slave-bin.000001 # Gtid 1 # GTID #-#-# +slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */ +include/stop_slave.inc +set @@GLOBAL.slave_parallel_threads= @save_par_thds; +set @@GLOBAL.slave_parallel_mode= @save_par_mode; +include/start_slave.inc +# +# Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly +# setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the +# replica should report ER_XAER_NOTA. +connection master; +create table t1 (a int) engine=innodb; +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +call mtr.add_suppression("XAER_NOTA: Unknown XID"); +include/stop_slave.inc +change master to master_use_gtid = slave_pos; +connection master; +xa start '1'; +insert into t1 set a=1; +xa end '1'; +xa prepare '1'; +xa rollback '1'; +insert into t1 set a=2; +include/save_master_gtid.inc +connection slave; +set @save_gtid_slave_pos= @@global.gtid_slave_pos; +SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1) +into @gtid_skip +FROM mysql.gtid_slave_pos +WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1; +set @@global.gtid_slave_pos = @gtid_skip; +start slave; +include/wait_for_slave_sql_error.inc [errno=1397] +select count(*) = 2 % 2 as 'must be true' from t1;; +must be true +1 +include/stop_slave.inc +set @@global.gtid_slave_pos = @save_gtid_slave_pos; +show warnings; +Level Code Message +Warning 1947 Specified GTID <value> conflicts with the binary log which contains a more recent GTID <value>. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos +include/start_slave.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +change master to master_use_gtid = slave_pos; +connection master; +xa start '1'; +insert into t1 set a=1; +xa end '1'; +xa prepare '1'; +xa commit '1'; +insert into t1 set a=2; +include/save_master_gtid.inc +connection slave; +set @save_gtid_slave_pos= @@global.gtid_slave_pos; +SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1) +into @gtid_skip +FROM mysql.gtid_slave_pos +WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1; +set @@global.gtid_slave_pos = @gtid_skip; +start slave; +include/wait_for_slave_sql_error.inc [errno=1397] +select count(*) = 1 % 2 as 'must be true' from t1;; +must be true +1 +include/stop_slave.inc +set @@global.gtid_slave_pos = @save_gtid_slave_pos; +show warnings; +Level Code Message +include/start_slave.inc +include/sync_with_master_gtid.inc +connection master; +drop table t1; +connection slave; +include/rpl_end.inc +# End of rpl_xa_concurrent_2pc.test diff --git a/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result b/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result index f3ea53c219a..92a820d6753 100644 --- a/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result +++ b/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result @@ -1165,5 +1165,56 @@ connection server_1; set @@binlog_format = @sav_binlog_format; set @@global.binlog_format = @sav_binlog_format; connection server_1; +create table t_not_in_binlog (a int) engine=innodb; +flush logs; +include/save_master_gtid.inc +connect con1,localhost,root,,; +call mtr.add_suppression("XAER_NOTA: Unknown XID"); +SET sql_log_bin=0; +XA START 'a'; +insert into t_not_in_binlog set a=1; +XA END 'a'; +XA PREPARE 'a'; +disconnect con1; +connection server_1; +xa recover; +formatID gtrid_length bqual_length data +1 1 0 a +XA ROLLBACK 'a'; +drop table t_not_in_binlog; +include/save_master_gtid.inc +connection server_2; +XAER_NOTA: Unknown XID +include/wait_for_slave_sql_error.inc [errno=1397] +connect con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,; +SET sql_log_bin=0; +XA START 'a'; +insert into t_not_in_binlog set a=1; +XA END 'a'; +XA PREPARE 'a'; +disconnect con2; +connection server_2; +xa recover; +formatID gtrid_length bqual_length data +1 1 0 a +include/start_slave.inc +include/sync_with_master_gtid.inc +connection server_3; +XAER_NOTA: Unknown XID +include/wait_for_slave_sql_error.inc [errno=1397] +connect con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,; +SET sql_log_bin=0; +XA START 'a'; +insert into t_not_in_binlog set a=1; +XA END 'a'; +XA PREPARE 'a'; +disconnect con3; +connection server_3; +xa recover; +formatID gtrid_length bqual_length data +1 1 0 a +include/start_slave.inc +include/sync_with_master_gtid.inc +connection server_1; include/rpl_end.inc # End of rpl_xa_empty_transaction.test diff --git a/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result b/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result index dd0d132471e..a3f5414b9da 100644 --- a/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result +++ b/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result @@ -42,7 +42,7 @@ connection master; drop table t1; connection slave; # TODO: Remove after fixing MDEV-21777 -set @@global.gtid_slave_pos= "0-1-100"; +set @@global.gtid_slave_pos= "0-1-101"; set @@global.slave_parallel_threads= @save_par_thds; set @@global.gtid_strict_mode= @save_strict_mode; set @@global.innodb_lock_wait_timeout= @save_innodb_lock_wait_timeout; diff --git a/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test b/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test new file mode 100644 index 00000000000..d762114ff26 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test @@ -0,0 +1,111 @@ +# +# This test ensures that two-phase XA transactions have their first and +# second phases parallelized for both XA COMMIT and XA ROLLBACK. It ensures the +# following behaviors: +# +# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE and +# XA COMMIT/ROLLBACK run concurrently. That is, the XA COMMIT/ROLLBACK will +# wait at group commit until the XA PREPARE binlogs, and then it will wait +# again until the XA PREPARE finishes preparing in all engines. At this point, +# the XA COMMIT/ROLLBACK will run to completion. +# +# Test Case 2: If two XA transactions have different XIDs, if XA COMMIT ends +# a transaction, ensure both phases of both transactions can all execute +# concurrently. +# +# Test Case 3: Two current 2-phase XA transactions with matching XIDs should +# run one after the other, while each transaction still allows both phases of +# its own transaction to run concurrently. +# +# Test Case 4: Error Case. If an XAP errors while its XAC/R is waiting on it, +# both the XAP and XAC/R should rollback successfully. Note this tests both: +# a) XAC/R is waiting in group commit (first phase times out in DMLs) +# b) XAC/R is waiting in group commit, with another XAP with a duplicate XID +# waiting on it. +# +# Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly +# setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the +# replica should report ER_XAER_NOTA. +# +# +# References: +# MDEV-31949: slow parallel replication of user xa +# +--source include/have_debug.inc +--source include/have_innodb.inc +--source include/have_binlog_format_mixed.inc +--source include/master-slave.inc + +--let $xa_complete_sym= COMMIT +--source include/rpl_xa_concurrent_2pc.inc + +--let $xa_complete_sym= ROLLBACK +--source include/rpl_xa_concurrent_2pc.inc + + +--echo # +--echo # Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly +--echo # setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the +--echo # replica should report ER_XAER_NOTA. + +--connection master +create table t1 (a int) engine=innodb; +--source include/save_master_gtid.inc + +--connection slave +--source include/sync_with_master_gtid.inc +call mtr.add_suppression("XAER_NOTA: Unknown XID"); + +--let $i=2 +while ($i) +{ + --source include/stop_slave.inc + --replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/ + change master to master_use_gtid = slave_pos; + + --connection master + --let $complete=rollback + if ($i == 1) + { + --let $complete=commit + } + xa start '1'; insert into t1 set a=1; xa end '1'; xa prepare '1'; + --eval xa $complete '1' + insert into t1 set a=2; + --source include/save_master_gtid.inc + + --connection slave + + # reposition the slave to skip one transaction from master + set @save_gtid_slave_pos= @@global.gtid_slave_pos; + SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1) + into @gtid_skip + FROM mysql.gtid_slave_pos + WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1; + set @@global.gtid_slave_pos = @gtid_skip; + + start slave; + let $slave_sql_errno= 1397; # ER_XAER_NOTA + source include/wait_for_slave_sql_error.inc; + --eval select count(*) = $i % 2 as 'must be true' from t1; + --source include/stop_slave.inc + + --disable_warnings + set @@global.gtid_slave_pos = @save_gtid_slave_pos; + --enable_warnings + --replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/ + show warnings; + --source include/start_slave.inc + --source include/sync_with_master_gtid.inc + + --dec $i +} + +# MDEV-31949 cleanup +--connection master +drop table t1; + +--sync_slave_with_master + +--source include/rpl_end.inc +--echo # End of rpl_xa_concurrent_2pc.test diff --git a/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test b/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test index 61cc0621d5a..f43af653ace 100644 --- a/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test +++ b/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test @@ -167,6 +167,95 @@ set @@global.binlog_format = row; set @@binlog_format = @sav_binlog_format; set @@global.binlog_format = @sav_binlog_format; + +# MDEV-32257 dangling XA-rollback in binlog from emtpy XA +# create a case of XA ROLLBACK gets to binlog while its XAP was not and +# try replicate it. +# Expected result is both slaves error out. +--connection server_1 +--let $binlog_start = query_get_value(SHOW MASTER STATUS, Position, 1) +--let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1) +create table t_not_in_binlog (a int) engine=innodb; +flush logs; +--source include/save_master_gtid.inc +--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1) + +# External connection XID access after disconnect is subject to race. +# "(" open parenthesis to remember # of connection before ... +--source include/count_sessions.inc + +--connect(con1,localhost,root,,) +call mtr.add_suppression("XAER_NOTA: Unknown XID"); + +SET sql_log_bin=0; +XA START 'a'; +insert into t_not_in_binlog set a=1; +XA END 'a'; +XA PREPARE 'a'; +--disconnect con1 + +--connection server_1 +# .. ")" close parenthesis, to wait until con1 fully releases access to xid. +--source include/wait_until_count_sessions.inc +xa recover; +# +# replicate orphan XAR to server 2,3 and expect the error first +# after that compensate it. + +--error 0 +XA ROLLBACK 'a'; +# cleanup at once +drop table t_not_in_binlog; +--source include/save_master_gtid.inc + +--connection server_2 +--echo XAER_NOTA: Unknown XID +--let $slave_sql_errno= 1397 +--source include/wait_for_slave_sql_error.inc + +# "(" +--source include/count_sessions.inc + +--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,) +SET sql_log_bin=0; +XA START 'a'; +insert into t_not_in_binlog set a=1; +XA END 'a'; +XA PREPARE 'a'; +--disconnect con2 + +--connection server_2 +# ")" +--source include/wait_until_count_sessions.inc + +xa recover; +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc + +--connection server_3 +--echo XAER_NOTA: Unknown XID +--let $slave_sql_errno= 1397 +--source include/wait_for_slave_sql_error.inc + +# "(" +--source include/count_sessions.inc + +--connect (con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,) +SET sql_log_bin=0; +XA START 'a'; +insert into t_not_in_binlog set a=1; +XA END 'a'; +XA PREPARE 'a'; +--disconnect con3 + +--connection server_3 +# ")" +--source include/wait_until_count_sessions.inc + +xa recover; +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc + # # Cleanup --connection server_1 diff --git a/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test b/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test index aa1b088ed23..72589953ac0 100644 --- a/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test +++ b/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test @@ -56,8 +56,8 @@ xa start '1'; update t1 set b=b+10 where a=1; xa end '1'; xa prepare '1'; ---let $new_gtid= `SELECT @@global.gtid_binlog_pos` xa commit '1'; +--let $new_gtid= `SELECT @@global.gtid_binlog_pos` --source include/save_master_gtid.inc --connection slave1 diff --git a/sql/handler.cc b/sql/handler.cc index 1ea1818749c..7fa78e4d9b2 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2180,13 +2180,13 @@ int ha_rollback_trans(THD *thd, bool all) rollback without signalling following transactions. And in release builds, we explicitly do the signalling before rolling back. */ - DBUG_ASSERT( - !(thd->rgi_slave && - !thd->rgi_slave->worker_error && - thd->rgi_slave->did_mark_start_commit) || - (thd->transaction->xid_state.is_explicit_XA() || - (thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_PREPARED_XA))); - + DBUG_ASSERT(!(thd->rgi_slave && + !thd->rgi_slave->worker_error && + thd->rgi_slave->did_mark_start_commit) || + (thd->transaction->xid_state.is_explicit_XA() || + (thd->rgi_slave->gtid_ev_flags2 & + (Gtid_log_event::FL_PREPARED_XA | + Gtid_log_event::FL_COMPLETED_XA)))); if (thd->rgi_slave && !thd->rgi_slave->worker_error && thd->rgi_slave->did_mark_start_commit) @@ -2343,6 +2343,15 @@ int ha_commit_or_rollback_by_xid(XID *xid, bool commit) else binlog_rollback_by_xid(binlog_hton, xid); +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF( + "stop_after_binlog_cor_by_xid", + DBUG_ASSERT(!debug_sync_set_action( + current_thd, + STRING_WITH_LEN( + "now SIGNAL xa_cor_binlogged WAIT_FOR continue_xa_cor")));); +#endif + plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &xaop); diff --git a/sql/log.cc b/sql/log.cc index eef8d86e4da..78a9419fb15 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -101,6 +101,8 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, Log_event *end_ev, bool all, bool using_stmt, bool using_trx, bool is_ro_1pc); +XID_cache_element *xid_cache_search_maybe_wait(THD *thd); + static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; @@ -1751,7 +1753,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, if ((using_stmt && !cache_mngr->stmt_cache.empty()) || (using_trx && !cache_mngr->trx_cache.empty()) || - thd->transaction->xid_state.is_explicit_XA()) + (thd->transaction->xid_state.is_explicit_XA() || + (thd->rgi_slave && thd->rgi_slave->is_async_xac))) { if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE)) DBUG_RETURN(1); @@ -1858,11 +1861,19 @@ binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr, if (thd->lex->sql_command == SQLCOM_XA_COMMIT && thd->lex->xa_opt != XA_ONE_PHASE) { - DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA()); + bool is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac); + DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA() || is_async_xac); DBUG_ASSERT(thd->transaction->xid_state.get_state_code() == - XA_PREPARED); - - buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(), + XA_PREPARED || is_async_xac); + DBUG_ASSERT(is_async_xac || + thd->lex->xid->eq(thd->transaction->xid_state.get_xid())); + /* + While xid_state.get_xid() is a robust method to access `xid` + it can't be used on slave by the asynchronously running XA-"complete". + In the latter case thd->lex->xid is safely accessible. + */ + buflen= serialize_with_xid(is_async_xac? thd->lex->xid : + thd->transaction->xid_state.get_xid(), buf, query, q_len); } Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); @@ -1888,13 +1899,19 @@ binlog_rollback_flush_trx_cache(THD *thd, bool all, const size_t q_len= sizeof(query) - 1; // do not count trailing 0 char buf[q_len + ser_buf_size]= "ROLLBACK"; size_t buflen= sizeof("ROLLBACK") - 1; + bool is_async_xac= false; - if (thd->transaction->xid_state.is_explicit_XA()) + if (thd->transaction->xid_state.is_explicit_XA() || + (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac))) { /* for not prepared use plain ROLLBACK */ - if (thd->transaction->xid_state.get_state_code() == XA_PREPARED) - buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(), + if (thd->transaction->xid_state.get_state_code() == XA_PREPARED || + is_async_xac) + { + buflen= serialize_with_xid(is_async_xac? thd->lex->xid : + thd->transaction->xid_state.get_xid(), buf, query, q_len); + } } Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); @@ -1985,11 +2002,71 @@ inline bool is_preparing_xa(THD *thd) static int binlog_prepare(handlerton *hton, THD *thd, bool all) { + int rc; + /* Do nothing unless the transaction is a user XA. */ - return is_preparing_xa(thd) ? binlog_commit(thd, all, FALSE) : 0; + if (is_preparing_xa(thd)) + { + DBUG_EXECUTE_IF( + "stop_before_binlog_prepare", + DBUG_ASSERT(!debug_sync_set_action( + thd, STRING_WITH_LEN("now WAIT_FOR binlog_xap")));); + + rc= binlog_commit(thd, all, FALSE); + +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF( + "stop_after_binlog_prepare", + DBUG_ASSERT(!debug_sync_set_action( + thd, + STRING_WITH_LEN( + "now SIGNAL xa_prepare_binlogged WAIT_FOR continue_xap")));); +#endif + } + else + { + rc= 0; + } + + return rc; } +/** + @c acquire_xid takes control by slave worker's THD over a xid record the system + xid cache. Implicitly provided @c xid corresponds to a being asynchronously + handled XA-"complete". + + @param thd the thread handler + @return false as success, true otherwise +*/ +static bool acquire_xid(THD *thd) +{ + bool rc= false; + + if (thd->rgi_slave && thd->rgi_slave->is_async_xac && + thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_COMPLETED_XA) + { + XID_STATE &xid_state= thd->transaction->xid_state; + + auto xs= xid_cache_search_maybe_wait(thd); + xid_state.xid_cache_element= xs; + if (!xs) + { + DBUG_ASSERT(thd->is_killed()); + + rpl_gtid *gtid= &thd->rgi_slave->current_gtid; + my_error(ER_XAER_RMERR, MYF(0)); + sql_print_error("XA COMMIT of GTID %u-%u-%ll could not complete " + "after having been logged into binary log", + gtid->domain_id, gtid->server_id, gtid->seq_no); + rc= true; + } + } + + return rc; +} + int binlog_commit_by_xid(handlerton *hton, XID *xid) { int rc= 0; @@ -1997,28 +2074,38 @@ int binlog_commit_by_xid(handlerton *hton, XID *xid) if (thd->is_current_stmt_binlog_disabled()) { - return thd->wait_for_prior_commit(); + rc= thd->wait_for_prior_commit(); } + else + { + /* the asserted state can't be reachable with xa commit */ + DBUG_ASSERT(!thd->get_stmt_da()->is_error() || + thd->get_stmt_da()->sql_errno() != ER_XA_RBROLLBACK); + /* + This is a recovered user xa transaction commit. + Create a "temporary" binlog transaction to write the commit record + into binlog. + */ + THD_TRANS trans; + trans.ha_list= NULL; - /* the asserted state can't be reachable with xa commit */ - DBUG_ASSERT(!thd->get_stmt_da()->is_error() || - thd->get_stmt_da()->sql_errno() != ER_XA_RBROLLBACK); - /* - This is a recovered user xa transaction commit. - Create a "temporary" binlog transaction to write the commit record - into binlog. - */ - THD_TRANS trans; - trans.ha_list= NULL; - - thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton); - thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write(); - (void) thd->binlog_setup_trx_data(); + thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton); + thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write(); + (void) thd->binlog_setup_trx_data(); - DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); - rc= binlog_commit(thd, TRUE, FALSE); - thd->ha_data[binlog_hton->slot].ha_info[1].reset(); + rc= binlog_commit(thd, TRUE, FALSE); + thd->ha_data[binlog_hton->slot].ha_info[1].reset(); + } + if (!rc) + { + rc= acquire_xid(thd); + } + if (thd->is_current_stmt_binlog_disabled()) + { + thd->wakeup_subsequent_commits(rc); + } return rc; } @@ -2031,33 +2118,54 @@ int binlog_rollback_by_xid(handlerton *hton, XID *xid) if (thd->is_current_stmt_binlog_disabled()) { - return thd->wait_for_prior_commit(); + rc= thd->wait_for_prior_commit(); } + else if (thd->get_stmt_da()->is_error() && + thd->get_stmt_da()->sql_errno() == ER_XA_RBROLLBACK) + rc= true; + else + { + THD_TRANS trans; + trans.ha_list= NULL; - if (thd->get_stmt_da()->is_error() && - thd->get_stmt_da()->sql_errno() == ER_XA_RBROLLBACK) - return rc; - - THD_TRANS trans; - trans.ha_list= NULL; - - thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton); - thd->ha_data[hton->slot].ha_info[1].set_trx_read_write(); - (void) thd->binlog_setup_trx_data(); + thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton); + thd->ha_data[hton->slot].ha_info[1].set_trx_read_write(); + (void) thd->binlog_setup_trx_data(); - DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK || - (thd->transaction->xid_state.get_state_code() == XA_ROLLBACK_ONLY)); + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK || + (thd->transaction->xid_state.get_state_code() == XA_ROLLBACK_ONLY)); - rc= binlog_rollback(hton, thd, TRUE); - thd->ha_data[hton->slot].ha_info[1].reset(); + rc= binlog_rollback(hton, thd, TRUE); + thd->ha_data[hton->slot].ha_info[1].reset(); + } + if (!rc) + { + rc= acquire_xid(thd); + } + if (thd->is_current_stmt_binlog_disabled()) + { + thd->wakeup_subsequent_commits(rc); + } return rc; } - +/** + @param thd thread handler + @return true when thd carries an XA transaction in prepared state, + or the XA transaction is being completed by + asynchronously running "COMPLETE" by slave parallel thread; + false otherwise +*/ inline bool is_prepared_xa(THD *thd) { - return thd->transaction->xid_state.is_explicit_XA() && + bool is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac); + DBUG_ASSERT(!is_async_xac || + thd->lex->sql_command == SQLCOM_XA_ROLLBACK || + thd->lex->sql_command == SQLCOM_XA_COMMIT); + + return is_async_xac ? true : + thd->transaction->xid_state.is_explicit_XA() && thd->transaction->xid_state.get_state_code() == XA_PREPARED; } @@ -2185,7 +2293,9 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc) } if (cache_mngr->trx_cache.empty() && - (thd->transaction->xid_state.get_state_code() != XA_PREPARED || + ((thd->transaction->xid_state.get_state_code() != XA_PREPARED && + !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->lex->sql_command == SQLCOM_XA_COMMIT)) || !(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() && thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write()))) { @@ -2279,7 +2389,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) } if (!cache_mngr->trx_cache.has_incident() && cache_mngr->trx_cache.empty() && - (thd->transaction->xid_state.get_state_code() != XA_PREPARED || + ((thd->transaction->xid_state.get_state_code() != XA_PREPARED && + !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->lex->sql_command == SQLCOM_XA_ROLLBACK)) || !(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() && thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write()))) { @@ -8382,7 +8494,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty() || - current->thd->transaction->xid_state.is_explicit_XA()); + (current->thd->transaction->xid_state.is_explicit_XA() || + (current->thd->rgi_slave && + current->thd->rgi_slave->is_async_xac))); if (unlikely((current->error= write_transaction_or_stmt(current, commit_id)))) @@ -10510,13 +10624,20 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all) binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); int cookie= 0; + int rc= 0; + + if (thd->rgi_slave && thd->is_current_stmt_binlog_disabled()) + { + rc= thd->wait_for_prior_commit(); + if (rc == 0) + thd->wakeup_subsequent_commits(rc); + return rc; + } if (!cache_mngr->need_unlog) { Ha_trx_info *ha_info; uint rw_count= ha_count_rw_all(thd, &ha_info); - bool rc= false; - /* This transaction has not been binlogged as indicated by need_unlog. Such exceptional cases include transactions with no effect to engines, diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 92ff401a260..230a7a4667f 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -3314,16 +3314,22 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, XID_STATE &xid_state= thd->transaction->xid_state; if (is_transactional) { - if (xid_state.is_explicit_XA() && - (thd->lex->sql_command == SQLCOM_XA_PREPARE || - xid_state.get_state_code() == XA_PREPARED)) + bool is_async_xac= false; + if ((xid_state.is_explicit_XA() && + (thd->lex->sql_command == SQLCOM_XA_PREPARE || + xid_state.get_state_code() == XA_PREPARED)) || + (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac))) { DBUG_ASSERT(!(thd->lex->sql_command == SQLCOM_XA_COMMIT && thd->lex->xa_opt == XA_ONE_PHASE)); + DBUG_ASSERT(!is_async_xac || + thd->lex->sql_command == SQLCOM_XA_ROLLBACK || + thd->lex->sql_command == SQLCOM_XA_COMMIT); flags2|= thd->lex->sql_command == SQLCOM_XA_PREPARE ? FL_PREPARED_XA : FL_COMPLETED_XA; - xid.set(xid_state.get_xid()); + xid.set(is_async_xac? thd->lex->xid : + thd->transaction->xid_state.get_xid()); } /* count non-zero extra recoverable engines; total = extra + 1 */ if (has_xid) @@ -4172,9 +4178,6 @@ int XA_prepare_log_event::do_commit() thd->lex->xid= &xid; if (!one_phase) { - if ((res= thd->wait_for_prior_commit())) - return res; - thd->lex->sql_command= SQLCOM_XA_PREPARE; res= trans_xa_prepare(thd); } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index c0dd56ab3d5..0f76dcc2351 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1055,7 +1055,7 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_COND_queue_busy; PSI_cond_key key_RELAYLOG_COND_relay_log_updated, key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready, - key_COND_wait_commit; + key_COND_wait_commit, key_COND_wait_commit_dep; PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, @@ -1083,6 +1083,7 @@ static PSI_cond_info all_server_conds[]= { &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0}, { &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0}, { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0}, + { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit_dep", 0}, { &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0}, { &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL}, { &key_COND_server_started, "COND_server_started", PSI_FLAG_GLOBAL}, @@ -9224,6 +9225,7 @@ PSI_stage_info stage_waiting_for_deadlock_kill= { 0, "Waiting for parallel repli PSI_stage_info stage_starting= { 0, "starting", 0}; PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0}; PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0}; +PSI_stage_info stage_waiting_for_prior_xa_transaction= { 0, "Waiting for prior xa transaction", 0}; PSI_memory_key key_memory_DATE_TIME_FORMAT; PSI_memory_key key_memory_DDL_LOG_MEMORY_ENTRY; diff --git a/sql/mysqld.h b/sql/mysqld.h index fc8afa06638..4245a1a5e10 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -376,7 +376,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, key_COND_start_thread; extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated, key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready, - key_COND_wait_commit; + key_COND_wait_commit, key_COND_wait_commit_dep; extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue, @@ -679,7 +679,7 @@ extern PSI_stage_info stage_slave_background_process_request; extern PSI_stage_info stage_slave_background_wait_request; extern PSI_stage_info stage_waiting_for_deadlock_kill; extern PSI_stage_info stage_starting; - +extern PSI_stage_info stage_waiting_for_prior_xa_transaction; #ifdef HAVE_PSI_STATEMENT_INTERFACE /** Statement instrumentation keys (sql). diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index c26263401b8..2653e294590 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -9,6 +9,8 @@ #ifdef WITH_WSREP #include "wsrep_trans_observer.h" #endif +#include <algorithm> +using std::max; /* Code for optional parallel execution of replicated events on the slave. @@ -760,7 +762,8 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) return; err_code= thd->get_stmt_da()->sql_errno(); if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC && - err_code != ER_PRIOR_COMMIT_FAILED) || + (err_code != ER_PRIOR_COMMIT_FAILED && + err_code != ER_XAER_NOTA)) || ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && rgi->killed_for_retry)) { @@ -2364,16 +2367,9 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, idx= rpl_thread_idx; if (gtid_ev) { - if (gtid_ev->flags2 & - (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) - idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; - else - { - ++idx; - if (idx >= rpl_thread_max) - idx= 0; - } + ++idx; + if (idx >= rpl_thread_max) + idx= 0; rpl_thread_idx= idx; } thr= rpl_threads[idx]; @@ -2467,6 +2463,7 @@ free_rpl_parallel_entry(void *element) } mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); + e->concurrent_xaps_window.~Dynamic_array(); my_free(e); } @@ -2521,6 +2518,19 @@ rpl_parallel::find(uint32 domain_id) e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; + + e->concurrent_xaps_window.init((PSI_memory_key) PSI_INSTRUMENT_ME, + max((decltype(e->rpl_thread_max)) 2, + 2*e->rpl_thread_max)); + e->cxap_lhs= e->cxap_rhs= 0; + + /* + 0 initialize each element + */ + for (size_t i= 0; i < e->concurrent_xaps_window.max_size(); i++) + { + e->concurrent_xaps_window.at(i)= {0, 0}; + } mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); @@ -2798,6 +2808,90 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread, mysql_cond_signal(&cur_thread->COND_rpl_thread); } +/** + Check the concurrency status of @c xid with ones in progress. + Any new @c xid of XA-prepare (@c is_xap is true then) is appended to + a sliding window designed as circular buffer. Through search in the window + the return result is computed. + + @param e parallel entry pointer + @param xid a pointer to the xid of either XA-prepare of XA-"complete" + @param is_xap + true when xid belongs to XA-prepare + @return true when there exists a duplicate xid hash value, + false otherwise. +*/ +static bool +handle_xa_prepera_duplicate_xid(rpl_parallel_entry *e, XID *xid, bool is_xap) +{ + DBUG_ASSERT(e->current_group_info || + (e->count_queued_event_groups == 0 && + e->cxap_lhs == e->cxap_rhs && e->cxap_lhs == 0)); + DBUG_ASSERT(xid); + DBUG_ASSERT(!xid->is_null()); + DBUG_ASSERT(xid->key()); + DBUG_ASSERT(xid->key_length()); + + uint64 curr_event_count= e->count_queued_event_groups; + uint32 i; + bool rc= false; + /* + We've seen XAP's before, so move the LHS up to a relevant spot. + LHS = RHS indicates the empty buffer (which implies RHS is exclusive "edge" + of the window. + Otherwise RHS always points to a free cell of which one at least must + exist at this point. + While transaction disribution is Round-robin, potential conflicts with + the current input xid can come only from + the preceeding 2*|W| - 1 xids, the 2*|W|th in the past is safe. + */ + for (i= e->cxap_lhs; i != e->cxap_rhs; + i= (i+1) % (e->concurrent_xaps_window.max_size())) + { + uint64 old_event_count= e->concurrent_xaps_window.at(i).second; + uint64 queued_event_diff= curr_event_count - old_event_count; + if (queued_event_diff >= e->rpl_thread_max) + { + /* + Squeeze the window from the left + as this XAP can't run in parallel with us. + */ + e->cxap_lhs= (i+1) % (e->concurrent_xaps_window.max_size()); + } + else + { + // new LHS is determined + DBUG_ASSERT(e->cxap_lhs != e->cxap_rhs); + break; + } + } + + std::size_t xid_hash= std::hash<XID>{}(*xid); + for (; i != e->cxap_rhs; i= (i+1) % (e->concurrent_xaps_window.max_size())) + { + std::size_t old_xid_hash= e->concurrent_xaps_window.at(i).first; + if (old_xid_hash == xid_hash) + { + rc= true; + break; + } + } + + // Add the XAP to the sliding window + if (is_xap) + { + e->concurrent_xaps_window.at(e->cxap_rhs).first= xid_hash; + e->concurrent_xaps_window.at(e->cxap_rhs).second= curr_event_count; + e->cxap_rhs= (e->cxap_rhs + 1) % (e->concurrent_xaps_window.max_size()); + if (e->cxap_rhs == e->cxap_lhs) + { + // the entire array is full therefore the lhs has become stale + e->cxap_lhs= (e->cxap_lhs + 1) % (e->concurrent_xaps_window.max_size()); + } + } + + return rc; +} /* do_event() is executed by the sql_driver_thd thread. @@ -3046,6 +3140,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, new_gco= true; force_switch_flag= 0; gco= e->current_gco; + /* + Take care of duplicate xids in XA-prepare, XA-"complete" should not + race its XA-prepare parent either. When the current transaction's xid + was seen and its transaction may still be in process this event group + gets flagged to wait for prior commits at the start of execution. + */ + if ((gtid_flags & (Gtid_log_event::FL_PREPARED_XA | + Gtid_log_event::FL_COMPLETED_XA)) && + handle_xa_prepera_duplicate_xid(e, >id_ev->xid, + gtid_flags & + Gtid_log_event::FL_PREPARED_XA)) + gtid_flags &= ~Gtid_log_event::FL_ALLOW_PARALLEL; if (likely(gco)) { uint8 flags= gco->flags; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 0b1d3cf9d80..b76d6749f6d 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -324,6 +324,14 @@ struct rpl_parallel_thread_pool { void release_thread(rpl_parallel_thread *rpt); }; +template <> +struct std::hash<XID> +{ + std::size_t operator()(const XID& xid) const + { + return my_hash_sort(&my_charset_bin, xid.key(), xid.key_length()); + } +}; struct rpl_parallel_entry { mysql_mutex_t LOCK_parallel_entry; @@ -419,6 +427,14 @@ struct rpl_parallel_entry { /* The group_commit_orderer object for the events currently being queued. */ group_commit_orderer *current_gco; + /* + Circular buffer of size slave_parallel_threads to hold XIDs of XA-prepare + group of events which may be processed concurrently. + See how handle_xa_prepera_duplicate_xid operates on it. + */ + Dynamic_array<std::pair<std::size_t, uint32>> concurrent_xaps_window; + uint32 cxap_lhs, cxap_rhs; + rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, Gtid_log_event *gtid_ev); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 375fb1d1c58..fdeb0d67872 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -2155,12 +2155,14 @@ rpl_group_info::reinit(Relay_log_info *rli) gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; speculation= SPECULATE_NO; commit_orderer.reinit(); + is_async_xac= false; } rpl_group_info::rpl_group_info(Relay_log_info *rli) : thd(0), wait_commit_sub_id(0), wait_commit_group_info(0), parallel_entry(0), - deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) + deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false), + is_async_xac(false) { reinit(rli); bzero(¤t_gtid, sizeof(current_gtid)); @@ -2291,7 +2293,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) if (thd->transaction->xid_state.is_explicit_XA() && thd->transaction->xid_state.get_state_code() != XA_PREPARED) xa_trans_force_rollback(thd); - + is_async_xac= false; thd->release_transactional_locks(); if (thd == rli->sql_driver_thd) diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index cc807852bf2..06eef910de4 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -835,6 +835,14 @@ struct rpl_group_info }; uchar killed_for_retry; + /* + When true indicates that the user xa transaction is going to + complete (with COMMIT or ROLLBACK) by the worker thread, + *while* another worker is still preparing it. Once the latter is done + the xid will be acquired and the flag gets reset. + */ + bool is_async_xac; + rpl_group_info(Relay_log_info *rli_); ~rpl_group_info(); void reinit(Relay_log_info *rli); diff --git a/sql/sql_array.h b/sql/sql_array.h index 8610e971016..c79c0c257a0 100644 --- a/sql/sql_array.h +++ b/sql/sql_array.h @@ -137,7 +137,7 @@ template <class Elem> class Dynamic_array */ Elem& at(size_t idx) { - DBUG_ASSERT(idx < array.elements); + DBUG_ASSERT(idx < max_size()); return *(((Elem*)array.buffer) + idx); } /// Const variant of at(), which cannot change data @@ -172,6 +172,8 @@ template <class Elem> class Dynamic_array size_t size() const { return array.elements; } + size_t max_size() const { return array.max_element; } + const Elem *end() const { return back() + 1; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 1b78f88bd3c..04338831cd7 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -7960,6 +7960,7 @@ wait_for_commit::wait_for_commit() { mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0); + mysql_cond_init(key_COND_wait_commit_dep, &COND_wait_commit_dep, 0); reinit(); } @@ -7989,6 +7990,7 @@ wait_for_commit::~wait_for_commit() mysql_mutex_destroy(&LOCK_wait_commit); mysql_cond_destroy(&COND_wait_commit); + mysql_cond_destroy(&COND_wait_commit_dep); } diff --git a/sql/sql_class.h b/sql/sql_class.h index e6580d2432c..72ca39c7710 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2367,6 +2367,15 @@ struct wait_for_commit event group is fully done. */ bool wakeup_blocked; + /* + The condition variable servers as a part of facilities to handle various + commit time additional dependency between groups of replication events, e.g + XA-Prepare -> XA-Commit, or XA-Prepare -> XA-Prepare all with the same xid. + */ + mysql_cond_t COND_wait_commit_dep; +#ifndef DBUG_OFF + bool debug_done; +#endif void register_wait_for_prior_commit(wait_for_commit *waitee); int wait_for_prior_commit(THD *thd, bool allow_kill=true) diff --git a/sql/xa.cc b/sql/xa.cc index 9df9da7acf1..3d1a7a2360a 100644 --- a/sql/xa.cc +++ b/sql/xa.cc @@ -22,7 +22,7 @@ #include "my_cpu.h" #include <pfs_transaction_provider.h> #include <mysql/psi/mysql_transaction.h> - +#include "rpl_rli.h" // rpl_group_info static bool slave_applier_reset_xa_trans(THD *thd); /*************************************************************************** @@ -79,6 +79,10 @@ class XID_cache_element uint rm_error; enum xa_states xa_state; XID xid; + /* parallel slave worker waiters. `c` stands for complete, `p` prepare */ + std::atomic<wait_for_commit *> c_waiter; // set by asynch run xa-"complete" + std::atomic<wait_for_commit *> p_waiter; // set by duplicate xid xa-start + bool is_set(int32_t flag) { return m_state.load(std::memory_order_relaxed) & flag; } void set(int32_t flag) @@ -134,6 +138,7 @@ class XID_cache_element element->rm_error= 0; element->xa_state= new_element->xa_state; element->xid.set(new_element->xid); + element->c_waiter= element->p_waiter= NULL; new_element->xid_cache_element= element; } static void lf_alloc_constructor(uchar *ptr) @@ -243,7 +248,7 @@ void xid_cache_free() Find recovered XA transaction by XID. */ -static XID_cache_element *xid_cache_search(THD *thd, XID *xid) +XID_cache_element *xid_cache_search(THD *thd, XID *xid) { DBUG_ASSERT(thd->xid_hash_pins); XID_cache_element *element= @@ -254,16 +259,221 @@ static XID_cache_element *xid_cache_search(THD *thd, XID *xid) /* The element can be removed from lf_hash by other thread, but element->acquire_recovered() will return false in this case. */ if (!element->acquire_recovered()) + { element= 0; + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec) + { + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT || + thd->lex->sql_command == SQLCOM_XA_ROLLBACK); + thd->rgi_slave->is_async_xac= true; + } + } lf_hash_search_unpin(thd->xid_hash_pins); /* Once the element is acquired (i.e. got the ACQUIRED bit) by this thread, only this thread can delete it. The deletion happens in xid_cache_delete(). See also the XID_cache_element documentation. */ DEBUG_SYNC(thd, "xa_after_search"); } + else if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec) + { + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT || + thd->lex->sql_command == SQLCOM_XA_ROLLBACK); + } + return element; } +const int SPIN_MAX= 20; +/** + The function tries inserting a xid into the system hash until succeeds. + Re-trying can be caused solely by existence of an earlier transaction with + a duplicate xid. + Analogously to @c xid_cache_search_maybe_wait, it is expecting, here the duplicate, + xid in the way will be eventually deleted from the hash. + + @param thd thread handler + @return false as success, + true otherwise. +*/ +bool xid_cache_insert_maybe_wait(THD* thd) +{ + int i= 0; + bool rc; + + do + { + if ((rc= xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid))) + ut_delay(1 + i++); + } + while (rc && i < SPIN_MAX); + + if (rc) + { + wait_for_commit *waiter= NULL; + XID *xid= thd->lex->xid; + XID_cache_element *element= + (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, + xid->key(), xid->key_length()); + if (element) + { + PSI_stage_info old_stage; + wait_for_commit *exp= NULL, *waiter= thd->wait_for_commit_ptr; +#ifndef DBUG_OFF + waiter->debug_done= false; +#endif + + while (unlikely(!element-> + p_waiter.compare_exchange_weak(exp, waiter, + std::memory_order_acq_rel))) + { + if (exp) + { + DBUG_ASSERT(exp != waiter); + waiter= NULL; // notifier is seen + + break; + } + else + { + (void) LF_BACKOFF(); + } + } + lf_hash_search_unpin(thd->xid_hash_pins); + + if (waiter) // notifier was not seen + { + mysql_mutex_lock(&waiter->LOCK_wait_commit); + thd->ENTER_COND(&waiter->COND_wait_commit_dep, &waiter->LOCK_wait_commit, + &stage_waiting_for_prior_xa_transaction, + &old_stage); + if ((element= + (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, + xid->key(), xid->key_length()))) + { + lf_hash_search_unpin(thd->xid_hash_pins); + mysql_cond_wait(&waiter->COND_wait_commit_dep, + &waiter->LOCK_wait_commit); + + DBUG_ASSERT(waiter->debug_done || thd->check_killed(1)); + } + thd->EXIT_COND(&old_stage); + } + } + if (!(rc= thd->check_killed(1))) + { + // (element && waiter = NULL) indicates the duplicate xid is coming + do + rc= xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid); + while (rc && element && !waiter && (ut_delay(1), true)); + } + } + + return rc; +} + +/** + XA-"complete" run by parallel slave gets access to its xid. + Analogously to @c xid_cache_insert_maybe_wait, it is expecting, here its, xid + supplied through the THD argument, will be soon (the parent XAP has already + waken up transactions before the current one) released for acquisition. + + @param thd thread handler + @return XID_cache_element pointer or NULL when the search is interruped + by kill. +*/ +XID_cache_element * xid_cache_search_maybe_wait(THD* thd) +{ + if (thd->fix_xid_hash_pins()) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return NULL; + } + + XID_cache_element *xs; + XID *xid= thd->lex->xid; + int i= 0; + do + { + if (!(xs= xid_cache_search(thd, thd->lex->xid))) + ut_delay(1 + i++); + } + while (!xs && i < SPIN_MAX); + + if (!xs) + { + XID_cache_element *element= + (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, + xid->key(), xid->key_length()); + if (element) + { + lf_hash_search_unpin(thd->xid_hash_pins); + if (!element->acquire_recovered()) + { + wait_for_commit *exp= NULL, *waiter= thd->wait_for_commit_ptr; + bool waiter_done= true; // assumption + + + /* + Set itself to wait for xid owner while taking care of race with it on + marking the xid element. When the element is found to be marked not + by us that indicates xid has been released. + */ + while (unlikely(!element-> + c_waiter. + compare_exchange_weak(exp, waiter, + std::memory_order_acq_rel))) + { + if (exp) + { + DBUG_ASSERT(exp != waiter); + waiter= NULL; // notifier is seen + + break; + } + else + { + (void) LF_BACKOFF(); + } + } + + if (waiter) // notifier was not seen + { + PSI_stage_info old_stage; + mysql_mutex_lock(&waiter->LOCK_wait_commit); + thd->ENTER_COND(&waiter->COND_wait_commit_dep, &waiter->LOCK_wait_commit, + &stage_waiting_for_prior_xa_transaction, + &old_stage); + if (element->c_waiter.load(std::memory_order_relaxed) && + likely(!thd->check_killed(1))) + mysql_cond_wait(&waiter->COND_wait_commit_dep, + &waiter->LOCK_wait_commit); + + if (element->c_waiter.load(std::memory_order_relaxed)) + { + waiter_done= false; + DBUG_ASSERT(thd->check_killed(1)); + } + thd->EXIT_COND(&old_stage); + } + + if (waiter_done && + likely(element->is_set(XID_cache_element::RECOVERED | + XID_cache_element::ACQUIRED))) + xs= element; + else + goto end; + } + else + { + xs= element; + } + } + } + +end: + return xs; +} + bool xid_cache_insert(XID *xid) { @@ -302,7 +512,8 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid) xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED); break; case 1: - my_error(ER_XAER_DUPID, MYF(0)); + if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec)) + my_error(ER_XAER_DUPID, MYF(0)); } return res; } @@ -311,9 +522,39 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid) static void xid_cache_delete(THD *thd, XID_cache_element *&element) { DBUG_ASSERT(thd->xid_hash_pins); + element->mark_uninitialized(); + wait_for_commit *waiter= NULL; + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec) + { + wait_for_commit *notifier= &thd->rgi_slave->commit_orderer; + while (unlikely(!element-> + p_waiter.compare_exchange_weak(waiter, notifier, + std::memory_order_acq_rel))) + { + if (waiter) + { + DBUG_ASSERT(notifier != waiter); + + break; + } + else + { + (void) LF_BACKOFF(); + } + } + } lf_hash_delete(&xid_cache, thd->xid_hash_pins, element->xid.key(), element->xid.key_length()); + if (waiter) + { + mysql_mutex_lock(&waiter->LOCK_wait_commit); +#ifndef DBUG_OFF + waiter->debug_done= true; +#endif + mysql_cond_signal(&waiter->COND_wait_commit_dep); + mysql_mutex_unlock(&waiter->LOCK_wait_commit); + } } @@ -456,7 +697,23 @@ bool trans_xa_start(THD *thd) else if (!trans_begin(thd)) { MYSQL_SET_TRANSACTION_XID(thd->m_transaction_psi, thd->lex->xid, XA_ACTIVE); - if (xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid)) + + bool parallel_slave_xap_status= true; // `true` presumes ordinary XA START. + if (thd->rgi_slave && + thd->rgi_slave->is_parallel_exec) + { + DBUG_ASSERT(thd->rgi_slave->gtid_ev_flags2 | + Gtid_log_event::FL_PREPARED_XA); + /* + The status gets refined below normally to flip in which case `false` + designates the xid insert is done. + Possibly incurred wait is when xid is duplicate. + */ + parallel_slave_xap_status= xid_cache_insert_maybe_wait(thd); + } + + if (parallel_slave_xap_status && + xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid)) { trans_rollback(thd); DBUG_RETURN(true); @@ -602,12 +859,19 @@ bool trans_xa_commit(THD *thd) my_error(ER_OUT_OF_RESOURCES, MYF(0)); DBUG_RETURN(TRUE); } + DBUG_ASSERT(!thd->rgi_slave || !thd->rgi_slave->is_async_xac); - if (auto xs= xid_cache_search(thd, thd->lex->xid)) + /* + Parallel slave may not succeed acquiring xid, in which case + @c is_async_xac is @c true, it will do that later. + */ + XID_cache_element *xs; + if ((xs= xid_cache_search(thd, thd->lex->xid)) || + (thd->rgi_slave && thd->rgi_slave->is_async_xac)) { bool xid_deleted= false; MDL_request mdl_request; - bool rw_trans= (xs->rm_error != ER_XA_RBROLLBACK); + bool rw_trans= (xs && xs->rm_error != ER_XA_RBROLLBACK); if (rw_trans && thd->is_read_only_ctx()) { @@ -615,8 +879,7 @@ bool trans_xa_commit(THD *thd) res= 1; goto _end_external_xid; } - - res= xa_trans_rolled_back(xs); + res= xs ? xa_trans_rolled_back(xs) : 0; /* Acquire metadata lock which will ensure that COMMIT is blocked by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in @@ -645,7 +908,7 @@ bool trans_xa_commit(THD *thd) } DBUG_ASSERT(!xid_state.xid_cache_element); - xid_state.xid_cache_element= xs; + xid_state.xid_cache_element= xs; // may be NULL on parallel slave ha_commit_or_rollback_by_xid(thd->lex->xid, !res); if (!res && thd->is_error()) { @@ -654,13 +917,16 @@ bool trans_xa_commit(THD *thd) res= true; goto _end_external_xid; } - xid_cache_delete(thd, xs); + DBUG_ASSERT(xs || (thd->rgi_slave && thd->rgi_slave->is_async_xac && + xid_state.xid_cache_element)); + + xid_cache_delete(thd, xid_state.xid_cache_element); xid_deleted= true; _end_external_xid: xid_state.xid_cache_element= 0; res= res || thd->is_error(); - if (!xid_deleted) + if (!xid_deleted && xs) xs->acquired_to_recovered(); if (mdl_request.ticket) { @@ -790,12 +1056,14 @@ bool trans_xa_rollback(THD *thd) DBUG_RETURN(TRUE); } - if (auto xs= xid_cache_search(thd, thd->lex->xid)) + XID_cache_element *xs; + if ((xs= xid_cache_search(thd, thd->lex->xid)) || + (thd->rgi_slave && thd->rgi_slave->is_async_xac)) { bool res; bool xid_deleted= false; MDL_request mdl_request; - bool rw_trans= (xs->rm_error != ER_XA_RBROLLBACK); + bool rw_trans= (xs && xs->rm_error != ER_XA_RBROLLBACK); if (rw_trans && thd->is_read_only_ctx()) { @@ -822,7 +1090,7 @@ bool trans_xa_rollback(THD *thd) { thd->backup_commit_lock= &mdl_request; } - res= xa_trans_rolled_back(xs); + res= xs ? xa_trans_rolled_back(xs) : 0; DBUG_ASSERT(!xid_state.xid_cache_element); xid_state.xid_cache_element= xs; @@ -831,12 +1099,15 @@ bool trans_xa_rollback(THD *thd) { goto _end_external_xid; } - xid_cache_delete(thd, xs); + DBUG_ASSERT(xs || (thd->rgi_slave && thd->rgi_slave->is_async_xac && + xid_state.xid_cache_element)); + + xid_cache_delete(thd, xid_state.xid_cache_element); xid_deleted= true; _end_external_xid: xid_state.xid_cache_element= 0; - if (!xid_deleted) + if (!xid_deleted && xs) xs->acquired_to_recovered(); if (mdl_request.ticket) { @@ -1146,7 +1417,7 @@ static bool slave_applier_reset_xa_trans(THD *thd) { thd->transaction->xid_state.set_error(ER_XA_RBROLLBACK); } - thd->transaction->xid_state.xid_cache_element->acquired_to_recovered(); + auto element= thd->transaction->xid_state.xid_cache_element; thd->transaction->xid_state.xid_cache_element= 0; for (Ha_trx_info *ha_info= thd->transaction->all.ha_list, *ha_info_next; @@ -1158,6 +1429,34 @@ static bool slave_applier_reset_xa_trans(THD *thd) thd->transaction->all.ha_list= 0; ha_close_connection(thd); + element->acquired_to_recovered(); + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec) + { + /* make the xid available to a possible (xa-"complete") waiter */ + wait_for_commit *xac_waiter= NULL, + *notifier= &thd->rgi_slave->commit_orderer; + while (unlikely(!element-> + c_waiter.compare_exchange_weak(xac_waiter, notifier, + std::memory_order_acq_rel))) + { + if (xac_waiter) + { + break; + } + else + { + (void) LF_BACKOFF(); + } + } + if (xac_waiter) + { + // unmark and signal + mysql_mutex_lock(&xac_waiter->LOCK_wait_commit); + element->c_waiter.store(NULL, std::memory_order_relaxed); + mysql_cond_signal(&xac_waiter->COND_wait_commit_dep); + mysql_mutex_unlock(&xac_waiter->LOCK_wait_commit); + } + } thd->transaction->cleanup(); thd->transaction->all.reset(); diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc index daf27822085..53ce685fa35 100644 --- a/storage/innobase/trx/trx0undo.cc +++ b/storage/innobase/trx/trx0undo.cc @@ -639,8 +639,7 @@ static void trx_undo_write_xid(buf_block_t *block, uint16_t offset, static_cast<uint32_t>(xid.bqual_length)); const ulint xid_length= static_cast<ulint>(xid.gtrid_length + xid.bqual_length); - mtr->memcpy(*block, &block->page.frame[offset + TRX_UNDO_XA_XID], - xid.data, xid_length); + mtr->memcpy<mtr_t::MAYBE_NOP>(*block, &block->page.frame[offset + TRX_UNDO_XA_XID], xid.data, xid_length); if (UNIV_LIKELY(xid_length < XIDDATASIZE)) mtr->memset(block, offset + TRX_UNDO_XA_XID + xid_length, XIDDATASIZE - xid_length, 0); -- 2.30.2
participants (1)
-
Kristian Nielsen