sujatha <sujatha.sivakumar@mariadb.com> writes: Hi Sujutha,
@sql/sql_class.h Moved 'wait_for_prior_commit(THD *thd)' method inside sql_class.cc
@sql/sql_class.cc Added code to check for 'stop_on_error_sub_id' for event groups which get skipped and don't have any preceding group to wait for.
This looks like the wrong fix. The wait_for_commit mechanism is a low-level API, it should not deal with things like stop_on_error_sub_id.
1,2,3 are scheduled in parallel.
Since the above is true 'skip_event_group=true' is set. Simply call 'wait_for_prior_commit' to wakeup all waiters. Group '2' didn't had any waitee and its execution is skipped. Hence its wakeup_error=0.It sends a positive wakeup signal to '3'. Which commits. This results in a missed transaction. i.e 33 is missed.
I think this the the real problem. 2 is stopping because of error, it must not call wakeup_subsequent_commits() without propagating the error, that breaks the whole semantics of wait_for_commit. Maybe it's enough just to also set rgi->worker_error here, when skip_event_group is set due to error in an earlier group? if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; Then wakeup_subsequent_commits() will correctly inform the following transaction about the error so it doesn't try to commit on its own. There is already similar code in the retry_event_group() function: if (entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX || rgi->gtid_sub_id < entry->stop_on_error_sub_id) ... else { err= rgi->worker_error= 1; Then all of the changes to sql_class.h and sql_class.cc can be omittet. Also, this fix doesn't seem to belong with the other MDEV-18648 changes, it is unrelated to what the default parallel replication mode is. So please do it in a separate commit. Hope this helps, - Kristian.
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 4eab241232b..5ba9c5fe456 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -7365,6 +7365,33 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) }
+int wait_for_commit::wait_for_prior_commit(THD *thd) +{ + /* + Quick inline check, to avoid function call and locking in the common case + where no wakeup is registered, or a registered wait was already signalled. + */ + if (waitee) + return wait_for_prior_commit2(thd); + else + { + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + else + { + rpl_group_info* rgi= thd->rgi_slave; + if (rgi && rgi->is_parallel_exec && + rgi->parallel_entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX && + rgi->gtid_sub_id >= rgi->parallel_entry->stop_on_error_sub_id) + { + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + wakeup_error= ER_PRIOR_COMMIT_FAILED; + } + } + return wakeup_error; + } +} + /* Wait for commit of another transaction to complete, as already registered with register_wait_for_prior_commit(). If the commit already completed, @@ -7387,6 +7414,17 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) { if (wakeup_error) my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + else + { + rpl_group_info* rgi= thd->rgi_slave; + if (rgi && rgi->is_parallel_exec && + rgi->parallel_entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX && + rgi->gtid_sub_id >= rgi->parallel_entry->stop_on_error_sub_id) + { + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + wakeup_error= ER_PRIOR_COMMIT_FAILED; + } + } goto end; } /* diff --git a/sql/sql_class.h b/sql/sql_class.h index 72cb8148895..0a0a1aa9fa1 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2062,21 +2062,6 @@ struct wait_for_commit bool commit_started;
void register_wait_for_prior_commit(wait_for_commit *waitee); - int wait_for_prior_commit(THD *thd) - { - /* - Quick inline check, to avoid function call and locking in the common case - where no wakeup is registered, or a registered wait was already signalled. - */ - if (waitee) - return wait_for_prior_commit2(thd); - else - { - if (wakeup_error) - my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); - return wakeup_error; - } - } void wakeup_subsequent_commits(int wakeup_error_arg) { /* @@ -2123,6 +2108,7 @@ struct wait_for_commit
void wakeup(int wakeup_error);
+ int wait_for_prior_commit(THD *thd); int wait_for_prior_commit2(THD *thd); void wakeup_subsequent_commits2(int wakeup_error); void unregister_wait_for_prior_commit2();