When choosing the scheduling bucket for the next event group in rpl_parallel_entry::choose_thread(), use an explicit FIFO for the round-robin selection instead of a simple cyclic counter i := (i+1) % N. This allows to schedule XA COMMIT/ROLLBACK dependencies explicitly without changing the round-robin scheduling of other event groups. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- sql/rpl_parallel.cc | 54 +++++++++++++++++++++++++++------------------ sql/rpl_parallel.h | 13 ++++++++--- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 6d6b78054f9..5be700a700f 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2356,33 +2356,38 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, Gtid_log_event *gtid_ev) { - uint32 idx; + sched_bucket *cur_thr; Relay_log_info *rli= rgi->rli; rpl_parallel_thread *thr; - idx= rpl_thread_idx; if (gtid_ev) { + /* New event group; cycle the thread scheduling buckets round-robin. */ + thread_sched_fifo->push_back(thread_sched_fifo->get()); + if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) - idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; - else { - ++idx; - if (idx >= rpl_thread_max) - idx= 0; + /* + For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE, + overriding the round-robin scheduling. + */ + uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), + gtid_ev->xid.key_length()) % rpl_thread_max; + rpl_threads[idx].unlink(); + thread_sched_fifo->append(rpl_threads + idx); } - rpl_thread_idx= idx; } - thr= rpl_threads[idx]; + cur_thr= thread_sched_fifo->head(); + + thr= cur_thr->thr; if (thr) { *did_enter_cond= false; mysql_mutex_lock(&thr->LOCK_rpl_thread); for (;;) { - if (thr->current_owner != &rpl_threads[idx]) + if (thr->current_owner != &cur_thr->thr) { /* The worker thread became idle, and returned to the free list and @@ -2448,8 +2453,8 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, } } if (!thr) - rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], - this); + cur_thr->thr= thr= + global_rpl_thread_pool.get_thread(&cur_thr->thr, this); return thr; } @@ -2506,15 +2511,20 @@ rpl_parallel::find(uint32 domain_id) ulong count= opt_slave_domain_parallel_threads; if (count == 0 || count > opt_slave_parallel_threads) count= opt_slave_parallel_threads; - rpl_parallel_thread **p; + rpl_parallel_entry::sched_bucket *p; + I_List<rpl_parallel_entry::sched_bucket> *fifo; if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL), &e, sizeof(*e), &p, count*sizeof(*p), + &fifo, sizeof(*fifo), NULL)) { my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; } + e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>; + for (ulong i= 0; i < count; ++i) + e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket); e->rpl_threads= p; e->rpl_thread_max= count; e->domain_id= domain_id; @@ -2580,10 +2590,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) mysql_mutex_unlock(&e->LOCK_parallel_entry); for (j= 0; j < e->rpl_thread_max; ++j) { - if ((rpt= e->rpl_threads[j])) + if ((rpt= e->rpl_threads[j].thr)) { mysql_mutex_lock(&rpt->LOCK_rpl_thread); - if (rpt->current_owner == &e->rpl_threads[j]) + if (rpt->current_owner == &e->rpl_threads[j].thr) mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -2603,10 +2613,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); for (j= 0; j < e->rpl_thread_max; ++j) { - if ((rpt= e->rpl_threads[j])) + if ((rpt= e->rpl_threads[j].thr)) { mysql_mutex_lock(&rpt->LOCK_rpl_thread); - while (rpt->current_owner == &e->rpl_threads[j]) + while (rpt->current_owner == &e->rpl_threads[j].thr) mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -2653,7 +2663,7 @@ int rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, Format_description_log_event *fdev) { - uint32 idx; + sched_bucket *cur_thr; rpl_parallel_thread *thr; rpl_parallel_thread::queued_event *qev; Relay_log_info *rli= rgi->rli; @@ -2668,12 +2678,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, Thus there is no need for the full complexity of choose_thread(). We only need to check if we have a current worker thread, and queue for it if so. */ - idx= rpl_thread_idx; - thr= rpl_threads[idx]; + cur_thr= thread_sched_fifo->head(); + thr= cur_thr->thr; if (!thr) return 0; mysql_mutex_lock(&thr->LOCK_rpl_thread); - if (thr->current_owner != &rpl_threads[idx]) + if (thr->current_owner != &cur_thr->thr) { /* No active worker thread, so no need to queue the master restart. */ mysql_mutex_unlock(&thr->LOCK_rpl_thread); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index bcce54bc0ec..9ba86453d06 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -326,6 +326,11 @@ struct rpl_parallel_thread_pool { struct rpl_parallel_entry { + struct sched_bucket : public ilink { + sched_bucket() : thr(nullptr) { } + rpl_parallel_thread *thr; + }; + mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; uint32 domain_id; @@ -355,17 +360,19 @@ struct rpl_parallel_entry { uint64 stop_sub_id; /* - Cyclic array recording the last rpl_thread_max worker threads that we + Array recording the last rpl_thread_max worker threads that we queued event for. This is used to limit how many workers a single domain can occupy (--slave-domain-parallel-threads). + The array is structured as a FIFO using an I_List thread_sched_fifo. + Note that workers are never explicitly deleted from the array. Instead, we need to check (under LOCK_rpl_thread) that the thread still belongs to us before re-using (rpl_thread::current_owner). */ - rpl_parallel_thread **rpl_threads; + sched_bucket *rpl_threads; + I_List<sched_bucket> *thread_sched_fifo; uint32 rpl_thread_max; - uint32 rpl_thread_idx; /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. -- 2.30.2