[PATCH] MDEV-26632: GTID master switch when slave position is filtered on new master
by Kristian Nielsen 26 Oct '23
by Kristian Nielsen 26 Oct '23
26 Oct '23
If an intermediate slave S1 has replication filters enabled, its
@@gtid_slave_pos may contain a GTID that is filtered and doesn't propagate
to lower-level slaves with S1 as master.
If then later S1 is demoted to a slave, it may attempt to connect to the
filtered position. This is normally disallowed in --gtid-strict-mode. But if
--gtid-ignore-duplicates is enabled, we should allow it, as in this case we
can trust the GTID sequence numbers between different server ids. So we can
know that the next GTID is the right one for the filtered slave GTID
position.
This allows advanced users to use replication filtering in topologies like
this and still run with --gtid-strict-mode enabled.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
mysql-test/suite/rpl/r/rpl_mdev26632.result | 78 ++++++++++++++
mysql-test/suite/rpl/t/rpl_mdev26632.cnf | 28 +++++
mysql-test/suite/rpl/t/rpl_mdev26632.test | 109 ++++++++++++++++++++
sql/sql_repl.cc | 8 +-
4 files changed, 222 insertions(+), 1 deletion(-)
create mode 100644 mysql-test/suite/rpl/r/rpl_mdev26632.result
create mode 100644 mysql-test/suite/rpl/t/rpl_mdev26632.cnf
create mode 100644 mysql-test/suite/rpl/t/rpl_mdev26632.test
diff --git a/mysql-test/suite/rpl/r/rpl_mdev26632.result b/mysql-test/suite/rpl/r/rpl_mdev26632.result
new file mode 100644
index 00000000000..84080b94de8
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_mdev26632.result
@@ -0,0 +1,78 @@
+include/rpl_init.inc [topology=1->2->3]
+*** Test GTID master switch in a topology with filtered events.
+*** With --gtid-ignore-duplicate and --gtid-strict-mode, should allow
+*** GTID connect at a GTID position that is filtered on the new master.
+connection server_1;
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1,1);
+CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t3 VALUES (1,1);
+INSERT INTO t1 VALUES (2,1);
+INSERT INTO t3 VALUES (2,1);
+include/save_master_gtid.inc
+connection server_2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1,2);
+include/sync_with_master_gtid.inc
+include/save_master_gtid.inc
+connection server_3;
+include/sync_with_master_gtid.inc
+*** Promote 3 as new master, demote 2 as slave of 3.
+*** GTID position of 2 in domain 0 is filtered on 3.
+connection server_2;
+include/stop_slave.inc
+connection server_3;
+include/stop_slave.inc
+CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_1,
+MASTER_USE_GTID=SLAVE_POS;
+connection server_2;
+CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_3,
+MASTER_USE_GTID=SLAVE_POS;
+include/start_slave.inc
+connection server_3;
+include/start_slave.inc
+connection server_1;
+INSERT INTO t1 VALUES (3,1);
+INSERT INTO t3 VALUES (3,1);
+include/save_master_gtid.inc
+connection server_3;
+INSERT INTO t2 VALUES (2,2);
+include/sync_with_master_gtid.inc
+include/save_master_gtid.inc
+connection server_2;
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a b
+1 1
+2 1
+3 1
+SELECT * FROM t3 ORDER BY a;
+ERROR 42S02: Table 'test.t3' doesn't exist
+SELECT * FROM t2 ORDER BY a;
+a b
+1 2
+2 2
+*** Restore original topology.
+connection server_3;
+include/stop_slave.inc
+connection server_2;
+include/stop_slave.inc
+CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_1,
+MASTER_USE_GTID=SLAVE_POS;
+include/start_slave.inc
+connection server_3;
+CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_2,
+MASTER_USE_GTID=SLAVE_POS;
+include/start_slave.inc
+connection server_1;
+DROP TABLE t1;
+DROP TABLE t3;
+include/save_master_gtid.inc
+connection server_2;
+DROP TABLE t2;
+include/sync_with_master_gtid.inc
+include/save_master_gtid.inc
+connection server_3;
+include/sync_with_master_gtid.inc
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_mdev26632.cnf b/mysql-test/suite/rpl/t/rpl_mdev26632.cnf
new file mode 100644
index 00000000000..5eda3ad0725
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_mdev26632.cnf
@@ -0,0 +1,28 @@
+!include ../my.cnf
+
+[mysqld.1]
+log-slave-updates
+loose-innodb
+gtid-domain-id=1
+gtid-strict-mode=1
+gtid-ignore-duplicates=1
+
+[mysqld.2]
+log-slave-updates
+loose-innodb
+gtid-domain-id=0
+replicate-ignore-table=test.t3
+gtid-strict-mode=1
+gtid-ignore-duplicates=1
+
+[mysqld.3]
+log-slave-updates
+loose-innodb
+gtid-domain-id=0
+replicate-ignore-table=test.t3
+gtid-strict-mode=1
+gtid-ignore-duplicates=1
+
+[ENV]
+SERVER_MYPORT_3= @mysqld.3.port
+SERVER_MYSOCK_3= @mysqld.3.socket
diff --git a/mysql-test/suite/rpl/t/rpl_mdev26632.test b/mysql-test/suite/rpl/t/rpl_mdev26632.test
new file mode 100644
index 00000000000..842bae8234c
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_mdev26632.test
@@ -0,0 +1,109 @@
+--source include/have_innodb.inc
+--source include/have_binlog_format_mixed.inc
+
+--let $rpl_topology=1->2->3
+--source include/rpl_init.inc
+
+--echo *** Test GTID master switch in a topology with filtered events.
+--echo *** With --gtid-ignore-duplicate and --gtid-strict-mode, should allow
+--echo *** GTID connect at a GTID position that is filtered on the new master.
+
+--connection server_1
+
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1,1);
+CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t3 VALUES (1,1);
+INSERT INTO t1 VALUES (2,1);
+INSERT INTO t3 VALUES (2,1);
+--source include/save_master_gtid.inc
+
+--connection server_2
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1,2);
+
+--let $slave_timeout= 10
+--source include/sync_with_master_gtid.inc
+--source include/save_master_gtid.inc
+
+--connection server_3
+--source include/sync_with_master_gtid.inc
+
+--echo *** Promote 3 as new master, demote 2 as slave of 3.
+--echo *** GTID position of 2 in domain 0 is filtered on 3.
+
+--connection server_2
+--source include/stop_slave.inc
+
+--connection server_3
+--source include/stop_slave.inc
+--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1
+eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_1,
+ MASTER_USE_GTID=SLAVE_POS;
+
+--connection server_2
+--replace_result $SERVER_MYPORT_3 SERVER_MYPORT_3
+eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_3,
+ MASTER_USE_GTID=SLAVE_POS;
+--source include/start_slave.inc
+
+--connection server_3
+--source include/start_slave.inc
+
+--connection server_1
+INSERT INTO t1 VALUES (3,1);
+INSERT INTO t3 VALUES (3,1);
+--source include/save_master_gtid.inc
+
+--connection server_3
+INSERT INTO t2 VALUES (2,2);
+
+--source include/sync_with_master_gtid.inc
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+
+SELECT * FROM t1 ORDER BY a;
+# Verify that table t3 is being filtered.
+--error 1146
+SELECT * FROM t3 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+
+
+--echo *** Restore original topology.
+
+--connection server_3
+--source include/stop_slave.inc
+
+--connection server_2
+--source include/stop_slave.inc
+--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1
+eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_1,
+ MASTER_USE_GTID=SLAVE_POS;
+--source include/start_slave.inc
+
+--connection server_3
+--replace_result $SERVER_MYPORT_2 SERVER_MYPORT_2
+eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_2,
+ MASTER_USE_GTID=SLAVE_POS;
+--source include/start_slave.inc
+
+
+# Cleanup
+
+--connection server_1
+DROP TABLE t1;
+DROP TABLE t3;
+--source include/save_master_gtid.inc
+
+--connection server_2
+DROP TABLE t2;
+--source include/sync_with_master_gtid.inc
+--source include/save_master_gtid.inc
+
+--connection server_3
+--source include/sync_with_master_gtid.inc
+
+--source include/rpl_end.inc
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 0d2e61f7f59..e3b6d5fb7f3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1823,13 +1823,19 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
{
if (info->slave_gtid_strict_mode &&
event_gtid.seq_no > gtid->seq_no &&
- !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
+ !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS) &&
+ !info->slave_gtid_ignore_duplicates)
{
/*
In strict mode, it is an error if the slave requests to start
in a "hole" in the master's binlog: a GTID that does not
exist, even though both the prior and subsequent seq_no exists
for same domain_id and server_id.
+
+ But in --gtid-ignore-duplicates this is relaxed, as this
+ implies that we trust the sequence numbers between different
+ server_id. Thus, we want to allow the slave to connect at the
+ "hole", which could eg. be a filtered event.
*/
info->error= ER_GTID_START_FROM_BINLOG_HOLE;
*error_gtid= *gtid;
--
2.30.2
1
0
[PATCH] MDEV-27436: binlog corruption (/tmp no space left on device at the same moment)
by Kristian Nielsen 26 Oct '23
by Kristian Nielsen 26 Oct '23
26 Oct '23
This commit fixes several bugs in error handling around disk full when
writing the statement/transaction binlog caches:
1. If the error occurs during a non-transactional statement, the code
attempts to binlog the partially executed statement (as it cannot roll
back). The stmt_cache->error was still set from the disk full error. This
caused MYSQL_BIN_LOG::write_cache() to get an error while trying to read the
cache to copy it to the binlog. This was then wrongly interpreted as a disk
full error writing to the binlog file. As a result, a partial event group
containing just a GTID event (no query or commit) was binlogged. Fixed by
checking if an error is set in the statement cache, and if so binlog an
INCIDENT event instead of a corrupt event group.
2. For LOAD DATA LOCAL INFILE, if a disk full error occured while writing to
the statement cache, the code would attempt to abort and read-and-discard
any remaining data sent by the client. The discard code would however
continue trying to write data to the statement cache, and wrongly interpret
another disk full error as end-of-file from the client. This left the client
connection with extra data which corrupts the communication for the next
command, as well as again causing an corrupt/incomplete event to be
binlogged. Fixed by restoring the default read function before reading any
remaining data from the client connection.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
.../rpl_binlog_cache_disk_full_loaddata.test | 45 +++++++
.../rpl/t/rpl_binlog_cache_disk_full_row.test | 59 +++++++++
sql/log.cc | 121 ++++++++++++------
sql/sql_load.cc | 4 +
sql/sql_repl.cc | 34 ++++-
5 files changed, 224 insertions(+), 39 deletions(-)
create mode 100644 mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_loaddata.test
create mode 100644 mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_row.test
diff --git a/mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_loaddata.test b/mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_loaddata.test
new file mode 100644
index 00000000000..be4399a52ac
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_loaddata.test
@@ -0,0 +1,45 @@
+--source include/have_binlog_format_statement.inc
+--source include/have_debug.inc
+--source include/master-slave.inc
+
+--connection master
+# Set minimal cache size so smaller transaction can trigger spill to disk.
+SET @save_binlog_stmt_cache_size= @@GLOBAL.binlog_stmt_cache_size;
+SET GLOBAL binlog_stmt_cache_size= 4096;
+
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM;
+
+FLUSH STATUS;
+SHOW STATUS LIKE "binlog_stmt_cache%";
+SET @old_dbug= @@SESSION.debug_dbug;
+SET SESSION debug_dbug="+d,load_data_binlog_cache_error";
+--error 3
+LOAD DATA CONCURRENT LOCAL INFILE 'std_data/bug30435_5k.txt'
+ REPLACE INTO TABLE t1 (a);
+SET SESSION debug_dbug= @old_dbug;
+SHOW STATUS LIKE "binlog_stmt_cache%";
+# The actual number of rows left after the disk full error may change as
+# binlog event sizes are modified. So here we just test that we get partial
+# update from the last INSERT..SELECT that gets disk full error.
+SELECT IF(COUNT(*) > 0 AND COUNT(*) < 5000,
+ "ok",
+ CONCAT("ERROR! Row count ", COUNT(*), " not as expected for partially executed query"))
+ AS check_result
+ FROM t1;
+
+--save_master_pos
+
+--connection slave
+--let $slave_sql_errno= 1590
+--source include/wait_for_slave_sql_error_and_skip.inc
+
+--sync_with_master
+SELECT COUNT(*) FROM t1;
+
+# Cleanup
+
+--connection master
+SET GLOBAL binlog_stmt_cache_size= @save_binlog_stmt_cache_size;
+DROP TABLE t1;
+
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_row.test b/mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_row.test
new file mode 100644
index 00000000000..eb67eca5071
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_binlog_cache_disk_full_row.test
@@ -0,0 +1,59 @@
+--source include/have_binlog_format_row.inc
+--source include/have_debug.inc
+--source include/master-slave.inc
+
+--connection master
+# Set minimal cache size so smaller transaction can trigger spill to disk.
+SET @save_binlog_stmt_cache_size= @@GLOBAL.binlog_stmt_cache_size;
+SET GLOBAL binlog_stmt_cache_size= 4096;
+
+CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(255)) ENGINE=MyISAM;
+
+FLUSH STATUS;
+SHOW STATUS LIKE "binlog_stmt_cache%";
+INSERT INTO t1 VALUES (0, CONCAT("?", "-", REPEAT("x", 200)));
+INSERT INTO t1 SELECT a+1, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+INSERT INTO t1 SELECT a+2, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+INSERT INTO t1 SELECT a+4, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+INSERT INTO t1 SELECT a+8, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+INSERT INTO t1 SELECT a+16, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+INSERT INTO t1 SELECT a+32, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+INSERT INTO t1 SELECT a+64, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+INSERT INTO t1 SELECT a+128, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+SHOW STATUS LIKE "binlog_stmt_cache%";
+
+SET @old_dbug= @@SESSION.debug_dbug;
+SET SESSION debug_dbug="+d,simulate_disk_full_at_flush_pending";
+--error 3
+INSERT INTO t1 SELECT a+256, CONCAT(a, "-", REPEAT("x", 200)) FROM t1;
+SET SESSION debug_dbug= @old_dbug;
+SHOW STATUS LIKE "binlog_stmt_cache%";
+# The actual number of rows left after the disk full error may change as
+# binlog event sizes are modified. So here we just test that we get partial
+# update from the last INSERT..SELECT that gets disk full error.
+SELECT IF(COUNT(*) > 256 AND COUNT(*) < 512,
+ "ok",
+ CONCAT("ERROR! Row count ", COUNT(*), " not as expected for partially executed query"))
+ AS check_result
+ FROM t1;
+
+# A random extra event that helped show the bug that a partial event
+# group was binlogged.
+ALTER TABLE t1 COMMENT '<mumble>';
+
+--save_master_pos
+
+--connection slave
+--let $slave_sql_errno= 1590
+--source include/wait_for_slave_sql_error_and_skip.inc
+
+--sync_with_master
+SELECT COUNT(*) FROM t1;
+
+# Cleanup
+
+--connection master
+SET GLOBAL binlog_stmt_cache_size= @save_binlog_stmt_cache_size;
+DROP TABLE t1;
+
+--source include/rpl_end.inc
diff --git a/sql/log.cc b/sql/log.cc
index e7292064747..8c93f9adf41 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6005,8 +6005,17 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
/*
Write pending event to the cache.
*/
+#ifndef DBUG_OFF
+ bool clear_dbug= false;
+#endif
DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
- {DBUG_SET("+d,simulate_file_write_error");});
+ {
+ if (my_b_tell(&cache_data->cache_log) > 10000)
+ {
+ DBUG_SET("+d,simulate_file_write_error");
+ clear_dbug= true;
+ }
+ });
if (writer.write(pending))
{
set_write_error(thd, is_transactional);
@@ -6016,9 +6025,17 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
delete pending;
cache_data->set_pending(NULL);
DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
- {DBUG_SET("-d,simulate_file_write_error");});
+ {
+ if (clear_dbug)
+ DBUG_SET("-d,simulate_file_write_error");
+ });
DBUG_RETURN(1);
}
+ DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
+ {
+ if (clear_dbug)
+ DBUG_SET("-d,simulate_file_write_error");
+ });
delete pending;
}
@@ -8337,51 +8354,83 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
binlog_cache_mngr *mngr= entry->cache_mngr;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt");
- if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
+ bool do_stmt= entry->using_stmt_cache && !mngr->stmt_cache.empty();
+ bool do_trx= entry->using_trx_cache && !mngr->trx_cache.empty();
+ IO_CACHE *stmt_cache= mngr->get_binlog_cache_log(FALSE);
+ IO_CACHE *trx_cache= mngr->get_binlog_cache_log(TRUE);
- if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
- write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
+ if (likely(!( (do_stmt && stmt_cache->error) ||
+ (do_trx && trx_cache->error) )))
{
- entry->error_cache= &mngr->stmt_cache.cache_log;
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- }
+ if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
- if (entry->using_trx_cache && !mngr->trx_cache.empty())
- {
- DBUG_EXECUTE_IF("crash_before_writing_xid",
+ if (do_stmt &&
+ write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
+ {
+ entry->error_cache= &mngr->stmt_cache.cache_log;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ }
+
+ if (do_trx)
+ {
+ DBUG_EXECUTE_IF("crash_before_writing_xid",
+ {
+ if ((write_cache(entry->thd,
+ mngr->get_binlog_cache_log(TRUE))))
+ DBUG_PRINT("info", ("error writing binlog cache"));
+ else
+ flush_and_sync(0);
+
+ DBUG_PRINT("info", ("crashing before writing xid"));
+ DBUG_SUICIDE();
+ });
+
+ if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE)))
+ {
+ entry->error_cache= &mngr->trx_cache.cache_log;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ }
+ }
+
+ DBUG_EXECUTE_IF("inject_error_writing_xid",
{
- if ((write_cache(entry->thd,
- mngr->get_binlog_cache_log(TRUE))))
- DBUG_PRINT("info", ("error writing binlog cache"));
- else
- flush_and_sync(0);
-
- DBUG_PRINT("info", ("crashing before writing xid"));
- DBUG_SUICIDE();
+ entry->error_cache= NULL;
+ errno= 28;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
});
- if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE)))
+ if (write_event(entry->end_event))
{
- entry->error_cache= &mngr->trx_cache.cache_log;
+ entry->error_cache= NULL;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
+ status_var_add(entry->thd->status_var.binlog_bytes_written,
+ entry->end_event->data_written);
}
+ else
+ {
+ /*
+ If writing the IO_CACHE caused an error, we musn't flush it to the main
+ binlog, it's probably corrupt/truncated.
- DBUG_EXECUTE_IF("inject_error_writing_xid",
- {
- entry->error_cache= NULL;
- errno= 28;
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- });
+ We clear the error (otherwise it would be interpreted as an error
+ _reading_ the IO_CACHE).
- if (write_event(entry->end_event))
- {
- entry->error_cache= NULL;
- DBUG_RETURN(ER_ERROR_ON_WRITE);
+ And generate an incident event, if one wasn't set already.
+ */
+ stmt_cache->error= trx_cache->error= 0;
+ if (!entry->incident_event)
+ {
+ Incident_log_event inc_ev(entry->thd, INCIDENT_LOST_EVENTS,
+ &write_error_msg);
+ if (write_event(&inc_ev))
+ {
+ entry->error_cache= NULL;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ }
+ }
}
- status_var_add(entry->thd->status_var.binlog_bytes_written,
- entry->end_event->data_written);
if (entry->incident_event)
{
@@ -8392,12 +8441,12 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
}
}
- if (unlikely(mngr->get_binlog_cache_log(FALSE)->error))
+ if (unlikely(do_stmt && stmt_cache->error))
{
entry->error_cache= &mngr->stmt_cache.cache_log;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
- if (unlikely(mngr->get_binlog_cache_log(TRUE)->error)) // Error on read
+ if (unlikely(do_trx && trx_cache->error)) // Error on read
{
entry->error_cache= &mngr->trx_cache.cache_log;
DBUG_RETURN(ER_ERROR_ON_WRITE);
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index 8264286a022..cc4361b0472 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -253,6 +253,10 @@ class READ_INFO: public Load_data_param
*/
void skip_data_till_eof()
{
+#ifndef EMBEDDED_LIBRARY
+ if (mysql_bin_log.is_open())
+ cache.read_function= cache.real_read_function;
+#endif
while (GET != my_b_EOF)
;
}
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index d9b93742195..0d2e61f7f59 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -4499,6 +4499,10 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
/* buffer contains position where we started last read */
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
uint max_event_size= lf_info->thd->variables.max_allowed_packet;
+ int res;
+#ifndef DBUG_OFF
+ bool did_dbug_inject= false;
+#endif
if (lf_info->thd->is_current_stmt_binlog_format_row())
goto ret;
@@ -4506,6 +4510,19 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
goto ret;
+ DBUG_EXECUTE_IF("load_data_binlog_cache_error",
+ {
+ /*
+ Simulate "disk full" error in the middle of writing to
+ the binlog cache.
+ */
+ if (lf_info->last_pos_in_file >= 2*4096)
+ {
+ DBUG_SET("+d,simulate_file_write_error");
+ did_dbug_inject= true;
+ }
+ };);
+
for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
buffer += MY_MIN(block_len, max_event_size),
block_len -= MY_MIN(block_len, max_event_size))
@@ -4517,7 +4534,10 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
MY_MIN(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write(&a))
- DBUG_RETURN(1);
+ {
+ res= 1;
+ goto err;
+ }
}
else
{
@@ -4526,12 +4546,20 @@ int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
MY_MIN(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write(&b))
- DBUG_RETURN(1);
+ {
+ res= 1;
+ goto err;
+ }
lf_info->wrote_create_file= 1;
}
}
ret:
- int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
+ res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
+err:
+#ifndef DBUG_OFF
+ if (did_dbug_inject)
+ DBUG_SET("-d,simulate_file_write_error");
+#endif
DBUG_RETURN(res);
}
--
2.30.2
1
0
22 Oct '23
From: Andrei <andrei.elkin(a)mariadb.com>
XA-Prepare group of events
XA START xid
...
XA END xid
XA PREPARE xid
and its XA-"complete" terminator
XA COMMIT or
XA ROLLBACK
are made distributed Round-Robin across slave parallel workers.
The former hash-based policy was proven to attribute to execution
latency through creating a big - many times larger than the size
of the worker pool - queue of binlog-ordered transactions
to commit.
Acronyms and notations used below:
XAP := XA-Prepare event or the whole prepared XA group of events
XAC := XA-"complete", which is a solitary group of events
|W| := the size of the slave worker pool
Subscripts like `_k' denote order in a corresponding sequence
(e.g binlog file).
KEY CHANGES:
The parallel slave
------------------
driver thread now maintains a list XAP:s currently
in processing. It's purpose is to avoid "wild" parallel execution of XA:s
with duplicate xids (unlikely, but that's the user's right).
The list is arranged as a sliding window with the size of 2*|W| to account
a possibility of XAP_k -> XAP_k+2|W|-1 the largest (in the group-of-events
count sense) dependency.
Say k=1, and |W| the # of Workers is 4. As transactions are distributed
Round-Robin, it's possible to have T^*_1 -> T^*_8 as the largest
dependency ('*' marks the dependents) in runtime.
It can be seen from worker queues, like in the picture below.
Let Q_i worker queues develop downward:
Q1 ... Q4
1^* 2 3 4
5 6 7 8^*
Worker # 1 has assigned with T_1 and T_5.
Worker #4 can take on its T_8 when T_1 is yet at the
beginning of its processing, so even before XA START of that XAP.
XA related
----------
XID_cache_element is extended with two pointers to resolve
two types of dependencies: the duplicate xid XAP_k -> XAP_k+i
and the ordinary completion on the prepare XAP_k -> XAC_k+j.
The former is handled by a wait-for-xid protocol conducted by
xid_cache_delete() and xid_cache_insert_maybe_wait().
The later is done analogously by xid_cache_search_maybe_wait() and
slave_applier_reset_xa_trans().
XA-"complete" are allowed to go forward before its XAP parent
has released the xid (all recovery concerns are covered in MDEV-21496,
MDEV-21777).
Yet XAC is going to wait for it at a critical
point of execution which is at "complete" the work in Engine.
CAVEAT: storage/innobase/trx/trx0undo.cc changes are due to possibly
fixed MDEV-32144,
TODO: to be verified.
Thanks to Brandon Nesterenko at mariadb.com for initial review and
a lot of creative efforts to advance with this work!
---
mysql-test/include/show_binlog_events2.inc | 11 +
.../binlog/r/binlog_xa_prepared_bugs.result | 53 +
.../binlog/t/binlog_xa_prepared_bugs.test | 30 +
.../rpl/include/rpl_xa_concurrent_2pc.inc | 442 ++++++++
.../suite/rpl/r/rpl_xa_concurrent_2pc.result | 953 ++++++++++++++++++
.../rpl/r/rpl_xa_empty_transaction.result | 51 +
.../rpl/r/rpl_xa_prepare_gtid_fail.result | 2 +-
.../suite/rpl/t/rpl_xa_concurrent_2pc.test | 111 ++
.../suite/rpl/t/rpl_xa_empty_transaction.test | 89 ++
.../suite/rpl/t/rpl_xa_prepare_gtid_fail.test | 2 +-
sql/handler.cc | 23 +-
sql/log.cc | 219 +++-
sql/log_event_server.cc | 17 +-
sql/mysqld.cc | 4 +-
sql/mysqld.h | 4 +-
sql/rpl_parallel.cc | 128 ++-
sql/rpl_parallel.h | 16 +
sql/rpl_rli.cc | 6 +-
sql/rpl_rli.h | 8 +
sql/sql_array.h | 4 +-
sql/sql_class.cc | 2 +
sql/sql_class.h | 9 +
sql/xa.cc | 333 +++++-
storage/innobase/trx/trx0undo.cc | 3 +-
24 files changed, 2419 insertions(+), 101 deletions(-)
create mode 100644 mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result
create mode 100644 mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test
create mode 100644 mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc
create mode 100644 mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result
create mode 100644 mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test
diff --git a/mysql-test/include/show_binlog_events2.inc b/mysql-test/include/show_binlog_events2.inc
index 84c62cced66..416514faea4 100644
--- a/mysql-test/include/show_binlog_events2.inc
+++ b/mysql-test/include/show_binlog_events2.inc
@@ -1,3 +1,9 @@
+# ==== Usage ====
+#
+# [--let $binlog_file= [<FILENAME> | LAST]]
+# [--let $binlog_start= <POSITION> ]
+# [--let $filter_cid= [0 | 1]
+
if ($binlog_start)
{
--let $_binlog_start=$binlog_start
@@ -14,4 +20,9 @@ if ($binlog_file)
--replace_result "$_from_binlog_start" "from <binlog_start>" $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
--replace_column 2 # 5 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/
+if ($filter_cid)
+{
+--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/ / cid=[0-9]+//
+
+}
--eval show binlog events $_in_binlog_file from $_binlog_start
diff --git a/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result b/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result
new file mode 100644
index 00000000000..35b7accfb24
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result
@@ -0,0 +1,53 @@
+CREATE TABLE ta (c INT KEY) engine=Aria;
+XA START 'xid_a';
+INSERT INTO ta VALUES (1);
+XA END 'xid_a';
+XA PREPARE 'xid_a';
+Warnings:
+Warning 1030 Got error 131 "Command not supported by the engine" from storage engine Aria
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+Table Op Msg_type Msg_text
+test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
+test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
+test.c preload_keys error Corrupt
+Warnings:
+Warning 1196 Some non-transactional changed tables couldn't be rolled back
+XA ROLLBACK 'xid_a';
+CREATE TABLE ti (c INT KEY) engine=Innodb;
+XA START 'xid_i';
+INSERT INTO ti VALUES (1);
+XA END 'xid_i';
+XA PREPARE 'xid_i';
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+Table Op Msg_type Msg_text
+test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
+test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
+test.c preload_keys error Corrupt
+XA COMMIT 'xid_i';
+SELECT * FROM ti;
+c
+include/show_binlog_events.inc
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; CREATE TABLE ta (c INT KEY) engine=Aria
+master-bin.000001 # Gtid # # BEGIN GTID #-#-#
+master-bin.000001 # Annotate_rows # # INSERT INTO ta VALUES (1)
+master-bin.000001 # Table_map # # table_id: # (test.ta)
+master-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
+master-bin.000001 # Query # # COMMIT
+master-bin.000001 # Gtid # # XA START X'7869645f61',X'',1 GTID #-#-#
+master-bin.000001 # Query # # XA END X'7869645f61',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7869645f61',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'7869645f61',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; CREATE TABLE ti (c INT KEY) engine=Innodb
+master-bin.000001 # Gtid # # XA START X'7869645f69',X'',1 GTID #-#-#
+master-bin.000001 # Annotate_rows # # INSERT INTO ti VALUES (1)
+master-bin.000001 # Table_map # # table_id: # (test.ti)
+master-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
+master-bin.000001 # Query # # XA END X'7869645f69',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7869645f69',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'7869645f69',X'',1
+drop table ta,ti;
diff --git a/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test b/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test
new file mode 100644
index 00000000000..3a5bb15968e
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test
@@ -0,0 +1,30 @@
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+
+CREATE TABLE ta (c INT KEY) engine=Aria;
+XA START 'xid_a';
+INSERT INTO ta VALUES (1);
+XA END 'xid_a';
+XA PREPARE 'xid_a';
+
+#--error ER_XAER_RMFAIL
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+
+XA ROLLBACK 'xid_a';
+
+CREATE TABLE ti (c INT KEY) engine=Innodb;
+XA START 'xid_i';
+INSERT INTO ti VALUES (1);
+XA END 'xid_i';
+XA PREPARE 'xid_i';
+
+# --error ER_XAER_RMFAIL
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+
+XA COMMIT 'xid_i';
+SELECT * FROM ti;
+
+#
+--source include/show_binlog_events.inc
+
+drop table ta,ti;
diff --git a/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc b/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc
new file mode 100644
index 00000000000..d1f1868d2c9
--- /dev/null
+++ b/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc
@@ -0,0 +1,442 @@
+#
+# Helper file to run the 1-4(a,b) test cases for rpl_xa_concurrent_2pc,
+# with either XA COMMIT or XA ROLLBACK used to complete XA transactions.
+#
+# Parameters
+# $xa_complete_sym (string) : COMMIT or ROLLBACK, the action used to complete
+# a prepared XA transaction
+#
+
+if (!$xa_complete_sym)
+{
+ die MTR variable xa_complete_sym not specified, must be either COMMIT or ROLLBACK;
+}
+
+--let $is_xac= 0
+--let $is_xar= 0
+
+if (`SELECT strcmp("COMMIT", "$xa_complete_sym") = 0`)
+{
+ --let $is_xac= 1
+}
+
+if (`SELECT strcmp("ROLLBACK", "$xa_complete_sym") = 0`)
+{
+ --let $is_xar= 1
+}
+
+if (`SELECT !$is_xar && !$is_xac`)
+{
+ die MTR variable xa_complete_sym invalid, must be either COMMIT or ROLLBACK;
+}
+
+
+--echo #
+--echo # Initialize test data
+--connection slave
+--source include/stop_slave.inc
+RESET SLAVE;
+set @@global.gtid_slave_pos= "";
+
+if ($is_xac)
+{
+--connection slave
+RESET MASTER;
+
+--connection master
+RESET MASTER;
+}
+
+--connection master
+create table t1 (a int primary key, b int) engine=innodb;
+
+# Slave locks this row before updates to pause transaction progress
+--let $hold_row= -1
+--let $t1_ctr= 0
+--eval insert into t1 values ($hold_row, 0)
+
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+--source include/stop_slave.inc
+set @save_debug= @@GLOBAL.debug_dbug;
+set @save_par_thds= @@GLOBAL.slave_parallel_threads;
+set @save_par_mode= @@GLOBAL.slave_parallel_mode;
+set @@GLOBAL.slave_parallel_threads= 4;
+set @@GLOBAL.slave_parallel_mode= optimistic;
+
+set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+
+
+--echo #
+--echo # Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE
+--echo # and XA $xa_complete_sym run concurrently. That is, the
+--echo # XA $xa_complete_sym will wait at group commit until the XA PREPARE
+--echo # binlogs, and then it will wait again until the XA PREPARE finishes
+--echo # preparing in all engines. At this point, the XA $xa_complete_sym will
+--echo # run to completion.
+--connection master
+# For worker thread to hold XAP at dequeue time via debug_sync through
+# `hold_worker_on_schedule`.
+set @@session.gtid_seq_no= 100;
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+--source include/save_master_gtid.inc
+
+--connection slave
+# For worker to stop at dequeue event time and after binlogging XA PREPARE
+set @@global.debug_dbug= "+d,hold_worker_on_schedule,stop_after_binlog_prepare";
+--source include/start_slave.inc
+
+--echo # Waiting for XAP to pause when it is pulled from the queue
+set debug_sync= "now wait_for reached_pause";
+
+--echo # Before the XA PREPARE executes, the XA $xa_complete_sym should wait in group commit..
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior transaction to commit"
+--source include/wait_condition.inc
+--echo # ..done
+
+
+--echo # Execute the XA PREPARE
+set debug_sync= "now signal continue_worker";
+
+--echo # Wait for XA PREPARE to have binlogged, but hold it before it prepares in engines
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # The XA $xa_complete_sym should move on from binlog to wait for the XA PREPARE to complete in engines
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction"
+--source include/wait_condition.inc
+--echo # ..done
+
+--echo # Signal the XAP to complete in engines (which will automatically signal XAC)
+set debug_sync= "now signal continue_xap";
+
+--source include/sync_with_master_gtid.inc
+
+--let $diff_tables=master:test.t1, slave:test.t1
+--source include/diff_tables.inc
+
+--connection slave
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo #
+--echo # Test Case 2: If two XA $xa_complete_sym transactions have different
+--echo # XIDs, ensure both phases of both transactions all execute concurrently.
+--echo #
+
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+
+# Stop both XAP after their binlogging and before their engine changing
+set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare";
+
+--connection master
+XA START 'x1';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x1';
+XA PREPARE 'x1';
+--eval XA $xa_complete_sym 'x1'
+
+XA START 'x2';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x2';
+XA PREPARE 'x2';
+--eval XA $xa_complete_sym 'x2'
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/start_slave.inc
+
+# This stage is necessary to avoid XAP_1 <-register-> XAC_2 race in that
+# XAC_2 may get stuck in the below WFPT2C state all time until XAP_1 has finished.
+# Prove the workers' status are like the following:
+--let $count_wait= 2
+--let $wait_condition=SELECT count(*) = $count_wait FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior transaction to commit"
+--source include/wait_condition.inc
+
+--let $count_wait= 2
+--let $wait_condition=SELECT count(*) = $count_wait FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+
+# wait for two XAP:s arrive at their stations (XAP_1 to next one after binlog is done)
+--let $wait_condition=SELECT count(*) >= 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+
+--echo # Ensuring both phases of both transactions all execute concurrently
+# Waiting for both XA "COMPLETE"s to binlog proves this, as they would not pass
+# group commit if their preceding XA PREPAREs had not also binlogged
+--let $count_xa_wait_workers= 2
+
+--let $wait_condition=SELECT count(*) = $count_xa_wait_workers FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction"
+--source include/wait_condition.inc
+--echo # ..done
+
+--echo # Verify XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # Signal the XAPs to complete in engines (which will automatically signal XACs)
+set debug_sync= "now signal continue_xap";
+
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal continue_xap";
+
+--source include/sync_with_master_gtid.inc
+
+--let $diff_tables=master:test.t1, slave:test.t1
+--source include/diff_tables.inc
+
+--connection slave
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo #
+--echo # Test Case 3: Two current 2-phase XA transactions with matching XIDs
+--echo # should run one after the other, while each transaction still allows
+--echo # its XA PREPARE and XA $xa_complete_sym to run concurrently
+
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+
+# Stop both XAP after their binlogging and before their engine changing
+set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid";
+
+--connection master
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/start_slave.inc
+
+# This stage is necessary to avoid XAP_1 <-register-> XAC_2 race in that
+# XAC_2 may get stuck in the below WFPT2C state all time until XAP_1 has finished.
+--let $count_wait= 3
+--let $wait_condition=SELECT count(*) = $count_wait FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior transaction to commit"
+
+--source include/wait_condition.inc
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+
+--echo # Verify first XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # Ensure first XA transaction is running concurrently
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" AND info LIKE "XA $xa_complete_sym%"
+--source include/wait_condition.inc
+
+--echo # Ensure second XA transaction's XAP waits for the first transaction
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction" AND info LIKE "XA START%"
+--source include/wait_condition.inc
+
+--echo # Signal first XA PREPARE to complete
+set debug_sync= "now signal continue_xap";
+
+--echo # Wait for first XA $xa_complete_sym to binlog
+set debug_sync= "now wait_for xa_cor_binlogged";
+
+--echo # Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released its XID
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction"
+--source include/wait_condition.inc
+
+--echo # Signal first XA $xa_complete_sym to complete
+set debug_sync= "now signal continue_xa_cor";
+
+--echo # Wait for second XA PREPARE to binlogged
+--echo # First pass through binlog_xap
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # Ensure second XA $xa_complete_sym is concurrent with XAP
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction"
+--source include/wait_condition.inc
+
+--echo # Signal second XA transaction to complete
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now wait_for xa_cor_binlogged";
+set debug_sync= "now signal continue_xa_cor";
+
+--source include/sync_with_master_gtid.inc
+
+--let $diff_tables=master:test.t1, slave:test.t1
+--source include/diff_tables.inc
+
+--connection slave
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo #
+--echo # Test Case 4 (Error Case): If an XA PREPARE errors while its
+--echo # XA $xa_complete_sym is waiting on it, both phases should rollback
+--echo # successfully. Note this tests both:
+--echo # a) XA $xa_complete_sym is waiting in group commit (first phase
+--echo # times out in DMLs)
+--echo # b) XA $xa_complete_sym is waiting in group commit, with another XAP
+--echo # with a duplicate XID waiting on it.
+
+--echo # Case a)
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+
+--connection master
+XA START 'x';
+--eval update t1 set b=b+1 where a=$hold_row
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+--source include/save_master_gtid.inc
+
+--connection slave1
+BEGIN;
+--eval select * from t1 where a=$hold_row for update;
+
+--connection slave
+--source include/start_slave.inc
+
+--let $slave_sql_errno= 1205
+--source include/wait_for_slave_sql_error.inc
+
+--connection slave1
+ROLLBACK;
+
+--connection slave
+# Stop the IO thread too
+--source include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+
+--echo # Ensure on slave restart, we can re-execute the XA transaction
+--source include/start_slave.inc
+--source include/save_master_gtid.inc
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo # Case b)
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+
+--connection master
+XA START 'x';
+--eval update t1 set b=b+1 where a=$hold_row
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--let $new_row_idx= $t1_ctr
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--source include/save_master_gtid.inc
+--eval XA $xa_complete_sym 'x'
+
+--connection slave1
+BEGIN;
+--eval select * from t1 where a=$hold_row for update;
+
+--connection slave
+--source include/start_slave.inc
+
+--let $slave_sql_errno= 1205
+--source include/wait_for_slave_sql_error.inc
+
+--connection slave1
+ROLLBACK;
+
+--echo # There should not be any prepared rows seen by XA RECOVER
+XA RECOVER;
+
+--echo # Ensuring data from second XAP isn't visible..
+if (`select count(*) from t1 where a=$new_row_idx`)
+{
+ --die Failed, row exists
+}
+--echo # ..done
+
+--connection slave
+--source include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+
+--echo # Ensure on slave restart, we can re-execute the XA transaction
+--source include/start_slave.inc
+--source include/save_master_gtid.inc
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+--source include/start_slave.inc
+
+--echo # Ensuring data from second XAP is visible..
+if ($is_xac)
+{
+ --let $expected_row_count= 1
+}
+if ($is_xar)
+{
+ --let $expected_row_count= 0
+}
+if (`select count(*) != $expected_row_count from t1 where a=$new_row_idx`)
+{
+ --die Failed, XA $xa_complete_sym was not observed
+}
+--echo # ..done
+
+--echo #
+--echo # Cleanup
+--connection master
+DROP TABLE t1;
+--source include/save_master_gtid.inc
+--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1)
+--source include/show_binlog_events.inc
+
+--connection slave
+--source include/sync_with_master_gtid.inc
+--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1)
+--let $filter_cid=1
+--source include/show_binlog_events2.inc
+
+--source include/stop_slave.inc
+set @@GLOBAL.slave_parallel_threads= @save_par_thds;
+set @@GLOBAL.slave_parallel_mode= @save_par_mode;
+--source include/start_slave.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result b/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result
new file mode 100644
index 00000000000..d30943b23da
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result
@@ -0,0 +1,953 @@
+include/master-slave.inc
+[connection master]
+#
+# Initialize test data
+connection slave;
+include/stop_slave.inc
+RESET SLAVE;
+set @@global.gtid_slave_pos= "";
+connection slave;
+RESET MASTER;
+connection master;
+RESET MASTER;
+connection master;
+create table t1 (a int primary key, b int) engine=innodb;
+insert into t1 values (-1, 0);
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+set @save_debug= @@GLOBAL.debug_dbug;
+set @save_par_thds= @@GLOBAL.slave_parallel_threads;
+set @save_par_mode= @@GLOBAL.slave_parallel_mode;
+set @@GLOBAL.slave_parallel_threads= 4;
+set @@GLOBAL.slave_parallel_mode= optimistic;
+set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+#
+# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE
+# and XA COMMIT run concurrently. That is, the
+# XA COMMIT will wait at group commit until the XA PREPARE
+# binlogs, and then it will wait again until the XA PREPARE finishes
+# preparing in all engines. At this point, the XA COMMIT will
+# run to completion.
+connection master;
+set @@session.gtid_seq_no= 100;
+XA START 'x';
+insert into t1 values (0, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+include/save_master_gtid.inc
+connection slave;
+set @@global.debug_dbug= "+d,hold_worker_on_schedule,stop_after_binlog_prepare";
+include/start_slave.inc
+# Waiting for XAP to pause when it is pulled from the queue
+set debug_sync= "now wait_for reached_pause";
+# Before the XA PREPARE executes, the XA COMMIT should wait in group commit..
+# ..done
+# Execute the XA PREPARE
+set debug_sync= "now signal continue_worker";
+# Wait for XA PREPARE to have binlogged, but hold it before it prepares in engines
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# The XA COMMIT should move on from binlog to wait for the XA PREPARE to complete in engines
+# ..done
+# Signal the XAP to complete in engines (which will automatically signal XAC)
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 2: If two XA COMMIT transactions have different
+# XIDs, ensure both phases of both transactions all execute concurrently.
+#
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare";
+connection master;
+XA START 'x1';
+insert into t1 values (1, 0);
+XA END 'x1';
+XA PREPARE 'x1';
+XA COMMIT 'x1';
+XA START 'x2';
+insert into t1 values (2, 0);
+XA END 'x2';
+XA PREPARE 'x2';
+XA COMMIT 'x2';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now signal binlog_xap";
+# Ensuring both phases of both transactions all execute concurrently
+# ..done
+# Verify XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Signal the XAPs to complete in engines (which will automatically signal XACs)
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 3: Two current 2-phase XA transactions with matching XIDs
+# should run one after the other, while each transaction still allows
+# its XA PREPARE and XA COMMIT to run concurrently
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid";
+connection master;
+XA START 'x';
+insert into t1 values (3, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+XA START 'x';
+insert into t1 values (4, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+# Verify first XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure first XA transaction is running concurrently
+# Ensure second XA transaction's XAP waits for the first transaction
+# Signal first XA PREPARE to complete
+set debug_sync= "now signal continue_xap";
+# Wait for first XA COMMIT to binlog
+set debug_sync= "now wait_for xa_cor_binlogged";
+# Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released its XID
+# Signal first XA COMMIT to complete
+set debug_sync= "now signal continue_xa_cor";
+# Wait for second XA PREPARE to binlogged
+# First pass through binlog_xap
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure second XA COMMIT is concurrent with XAP
+# Signal second XA transaction to complete
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now wait_for xa_cor_binlogged";
+set debug_sync= "now signal continue_xa_cor";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 4 (Error Case): If an XA PREPARE errors while its
+# XA COMMIT is waiting on it, both phases should rollback
+# successfully. Note this tests both:
+# a) XA COMMIT is waiting in group commit (first phase
+# times out in DMLs)
+# b) XA COMMIT is waiting in group commit, with another XAP
+# with a duplicate XID waiting on it.
+# Case a)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+include/save_master_gtid.inc
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a b
+-1 0
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+# Case b)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+XA START 'x';
+insert into t1 values (5, 0);
+XA END 'x';
+XA PREPARE 'x';
+include/save_master_gtid.inc
+XA COMMIT 'x';
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a b
+-1 1
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+# There should not be any prepared rows seen by XA RECOVER
+XA RECOVER;
+formatID gtrid_length bqual_length data
+# Ensuring data from second XAP isn't visible..
+# ..done
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+include/start_slave.inc
+# Ensuring data from second XAP is visible..
+# ..done
+#
+# Cleanup
+connection master;
+DROP TABLE t1;
+include/save_master_gtid.inc
+include/show_binlog_events.inc
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; create table t1 (a int primary key, b int) engine=innodb
+master-bin.000001 # Gtid # # BEGIN GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (-1, 0)
+master-bin.000001 # Xid # # COMMIT /* XID */
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (0, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'7831',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (1, 0)
+master-bin.000001 # Query # # XA END X'7831',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7831',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'7831',X'',1
+master-bin.000001 # Gtid # # XA START X'7832',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (2, 0)
+master-bin.000001 # Query # # XA END X'7832',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7832',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'7832',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (3, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (4, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (5, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; DROP TABLE `t1` /* generated by server */
+connection slave;
+include/sync_with_master_gtid.inc
+show binlog events in 'slave-bin.000001' from <binlog_start>;
+Log_name Pos Event_type Server_id End_log_pos Info
+slave-bin.000001 # Gtid_list 2 # []
+slave-bin.000001 # Binlog_checkpoint 2 # slave-bin.000001
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0)
+slave-bin.000001 # Xid 1 # COMMIT /* XID */
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0)
+slave-bin.000001 # Query 1 # XA END X'7831',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'7831',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0)
+slave-bin.000001 # Query 1 # XA END X'7832',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'7832',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */
+include/stop_slave.inc
+set @@GLOBAL.slave_parallel_threads= @save_par_thds;
+set @@GLOBAL.slave_parallel_mode= @save_par_mode;
+include/start_slave.inc
+#
+# Initialize test data
+connection slave;
+include/stop_slave.inc
+RESET SLAVE;
+set @@global.gtid_slave_pos= "";
+connection master;
+create table t1 (a int primary key, b int) engine=innodb;
+insert into t1 values (-1, 0);
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+set @save_debug= @@GLOBAL.debug_dbug;
+set @save_par_thds= @@GLOBAL.slave_parallel_threads;
+set @save_par_mode= @@GLOBAL.slave_parallel_mode;
+set @@GLOBAL.slave_parallel_threads= 4;
+set @@GLOBAL.slave_parallel_mode= optimistic;
+set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+#
+# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE
+# and XA ROLLBACK run concurrently. That is, the
+# XA ROLLBACK will wait at group commit until the XA PREPARE
+# binlogs, and then it will wait again until the XA PREPARE finishes
+# preparing in all engines. At this point, the XA ROLLBACK will
+# run to completion.
+connection master;
+set @@session.gtid_seq_no= 100;
+XA START 'x';
+insert into t1 values (0, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+include/save_master_gtid.inc
+connection slave;
+set @@global.debug_dbug= "+d,hold_worker_on_schedule,stop_after_binlog_prepare";
+include/start_slave.inc
+# Waiting for XAP to pause when it is pulled from the queue
+set debug_sync= "now wait_for reached_pause";
+# Before the XA PREPARE executes, the XA ROLLBACK should wait in group commit..
+# ..done
+# Execute the XA PREPARE
+set debug_sync= "now signal continue_worker";
+# Wait for XA PREPARE to have binlogged, but hold it before it prepares in engines
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# The XA ROLLBACK should move on from binlog to wait for the XA PREPARE to complete in engines
+# ..done
+# Signal the XAP to complete in engines (which will automatically signal XAC)
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 2: If two XA ROLLBACK transactions have different
+# XIDs, ensure both phases of both transactions all execute concurrently.
+#
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare";
+connection master;
+XA START 'x1';
+insert into t1 values (1, 0);
+XA END 'x1';
+XA PREPARE 'x1';
+XA ROLLBACK 'x1';
+XA START 'x2';
+insert into t1 values (2, 0);
+XA END 'x2';
+XA PREPARE 'x2';
+XA ROLLBACK 'x2';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now signal binlog_xap";
+# Ensuring both phases of both transactions all execute concurrently
+# ..done
+# Verify XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Signal the XAPs to complete in engines (which will automatically signal XACs)
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 3: Two current 2-phase XA transactions with matching XIDs
+# should run one after the other, while each transaction still allows
+# its XA PREPARE and XA ROLLBACK to run concurrently
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= "+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid";
+connection master;
+XA START 'x';
+insert into t1 values (3, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+XA START 'x';
+insert into t1 values (4, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+# Verify first XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure first XA transaction is running concurrently
+# Ensure second XA transaction's XAP waits for the first transaction
+# Signal first XA PREPARE to complete
+set debug_sync= "now signal continue_xap";
+# Wait for first XA ROLLBACK to binlog
+set debug_sync= "now wait_for xa_cor_binlogged";
+# Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released its XID
+# Signal first XA ROLLBACK to complete
+set debug_sync= "now signal continue_xa_cor";
+# Wait for second XA PREPARE to binlogged
+# First pass through binlog_xap
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure second XA ROLLBACK is concurrent with XAP
+# Signal second XA transaction to complete
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now wait_for xa_cor_binlogged";
+set debug_sync= "now signal continue_xa_cor";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 4 (Error Case): If an XA PREPARE errors while its
+# XA ROLLBACK is waiting on it, both phases should rollback
+# successfully. Note this tests both:
+# a) XA ROLLBACK is waiting in group commit (first phase
+# times out in DMLs)
+# b) XA ROLLBACK is waiting in group commit, with another XAP
+# with a duplicate XID waiting on it.
+# Case a)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+include/save_master_gtid.inc
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a b
+-1 0
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+# Case b)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+XA START 'x';
+insert into t1 values (5, 0);
+XA END 'x';
+XA PREPARE 'x';
+include/save_master_gtid.inc
+XA ROLLBACK 'x';
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a b
+-1 0
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+# There should not be any prepared rows seen by XA RECOVER
+XA RECOVER;
+formatID gtrid_length bqual_length data
+# Ensuring data from second XAP isn't visible..
+# ..done
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+include/start_slave.inc
+# Ensuring data from second XAP is visible..
+# ..done
+#
+# Cleanup
+connection master;
+DROP TABLE t1;
+include/save_master_gtid.inc
+include/show_binlog_events.inc
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; create table t1 (a int primary key, b int) engine=innodb
+master-bin.000001 # Gtid # # BEGIN GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (-1, 0)
+master-bin.000001 # Xid # # COMMIT /* XID */
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (0, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'7831',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (1, 0)
+master-bin.000001 # Query # # XA END X'7831',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7831',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'7831',X'',1
+master-bin.000001 # Gtid # # XA START X'7832',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (2, 0)
+master-bin.000001 # Query # # XA END X'7832',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7832',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'7832',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (3, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (4, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (5, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA COMMIT X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; DROP TABLE `t1` /* generated by server */
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; create table t1 (a int primary key, b int) engine=innodb
+master-bin.000001 # Gtid # # BEGIN GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (-1, 0)
+master-bin.000001 # Xid # # COMMIT /* XID */
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (0, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'7831',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (1, 0)
+master-bin.000001 # Query # # XA END X'7831',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7831',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'7831',X'',1
+master-bin.000001 # Gtid # # XA START X'7832',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (2, 0)
+master-bin.000001 # Query # # XA END X'7832',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'7832',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'7832',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (3, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (4, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; update t1 set b=b+1 where a=-1
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1
+master-bin.000001 # Gtid # # XA START X'78',X'',1 GTID #-#-#
+master-bin.000001 # Query # # use `test`; insert into t1 values (5, 0)
+master-bin.000001 # Query # # XA END X'78',X'',1
+master-bin.000001 # XA_prepare # # XA PREPARE X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # XA ROLLBACK X'78',X'',1
+master-bin.000001 # Gtid # # GTID #-#-#
+master-bin.000001 # Query # # use `test`; DROP TABLE `t1` /* generated by server */
+connection slave;
+include/sync_with_master_gtid.inc
+show binlog events in 'slave-bin.000001' from <binlog_start>;
+Log_name Pos Event_type Server_id End_log_pos Info
+slave-bin.000001 # Gtid_list 2 # []
+slave-bin.000001 # Binlog_checkpoint 2 # slave-bin.000001
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0)
+slave-bin.000001 # Xid 1 # COMMIT /* XID */
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0)
+slave-bin.000001 # Query 1 # XA END X'7831',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'7831',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0)
+slave-bin.000001 # Query 1 # XA END X'7832',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'7832',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0)
+slave-bin.000001 # Xid 1 # COMMIT /* XID */
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0)
+slave-bin.000001 # Query 1 # XA END X'7831',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'7831',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0)
+slave-bin.000001 # Query 1 # XA END X'7832',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'7832',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA COMMIT X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; create table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001 # Gtid 1 # BEGIN GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (-1, 0)
+slave-bin.000001 # Xid 1 # COMMIT /* XID */
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (0, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7831',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (1, 0)
+slave-bin.000001 # Query 1 # XA END X'7831',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7831',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'7831',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'7832',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (2, 0)
+slave-bin.000001 # Query 1 # XA END X'7832',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'7832',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'7832',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (3, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (4, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; update t1 set b=b+1 where a=-1
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1
+slave-bin.000001 # Gtid 1 # XA START X'78',X'',1 GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; insert into t1 values (5, 0)
+slave-bin.000001 # Query 1 # XA END X'78',X'',1
+slave-bin.000001 # XA_prepare 1 # XA PREPARE X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # XA ROLLBACK X'78',X'',1
+slave-bin.000001 # Gtid 1 # GTID #-#-#
+slave-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */
+include/stop_slave.inc
+set @@GLOBAL.slave_parallel_threads= @save_par_thds;
+set @@GLOBAL.slave_parallel_mode= @save_par_mode;
+include/start_slave.inc
+#
+# Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
+# setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
+# replica should report ER_XAER_NOTA.
+connection master;
+create table t1 (a int) engine=innodb;
+include/save_master_gtid.inc
+connection slave;
+include/sync_with_master_gtid.inc
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+include/stop_slave.inc
+change master to master_use_gtid = slave_pos;
+connection master;
+xa start '1';
+insert into t1 set a=1;
+xa end '1';
+xa prepare '1';
+xa rollback '1';
+insert into t1 set a=2;
+include/save_master_gtid.inc
+connection slave;
+set @save_gtid_slave_pos= @@global.gtid_slave_pos;
+SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1)
+into @gtid_skip
+FROM mysql.gtid_slave_pos
+WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1;
+set @@global.gtid_slave_pos = @gtid_skip;
+start slave;
+include/wait_for_slave_sql_error.inc [errno=1397]
+select count(*) = 2 % 2 as 'must be true' from t1;;
+must be true
+1
+include/stop_slave.inc
+set @@global.gtid_slave_pos = @save_gtid_slave_pos;
+show warnings;
+Level Code Message
+Warning 1947 Specified GTID <value> conflicts with the binary log which contains a more recent GTID <value>. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+change master to master_use_gtid = slave_pos;
+connection master;
+xa start '1';
+insert into t1 set a=1;
+xa end '1';
+xa prepare '1';
+xa commit '1';
+insert into t1 set a=2;
+include/save_master_gtid.inc
+connection slave;
+set @save_gtid_slave_pos= @@global.gtid_slave_pos;
+SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1)
+into @gtid_skip
+FROM mysql.gtid_slave_pos
+WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1;
+set @@global.gtid_slave_pos = @gtid_skip;
+start slave;
+include/wait_for_slave_sql_error.inc [errno=1397]
+select count(*) = 1 % 2 as 'must be true' from t1;;
+must be true
+1
+include/stop_slave.inc
+set @@global.gtid_slave_pos = @save_gtid_slave_pos;
+show warnings;
+Level Code Message
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+connection master;
+drop table t1;
+connection slave;
+include/rpl_end.inc
+# End of rpl_xa_concurrent_2pc.test
diff --git a/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result b/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result
index f3ea53c219a..92a820d6753 100644
--- a/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result
+++ b/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result
@@ -1165,5 +1165,56 @@ connection server_1;
set @@binlog_format = @sav_binlog_format;
set @@global.binlog_format = @sav_binlog_format;
connection server_1;
+create table t_not_in_binlog (a int) engine=innodb;
+flush logs;
+include/save_master_gtid.inc
+connect con1,localhost,root,,;
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+disconnect con1;
+connection server_1;
+xa recover;
+formatID gtrid_length bqual_length data
+1 1 0 a
+XA ROLLBACK 'a';
+drop table t_not_in_binlog;
+include/save_master_gtid.inc
+connection server_2;
+XAER_NOTA: Unknown XID
+include/wait_for_slave_sql_error.inc [errno=1397]
+connect con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+disconnect con2;
+connection server_2;
+xa recover;
+formatID gtrid_length bqual_length data
+1 1 0 a
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+connection server_3;
+XAER_NOTA: Unknown XID
+include/wait_for_slave_sql_error.inc [errno=1397]
+connect con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,;
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+disconnect con3;
+connection server_3;
+xa recover;
+formatID gtrid_length bqual_length data
+1 1 0 a
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+connection server_1;
include/rpl_end.inc
# End of rpl_xa_empty_transaction.test
diff --git a/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result b/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result
index dd0d132471e..a3f5414b9da 100644
--- a/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result
+++ b/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result
@@ -42,7 +42,7 @@ connection master;
drop table t1;
connection slave;
# TODO: Remove after fixing MDEV-21777
-set @@global.gtid_slave_pos= "0-1-100";
+set @@global.gtid_slave_pos= "0-1-101";
set @@global.slave_parallel_threads= @save_par_thds;
set @@global.gtid_strict_mode= @save_strict_mode;
set @@global.innodb_lock_wait_timeout= @save_innodb_lock_wait_timeout;
diff --git a/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test b/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test
new file mode 100644
index 00000000000..d762114ff26
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test
@@ -0,0 +1,111 @@
+#
+# This test ensures that two-phase XA transactions have their first and
+# second phases parallelized for both XA COMMIT and XA ROLLBACK. It ensures the
+# following behaviors:
+#
+# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE and
+# XA COMMIT/ROLLBACK run concurrently. That is, the XA COMMIT/ROLLBACK will
+# wait at group commit until the XA PREPARE binlogs, and then it will wait
+# again until the XA PREPARE finishes preparing in all engines. At this point,
+# the XA COMMIT/ROLLBACK will run to completion.
+#
+# Test Case 2: If two XA transactions have different XIDs, if XA COMMIT ends
+# a transaction, ensure both phases of both transactions can all execute
+# concurrently.
+#
+# Test Case 3: Two current 2-phase XA transactions with matching XIDs should
+# run one after the other, while each transaction still allows both phases of
+# its own transaction to run concurrently.
+#
+# Test Case 4: Error Case. If an XAP errors while its XAC/R is waiting on it,
+# both the XAP and XAC/R should rollback successfully. Note this tests both:
+# a) XAC/R is waiting in group commit (first phase times out in DMLs)
+# b) XAC/R is waiting in group commit, with another XAP with a duplicate XID
+# waiting on it.
+#
+# Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
+# setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
+# replica should report ER_XAER_NOTA.
+#
+#
+# References:
+# MDEV-31949: slow parallel replication of user xa
+#
+--source include/have_debug.inc
+--source include/have_innodb.inc
+--source include/have_binlog_format_mixed.inc
+--source include/master-slave.inc
+
+--let $xa_complete_sym= COMMIT
+--source include/rpl_xa_concurrent_2pc.inc
+
+--let $xa_complete_sym= ROLLBACK
+--source include/rpl_xa_concurrent_2pc.inc
+
+
+--echo #
+--echo # Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
+--echo # setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
+--echo # replica should report ER_XAER_NOTA.
+
+--connection master
+create table t1 (a int) engine=innodb;
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/sync_with_master_gtid.inc
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+
+--let $i=2
+while ($i)
+{
+ --source include/stop_slave.inc
+ --replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/
+ change master to master_use_gtid = slave_pos;
+
+ --connection master
+ --let $complete=rollback
+ if ($i == 1)
+ {
+ --let $complete=commit
+ }
+ xa start '1'; insert into t1 set a=1; xa end '1'; xa prepare '1';
+ --eval xa $complete '1'
+ insert into t1 set a=2;
+ --source include/save_master_gtid.inc
+
+ --connection slave
+
+ # reposition the slave to skip one transaction from master
+ set @save_gtid_slave_pos= @@global.gtid_slave_pos;
+ SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1)
+ into @gtid_skip
+ FROM mysql.gtid_slave_pos
+ WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1;
+ set @@global.gtid_slave_pos = @gtid_skip;
+
+ start slave;
+ let $slave_sql_errno= 1397; # ER_XAER_NOTA
+ source include/wait_for_slave_sql_error.inc;
+ --eval select count(*) = $i % 2 as 'must be true' from t1;
+ --source include/stop_slave.inc
+
+ --disable_warnings
+ set @@global.gtid_slave_pos = @save_gtid_slave_pos;
+ --enable_warnings
+ --replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/
+ show warnings;
+ --source include/start_slave.inc
+ --source include/sync_with_master_gtid.inc
+
+ --dec $i
+}
+
+# MDEV-31949 cleanup
+--connection master
+drop table t1;
+
+--sync_slave_with_master
+
+--source include/rpl_end.inc
+--echo # End of rpl_xa_concurrent_2pc.test
diff --git a/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test b/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test
index 61cc0621d5a..f43af653ace 100644
--- a/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test
+++ b/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test
@@ -167,6 +167,95 @@ set @@global.binlog_format = row;
set @@binlog_format = @sav_binlog_format;
set @@global.binlog_format = @sav_binlog_format;
+
+# MDEV-32257 dangling XA-rollback in binlog from emtpy XA
+# create a case of XA ROLLBACK gets to binlog while its XAP was not and
+# try replicate it.
+# Expected result is both slaves error out.
+--connection server_1
+--let $binlog_start = query_get_value(SHOW MASTER STATUS, Position, 1)
+--let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1)
+create table t_not_in_binlog (a int) engine=innodb;
+flush logs;
+--source include/save_master_gtid.inc
+--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1)
+
+# External connection XID access after disconnect is subject to race.
+# "(" open parenthesis to remember # of connection before ...
+--source include/count_sessions.inc
+
+--connect(con1,localhost,root,,)
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+--disconnect con1
+
+--connection server_1
+# .. ")" close parenthesis, to wait until con1 fully releases access to xid.
+--source include/wait_until_count_sessions.inc
+xa recover;
+#
+# replicate orphan XAR to server 2,3 and expect the error first
+# after that compensate it.
+
+--error 0
+XA ROLLBACK 'a';
+# cleanup at once
+drop table t_not_in_binlog;
+--source include/save_master_gtid.inc
+
+--connection server_2
+--echo XAER_NOTA: Unknown XID
+--let $slave_sql_errno= 1397
+--source include/wait_for_slave_sql_error.inc
+
+# "("
+--source include/count_sessions.inc
+
+--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+--disconnect con2
+
+--connection server_2
+# ")"
+--source include/wait_until_count_sessions.inc
+
+xa recover;
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+
+--connection server_3
+--echo XAER_NOTA: Unknown XID
+--let $slave_sql_errno= 1397
+--source include/wait_for_slave_sql_error.inc
+
+# "("
+--source include/count_sessions.inc
+
+--connect (con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,)
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+--disconnect con3
+
+--connection server_3
+# ")"
+--source include/wait_until_count_sessions.inc
+
+xa recover;
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+
#
# Cleanup
--connection server_1
diff --git a/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test b/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test
index aa1b088ed23..72589953ac0 100644
--- a/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test
+++ b/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test
@@ -56,8 +56,8 @@ xa start '1';
update t1 set b=b+10 where a=1;
xa end '1';
xa prepare '1';
---let $new_gtid= `SELECT @@global.gtid_binlog_pos`
xa commit '1';
+--let $new_gtid= `SELECT @@global.gtid_binlog_pos`
--source include/save_master_gtid.inc
--connection slave1
diff --git a/sql/handler.cc b/sql/handler.cc
index 1ea1818749c..7fa78e4d9b2 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -2180,13 +2180,13 @@ int ha_rollback_trans(THD *thd, bool all)
rollback without signalling following transactions. And in release
builds, we explicitly do the signalling before rolling back.
*/
- DBUG_ASSERT(
- !(thd->rgi_slave &&
- !thd->rgi_slave->worker_error &&
- thd->rgi_slave->did_mark_start_commit) ||
- (thd->transaction->xid_state.is_explicit_XA() ||
- (thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_PREPARED_XA)));
-
+ DBUG_ASSERT(!(thd->rgi_slave &&
+ !thd->rgi_slave->worker_error &&
+ thd->rgi_slave->did_mark_start_commit) ||
+ (thd->transaction->xid_state.is_explicit_XA() ||
+ (thd->rgi_slave->gtid_ev_flags2 &
+ (Gtid_log_event::FL_PREPARED_XA |
+ Gtid_log_event::FL_COMPLETED_XA))));
if (thd->rgi_slave &&
!thd->rgi_slave->worker_error &&
thd->rgi_slave->did_mark_start_commit)
@@ -2343,6 +2343,15 @@ int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
else
binlog_rollback_by_xid(binlog_hton, xid);
+#ifdef ENABLED_DEBUG_SYNC
+ DBUG_EXECUTE_IF(
+ "stop_after_binlog_cor_by_xid",
+ DBUG_ASSERT(!debug_sync_set_action(
+ current_thd,
+ STRING_WITH_LEN(
+ "now SIGNAL xa_cor_binlogged WAIT_FOR continue_xa_cor"))););
+#endif
+
plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &xaop);
diff --git a/sql/log.cc b/sql/log.cc
index eef8d86e4da..78a9419fb15 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -101,6 +101,8 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt,
bool using_trx, bool is_ro_1pc);
+XID_cache_element *xid_cache_search_maybe_wait(THD *thd);
+
static const LEX_CSTRING write_error_msg=
{ STRING_WITH_LEN("error writing to the binary log") };
@@ -1751,7 +1753,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
if ((using_stmt && !cache_mngr->stmt_cache.empty()) ||
(using_trx && !cache_mngr->trx_cache.empty()) ||
- thd->transaction->xid_state.is_explicit_XA())
+ (thd->transaction->xid_state.is_explicit_XA() ||
+ (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
{
if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE))
DBUG_RETURN(1);
@@ -1858,11 +1861,19 @@ binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr,
if (thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt != XA_ONE_PHASE)
{
- DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA());
+ bool is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac);
+ DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA() || is_async_xac);
DBUG_ASSERT(thd->transaction->xid_state.get_state_code() ==
- XA_PREPARED);
-
- buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(),
+ XA_PREPARED || is_async_xac);
+ DBUG_ASSERT(is_async_xac ||
+ thd->lex->xid->eq(thd->transaction->xid_state.get_xid()));
+ /*
+ While xid_state.get_xid() is a robust method to access `xid`
+ it can't be used on slave by the asynchronously running XA-"complete".
+ In the latter case thd->lex->xid is safely accessible.
+ */
+ buflen= serialize_with_xid(is_async_xac? thd->lex->xid :
+ thd->transaction->xid_state.get_xid(),
buf, query, q_len);
}
Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0);
@@ -1888,13 +1899,19 @@ binlog_rollback_flush_trx_cache(THD *thd, bool all,
const size_t q_len= sizeof(query) - 1; // do not count trailing 0
char buf[q_len + ser_buf_size]= "ROLLBACK";
size_t buflen= sizeof("ROLLBACK") - 1;
+ bool is_async_xac= false;
- if (thd->transaction->xid_state.is_explicit_XA())
+ if (thd->transaction->xid_state.is_explicit_XA() ||
+ (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
{
/* for not prepared use plain ROLLBACK */
- if (thd->transaction->xid_state.get_state_code() == XA_PREPARED)
- buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(),
+ if (thd->transaction->xid_state.get_state_code() == XA_PREPARED ||
+ is_async_xac)
+ {
+ buflen= serialize_with_xid(is_async_xac? thd->lex->xid :
+ thd->transaction->xid_state.get_xid(),
buf, query, q_len);
+ }
}
Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0);
@@ -1985,11 +2002,71 @@ inline bool is_preparing_xa(THD *thd)
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
+ int rc;
+
/* Do nothing unless the transaction is a user XA. */
- return is_preparing_xa(thd) ? binlog_commit(thd, all, FALSE) : 0;
+ if (is_preparing_xa(thd))
+ {
+ DBUG_EXECUTE_IF(
+ "stop_before_binlog_prepare",
+ DBUG_ASSERT(!debug_sync_set_action(
+ thd, STRING_WITH_LEN("now WAIT_FOR binlog_xap"))););
+
+ rc= binlog_commit(thd, all, FALSE);
+
+#ifdef ENABLED_DEBUG_SYNC
+ DBUG_EXECUTE_IF(
+ "stop_after_binlog_prepare",
+ DBUG_ASSERT(!debug_sync_set_action(
+ thd,
+ STRING_WITH_LEN(
+ "now SIGNAL xa_prepare_binlogged WAIT_FOR continue_xap"))););
+#endif
+ }
+ else
+ {
+ rc= 0;
+ }
+
+ return rc;
}
+/**
+ @c acquire_xid takes control by slave worker's THD over a xid record the system
+ xid cache. Implicitly provided @c xid corresponds to a being asynchronously
+ handled XA-"complete".
+
+ @param thd the thread handler
+ @return false as success, true otherwise
+*/
+static bool acquire_xid(THD *thd)
+{
+ bool rc= false;
+
+ if (thd->rgi_slave && thd->rgi_slave->is_async_xac &&
+ thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_COMPLETED_XA)
+ {
+ XID_STATE &xid_state= thd->transaction->xid_state;
+
+ auto xs= xid_cache_search_maybe_wait(thd);
+ xid_state.xid_cache_element= xs;
+ if (!xs)
+ {
+ DBUG_ASSERT(thd->is_killed());
+
+ rpl_gtid *gtid= &thd->rgi_slave->current_gtid;
+ my_error(ER_XAER_RMERR, MYF(0));
+ sql_print_error("XA COMMIT of GTID %u-%u-%ll could not complete "
+ "after having been logged into binary log",
+ gtid->domain_id, gtid->server_id, gtid->seq_no);
+ rc= true;
+ }
+ }
+
+ return rc;
+}
+
int binlog_commit_by_xid(handlerton *hton, XID *xid)
{
int rc= 0;
@@ -1997,28 +2074,38 @@ int binlog_commit_by_xid(handlerton *hton, XID *xid)
if (thd->is_current_stmt_binlog_disabled())
{
- return thd->wait_for_prior_commit();
+ rc= thd->wait_for_prior_commit();
}
+ else
+ {
+ /* the asserted state can't be reachable with xa commit */
+ DBUG_ASSERT(!thd->get_stmt_da()->is_error() ||
+ thd->get_stmt_da()->sql_errno() != ER_XA_RBROLLBACK);
+ /*
+ This is a recovered user xa transaction commit.
+ Create a "temporary" binlog transaction to write the commit record
+ into binlog.
+ */
+ THD_TRANS trans;
+ trans.ha_list= NULL;
- /* the asserted state can't be reachable with xa commit */
- DBUG_ASSERT(!thd->get_stmt_da()->is_error() ||
- thd->get_stmt_da()->sql_errno() != ER_XA_RBROLLBACK);
- /*
- This is a recovered user xa transaction commit.
- Create a "temporary" binlog transaction to write the commit record
- into binlog.
- */
- THD_TRANS trans;
- trans.ha_list= NULL;
-
- thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
- thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write();
- (void) thd->binlog_setup_trx_data();
+ thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
+ thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write();
+ (void) thd->binlog_setup_trx_data();
- DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT);
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT);
- rc= binlog_commit(thd, TRUE, FALSE);
- thd->ha_data[binlog_hton->slot].ha_info[1].reset();
+ rc= binlog_commit(thd, TRUE, FALSE);
+ thd->ha_data[binlog_hton->slot].ha_info[1].reset();
+ }
+ if (!rc)
+ {
+ rc= acquire_xid(thd);
+ }
+ if (thd->is_current_stmt_binlog_disabled())
+ {
+ thd->wakeup_subsequent_commits(rc);
+ }
return rc;
}
@@ -2031,33 +2118,54 @@ int binlog_rollback_by_xid(handlerton *hton, XID *xid)
if (thd->is_current_stmt_binlog_disabled())
{
- return thd->wait_for_prior_commit();
+ rc= thd->wait_for_prior_commit();
}
+ else if (thd->get_stmt_da()->is_error() &&
+ thd->get_stmt_da()->sql_errno() == ER_XA_RBROLLBACK)
+ rc= true;
+ else
+ {
+ THD_TRANS trans;
+ trans.ha_list= NULL;
- if (thd->get_stmt_da()->is_error() &&
- thd->get_stmt_da()->sql_errno() == ER_XA_RBROLLBACK)
- return rc;
-
- THD_TRANS trans;
- trans.ha_list= NULL;
-
- thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
- thd->ha_data[hton->slot].ha_info[1].set_trx_read_write();
- (void) thd->binlog_setup_trx_data();
+ thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
+ thd->ha_data[hton->slot].ha_info[1].set_trx_read_write();
+ (void) thd->binlog_setup_trx_data();
- DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
- (thd->transaction->xid_state.get_state_code() == XA_ROLLBACK_ONLY));
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
+ (thd->transaction->xid_state.get_state_code() == XA_ROLLBACK_ONLY));
- rc= binlog_rollback(hton, thd, TRUE);
- thd->ha_data[hton->slot].ha_info[1].reset();
+ rc= binlog_rollback(hton, thd, TRUE);
+ thd->ha_data[hton->slot].ha_info[1].reset();
+ }
+ if (!rc)
+ {
+ rc= acquire_xid(thd);
+ }
+ if (thd->is_current_stmt_binlog_disabled())
+ {
+ thd->wakeup_subsequent_commits(rc);
+ }
return rc;
}
-
+/**
+ @param thd thread handler
+ @return true when thd carries an XA transaction in prepared state,
+ or the XA transaction is being completed by
+ asynchronously running "COMPLETE" by slave parallel thread;
+ false otherwise
+*/
inline bool is_prepared_xa(THD *thd)
{
- return thd->transaction->xid_state.is_explicit_XA() &&
+ bool is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac);
+ DBUG_ASSERT(!is_async_xac ||
+ thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
+ thd->lex->sql_command == SQLCOM_XA_COMMIT);
+
+ return is_async_xac ? true :
+ thd->transaction->xid_state.is_explicit_XA() &&
thd->transaction->xid_state.get_state_code() == XA_PREPARED;
}
@@ -2185,7 +2293,9 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
}
if (cache_mngr->trx_cache.empty() &&
- (thd->transaction->xid_state.get_state_code() != XA_PREPARED ||
+ ((thd->transaction->xid_state.get_state_code() != XA_PREPARED &&
+ !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
+ thd->lex->sql_command == SQLCOM_XA_COMMIT)) ||
!(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() &&
thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write())))
{
@@ -2279,7 +2389,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
}
if (!cache_mngr->trx_cache.has_incident() && cache_mngr->trx_cache.empty() &&
- (thd->transaction->xid_state.get_state_code() != XA_PREPARED ||
+ ((thd->transaction->xid_state.get_state_code() != XA_PREPARED &&
+ !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
+ thd->lex->sql_command == SQLCOM_XA_ROLLBACK)) ||
!(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() &&
thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write())))
{
@@ -8382,7 +8494,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
DBUG_ASSERT(!cache_mngr->stmt_cache.empty() ||
!cache_mngr->trx_cache.empty() ||
- current->thd->transaction->xid_state.is_explicit_XA());
+ (current->thd->transaction->xid_state.is_explicit_XA() ||
+ (current->thd->rgi_slave &&
+ current->thd->rgi_slave->is_async_xac)));
if (unlikely((current->error= write_transaction_or_stmt(current,
commit_id))))
@@ -10510,13 +10624,20 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all)
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
int cookie= 0;
+ int rc= 0;
+
+ if (thd->rgi_slave && thd->is_current_stmt_binlog_disabled())
+ {
+ rc= thd->wait_for_prior_commit();
+ if (rc == 0)
+ thd->wakeup_subsequent_commits(rc);
+ return rc;
+ }
if (!cache_mngr->need_unlog)
{
Ha_trx_info *ha_info;
uint rw_count= ha_count_rw_all(thd, &ha_info);
- bool rc= false;
-
/*
This transaction has not been binlogged as indicated by need_unlog.
Such exceptional cases include transactions with no effect to engines,
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 92ff401a260..230a7a4667f 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -3314,16 +3314,22 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
XID_STATE &xid_state= thd->transaction->xid_state;
if (is_transactional)
{
- if (xid_state.is_explicit_XA() &&
- (thd->lex->sql_command == SQLCOM_XA_PREPARE ||
- xid_state.get_state_code() == XA_PREPARED))
+ bool is_async_xac= false;
+ if ((xid_state.is_explicit_XA() &&
+ (thd->lex->sql_command == SQLCOM_XA_PREPARE ||
+ xid_state.get_state_code() == XA_PREPARED)) ||
+ (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
{
DBUG_ASSERT(!(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt == XA_ONE_PHASE));
+ DBUG_ASSERT(!is_async_xac ||
+ thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
+ thd->lex->sql_command == SQLCOM_XA_COMMIT);
flags2|= thd->lex->sql_command == SQLCOM_XA_PREPARE ?
FL_PREPARED_XA : FL_COMPLETED_XA;
- xid.set(xid_state.get_xid());
+ xid.set(is_async_xac? thd->lex->xid :
+ thd->transaction->xid_state.get_xid());
}
/* count non-zero extra recoverable engines; total = extra + 1 */
if (has_xid)
@@ -4172,9 +4178,6 @@ int XA_prepare_log_event::do_commit()
thd->lex->xid= &xid;
if (!one_phase)
{
- if ((res= thd->wait_for_prior_commit()))
- return res;
-
thd->lex->sql_command= SQLCOM_XA_PREPARE;
res= trans_xa_prepare(thd);
}
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index c0dd56ab3d5..0f76dcc2351 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -1055,7 +1055,7 @@ PSI_cond_key key_BINLOG_COND_xid_list,
key_BINLOG_COND_queue_busy;
PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
- key_COND_wait_commit;
+ key_COND_wait_commit, key_COND_wait_commit_dep;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
@@ -1083,6 +1083,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
{ &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
{ &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
+ { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit_dep", 0},
{ &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
{ &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
{ &key_COND_server_started, "COND_server_started", PSI_FLAG_GLOBAL},
@@ -9224,6 +9225,7 @@ PSI_stage_info stage_waiting_for_deadlock_kill= { 0, "Waiting for parallel repli
PSI_stage_info stage_starting= { 0, "starting", 0};
PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0};
PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0};
+PSI_stage_info stage_waiting_for_prior_xa_transaction= { 0, "Waiting for prior xa transaction", 0};
PSI_memory_key key_memory_DATE_TIME_FORMAT;
PSI_memory_key key_memory_DDL_LOG_MEMORY_ENTRY;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index fc8afa06638..4245a1a5e10 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -376,7 +376,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_COND_start_thread;
extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
- key_COND_wait_commit;
+ key_COND_wait_commit, key_COND_wait_commit_dep;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
@@ -679,7 +679,7 @@ extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;
extern PSI_stage_info stage_waiting_for_deadlock_kill;
extern PSI_stage_info stage_starting;
-
+extern PSI_stage_info stage_waiting_for_prior_xa_transaction;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
Statement instrumentation keys (sql).
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index c26263401b8..2653e294590 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -9,6 +9,8 @@
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
#endif
+#include <algorithm>
+using std::max;
/*
Code for optional parallel execution of replicated events on the slave.
@@ -760,7 +762,8 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
return;
err_code= thd->get_stmt_da()->sql_errno();
if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
- err_code != ER_PRIOR_COMMIT_FAILED) ||
+ (err_code != ER_PRIOR_COMMIT_FAILED &&
+ err_code != ER_XAER_NOTA)) ||
((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
rgi->killed_for_retry))
{
@@ -2364,16 +2367,9 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
idx= rpl_thread_idx;
if (gtid_ev)
{
- if (gtid_ev->flags2 &
- (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
- idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
- gtid_ev->xid.key_length()) % rpl_thread_max;
- else
- {
- ++idx;
- if (idx >= rpl_thread_max)
- idx= 0;
- }
+ ++idx;
+ if (idx >= rpl_thread_max)
+ idx= 0;
rpl_thread_idx= idx;
}
thr= rpl_threads[idx];
@@ -2467,6 +2463,7 @@ free_rpl_parallel_entry(void *element)
}
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
+ e->concurrent_xaps_window.~Dynamic_array();
my_free(e);
}
@@ -2521,6 +2518,19 @@ rpl_parallel::find(uint32 domain_id)
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
+
+ e->concurrent_xaps_window.init((PSI_memory_key) PSI_INSTRUMENT_ME,
+ max((decltype(e->rpl_thread_max)) 2,
+ 2*e->rpl_thread_max));
+ e->cxap_lhs= e->cxap_rhs= 0;
+
+ /*
+ 0 initialize each element
+ */
+ for (size_t i= 0; i < e->concurrent_xaps_window.max_size(); i++)
+ {
+ e->concurrent_xaps_window.at(i)= {0, 0};
+ }
mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
@@ -2798,6 +2808,90 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
mysql_cond_signal(&cur_thread->COND_rpl_thread);
}
+/**
+ Check the concurrency status of @c xid with ones in progress.
+ Any new @c xid of XA-prepare (@c is_xap is true then) is appended to
+ a sliding window designed as circular buffer. Through search in the window
+ the return result is computed.
+
+ @param e parallel entry pointer
+ @param xid a pointer to the xid of either XA-prepare of XA-"complete"
+ @param is_xap
+ true when xid belongs to XA-prepare
+ @return true when there exists a duplicate xid hash value,
+ false otherwise.
+*/
+static bool
+handle_xa_prepera_duplicate_xid(rpl_parallel_entry *e, XID *xid, bool is_xap)
+{
+ DBUG_ASSERT(e->current_group_info ||
+ (e->count_queued_event_groups == 0 &&
+ e->cxap_lhs == e->cxap_rhs && e->cxap_lhs == 0));
+ DBUG_ASSERT(xid);
+ DBUG_ASSERT(!xid->is_null());
+ DBUG_ASSERT(xid->key());
+ DBUG_ASSERT(xid->key_length());
+
+ uint64 curr_event_count= e->count_queued_event_groups;
+ uint32 i;
+ bool rc= false;
+ /*
+ We've seen XAP's before, so move the LHS up to a relevant spot.
+ LHS = RHS indicates the empty buffer (which implies RHS is exclusive "edge"
+ of the window.
+ Otherwise RHS always points to a free cell of which one at least must
+ exist at this point.
+ While transaction disribution is Round-robin, potential conflicts with
+ the current input xid can come only from
+ the preceeding 2*|W| - 1 xids, the 2*|W|th in the past is safe.
+ */
+ for (i= e->cxap_lhs; i != e->cxap_rhs;
+ i= (i+1) % (e->concurrent_xaps_window.max_size()))
+ {
+ uint64 old_event_count= e->concurrent_xaps_window.at(i).second;
+ uint64 queued_event_diff= curr_event_count - old_event_count;
+ if (queued_event_diff >= e->rpl_thread_max)
+ {
+ /*
+ Squeeze the window from the left
+ as this XAP can't run in parallel with us.
+ */
+ e->cxap_lhs= (i+1) % (e->concurrent_xaps_window.max_size());
+ }
+ else
+ {
+ // new LHS is determined
+ DBUG_ASSERT(e->cxap_lhs != e->cxap_rhs);
+ break;
+ }
+ }
+
+ std::size_t xid_hash= std::hash<XID>{}(*xid);
+ for (; i != e->cxap_rhs; i= (i+1) % (e->concurrent_xaps_window.max_size()))
+ {
+ std::size_t old_xid_hash= e->concurrent_xaps_window.at(i).first;
+ if (old_xid_hash == xid_hash)
+ {
+ rc= true;
+ break;
+ }
+ }
+
+ // Add the XAP to the sliding window
+ if (is_xap)
+ {
+ e->concurrent_xaps_window.at(e->cxap_rhs).first= xid_hash;
+ e->concurrent_xaps_window.at(e->cxap_rhs).second= curr_event_count;
+ e->cxap_rhs= (e->cxap_rhs + 1) % (e->concurrent_xaps_window.max_size());
+ if (e->cxap_rhs == e->cxap_lhs)
+ {
+ // the entire array is full therefore the lhs has become stale
+ e->cxap_lhs= (e->cxap_lhs + 1) % (e->concurrent_xaps_window.max_size());
+ }
+ }
+
+ return rc;
+}
/*
do_event() is executed by the sql_driver_thd thread.
@@ -3046,6 +3140,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
new_gco= true;
force_switch_flag= 0;
gco= e->current_gco;
+ /*
+ Take care of duplicate xids in XA-prepare, XA-"complete" should not
+ race its XA-prepare parent either. When the current transaction's xid
+ was seen and its transaction may still be in process this event group
+ gets flagged to wait for prior commits at the start of execution.
+ */
+ if ((gtid_flags & (Gtid_log_event::FL_PREPARED_XA |
+ Gtid_log_event::FL_COMPLETED_XA)) &&
+ handle_xa_prepera_duplicate_xid(e, >id_ev->xid,
+ gtid_flags &
+ Gtid_log_event::FL_PREPARED_XA))
+ gtid_flags &= ~Gtid_log_event::FL_ALLOW_PARALLEL;
if (likely(gco))
{
uint8 flags= gco->flags;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 0b1d3cf9d80..b76d6749f6d 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -324,6 +324,14 @@ struct rpl_parallel_thread_pool {
void release_thread(rpl_parallel_thread *rpt);
};
+template <>
+struct std::hash<XID>
+{
+ std::size_t operator()(const XID& xid) const
+ {
+ return my_hash_sort(&my_charset_bin, xid.key(), xid.key_length());
+ }
+};
struct rpl_parallel_entry {
mysql_mutex_t LOCK_parallel_entry;
@@ -419,6 +427,14 @@ struct rpl_parallel_entry {
/* The group_commit_orderer object for the events currently being queued. */
group_commit_orderer *current_gco;
+ /*
+ Circular buffer of size slave_parallel_threads to hold XIDs of XA-prepare
+ group of events which may be processed concurrently.
+ See how handle_xa_prepera_duplicate_xid operates on it.
+ */
+ Dynamic_array<std::pair<std::size_t, uint32>> concurrent_xaps_window;
+ uint32 cxap_lhs, cxap_rhs;
+
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 375fb1d1c58..fdeb0d67872 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2155,12 +2155,14 @@ rpl_group_info::reinit(Relay_log_info *rli)
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
commit_orderer.reinit();
+ is_async_xac= false;
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
+ is_async_xac(false)
{
reinit(rli);
bzero(¤t_gtid, sizeof(current_gtid));
@@ -2291,7 +2293,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
if (thd->transaction->xid_state.is_explicit_XA() &&
thd->transaction->xid_state.get_state_code() != XA_PREPARED)
xa_trans_force_rollback(thd);
-
+ is_async_xac= false;
thd->release_transactional_locks();
if (thd == rli->sql_driver_thd)
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index cc807852bf2..06eef910de4 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -835,6 +835,14 @@ struct rpl_group_info
};
uchar killed_for_retry;
+ /*
+ When true indicates that the user xa transaction is going to
+ complete (with COMMIT or ROLLBACK) by the worker thread,
+ *while* another worker is still preparing it. Once the latter is done
+ the xid will be acquired and the flag gets reset.
+ */
+ bool is_async_xac;
+
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
void reinit(Relay_log_info *rli);
diff --git a/sql/sql_array.h b/sql/sql_array.h
index 8610e971016..c79c0c257a0 100644
--- a/sql/sql_array.h
+++ b/sql/sql_array.h
@@ -137,7 +137,7 @@ template <class Elem> class Dynamic_array
*/
Elem& at(size_t idx)
{
- DBUG_ASSERT(idx < array.elements);
+ DBUG_ASSERT(idx < max_size());
return *(((Elem*)array.buffer) + idx);
}
/// Const variant of at(), which cannot change data
@@ -172,6 +172,8 @@ template <class Elem> class Dynamic_array
size_t size() const { return array.elements; }
+ size_t max_size() const { return array.max_element; }
+
const Elem *end() const
{
return back() + 1;
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 1b78f88bd3c..04338831cd7 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -7960,6 +7960,7 @@ wait_for_commit::wait_for_commit()
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+ mysql_cond_init(key_COND_wait_commit_dep, &COND_wait_commit_dep, 0);
reinit();
}
@@ -7989,6 +7990,7 @@ wait_for_commit::~wait_for_commit()
mysql_mutex_destroy(&LOCK_wait_commit);
mysql_cond_destroy(&COND_wait_commit);
+ mysql_cond_destroy(&COND_wait_commit_dep);
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index e6580d2432c..72ca39c7710 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2367,6 +2367,15 @@ struct wait_for_commit
event group is fully done.
*/
bool wakeup_blocked;
+ /*
+ The condition variable servers as a part of facilities to handle various
+ commit time additional dependency between groups of replication events, e.g
+ XA-Prepare -> XA-Commit, or XA-Prepare -> XA-Prepare all with the same xid.
+ */
+ mysql_cond_t COND_wait_commit_dep;
+#ifndef DBUG_OFF
+ bool debug_done;
+#endif
void register_wait_for_prior_commit(wait_for_commit *waitee);
int wait_for_prior_commit(THD *thd, bool allow_kill=true)
diff --git a/sql/xa.cc b/sql/xa.cc
index 9df9da7acf1..3d1a7a2360a 100644
--- a/sql/xa.cc
+++ b/sql/xa.cc
@@ -22,7 +22,7 @@
#include "my_cpu.h"
#include <pfs_transaction_provider.h>
#include <mysql/psi/mysql_transaction.h>
-
+#include "rpl_rli.h" // rpl_group_info
static bool slave_applier_reset_xa_trans(THD *thd);
/***************************************************************************
@@ -79,6 +79,10 @@ class XID_cache_element
uint rm_error;
enum xa_states xa_state;
XID xid;
+ /* parallel slave worker waiters. `c` stands for complete, `p` prepare */
+ std::atomic<wait_for_commit *> c_waiter; // set by asynch run xa-"complete"
+ std::atomic<wait_for_commit *> p_waiter; // set by duplicate xid xa-start
+
bool is_set(int32_t flag)
{ return m_state.load(std::memory_order_relaxed) & flag; }
void set(int32_t flag)
@@ -134,6 +138,7 @@ class XID_cache_element
element->rm_error= 0;
element->xa_state= new_element->xa_state;
element->xid.set(new_element->xid);
+ element->c_waiter= element->p_waiter= NULL;
new_element->xid_cache_element= element;
}
static void lf_alloc_constructor(uchar *ptr)
@@ -243,7 +248,7 @@ void xid_cache_free()
Find recovered XA transaction by XID.
*/
-static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
+XID_cache_element *xid_cache_search(THD *thd, XID *xid)
{
DBUG_ASSERT(thd->xid_hash_pins);
XID_cache_element *element=
@@ -254,16 +259,221 @@ static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
/* The element can be removed from lf_hash by other thread, but
element->acquire_recovered() will return false in this case. */
if (!element->acquire_recovered())
+ {
element= 0;
+ if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+ {
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT ||
+ thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
+ thd->rgi_slave->is_async_xac= true;
+ }
+ }
lf_hash_search_unpin(thd->xid_hash_pins);
/* Once the element is acquired (i.e. got the ACQUIRED bit) by this thread,
only this thread can delete it. The deletion happens in xid_cache_delete().
See also the XID_cache_element documentation. */
DEBUG_SYNC(thd, "xa_after_search");
}
+ else if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+ {
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT ||
+ thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
+ }
+
return element;
}
+const int SPIN_MAX= 20;
+/**
+ The function tries inserting a xid into the system hash until succeeds.
+ Re-trying can be caused solely by existence of an earlier transaction with
+ a duplicate xid.
+ Analogously to @c xid_cache_search_maybe_wait, it is expecting, here the duplicate,
+ xid in the way will be eventually deleted from the hash.
+
+ @param thd thread handler
+ @return false as success,
+ true otherwise.
+*/
+bool xid_cache_insert_maybe_wait(THD* thd)
+{
+ int i= 0;
+ bool rc;
+
+ do
+ {
+ if ((rc= xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid)))
+ ut_delay(1 + i++);
+ }
+ while (rc && i < SPIN_MAX);
+
+ if (rc)
+ {
+ wait_for_commit *waiter= NULL;
+ XID *xid= thd->lex->xid;
+ XID_cache_element *element=
+ (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
+ xid->key(), xid->key_length());
+ if (element)
+ {
+ PSI_stage_info old_stage;
+ wait_for_commit *exp= NULL, *waiter= thd->wait_for_commit_ptr;
+#ifndef DBUG_OFF
+ waiter->debug_done= false;
+#endif
+
+ while (unlikely(!element->
+ p_waiter.compare_exchange_weak(exp, waiter,
+ std::memory_order_acq_rel)))
+ {
+ if (exp)
+ {
+ DBUG_ASSERT(exp != waiter);
+ waiter= NULL; // notifier is seen
+
+ break;
+ }
+ else
+ {
+ (void) LF_BACKOFF();
+ }
+ }
+ lf_hash_search_unpin(thd->xid_hash_pins);
+
+ if (waiter) // notifier was not seen
+ {
+ mysql_mutex_lock(&waiter->LOCK_wait_commit);
+ thd->ENTER_COND(&waiter->COND_wait_commit_dep, &waiter->LOCK_wait_commit,
+ &stage_waiting_for_prior_xa_transaction,
+ &old_stage);
+ if ((element=
+ (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
+ xid->key(), xid->key_length())))
+ {
+ lf_hash_search_unpin(thd->xid_hash_pins);
+ mysql_cond_wait(&waiter->COND_wait_commit_dep,
+ &waiter->LOCK_wait_commit);
+
+ DBUG_ASSERT(waiter->debug_done || thd->check_killed(1));
+ }
+ thd->EXIT_COND(&old_stage);
+ }
+ }
+ if (!(rc= thd->check_killed(1)))
+ {
+ // (element && waiter = NULL) indicates the duplicate xid is coming
+ do
+ rc= xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid);
+ while (rc && element && !waiter && (ut_delay(1), true));
+ }
+ }
+
+ return rc;
+}
+
+/**
+ XA-"complete" run by parallel slave gets access to its xid.
+ Analogously to @c xid_cache_insert_maybe_wait, it is expecting, here its, xid
+ supplied through the THD argument, will be soon (the parent XAP has already
+ waken up transactions before the current one) released for acquisition.
+
+ @param thd thread handler
+ @return XID_cache_element pointer or NULL when the search is interruped
+ by kill.
+*/
+XID_cache_element * xid_cache_search_maybe_wait(THD* thd)
+{
+ if (thd->fix_xid_hash_pins())
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ XID_cache_element *xs;
+ XID *xid= thd->lex->xid;
+ int i= 0;
+ do
+ {
+ if (!(xs= xid_cache_search(thd, thd->lex->xid)))
+ ut_delay(1 + i++);
+ }
+ while (!xs && i < SPIN_MAX);
+
+ if (!xs)
+ {
+ XID_cache_element *element=
+ (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
+ xid->key(), xid->key_length());
+ if (element)
+ {
+ lf_hash_search_unpin(thd->xid_hash_pins);
+ if (!element->acquire_recovered())
+ {
+ wait_for_commit *exp= NULL, *waiter= thd->wait_for_commit_ptr;
+ bool waiter_done= true; // assumption
+
+
+ /*
+ Set itself to wait for xid owner while taking care of race with it on
+ marking the xid element. When the element is found to be marked not
+ by us that indicates xid has been released.
+ */
+ while (unlikely(!element->
+ c_waiter.
+ compare_exchange_weak(exp, waiter,
+ std::memory_order_acq_rel)))
+ {
+ if (exp)
+ {
+ DBUG_ASSERT(exp != waiter);
+ waiter= NULL; // notifier is seen
+
+ break;
+ }
+ else
+ {
+ (void) LF_BACKOFF();
+ }
+ }
+
+ if (waiter) // notifier was not seen
+ {
+ PSI_stage_info old_stage;
+ mysql_mutex_lock(&waiter->LOCK_wait_commit);
+ thd->ENTER_COND(&waiter->COND_wait_commit_dep, &waiter->LOCK_wait_commit,
+ &stage_waiting_for_prior_xa_transaction,
+ &old_stage);
+ if (element->c_waiter.load(std::memory_order_relaxed) &&
+ likely(!thd->check_killed(1)))
+ mysql_cond_wait(&waiter->COND_wait_commit_dep,
+ &waiter->LOCK_wait_commit);
+
+ if (element->c_waiter.load(std::memory_order_relaxed))
+ {
+ waiter_done= false;
+ DBUG_ASSERT(thd->check_killed(1));
+ }
+ thd->EXIT_COND(&old_stage);
+ }
+
+ if (waiter_done &&
+ likely(element->is_set(XID_cache_element::RECOVERED |
+ XID_cache_element::ACQUIRED)))
+ xs= element;
+ else
+ goto end;
+ }
+ else
+ {
+ xs= element;
+ }
+ }
+ }
+
+end:
+ return xs;
+}
+
bool xid_cache_insert(XID *xid)
{
@@ -302,7 +512,8 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid)
xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
break;
case 1:
- my_error(ER_XAER_DUPID, MYF(0));
+ if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec))
+ my_error(ER_XAER_DUPID, MYF(0));
}
return res;
}
@@ -311,9 +522,39 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid)
static void xid_cache_delete(THD *thd, XID_cache_element *&element)
{
DBUG_ASSERT(thd->xid_hash_pins);
+
element->mark_uninitialized();
+ wait_for_commit *waiter= NULL;
+ if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+ {
+ wait_for_commit *notifier= &thd->rgi_slave->commit_orderer;
+ while (unlikely(!element->
+ p_waiter.compare_exchange_weak(waiter, notifier,
+ std::memory_order_acq_rel)))
+ {
+ if (waiter)
+ {
+ DBUG_ASSERT(notifier != waiter);
+
+ break;
+ }
+ else
+ {
+ (void) LF_BACKOFF();
+ }
+ }
+ }
lf_hash_delete(&xid_cache, thd->xid_hash_pins,
element->xid.key(), element->xid.key_length());
+ if (waiter)
+ {
+ mysql_mutex_lock(&waiter->LOCK_wait_commit);
+#ifndef DBUG_OFF
+ waiter->debug_done= true;
+#endif
+ mysql_cond_signal(&waiter->COND_wait_commit_dep);
+ mysql_mutex_unlock(&waiter->LOCK_wait_commit);
+ }
}
@@ -456,7 +697,23 @@ bool trans_xa_start(THD *thd)
else if (!trans_begin(thd))
{
MYSQL_SET_TRANSACTION_XID(thd->m_transaction_psi, thd->lex->xid, XA_ACTIVE);
- if (xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid))
+
+ bool parallel_slave_xap_status= true; // `true` presumes ordinary XA START.
+ if (thd->rgi_slave &&
+ thd->rgi_slave->is_parallel_exec)
+ {
+ DBUG_ASSERT(thd->rgi_slave->gtid_ev_flags2 |
+ Gtid_log_event::FL_PREPARED_XA);
+ /*
+ The status gets refined below normally to flip in which case `false`
+ designates the xid insert is done.
+ Possibly incurred wait is when xid is duplicate.
+ */
+ parallel_slave_xap_status= xid_cache_insert_maybe_wait(thd);
+ }
+
+ if (parallel_slave_xap_status &&
+ xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid))
{
trans_rollback(thd);
DBUG_RETURN(true);
@@ -602,12 +859,19 @@ bool trans_xa_commit(THD *thd)
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
+ DBUG_ASSERT(!thd->rgi_slave || !thd->rgi_slave->is_async_xac);
- if (auto xs= xid_cache_search(thd, thd->lex->xid))
+ /*
+ Parallel slave may not succeed acquiring xid, in which case
+ @c is_async_xac is @c true, it will do that later.
+ */
+ XID_cache_element *xs;
+ if ((xs= xid_cache_search(thd, thd->lex->xid)) ||
+ (thd->rgi_slave && thd->rgi_slave->is_async_xac))
{
bool xid_deleted= false;
MDL_request mdl_request;
- bool rw_trans= (xs->rm_error != ER_XA_RBROLLBACK);
+ bool rw_trans= (xs && xs->rm_error != ER_XA_RBROLLBACK);
if (rw_trans && thd->is_read_only_ctx())
{
@@ -615,8 +879,7 @@ bool trans_xa_commit(THD *thd)
res= 1;
goto _end_external_xid;
}
-
- res= xa_trans_rolled_back(xs);
+ res= xs ? xa_trans_rolled_back(xs) : 0;
/*
Acquire metadata lock which will ensure that COMMIT is blocked
by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
@@ -645,7 +908,7 @@ bool trans_xa_commit(THD *thd)
}
DBUG_ASSERT(!xid_state.xid_cache_element);
- xid_state.xid_cache_element= xs;
+ xid_state.xid_cache_element= xs; // may be NULL on parallel slave
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
if (!res && thd->is_error())
{
@@ -654,13 +917,16 @@ bool trans_xa_commit(THD *thd)
res= true;
goto _end_external_xid;
}
- xid_cache_delete(thd, xs);
+ DBUG_ASSERT(xs || (thd->rgi_slave && thd->rgi_slave->is_async_xac &&
+ xid_state.xid_cache_element));
+
+ xid_cache_delete(thd, xid_state.xid_cache_element);
xid_deleted= true;
_end_external_xid:
xid_state.xid_cache_element= 0;
res= res || thd->is_error();
- if (!xid_deleted)
+ if (!xid_deleted && xs)
xs->acquired_to_recovered();
if (mdl_request.ticket)
{
@@ -790,12 +1056,14 @@ bool trans_xa_rollback(THD *thd)
DBUG_RETURN(TRUE);
}
- if (auto xs= xid_cache_search(thd, thd->lex->xid))
+ XID_cache_element *xs;
+ if ((xs= xid_cache_search(thd, thd->lex->xid)) ||
+ (thd->rgi_slave && thd->rgi_slave->is_async_xac))
{
bool res;
bool xid_deleted= false;
MDL_request mdl_request;
- bool rw_trans= (xs->rm_error != ER_XA_RBROLLBACK);
+ bool rw_trans= (xs && xs->rm_error != ER_XA_RBROLLBACK);
if (rw_trans && thd->is_read_only_ctx())
{
@@ -822,7 +1090,7 @@ bool trans_xa_rollback(THD *thd)
{
thd->backup_commit_lock= &mdl_request;
}
- res= xa_trans_rolled_back(xs);
+ res= xs ? xa_trans_rolled_back(xs) : 0;
DBUG_ASSERT(!xid_state.xid_cache_element);
xid_state.xid_cache_element= xs;
@@ -831,12 +1099,15 @@ bool trans_xa_rollback(THD *thd)
{
goto _end_external_xid;
}
- xid_cache_delete(thd, xs);
+ DBUG_ASSERT(xs || (thd->rgi_slave && thd->rgi_slave->is_async_xac &&
+ xid_state.xid_cache_element));
+
+ xid_cache_delete(thd, xid_state.xid_cache_element);
xid_deleted= true;
_end_external_xid:
xid_state.xid_cache_element= 0;
- if (!xid_deleted)
+ if (!xid_deleted && xs)
xs->acquired_to_recovered();
if (mdl_request.ticket)
{
@@ -1146,7 +1417,7 @@ static bool slave_applier_reset_xa_trans(THD *thd)
{
thd->transaction->xid_state.set_error(ER_XA_RBROLLBACK);
}
- thd->transaction->xid_state.xid_cache_element->acquired_to_recovered();
+ auto element= thd->transaction->xid_state.xid_cache_element;
thd->transaction->xid_state.xid_cache_element= 0;
for (Ha_trx_info *ha_info= thd->transaction->all.ha_list, *ha_info_next;
@@ -1158,6 +1429,34 @@ static bool slave_applier_reset_xa_trans(THD *thd)
thd->transaction->all.ha_list= 0;
ha_close_connection(thd);
+ element->acquired_to_recovered();
+ if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+ {
+ /* make the xid available to a possible (xa-"complete") waiter */
+ wait_for_commit *xac_waiter= NULL,
+ *notifier= &thd->rgi_slave->commit_orderer;
+ while (unlikely(!element->
+ c_waiter.compare_exchange_weak(xac_waiter, notifier,
+ std::memory_order_acq_rel)))
+ {
+ if (xac_waiter)
+ {
+ break;
+ }
+ else
+ {
+ (void) LF_BACKOFF();
+ }
+ }
+ if (xac_waiter)
+ {
+ // unmark and signal
+ mysql_mutex_lock(&xac_waiter->LOCK_wait_commit);
+ element->c_waiter.store(NULL, std::memory_order_relaxed);
+ mysql_cond_signal(&xac_waiter->COND_wait_commit_dep);
+ mysql_mutex_unlock(&xac_waiter->LOCK_wait_commit);
+ }
+ }
thd->transaction->cleanup();
thd->transaction->all.reset();
diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc
index daf27822085..53ce685fa35 100644
--- a/storage/innobase/trx/trx0undo.cc
+++ b/storage/innobase/trx/trx0undo.cc
@@ -639,8 +639,7 @@ static void trx_undo_write_xid(buf_block_t *block, uint16_t offset,
static_cast<uint32_t>(xid.bqual_length));
const ulint xid_length= static_cast<ulint>(xid.gtrid_length
+ xid.bqual_length);
- mtr->memcpy(*block, &block->page.frame[offset + TRX_UNDO_XA_XID],
- xid.data, xid_length);
+ mtr->memcpy<mtr_t::MAYBE_NOP>(*block, &block->page.frame[offset + TRX_UNDO_XA_XID], xid.data, xid_length);
if (UNIV_LIKELY(xid_length < XIDDATASIZE))
mtr->memset(block, offset + TRX_UNDO_XA_XID + xid_length,
XIDDATASIZE - xid_length, 0);
--
2.30.2
1
0
[PATCH 0/4] MDEV-31273, Pre-compute binlog checksums outside of LOCK_log
by Kristian Nielsen 18 Oct '23
by Kristian Nielsen 18 Oct '23
18 Oct '23
Hi Monty,
Here's the implementation of MDEV-31273, pre-compute binlog checksums.
The main patch to review is the last one, number 4. This is the actual
implementation of binlog checksum pre-computation, and the only patch
that changes the behaviour of the code.
Most of the work (and most of the changes) are cleanups of the old checksum
code that don't change the functionality but removes a lot of complex and
hard-to-modify logic (and I think actually fixes a bug or two). I have kept
this cleanup separate in the first 3 patches to make it easier to review and
not get mixed up with the actual implementation of the new functionality.
With this patch series, calculation of binlog checksum will happen when
writing events into the stmt/trx caches. Later, when writing the binlog file
under LOCK_log, only a direct copy of the bytes is done, which should
improve binlog scalability with checksums enabled.
The patch series is also available on github:
https://github.com/MariaDB/server/commits/knielsen_mdev31273
- Kristian.
Kristian Nielsen (4):
MDEV-31273: Replace Log_event::writer with function parameter
MDEV-31273: Eliminate Log_event::checksum_alg
MDEV-31273: Refactor MYSQL_BIN_LOG::write_cache()
MDEV-31273: Precompute binlog checksums
include/my_atomic.h | 41 +-
include/my_sys.h | 2 +
.../main/mysqlbinlog_row_compressed.result | 48 +-
.../main/mysqlbinlog_row_minimal.result | 48 +-
.../main/mysqlbinlog_stmt_compressed.result | 16 +-
mysql-test/main/mysqld--help.result | 7 +
.../suite/binlog/include/binlog_ioerr.inc | 3 +
mysql-test/suite/binlog/r/binlog_ioerr.result | 2 +
.../r/binlog_mysqlbinlog_raw_flush.result | 1 +
mysql-test/suite/binlog/t/binlog_killed.test | 2 +-
.../t/binlog_mysqlbinlog_raw_flush.test | 2 +
.../t/binlog_table_map_optional_metadata.test | 4 +-
.../binlog_encryption/binlog_ioerr.result | 2 +
.../suite/rpl/r/rpl_checksum_cache.result | 43 +-
.../suite/rpl/t/rpl_checksum_cache.test | 98 +++-
.../r/sysvars_server_notembedded.result | 10 +
mysys/mf_iocache2.c | 34 ++
sql/log.cc | 468 ++++++++++--------
sql/log.h | 14 +-
sql/log_event.cc | 27 +-
sql/log_event.h | 158 +++---
sql/log_event_client.cc | 22 +-
sql/log_event_old.cc | 14 +-
sql/log_event_old.h | 4 +-
sql/log_event_server.cc | 395 ++++++---------
sql/mysqld.cc | 1 +
sql/mysqld.h | 1 +
sql/privilege.h | 3 +
sql/slave.cc | 60 +--
sql/sql_repl.cc | 2 +-
sql/sys_vars.cc | 13 +
sql/wsrep_binlog.cc | 6 +-
sql/wsrep_mysqld.cc | 12 +-
33 files changed, 922 insertions(+), 641 deletions(-)
--
2.30.2
3
7