Kristian Nielsen <knielsen@knielsen-hq.org> writes: Hi Andrei,
commit 96bd9e6b780f3738b5008b89aae6b0b086f15943 Author: Andrei <andrei.elkin@mariadb.com> Date: Sat Aug 19 19:49:25 2023 +0300
XA-Prepare group of events
XA START xid ... XA END xid XA PREPARE xid
and its XA-"complete" terminator
XA COMMIT or XA ROLLBACK
are made distributed Round-Robin across slave parallel workers. The former hash-based policy was proven to attribute to execution latency through creating a big - many times larger than the size of the worker pool - queue of binlog-ordered transactions to commit.
Acronyms and notations used below:
XAP := XA-Prepare event or the whole prepared XA group of events XAC := XA-"complete", which is a solitary group of events |W| := the size of the slave worker pool Subscripts like `_k' denote order in a corresponding sequence (e.g binlog file).
Here is my review of this patch.
The list is arranged as a sliding window with the size of 2*|W| to account a possibility of XAP_k -> XAP_k+2|W|-1 the largest (in the group-of-events count sense) dependency.
I spent a lot of effort trying to understand why the factor 2 in the size of 2*|W| occurs. Since each transaction must wait_for_prior_commit for the prior transaction, there should never be more than |W| transactions active. In your example with |W|=4, you say that
Worker #4 can take on its T_8 when T_1 is yet at the beginning of its processing
But that does not seem possible. T_8 cannot start until T_1 (and 2..4) have done wakeup_subsequent_commits(), which surely shouldn't happen "at the beginning of its processing", but only at the end. But I think I now understand. The problem is that the XA code calls wakeup_subsequent_commits _before_ updating the XID hash (in XA PREPARE) / deleting from the XID hash (in XA COMMIT/ROLLBACK). Right? This I think is the fundamental issue to address. The wait_for_prior_commit mechanism is to ensure that the prior transaction has completed its commit, which means the state of the transaction is "committed" in memory. Thus, the XID hash, which records information about the status of the (XA) transactions, must also be updated before wakeup_prior_commit may be done. I can see that the XA PREPARE/COMMIT/ROLLBACK already uses the trx_group_commit_leader() code. So what you should do is to arrange for the update of / delete from the XID hash happens inside there, just before the wakeup_subsequent_commits() gets called: if (current->cache_mngr->using_xa && likely(!current->error) && DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1)) { mysql_mutex_lock(¤t->thd->LOCK_thd_data); run_commit_ordered(current->thd, current->all); mysql_mutex_unlock(¤t->thd->LOCK_thd_data); } current->thd->wakeup_subsequent_commits(current->error); I checked the code, the XID hash update/delete currently happens shortly after that in the code, so there should be no problems moving it in there. [Incidentally, I also noticed that the XA PREPARE/COMMIT does not use the commit_ordered mechanism. While this seems (I think) unrelated to this patch, I think it is something you need to look into. The commit_ordered mechansim is central to ensuring that InnoDB and binlog commit in the same order, and I don't see how this is guaranteed for the current XA code (and I suspect this might also mean that binlog recovery will be broken in some cases).] Once the XID hash update/delete is moved as described, we have the very nice property that after an XA PREPARE/COMMIT/ROLLBACK T_i has done wakeup_subsequent_commits(), it is safe for a following T_j that refers to the same XID to replicate. This means that you can now use the existing wait_for_prior_commit mechanism to handle the dependencies between XA event groups with the same XID. Thus, there is no need to introduce a separate (and very complicated) wait mechanism of xid_cache_insert_maybe_wait() and xid_cache_search_maybe_wait(), solely for parallel replication of user XA. Thus, in the SQL driver thread, using your sliding window (which need only be of size |W| I believe), you can mark a "T1: XA COMMIT <xid> " followed by "T2: XA PREPARE <xid>" of a duplicate XID, that T2 must do a wait_for_prior_commit(T1) before running. This is simple, it can use the existing mechanims for that, using entry->last_committed_sub_id. Just like rgi->wait_commit_sub_id and rgi->wait_commit_group_info, we can introduce eg.: rgi->pre_dependency_sub_id rgi->pre_dependency_group_info and then do a wait_for_prior_commit(&rgi->pre_dependency_group_info->commit_orderer) if the rgi->pre_dependency_sub_id > entry->last_committed_sub_id) Similarly, the sliding window can record for "T3: XA PREPARE" and "T4: XA COMMIT", that T4 should do a wait_for_prior_commit(T3) before running. Then T4 will be sure that the XA transaction is ready. This will also be a good preparation for later introducing more of this kind of pre-calculated dependencies to the parallel replication scheduling, along the lines of the "balanced applier" that you have written about previously. These ideas I believe have tremendous potential, and handling this in a generic way is a very big improvement. I'm not suggesting to implement more than what is necessary now for XA, but keep it in mind that this dependency calculation in the SQL driver thread can be used for other than user XA in the future. You can also name the new introduced dependency fields appropriately (ie. with generic names not specific to XA when applicable). This will greatly simplify the patch, I believe; and more importantly it will integrate the XA-required scheduling in a clean and generic way in the parallel replication code. Following is more detailed comments on the patch:
- DBUG_ASSERT( - !(thd->rgi_slave && - !thd->rgi_slave->worker_error && - thd->rgi_slave->did_mark_start_commit) || - (thd->transaction->xid_state.is_explicit_XA() || - (thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_PREPARED_XA))); - + DBUG_ASSERT(!(thd->rgi_slave && + !thd->rgi_slave->worker_error && + thd->rgi_slave->did_mark_start_commit) || + (thd->transaction->xid_state.is_explicit_XA() || + (thd->rgi_slave->gtid_ev_flags2 & + (Gtid_log_event::FL_PREPARED_XA | + Gtid_log_event::FL_COMPLETED_XA)))); if (thd->rgi_slave && !thd->rgi_slave->worker_error && thd->rgi_slave->did_mark_start_commit) thd->rgi_slave->unmark_start_commit();
Why is it allowed to rollback here while did_mark_start_commit is true for XA PREPARE or XA COMMIT? The comment above should be extended to explain this. And why does the following if () statement then still do an emergency unmark_start_commit(), when the condition is not caught by the assertion? It looks like something is wrong in the earlier code, the intention here is that it should never be necessary to unmark_start_commit() here, and if it is, then it is a bug flagged by the assertion.
@@ -1751,7 +1753,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
if ((using_stmt && !cache_mngr->stmt_cache.empty()) || (using_trx && !cache_mngr->trx_cache.empty()) || - thd->transaction->xid_state.is_explicit_XA()) + (thd->transaction->xid_state.is_explicit_XA() || + (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
Why is this extra condition thd->rgi_slave->is_async_xac necessary? There are many of these conditions spread around the code in the patch. I think perhaps it is because the transaction is somehow not "connected" to the THD? Because it is in the XID cache? But the situation must be similar on the master, if the XA COMMIT happens in a different transaction from XA PREPARE. So this should be done the same way on the slave, so that the XA transaction gets connected to the THD of the worker thread processing it, and so that these extra rgi_slave->is_async_xac conditions are not needed. It is very fragile to have such conditions spread around the code, it will be impossible to avoid bugs due to forgetting such extra condition on one place or another.
+ /* + While xid_state.get_xid() is a robust method to access `xid` + it can't be used on slave by the asynchronously running XA-"complete". + In the latter case thd->lex->xid is safely accessible. + */ + buflen= serialize_with_xid(is_async_xac? thd->lex->xid : + thd->transaction->xid_state.get_xid(),
Same here, it should be possible to use the same xid_state in the slave thread as on the master, so we avoid special cases all over the code for the slave threads.
+static bool acquire_xid(THD *thd) +{ + bool rc= false; + + if (thd->rgi_slave && thd->rgi_slave->is_async_xac && + thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_COMPLETED_XA) + { + XID_STATE &xid_state= thd->transaction->xid_state;
This function actually seems to do part of "XA transaction gets connected to the THD". But it's called from binlog_commit_by_xid() and binlog_rollback_by_xid(), which means it has to have yet another of these special-condition checks for thd->rgi_slave->is_async_xac etc. Instead, call this from the code that applies the "XA COMMIT" event (eg. in log_event_server.cc), and make sure it fully connects the XA transaction to the the THD. Then you get rid of the checks for is_async_xac in a lot of places, and also avoid polluting with replication details the code than runs the original transaction on the master.
+ rc= binlog_commit(thd, TRUE, FALSE); + thd->ha_data[binlog_hton->slot].ha_info[1].reset(); + } + if (!rc) + { + rc= acquire_xid(thd); + } + if (thd->is_current_stmt_binlog_disabled()) + { + thd->wakeup_subsequent_commits(rc); + }
So IIUC, here we first binlog the XA COMMIT and then wakeup_subsequent_commits(). But the engine commit only happens afterwards, in ha_commit_or_rollback_by_xid. Why wakeup_subsequent_commits() here? It seems too early, the transaction is not yet committed in the engine, how do you avoid that the commits in the engine will happen in the wrong order? And then a mariabackup might take a backup with T2 committed but T1 not and not have a valid replication position to provision a slave. And also, is this skipping the binlog transaction coordinator and two-phase commit with the engines? And in particular, commit_ordered()? Then again, it seems we have the problem that the engine can commit in the opposite order from the binlog. I wonder if this will also affect crash recovery if we crash with different commit order in binlog and engine? So I think wakeup_subsequent_commits() here is wrong, should be removed.
@@ -2185,7 +2293,9 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc) }
if (cache_mngr->trx_cache.empty() && - (thd->transaction->xid_state.get_state_code() != XA_PREPARED || + ((thd->transaction->xid_state.get_state_code() != XA_PREPARED && + !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->lex->sql_command == SQLCOM_XA_COMMIT)) ||
As explained above, this condition (and the similar in binlog_rollback() should be avoided.
@@ -10510,13 +10624,20 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all)
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); int cookie= 0; + int rc= 0; + + if (thd->rgi_slave && thd->is_current_stmt_binlog_disabled()) + { + rc= thd->wait_for_prior_commit(); + if (rc == 0) + thd->wakeup_subsequent_commits(rc); + return rc; + }
Why is this necessary? If it is really necessary, then put it in the code that applies the XA PREPARE event, not as a random special case in generic code.
- bool rc= false; -
rc= write_empty_xa_prepare(thd, cache_mngr); // normally gains need_unlog
static bool write_empty_xa_prepare(THD *thd, binlog_cache_mngr *cache_mngr) { return binlog_commit_flush_xa_prepare(thd, true, cache_mngr); }
Smaller point: since you're fixing the type of `rc` to be int (which is correct), also fix the return type of write_empty_xa_prepare() to be int and not bool - as binlog_commit_flush_xa_prepare() as well as unlog_xa_prepare() return int, not bool.
@@ -3314,16 +3314,22 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, XID_STATE &xid_state= thd->transaction->xid_state; if (is_transactional) { - if (xid_state.is_explicit_XA() && - (thd->lex->sql_command == SQLCOM_XA_PREPARE || - xid_state.get_state_code() == XA_PREPARED)) + bool is_async_xac= false; + if ((xid_state.is_explicit_XA() && + (thd->lex->sql_command == SQLCOM_XA_PREPARE || + xid_state.get_state_code() == XA_PREPARED)) || + (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
Again, this is the generic GTID event contructor, it shouldn't need this kind of logic. It's very strange that a simple class constructor returns something different depending on which thread it runs in! Hopefully this is no longer necessary after XA transaction gets properly connected to the THD. But else, it should be handled by passing in suitable flag from the caller, or by the caller setting the required modifications after construction.
+using std::max;
Please don't. Using std::max() explicit is not long and makes it explicit what `max` implementation is used.
@@ -760,7 +762,8 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) return; err_code= thd->get_stmt_da()->sql_errno(); if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC && - err_code != ER_PRIOR_COMMIT_FAILED) || + (err_code != ER_PRIOR_COMMIT_FAILED && + err_code != ER_XAER_NOTA)) ||
Why? What is special about the ER_XAER_NOTA error? Is it really safe to speculatively run the XA COMMIT in parallel, and then retry it for any other error than ER_XAER_NOTA ?
@@ -2467,6 +2463,7 @@ free_rpl_parallel_entry(void *element) } mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); + e->concurrent_xaps_window.~Dynamic_array(); my_free(e);
No, let's not do this, calling a destructor explicitly in a POD managed by my_malloc() / my_free(). Why not just make concurrent_xaps_window a pointer to the Dynamic_array and manage with new / delete? Then you can also allocate it lazily, so it will not need to be allocated at all for the 99.9xx% of users who are not replicating user XA transactions. The size of the sliding window is fixed anyway (until slave_parallel_threads is changed), so why use Dynamic_array at all, why not just allocate a plain array of the right size?
+template <> +struct std::hash<XID> +{ + std::size_t operator()(const XID& xid) const + { + return my_hash_sort(&my_charset_bin, xid.key(), xid.key_length());
I don't understand, what is the purpose of introducing this function object? Why not just call my_hash_sort() directly (seems it's only used in one place) and avoid this code-obfuscation?
+ Dynamic_array<std::pair<std::size_t, uint32>> concurrent_xaps_window;
This is dangerous as Dynamic_array has a destructor but rpl_parallel_entry is a POD where we don't call any constructor/destructor. Should instead use a pointer to an object constructed with new (or not use Dynamic_array at all as suggested above).
+ /* + When true indicates that the user xa transaction is going to + complete (with COMMIT or ROLLBACK) by the worker thread, + *while* another worker is still preparing it. Once the latter is done + the xid will be acquired and the flag gets reset. + */ + bool is_async_xac; +
I don't understand this comment. Surely it is not possible to *complete*, with XA COMMIT, a transaction before the corresponding XA PREPARE has completed (possibly in another worker thread)? Do you mean that it is possible for a worker thread W2 to *start* applying the XA COMMIT speculatively, before another worker W1 has completed the XA PREPARE? But I think this is not necessary, see below the discussion that the XA COMMIT worker can just do a wait_for_prior_commit() on the event group of its XA PREPARE. Then this should not be necessary.
@@ -137,7 +137,7 @@ template <class Elem> class Dynamic_array */ Elem& at(size_t idx) { - DBUG_ASSERT(idx < array.elements); + DBUG_ASSERT(idx < max_size()); return *(((Elem*)array.buffer) + idx);
No, this cannot possibly be right. This is a patch about XA replication, why would it suddenly allow accessing deleted elements of any Dynamic_array used in the server?!? Surely you can allocate the elements you need for your sliding window. In fact, since the window is fixed size anyway, why use Dynamic_array at all? @@ -2367,6 +2367,15 @@ struct wait_for_commit event group is fully done. */ bool wakeup_blocked; + /* + The condition variable servers as a part of facilities to handle various + commit time additional dependency between groups of replication events, e.g + XA-Prepare -> XA-Commit, or XA-Prepare -> XA-Prepare all with the same xid. + */ + mysql_cond_t COND_wait_commit_dep; No, this doesn't belong in struct wait_for_commit. wait_for_commit is a low-level mechanism for ordering commits, it should not need any knowledge even of replication, certainly not of user-XA replication. The COND_wait_commit_dep isn't even used in any function related to wait_for_commit or even sql_class.cc, it's only used in sql/xa.cc. [But as discussed at the start, we can use the existing wait_for_prior_commit instead of introducing this new mechanism just for replicating user XA.]
-static XID_cache_element *xid_cache_search(THD *thd, XID *xid) +XID_cache_element *xid_cache_search(THD *thd, XID *xid)
Why remove the `static`? The function is not used outside of sql/xa.cc
@@ -254,16 +259,221 @@ static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
+ if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec) + { + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT || + thd->lex->sql_command == SQLCOM_XA_ROLLBACK); + thd->rgi_slave->is_async_xac= true;
Here again we have some logic that relates to replication being inserted somewhat randomly in low-level code, requiring this thd->rgi_slave->is_parallel_exec condition. Instead of doing this, the caller higher up should handle this depending on the return of xid_cache_search(). This way, code paths that are not related to replication don't get affected by the replication-specific logic.
+bool xid_cache_insert_maybe_wait(THD* thd) +{
This new wait mechanism just for parallel replication of user XA seems very complex. Spin loop, and extra locking and condition variables and locking on a lock-free hash. As explained at the start, let's instead avoid introducing a new mechanism and use the existing wait_for_prior_commit, which is central to parallel replication and highly optimized for this. IIUC, the real problem here is that after wakeup_subsequent_commits() of XA COMMIT, the XID is still in the xid_hash. This is because of this code in trans_xa_commit(): ha_commit_or_rollback_by_xid(thd->lex->xid, !res); if (!res && thd->is_error()) { // hton completion error retains xs/xid in the cache, // unless there had been already one as reflected by `res`. res= true; goto _end_external_xid; } xid_cache_delete(thd, xs); xid_deleted= true; ha_commit_or_rollback_by_xid() ends up in queue_for_group_commit() which will do the wakeup_subsequent_commits. But the xid_cache_delete() happens just after that. This is too late. The wakeup_subsequent_commits() must not occur until after the transaction is fully committed in memory. So instead, move the xid_cache_delete() call so it happens inside the group commit code. Then when wakeup_subsequent_commits() is done after XA COMMIT, the XID entry will be gone. It would go here in MYSQL_BIN_LOG::trx_group_commit_leader(): if (current->cache_mngr->using_xa && likely(!current->error) && DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1)) { mysql_mutex_lock(¤t->thd->LOCK_thd_data); run_commit_ordered(current->thd, current->all); mysql_mutex_unlock(¤t->thd->LOCK_thd_data); } current->thd->wakeup_subsequent_commits(current->error); I noticed that commit_ordered is not done for user XA transactions in replication. The direct reason for that seems to be that they do not use log_and_order(), which sets cache_mngr->using_xa. Can you explain why the XA PREPARE and XA COMMIT is not following the commit_ordered protocol, and what mechanism exists instead to ensure that everything still works correctly? For example that commit order in binlog and InnoDB will be the same, and that binlog recovery works? And finally, just for the record, I still do not agree with the way replication of user XA has been changed (aka MDEV-32020), and this patch does not fix the underlying problem. But changing it to use the wait_for_prior_commit() mechanism as explained at the start will improve the integration with the parallel replication code and be a step towards solving also some of the other underlying issues. - Kristian.