Keep track of each recently active XID, recording which worker it was queued on. If an XID might still be active, choose the same worker to queue event groups that refer to the same XID to avoid conflicts. Otherwise, schedule the XID freely in the next round-robin slot. This way, XA PREPARE can normally be scheduled without restrictions (unless duplicate XID transactions come close together). This improves scheduling and parallelism over the old method, where the worker thread to schedule XA PREPARE on was fixed based on a hash value of the XID. XA COMMIT will normally be scheduled on the same worker as XA PREPARE, but can be a different one if the XA PREPARE is far back in the event history. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- sql/rpl_parallel.cc | 126 ++++++++++++++++++++++++++++++++++++++++---- sql/rpl_parallel.h | 35 ++++++++++++ 2 files changed, 150 insertions(+), 11 deletions(-) diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 5be700a700f..7c6fa184259 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2324,6 +2324,75 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) } } + +/* + Check when we have done a complete round of scheduling for workers + 0, 1, ..., (rpl_thread_max-1), in this order. + This often occurs every rpl_thread_max event group, but XA XID dependency + restrictions can cause insertion of extra out-of-order worker scheduling + in-between the normal round-robin scheduling. +*/ +void +rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur) +{ + uint32 idx= cur - rpl_threads; + DBUG_ASSERT(cur >= rpl_threads); + DBUG_ASSERT(cur < rpl_threads + rpl_thread_max); + if (idx == current_generation_idx) + { + ++idx; + if (idx >= rpl_thread_max) + { + /* A new generation; all workers have been scheduled at least once. */ + idx= 0; + ++current_generation; + } + current_generation_idx= idx; + } +} + + +rpl_parallel_entry::sched_bucket * +rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid) +{ + uint64 cur_gen= current_generation; + my_off_t i= 0; + while (i < maybe_active_xid.elements) + { + /* + Purge no longer active XID from the list: + + - In generation N, XID might have been scheduled for worker W. + - Events in generation (N+1) might run freely in parallel with W. + - Events in generation (N+2) will have done wait_for_prior_commit for + the event group with XID (or a later one), but the XID might still be + active for a bit longer after wakeup_prior_commit(). + - Events in generation (N+3) will have done wait_for_prior_commit() for + an event in W _after_ the XID, so are sure not to see the XID active. + + Therefore, XID can be safely scheduled to a different worker in + generation (N+3) when last prior use was in generation N (or earlier). + */ + xid_active_generation *a= + dynamic_element(&maybe_active_xid, i, xid_active_generation *); + if (a->generation + 3 <= cur_gen) + { + *a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid)); + continue; + } + if (xid->eq(&a->xid)) + { + /* Update the last used generation and return the match. */ + a->generation= cur_gen; + return a->thr; + } + ++i; + } + /* No matching XID conflicts. */ + return nullptr; +} + + /* Obtain a worker thread that we can queue an event to. @@ -2368,17 +2437,36 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) { - /* - For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE, - overriding the round-robin scheduling. - */ - uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; - rpl_threads[idx].unlink(); - thread_sched_fifo->append(rpl_threads + idx); + if ((cur_thr= check_xa_xid_dependency(>id_ev->xid))) + { + /* + A previously scheduled event group with the same XID might still be + active in a worker, so schedule this event group in the same worker + to avoid a conflict. + */ + cur_thr->unlink(); + thread_sched_fifo->append(cur_thr); + } + else + { + /* Record this XID now active. */ + xid_active_generation *a= + (xid_active_generation *)alloc_dynamic(&maybe_active_xid); + if (!a) + return NULL; + a->thr= cur_thr= thread_sched_fifo->head(); + a->generation= current_generation; + a->xid.set(>id_ev->xid); + } } + else + cur_thr= thread_sched_fifo->head(); + + check_scheduling_generation(cur_thr); } - cur_thr= thread_sched_fifo->head(); + else + cur_thr= thread_sched_fifo->head(); + thr= cur_thr->thr; if (thr) @@ -2469,6 +2557,7 @@ free_rpl_parallel_entry(void *element) dealloc_gco(e->current_gco); e->current_gco= prev_gco; } + delete_dynamic(&e->maybe_active_xid); mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); my_free(e); @@ -2522,11 +2611,26 @@ rpl_parallel::find(uint32 domain_id) my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; } + /* Initialize a FIFO of scheduled worker threads. */ e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>; - for (ulong i= 0; i < count; ++i) - e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket); + /* + (We cycle the FIFO _before_ allocating next entry in + rpl_parallel_entry::choose_thread(). So initialize the FIFO with the + highest element at the front, just so that the first event group gets + scheduled on entry 0). + */ + e->thread_sched_fifo-> + push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket); + for (ulong i= 0; i < count-1; ++i) + e->thread_sched_fifo-> + push_back(::new (p+i) rpl_parallel_entry::sched_bucket); e->rpl_threads= p; e->rpl_thread_max= count; + e->current_generation = 0; + e->current_generation_idx = 0; + init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid, + sizeof(rpl_parallel_entry::xid_active_generation), + 0, count, 0, MYF(0)); e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 9ba86453d06..a0100fd6446 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -326,10 +326,26 @@ struct rpl_parallel_thread_pool { struct rpl_parallel_entry { + /* + A small struct to put worker threads references into a FIFO (using an + I_List) for round-robin scheduling. + */ struct sched_bucket : public ilink { sched_bucket() : thr(nullptr) { } rpl_parallel_thread *thr; }; + /* + A struct to keep track of into which "generation" an XA XID was last + scheduled. A "generation" means that we know that every worker thread + slot in the rpl_parallel_entry was scheduled at least once. When more + that two generations have passed, we can safely reuse the XID in a + different worker. + */ + struct xid_active_generation { + uint64 generation; + sched_bucket *thr; + xid_t xid; + }; mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; @@ -373,6 +389,23 @@ struct rpl_parallel_entry { sched_bucket *rpl_threads; I_List<sched_bucket> *thread_sched_fifo; uint32 rpl_thread_max; + /* + Keep track of all XA XIDs that may still be active in a worker thread. + The elements are of type xid_active_generation. + */ + DYNAMIC_ARRAY maybe_active_xid; + /* + Keeping track of the current scheduling generation. + + A new generation means that every worker thread in the rpl_threads array + have been scheduled at least one event group. + + When we have scheduled to slot current_generation_idx= 0, 1, ..., N-1 in this + order, we know that (at least) one generation has passed. + */ + uint64 current_generation; + uint32 current_generation_idx; + /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. @@ -426,6 +459,8 @@ struct rpl_parallel_entry { /* The group_commit_orderer object for the events currently being queued. */ group_commit_orderer *current_gco; + void check_scheduling_generation(sched_bucket *cur); + sched_bucket *check_xa_xid_dependency(xid_t *xid); rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, Gtid_log_event *gtid_ev); -- 2.30.2