[PATCH 0/2] Suggestion for smaller fix to XA parallel replication performance, MDEV-31949
As was discussed recently off-list, there is a desire to find a minimal fix to the performance issues in 10.6 with parallel replication of user XA due to non-optimal scheduling (MDEV-31949). How about something like this two-patch series? It _only_ modifies the scheduling of event groups to worker threads, and only for XA transactions, and is quite a bit simpler than the other proposed patches for MDEV-31949. The current 10.6 code for parallel replication of user XA needs to avoid applying in parallel two different event groups that have the same XA XID. Typically the XA PREPARE and corresponding XA COMMIT/ROLLBACK, but could also be another transaction that uses a duplicate XID. The method currently used in 10.6 takes a hash of the XID and assigns the transaction to worker (hash MOD N) amongst the N workers. This is sub-optimal, since there is a high chance that amonst N transactions, several of them will map to the same worker. Thus less than N transactions will be able to replicate in parallel and group commit together. The second patch in the series improves this by keeping track of recently active XIDs and which worker they were scheduled on. A subsequent event group with the same XID is scheduled on the same worker. If the XID is different, the event group is scheduled normally on the next free worker in round-robin fashion. This permits most XA PREPARE to be scheduled without dependency restrictions, same as normal non-XA transactions. Only if the user submits multiple XA transactions close together using the same duplicate XID will there be dependency restrictions. The XA COMMIT will normally be scheduled on the same worker as the XA PREPARE, unless the two events are far apart in the replication stream. This is mostly unavoidable in the current XA replication design, since the XA PREPARE and XA COMMIT of a single transaction cannot group-commit together. The first patch is a refactor/preparation patch. It rewrites the scheduling to use an explicit FIFO for scheduling the threads instead of a simple `i := (i+1) % N` cyclic counter. This allows to combine explicit scheduling of some transactions with round-robin scheduling of the rest in the second patch. Patches also available on github: https://github.com/MariaDB/server/commits/knielsen_xa_sched_minimal_fix https://github.com/MariaDB/server/commit/66d6cce96f831b638812490844d75423178... https://github.com/MariaDB/server/commit/3a32fb546c111e4627cad7c66bcc089bc11... This is just a quick proof-of-concept I did today, but it seems to work, and is small and simple and suitable for targeted fixing and cherry-picking. Brandon and/or Andrei, maybe you could try your benchmarks with this patch and see if it also solves the performance issues you were seeing? Or if there's something else required for XA that I'm missing? - Kristian. Kristian Nielsen (2): Refactor parallel replication round-robin scheduling to use explicit FIFO More precise dependency tracking of XA XID in parallel replication sql/rpl_parallel.cc | 158 ++++++++++++++++++++++++++++++++++++++------ sql/rpl_parallel.h | 48 +++++++++++++- 2 files changed, 181 insertions(+), 25 deletions(-) -- 2.30.2
When choosing the scheduling bucket for the next event group in rpl_parallel_entry::choose_thread(), use an explicit FIFO for the round-robin selection instead of a simple cyclic counter i := (i+1) % N. This allows to schedule XA COMMIT/ROLLBACK dependencies explicitly without changing the round-robin scheduling of other event groups. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- sql/rpl_parallel.cc | 54 +++++++++++++++++++++++++++------------------ sql/rpl_parallel.h | 13 ++++++++--- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 6d6b78054f9..5be700a700f 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2356,33 +2356,38 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, Gtid_log_event *gtid_ev) { - uint32 idx; + sched_bucket *cur_thr; Relay_log_info *rli= rgi->rli; rpl_parallel_thread *thr; - idx= rpl_thread_idx; if (gtid_ev) { + /* New event group; cycle the thread scheduling buckets round-robin. */ + thread_sched_fifo->push_back(thread_sched_fifo->get()); + if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) - idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; - else { - ++idx; - if (idx >= rpl_thread_max) - idx= 0; + /* + For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE, + overriding the round-robin scheduling. + */ + uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), + gtid_ev->xid.key_length()) % rpl_thread_max; + rpl_threads[idx].unlink(); + thread_sched_fifo->append(rpl_threads + idx); } - rpl_thread_idx= idx; } - thr= rpl_threads[idx]; + cur_thr= thread_sched_fifo->head(); + + thr= cur_thr->thr; if (thr) { *did_enter_cond= false; mysql_mutex_lock(&thr->LOCK_rpl_thread); for (;;) { - if (thr->current_owner != &rpl_threads[idx]) + if (thr->current_owner != &cur_thr->thr) { /* The worker thread became idle, and returned to the free list and @@ -2448,8 +2453,8 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, } } if (!thr) - rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], - this); + cur_thr->thr= thr= + global_rpl_thread_pool.get_thread(&cur_thr->thr, this); return thr; } @@ -2506,15 +2511,20 @@ rpl_parallel::find(uint32 domain_id) ulong count= opt_slave_domain_parallel_threads; if (count == 0 || count > opt_slave_parallel_threads) count= opt_slave_parallel_threads; - rpl_parallel_thread **p; + rpl_parallel_entry::sched_bucket *p; + I_List<rpl_parallel_entry::sched_bucket> *fifo; if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL), &e, sizeof(*e), &p, count*sizeof(*p), + &fifo, sizeof(*fifo), NULL)) { my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; } + e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>; + for (ulong i= 0; i < count; ++i) + e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket); e->rpl_threads= p; e->rpl_thread_max= count; e->domain_id= domain_id; @@ -2580,10 +2590,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) mysql_mutex_unlock(&e->LOCK_parallel_entry); for (j= 0; j < e->rpl_thread_max; ++j) { - if ((rpt= e->rpl_threads[j])) + if ((rpt= e->rpl_threads[j].thr)) { mysql_mutex_lock(&rpt->LOCK_rpl_thread); - if (rpt->current_owner == &e->rpl_threads[j]) + if (rpt->current_owner == &e->rpl_threads[j].thr) mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -2603,10 +2613,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); for (j= 0; j < e->rpl_thread_max; ++j) { - if ((rpt= e->rpl_threads[j])) + if ((rpt= e->rpl_threads[j].thr)) { mysql_mutex_lock(&rpt->LOCK_rpl_thread); - while (rpt->current_owner == &e->rpl_threads[j]) + while (rpt->current_owner == &e->rpl_threads[j].thr) mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -2653,7 +2663,7 @@ int rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, Format_description_log_event *fdev) { - uint32 idx; + sched_bucket *cur_thr; rpl_parallel_thread *thr; rpl_parallel_thread::queued_event *qev; Relay_log_info *rli= rgi->rli; @@ -2668,12 +2678,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, Thus there is no need for the full complexity of choose_thread(). We only need to check if we have a current worker thread, and queue for it if so. */ - idx= rpl_thread_idx; - thr= rpl_threads[idx]; + cur_thr= thread_sched_fifo->head(); + thr= cur_thr->thr; if (!thr) return 0; mysql_mutex_lock(&thr->LOCK_rpl_thread); - if (thr->current_owner != &rpl_threads[idx]) + if (thr->current_owner != &cur_thr->thr) { /* No active worker thread, so no need to queue the master restart. */ mysql_mutex_unlock(&thr->LOCK_rpl_thread); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index bcce54bc0ec..9ba86453d06 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -326,6 +326,11 @@ struct rpl_parallel_thread_pool { struct rpl_parallel_entry { + struct sched_bucket : public ilink { + sched_bucket() : thr(nullptr) { } + rpl_parallel_thread *thr; + }; + mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; uint32 domain_id; @@ -355,17 +360,19 @@ struct rpl_parallel_entry { uint64 stop_sub_id; /* - Cyclic array recording the last rpl_thread_max worker threads that we + Array recording the last rpl_thread_max worker threads that we queued event for. This is used to limit how many workers a single domain can occupy (--slave-domain-parallel-threads). + The array is structured as a FIFO using an I_List thread_sched_fifo. + Note that workers are never explicitly deleted from the array. Instead, we need to check (under LOCK_rpl_thread) that the thread still belongs to us before re-using (rpl_thread::current_owner). */ - rpl_parallel_thread **rpl_threads; + sched_bucket *rpl_threads; + I_List<sched_bucket> *thread_sched_fifo; uint32 rpl_thread_max; - uint32 rpl_thread_idx; /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. -- 2.30.2
Keep track of each recently active XID, recording which worker it was queued on. If an XID might still be active, choose the same worker to queue event groups that refer to the same XID to avoid conflicts. Otherwise, schedule the XID freely in the next round-robin slot. This way, XA PREPARE can normally be scheduled without restrictions (unless duplicate XID transactions come close together). This improves scheduling and parallelism over the old method, where the worker thread to schedule XA PREPARE on was fixed based on a hash value of the XID. XA COMMIT will normally be scheduled on the same worker as XA PREPARE, but can be a different one if the XA PREPARE is far back in the event history. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- sql/rpl_parallel.cc | 126 ++++++++++++++++++++++++++++++++++++++++---- sql/rpl_parallel.h | 35 ++++++++++++ 2 files changed, 150 insertions(+), 11 deletions(-) diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 5be700a700f..7c6fa184259 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2324,6 +2324,75 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) } } + +/* + Check when we have done a complete round of scheduling for workers + 0, 1, ..., (rpl_thread_max-1), in this order. + This often occurs every rpl_thread_max event group, but XA XID dependency + restrictions can cause insertion of extra out-of-order worker scheduling + in-between the normal round-robin scheduling. +*/ +void +rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur) +{ + uint32 idx= cur - rpl_threads; + DBUG_ASSERT(cur >= rpl_threads); + DBUG_ASSERT(cur < rpl_threads + rpl_thread_max); + if (idx == current_generation_idx) + { + ++idx; + if (idx >= rpl_thread_max) + { + /* A new generation; all workers have been scheduled at least once. */ + idx= 0; + ++current_generation; + } + current_generation_idx= idx; + } +} + + +rpl_parallel_entry::sched_bucket * +rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid) +{ + uint64 cur_gen= current_generation; + my_off_t i= 0; + while (i < maybe_active_xid.elements) + { + /* + Purge no longer active XID from the list: + + - In generation N, XID might have been scheduled for worker W. + - Events in generation (N+1) might run freely in parallel with W. + - Events in generation (N+2) will have done wait_for_prior_commit for + the event group with XID (or a later one), but the XID might still be + active for a bit longer after wakeup_prior_commit(). + - Events in generation (N+3) will have done wait_for_prior_commit() for + an event in W _after_ the XID, so are sure not to see the XID active. + + Therefore, XID can be safely scheduled to a different worker in + generation (N+3) when last prior use was in generation N (or earlier). + */ + xid_active_generation *a= + dynamic_element(&maybe_active_xid, i, xid_active_generation *); + if (a->generation + 3 <= cur_gen) + { + *a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid)); + continue; + } + if (xid->eq(&a->xid)) + { + /* Update the last used generation and return the match. */ + a->generation= cur_gen; + return a->thr; + } + ++i; + } + /* No matching XID conflicts. */ + return nullptr; +} + + /* Obtain a worker thread that we can queue an event to. @@ -2368,17 +2437,36 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) { - /* - For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE, - overriding the round-robin scheduling. - */ - uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; - rpl_threads[idx].unlink(); - thread_sched_fifo->append(rpl_threads + idx); + if ((cur_thr= check_xa_xid_dependency(>id_ev->xid))) + { + /* + A previously scheduled event group with the same XID might still be + active in a worker, so schedule this event group in the same worker + to avoid a conflict. + */ + cur_thr->unlink(); + thread_sched_fifo->append(cur_thr); + } + else + { + /* Record this XID now active. */ + xid_active_generation *a= + (xid_active_generation *)alloc_dynamic(&maybe_active_xid); + if (!a) + return NULL; + a->thr= cur_thr= thread_sched_fifo->head(); + a->generation= current_generation; + a->xid.set(>id_ev->xid); + } } + else + cur_thr= thread_sched_fifo->head(); + + check_scheduling_generation(cur_thr); } - cur_thr= thread_sched_fifo->head(); + else + cur_thr= thread_sched_fifo->head(); + thr= cur_thr->thr; if (thr) @@ -2469,6 +2557,7 @@ free_rpl_parallel_entry(void *element) dealloc_gco(e->current_gco); e->current_gco= prev_gco; } + delete_dynamic(&e->maybe_active_xid); mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); my_free(e); @@ -2522,11 +2611,26 @@ rpl_parallel::find(uint32 domain_id) my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; } + /* Initialize a FIFO of scheduled worker threads. */ e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>; - for (ulong i= 0; i < count; ++i) - e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket); + /* + (We cycle the FIFO _before_ allocating next entry in + rpl_parallel_entry::choose_thread(). So initialize the FIFO with the + highest element at the front, just so that the first event group gets + scheduled on entry 0). + */ + e->thread_sched_fifo-> + push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket); + for (ulong i= 0; i < count-1; ++i) + e->thread_sched_fifo-> + push_back(::new (p+i) rpl_parallel_entry::sched_bucket); e->rpl_threads= p; e->rpl_thread_max= count; + e->current_generation = 0; + e->current_generation_idx = 0; + init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid, + sizeof(rpl_parallel_entry::xid_active_generation), + 0, count, 0, MYF(0)); e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 9ba86453d06..a0100fd6446 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -326,10 +326,26 @@ struct rpl_parallel_thread_pool { struct rpl_parallel_entry { + /* + A small struct to put worker threads references into a FIFO (using an + I_List) for round-robin scheduling. + */ struct sched_bucket : public ilink { sched_bucket() : thr(nullptr) { } rpl_parallel_thread *thr; }; + /* + A struct to keep track of into which "generation" an XA XID was last + scheduled. A "generation" means that we know that every worker thread + slot in the rpl_parallel_entry was scheduled at least once. When more + that two generations have passed, we can safely reuse the XID in a + different worker. + */ + struct xid_active_generation { + uint64 generation; + sched_bucket *thr; + xid_t xid; + }; mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; @@ -373,6 +389,23 @@ struct rpl_parallel_entry { sched_bucket *rpl_threads; I_List<sched_bucket> *thread_sched_fifo; uint32 rpl_thread_max; + /* + Keep track of all XA XIDs that may still be active in a worker thread. + The elements are of type xid_active_generation. + */ + DYNAMIC_ARRAY maybe_active_xid; + /* + Keeping track of the current scheduling generation. + + A new generation means that every worker thread in the rpl_threads array + have been scheduled at least one event group. + + When we have scheduled to slot current_generation_idx= 0, 1, ..., N-1 in this + order, we know that (at least) one generation has passed. + */ + uint64 current_generation; + uint32 current_generation_idx; + /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. @@ -426,6 +459,8 @@ struct rpl_parallel_entry { /* The group_commit_orderer object for the events currently being queued. */ group_commit_orderer *current_gco; + void check_scheduling_generation(sched_bucket *cur); + sched_bucket *check_xa_xid_dependency(xid_t *xid); rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, Gtid_log_event *gtid_ev); -- 2.30.2
Kristian Nielsen via developers <developers@lists.mariadb.org> writes:
The XA COMMIT will normally be scheduled on the same worker as the XA PREPARE, unless the two events are far apart in the replication stream. This is mostly unavoidable in the current XA replication design, since the XA PREPARE and XA COMMIT of a single transaction cannot group-commit together.
After some discussions with Andrei, things are a bit more complex than this. On a (parallel) slave where transactions are generally small and fsync is constly, the ability to group commit multiple transactions together is important for performance. And if XA PREPARE and the matching XA COMMIT cannot group-commit together, this becomes a problem in a simple serial stream of prepares-and-commits: XA PREPARE 't1' XA COMMIT 't1' XA PREPARE 't2' XA COMMIT 't2' XA PREPARE 't3' XA COMMIT 't3' ... If XA PREPARE 't1' cannot group-commit together with XA COMMIT 't1', then the maximum group commit size will be 2. This is because transactions must binlog in the same order on the slave as on the master. This limitation exists in my suggested "minimal" patch. In the more complex MDEV-31949 patches, this limitation is partially lifted, from what I understood on Andrei. The XA PREPARE 't1' can group-commit _to the binlog_ together with XA COMMIT 't1'. This allows large group commits to the binlog. Inside the engine (InnoDB), the XA PREPARE 't1' and XA COMMIT 't1' cannot group-commit together. Much of the extra complexity in the proposed MDEV-31949 patches come due to this, being able to group-commit the XA PREPARE and XA COMMIT together in the binlog, but have the XA COMMIT wait for the XA PREPARE before committing in engine. I think the point is that XA PREPAREs do not commit in-order inside the engine, so InnoDB can still have large group-commits (just the groups will be different mix of transactions than in the binlog). Based on all this, I expect my minimal patch to have reduced performance (compared to the more complex patches) in benchmarks where the XA PREPARE and XA COMMIT appear close together in the binlog, and the cost of fsync is significant compared to the cost of running the transaction. - Kristian.
participants (1)
-
Kristian Nielsen