Kristian Nielsen <knielsen(a)knielsen-hq.org> writes:
Hi Andrei,
> commit 96bd9e6b780f3738b5008b89aae6b0b086f15943
> Author: Andrei <andrei.elkin(a)mariadb.com>
> Date: Sat Aug 19 19:49:25 2023 +0300
> XA-Prepare group of events
>
> XA START xid
> ...
> XA END xid
> XA PREPARE xid
>
> and its XA-"complete" terminator
>
> XA COMMIT or
> XA ROLLBACK
>
> are made distributed Round-Robin across slave parallel workers.
> The former hash-based policy was proven to attribute to execution
> latency through creating a big - many times larger than the size
> of the worker pool - queue of binlog-ordered transactions
> to commit.
>
> Acronyms and notations used below:
>
> XAP := XA-Prepare event or the whole prepared XA group of events
> XAC := XA-"complete", which is a solitary group of events
> |W| := the size of the slave worker pool
> Subscripts like `_k' denote order in a corresponding sequence
> (e.g binlog file).
Here is my review of this patch.
> The list is arranged as a sliding window with the size of 2*|W| to account
> a possibility of XAP_k -> XAP_k+2|W|-1 the largest (in the group-of-events
> count sense) dependency.
I spent a lot of effort trying to understand why the factor 2 in the size of
2*|W| occurs. Since each transaction must wait_for_prior_commit for the
prior transaction, there should never be more than |W| transactions active.
In your example with |W|=4, you say that
> Worker #4 can take on its T_8 when T_1 is yet at the beginning of its processing
But that does not seem possible. T_8 cannot start until T_1 (and 2..4) have
done wakeup_subsequent_commits(), which surely shouldn't happen "at the
beginning of its processing", but only at the end.
But I think I now understand. The problem is that the XA code calls
wakeup_subsequent_commits _before_ updating the XID hash (in XA PREPARE) /
deleting from the XID hash (in XA COMMIT/ROLLBACK). Right?
This I think is the fundamental issue to address. The wait_for_prior_commit
mechanism is to ensure that the prior transaction has completed its commit,
which means the state of the transaction is "committed" in memory. Thus, the
XID hash, which records information about the status of the (XA)
transactions, must also be updated before wakeup_prior_commit may be done.
I can see that the XA PREPARE/COMMIT/ROLLBACK already uses the
trx_group_commit_leader() code. So what you should do is to arrange for the
update of / delete from the XID hash happens inside there, just before the
wakeup_subsequent_commits() gets called:
if (current->cache_mngr->using_xa && likely(!current->error) &&
DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1))
{
mysql_mutex_lock(¤t->thd->LOCK_thd_data);
run_commit_ordered(current->thd, current->all);
mysql_mutex_unlock(¤t->thd->LOCK_thd_data);
}
current->thd->wakeup_subsequent_commits(current->error);
I checked the code, the XID hash update/delete currently happens shortly
after that in the code, so there should be no problems moving it in there.
[Incidentally, I also noticed that the XA PREPARE/COMMIT does not use the
commit_ordered mechanism. While this seems (I think) unrelated to this
patch, I think it is something you need to look into. The commit_ordered
mechansim is central to ensuring that InnoDB and binlog commit in the same
order, and I don't see how this is guaranteed for the current XA code (and
I suspect this might also mean that binlog recovery will be broken in some
cases).]
Once the XID hash update/delete is moved as described, we have the very nice
property that after an XA PREPARE/COMMIT/ROLLBACK T_i has done
wakeup_subsequent_commits(), it is safe for a following T_j that refers to
the same XID to replicate.
This means that you can now use the existing wait_for_prior_commit mechanism
to handle the dependencies between XA event groups with the same XID. Thus,
there is no need to introduce a separate (and very complicated) wait
mechanism of xid_cache_insert_maybe_wait() and
xid_cache_search_maybe_wait(), solely for parallel replication of user XA.
Thus, in the SQL driver thread, using your sliding window (which need only
be of size |W| I believe), you can mark a "T1: XA COMMIT <xid> " followed by
"T2: XA PREPARE <xid>" of a duplicate XID, that T2 must do a
wait_for_prior_commit(T1) before running. This is simple, it can use the
existing mechanims for that, using entry->last_committed_sub_id. Just like
rgi->wait_commit_sub_id and rgi->wait_commit_group_info, we can introduce eg.:
rgi->pre_dependency_sub_id
rgi->pre_dependency_group_info
and then do a
wait_for_prior_commit(&rgi->pre_dependency_group_info->commit_orderer)
if the rgi->pre_dependency_sub_id > entry->last_committed_sub_id)
Similarly, the sliding window can record for "T3: XA PREPARE" and "T4: XA
COMMIT", that T4 should do a wait_for_prior_commit(T3) before running. Then
T4 will be sure that the XA transaction is ready.
This will also be a good preparation for later introducing more of this kind
of pre-calculated dependencies to the parallel replication scheduling, along
the lines of the "balanced applier" that you have written about previously.
These ideas I believe have tremendous potential, and handling this in a
generic way is a very big improvement. I'm not suggesting to implement more
than what is necessary now for XA, but keep it in mind that this dependency
calculation in the SQL driver thread can be used for other than user XA in
the future. You can also name the new introduced dependency fields
appropriately (ie. with generic names not specific to XA when applicable).
This will greatly simplify the patch, I believe; and more importantly it
will integrate the XA-required scheduling in a clean and generic way in the
parallel replication code.
Following is more detailed comments on the patch:
> - DBUG_ASSERT(
> - !(thd->rgi_slave &&
> - !thd->rgi_slave->worker_error &&
> - thd->rgi_slave->did_mark_start_commit) ||
> - (thd->transaction->xid_state.is_explicit_XA() ||
> - (thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_PREPARED_XA)));
> -
> + DBUG_ASSERT(!(thd->rgi_slave &&
> + !thd->rgi_slave->worker_error &&
> + thd->rgi_slave->did_mark_start_commit) ||
> + (thd->transaction->xid_state.is_explicit_XA() ||
> + (thd->rgi_slave->gtid_ev_flags2 &
> + (Gtid_log_event::FL_PREPARED_XA |
> + Gtid_log_event::FL_COMPLETED_XA))));
> if (thd->rgi_slave &&
> !thd->rgi_slave->worker_error &&
> thd->rgi_slave->did_mark_start_commit)
> thd->rgi_slave->unmark_start_commit();
Why is it allowed to rollback here while did_mark_start_commit is true for
XA PREPARE or XA COMMIT?
The comment above should be extended to explain this.
And why does the following if () statement then still do an emergency
unmark_start_commit(), when the condition is not caught by the assertion?
It looks like something is wrong in the earlier code, the intention here is
that it should never be necessary to unmark_start_commit() here, and if it
is, then it is a bug flagged by the assertion.
> @@ -1751,7 +1753,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
>
> if ((using_stmt && !cache_mngr->stmt_cache.empty()) ||
> (using_trx && !cache_mngr->trx_cache.empty()) ||
> - thd->transaction->xid_state.is_explicit_XA())
> + (thd->transaction->xid_state.is_explicit_XA() ||
> + (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
Why is this extra condition thd->rgi_slave->is_async_xac necessary?
There are many of these conditions spread around the code in the patch.
I think perhaps it is because the transaction is somehow not "connected" to
the THD? Because it is in the XID cache? But the situation must be similar
on the master, if the XA COMMIT happens in a different transaction from XA
PREPARE.
So this should be done the same way on the slave, so that the XA transaction
gets connected to the THD of the worker thread processing it, and so that
these extra rgi_slave->is_async_xac conditions are not needed. It is very
fragile to have such conditions spread around the code, it will be
impossible to avoid bugs due to forgetting such extra condition on one place
or another.
> + /*
> + While xid_state.get_xid() is a robust method to access `xid`
> + it can't be used on slave by the asynchronously running XA-"complete".
> + In the latter case thd->lex->xid is safely accessible.
> + */
> + buflen= serialize_with_xid(is_async_xac? thd->lex->xid :
> + thd->transaction->xid_state.get_xid(),
Same here, it should be possible to use the same xid_state in the slave
thread as on the master, so we avoid special cases all over the code for the
slave threads.
> +static bool acquire_xid(THD *thd)
> +{
> + bool rc= false;
> +
> + if (thd->rgi_slave && thd->rgi_slave->is_async_xac &&
> + thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_COMPLETED_XA)
> + {
> + XID_STATE &xid_state= thd->transaction->xid_state;
This function actually seems to do part of "XA transaction gets connected to
the THD". But it's called from binlog_commit_by_xid() and
binlog_rollback_by_xid(), which means it has to have yet another of these
special-condition checks for thd->rgi_slave->is_async_xac etc.
Instead, call this from the code that applies the "XA COMMIT" event (eg. in
log_event_server.cc), and make sure it fully connects the XA transaction to
the the THD. Then you get rid of the checks for is_async_xac in a lot of
places, and also avoid polluting with replication details the code than runs
the original transaction on the master.
> + rc= binlog_commit(thd, TRUE, FALSE);
> + thd->ha_data[binlog_hton->slot].ha_info[1].reset();
> + }
> + if (!rc)
> + {
> + rc= acquire_xid(thd);
> + }
> + if (thd->is_current_stmt_binlog_disabled())
> + {
> + thd->wakeup_subsequent_commits(rc);
> + }
So IIUC, here we first binlog the XA COMMIT and then
wakeup_subsequent_commits(). But the engine commit only happens afterwards,
in ha_commit_or_rollback_by_xid.
Why wakeup_subsequent_commits() here? It seems too early, the transaction is
not yet committed in the engine, how do you avoid that the commits in the
engine will happen in the wrong order? And then a mariabackup might take a
backup with T2 committed but T1 not and not have a valid replication
position to provision a slave.
And also, is this skipping the binlog transaction coordinator and two-phase
commit with the engines? And in particular, commit_ordered()? Then again, it
seems we have the problem that the engine can commit in the opposite order
from the binlog. I wonder if this will also affect crash recovery if we
crash with different commit order in binlog and engine?
So I think wakeup_subsequent_commits() here is wrong, should be removed.
> @@ -2185,7 +2293,9 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
> }
>
> if (cache_mngr->trx_cache.empty() &&
> - (thd->transaction->xid_state.get_state_code() != XA_PREPARED ||
> + ((thd->transaction->xid_state.get_state_code() != XA_PREPARED &&
> + !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
> + thd->lex->sql_command == SQLCOM_XA_COMMIT)) ||
As explained above, this condition (and the similar in binlog_rollback()
should be avoided.
> @@ -10510,13 +10624,20 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all)
>
> binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
> int cookie= 0;
> + int rc= 0;
> +
> + if (thd->rgi_slave && thd->is_current_stmt_binlog_disabled())
> + {
> + rc= thd->wait_for_prior_commit();
> + if (rc == 0)
> + thd->wakeup_subsequent_commits(rc);
> + return rc;
> + }
Why is this necessary?
If it is really necessary, then put it in the code that applies the XA
PREPARE event, not as a random special case in generic code.
> - bool rc= false;
> -
>
> rc= write_empty_xa_prepare(thd, cache_mngr); // normally gains need_unlog
>
> static bool write_empty_xa_prepare(THD *thd, binlog_cache_mngr *cache_mngr)
> {
> return binlog_commit_flush_xa_prepare(thd, true, cache_mngr);
> }
Smaller point: since you're fixing the type of `rc` to be int (which is
correct), also fix the return type of write_empty_xa_prepare() to be int and
not bool - as binlog_commit_flush_xa_prepare() as well as unlog_xa_prepare()
return int, not bool.
> @@ -3314,16 +3314,22 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
> XID_STATE &xid_state= thd->transaction->xid_state;
> if (is_transactional)
> {
> - if (xid_state.is_explicit_XA() &&
> - (thd->lex->sql_command == SQLCOM_XA_PREPARE ||
> - xid_state.get_state_code() == XA_PREPARED))
> + bool is_async_xac= false;
> + if ((xid_state.is_explicit_XA() &&
> + (thd->lex->sql_command == SQLCOM_XA_PREPARE ||
> + xid_state.get_state_code() == XA_PREPARED)) ||
> + (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
Again, this is the generic GTID event contructor, it shouldn't need this
kind of logic. It's very strange that a simple class constructor returns
something different depending on which thread it runs in!
Hopefully this is no longer necessary after XA transaction gets properly
connected to the THD. But else, it should be handled by passing in suitable
flag from the caller, or by the caller setting the required modifications
after construction.
> +using std::max;
Please don't. Using std::max() explicit is not long and makes it explicit
what `max` implementation is used.
> @@ -760,7 +762,8 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
> return;
> err_code= thd->get_stmt_da()->sql_errno();
> if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
> - err_code != ER_PRIOR_COMMIT_FAILED) ||
> + (err_code != ER_PRIOR_COMMIT_FAILED &&
> + err_code != ER_XAER_NOTA)) ||
Why? What is special about the ER_XAER_NOTA error?
Is it really safe to speculatively run the XA COMMIT in parallel, and then
retry it for any other error than ER_XAER_NOTA ?
> @@ -2467,6 +2463,7 @@ free_rpl_parallel_entry(void *element)
> }
> mysql_cond_destroy(&e->COND_parallel_entry);
> mysql_mutex_destroy(&e->LOCK_parallel_entry);
> + e->concurrent_xaps_window.~Dynamic_array();
> my_free(e);
No, let's not do this, calling a destructor explicitly in a POD managed by
my_malloc() / my_free().
Why not just make concurrent_xaps_window a pointer to the Dynamic_array and
manage with new / delete? Then you can also allocate it lazily, so it will
not need to be allocated at all for the 99.9xx% of users who are not
replicating user XA transactions.
The size of the sliding window is fixed anyway (until slave_parallel_threads
is changed), so why use Dynamic_array at all, why not just allocate a plain
array of the right size?
> +template <>
> +struct std::hash<XID>
> +{
> + std::size_t operator()(const XID& xid) const
> + {
> + return my_hash_sort(&my_charset_bin, xid.key(), xid.key_length());
I don't understand, what is the purpose of introducing this function object?
Why not just call my_hash_sort() directly (seems it's only used in one
place) and avoid this code-obfuscation?
> + Dynamic_array<std::pair<std::size_t, uint32>> concurrent_xaps_window;
This is dangerous as Dynamic_array has a destructor but rpl_parallel_entry
is a POD where we don't call any constructor/destructor. Should instead use
a pointer to an object constructed with new (or not use Dynamic_array at all
as suggested above).
> + /*
> + When true indicates that the user xa transaction is going to
> + complete (with COMMIT or ROLLBACK) by the worker thread,
> + *while* another worker is still preparing it. Once the latter is done
> + the xid will be acquired and the flag gets reset.
> + */
> + bool is_async_xac;
> +
I don't understand this comment. Surely it is not possible to *complete*,
with XA COMMIT, a transaction before the corresponding XA PREPARE has
completed (possibly in another worker thread)?
Do you mean that it is possible for a worker thread W2 to *start* applying
the XA COMMIT speculatively, before another worker W1 has completed the XA
PREPARE?
But I think this is not necessary, see below the discussion that the XA
COMMIT worker can just do a wait_for_prior_commit() on the event group of
its XA PREPARE. Then this should not be necessary.
> @@ -137,7 +137,7 @@ template <class Elem> class Dynamic_array
> */
> Elem& at(size_t idx)
> {
> - DBUG_ASSERT(idx < array.elements);
> + DBUG_ASSERT(idx < max_size());
> return *(((Elem*)array.buffer) + idx);
No, this cannot possibly be right. This is a patch about XA replication, why
would it suddenly allow accessing deleted elements of any Dynamic_array used
in the server?!?
Surely you can allocate the elements you need for your sliding window. In
fact, since the window is fixed size anyway, why use Dynamic_array at all?
@@ -2367,6 +2367,15 @@ struct wait_for_commit
event group is fully done.
*/
bool wakeup_blocked;
+ /*
+ The condition variable servers as a part of facilities to handle various
+ commit time additional dependency between groups of replication events, e.g
+ XA-Prepare -> XA-Commit, or XA-Prepare -> XA-Prepare all with the same xid.
+ */
+ mysql_cond_t COND_wait_commit_dep;
No, this doesn't belong in struct wait_for_commit.
wait_for_commit is a low-level mechanism for ordering commits, it should not
need any knowledge even of replication, certainly not of user-XA
replication.
The COND_wait_commit_dep isn't even used in any function related to
wait_for_commit or even sql_class.cc, it's only used in sql/xa.cc.
[But as discussed at the start, we can use the existing
wait_for_prior_commit instead of introducing this new mechanism just for
replicating user XA.]
> -static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
> +XID_cache_element *xid_cache_search(THD *thd, XID *xid)
Why remove the `static`? The function is not used outside of sql/xa.cc
> @@ -254,16 +259,221 @@ static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
> + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
> + {
> + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT ||
> + thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
> + thd->rgi_slave->is_async_xac= true;
Here again we have some logic that relates to replication being inserted
somewhat randomly in low-level code, requiring this
thd->rgi_slave->is_parallel_exec condition.
Instead of doing this, the caller higher up should handle this depending on
the return of xid_cache_search(). This way, code paths that are not related
to replication don't get affected by the replication-specific logic.
> +bool xid_cache_insert_maybe_wait(THD* thd)
> +{
This new wait mechanism just for parallel replication of user XA seems very
complex. Spin loop, and extra locking and condition variables and locking on
a lock-free hash. As explained at the start, let's instead avoid introducing
a new mechanism and use the existing wait_for_prior_commit, which is central
to parallel replication and highly optimized for this.
IIUC, the real problem here is that after wakeup_subsequent_commits() of XA
COMMIT, the XID is still in the xid_hash. This is because of this code in
trans_xa_commit():
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
if (!res && thd->is_error())
{
// hton completion error retains xs/xid in the cache,
// unless there had been already one as reflected by `res`.
res= true;
goto _end_external_xid;
}
xid_cache_delete(thd, xs);
xid_deleted= true;
ha_commit_or_rollback_by_xid() ends up in queue_for_group_commit() which
will do the wakeup_subsequent_commits. But the xid_cache_delete() happens
just after that. This is too late. The wakeup_subsequent_commits() must
not occur until after the transaction is fully committed in memory.
So instead, move the xid_cache_delete() call so it happens inside the group
commit code. Then when wakeup_subsequent_commits() is done after XA COMMIT,
the XID entry will be gone. It would go here in
MYSQL_BIN_LOG::trx_group_commit_leader():
if (current->cache_mngr->using_xa && likely(!current->error) &&
DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1))
{
mysql_mutex_lock(¤t->thd->LOCK_thd_data);
run_commit_ordered(current->thd, current->all);
mysql_mutex_unlock(¤t->thd->LOCK_thd_data);
}
current->thd->wakeup_subsequent_commits(current->error);
I noticed that commit_ordered is not done for user XA transactions in
replication. The direct reason for that seems to be that they do not use
log_and_order(), which sets cache_mngr->using_xa.
Can you explain why the XA PREPARE and XA COMMIT is not following the
commit_ordered protocol, and what mechanism exists instead to ensure that
everything still works correctly? For example that commit order in binlog
and InnoDB will be the same, and that binlog recovery works?
And finally, just for the record, I still do not agree with the way
replication of user XA has been changed (aka MDEV-32020), and this patch
does not fix the underlying problem. But changing it to use the
wait_for_prior_commit() mechanism as explained at the start will improve the
integration with the parallel replication code and be a step towards solving
also some of the other underlying issues.
- Kristian.