[Maria-developers] Review requested for MDEV-7818
I need someone to review my below fix for MDEV-7818. This involves the FLUSH TABLES WITH READ LOCK command and associated metadata locking, something that I know little about. Monty suggested the basic semantics - that all currently running parallel replication transactions would be allowed to complete before granting the global read lock. But it really needs a review of the actual implementation, by someone who has an understanding of the FTWRL semantics and implementation. Thanks, - Kristian. Kristian Nielsen <knielsen@knielsen-hq.org> writes:
revision-id: 0e609daea4796fafca94f7adf796addfc506168b parent(s): 22b6c297dc0de6b176238ae820bddb5e70f2c8ab committer: Kristian Nielsen branch nick: mariadb timestamp: 2015-06-09 16:01:53 +0200 message:
MDEV-7818: Deadlock occurring with parallel replication and FTWRL
Problem is that FLUSH TABLES WITH READ LOCK first blocks threads from starting new commits, then waits for running commits to complete. But in-order parallel replication needs commits to happen in a particular order, so this can easily deadlock.
To fix this problem, this patch introduces a way to temporarily pause the parallel replication worker threads. Before starting FTWRL, we let all worker threads complete in-progress transactions, and then wait. Then we proceed to take the global read lock. Once the lock is obtained, we unpause the worker threads. Now commits are blocked from starting by the global read lock, so the deadlock will no longer occur.
--- sql/mysqld.cc | 3 + sql/mysqld.h | 3 + sql/rpl_parallel.cc | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++-- sql/rpl_parallel.h | 27 ++++-- sql/sql_parse.cc | 13 +++ 5 files changed, 262 insertions(+), 14 deletions(-)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc index e05c0b6..af8ae2c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9514,6 +9514,9 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0}; PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0}; PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0}; +PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read lock", 0}; +PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for worker threads to pause for global read lock", 0}; +PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replication worker thread pool is busy", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0}; diff --git a/sql/mysqld.h b/sql/mysqld.h index 156e7f9..8c953c1 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -454,6 +454,9 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit; extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit; extern PSI_stage_info stage_waiting_for_room_in_worker_thread; extern PSI_stage_info stage_waiting_for_workers_idle; +extern PSI_stage_info stage_waiting_for_ftwrl; +extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause; +extern PSI_stage_info stage_waiting_for_rpl_thread_pool; extern PSI_stage_info stage_master_gtid_wait_primary; extern PSI_stage_info stage_master_gtid_wait; extern PSI_stage_info stage_gtid_wait_other_connection; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 738d0e5..d90507a 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -280,6 +280,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_count;
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + if (!gco->installed) { group_commit_orderer *prev_gco= gco->prev_gco; @@ -336,6 +338,159 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, }
+static void +do_ftwrl_wait(rpl_group_info *rgi, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 sub_id= rgi->gtid_sub_id; + + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + + if (unlikely(entry->pause_sub_id > 0) && sub_id > entry->pause_sub_id) + { + thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, + &stage_waiting_for_ftwrl, old_stage); + *did_enter_cond= true; + do + { + if (entry->force_abort || rgi->worker_error) + break; + if (thd->check_killed()) + { + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + break; + } + mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); + } while (entry->pause_sub_id > 0 && sub_id > entry->pause_sub_id); + } + + if (sub_id > entry->largest_started_sub_id) + entry->largest_started_sub_id= sub_id; +} + + +static void +pool_mark_busy(rpl_parallel_thread_pool *pool) +{ + mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool); + DBUG_ASSERT(!pool->busy); + pool->busy= true; +} + + +static void +pool_mark_not_busy(rpl_parallel_thread_pool *pool) +{ + mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool); + DBUG_ASSERT(pool->busy); + pool->busy= false; + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); +} + + +void +rpl_unpause_for_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + + DBUG_ASSERT(pool->busy); + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + e->pause_sub_id= 0; + mysql_cond_broadcast(&e->COND_parallel_entry); + } + + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + pool_mark_not_busy(pool); + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); +} + + +/* + . + + Note: in case of error return, unpause_for_ftwrl() must _not_ be called. +*/ +int +rpl_pause_for_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + int err= 0; + + /* + While the count_pending_pause_for_ftwrl counter is non-zero, the pool + cannot be shutdown/resized, so threads are guaranteed to not disappear. + + This is required to safely be able to access the individual threads below. + (We cannot lock an individual thread while holding LOCK_rpl_thread_pool, + as this can deadlock against release_thread()). + */ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + pool_mark_busy(pool); + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + + for (i= 0; i < pool->count; ++i) + { + PSI_stage_info old_stage; + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + ++e->need_sub_id_signal; + if (!e->pause_sub_id) + e->pause_sub_id= e->largest_started_sub_id; + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, + &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); + while (e->last_committed_sub_id < e->pause_sub_id && !err) + { + if (thd->check_killed()) + { + thd->send_kill_message(); + err= 1; + break; + } + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + }; + --e->need_sub_id_signal; + thd->EXIT_COND(&old_stage); + if (err) + break; + } + + if (err) + rpl_unpause_for_ftwrl(thd); + return err; +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -856,6 +1011,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; + if (likely(!skip_event_group)) + do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); + register_wait_for_prior_event_group_commit(rgi, entry);
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, @@ -1058,6 +1216,47 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, rpl_parallel_thread **new_list= NULL; rpl_parallel_thread *new_free_list= NULL; rpl_parallel_thread *rpt_array= NULL; + THD *thd; + PSI_stage_info old_stage; + int res; + + /* + Wait here while the queue is busy. This is done to make FLUSH TABLES WITH + READ LOCK work correctly, without incuring extra locking penalties in + normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the + thread pool, and for this we need to make sure the pool will not go away + during the operation. The LOCK_rpl_thread_pool is not suitable for + this. It is taken by release_thread() while holding LOCK_rpl_thread; so it + must be released before locking any LOCK_rpl_thread lock, or a deadlock + can occur. + + So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and + pool size changes with this condition wait. + */ + thd= current_thd; + res= 0; + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + if (thd) + thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, + &stage_waiting_for_rpl_thread_pool, &old_stage); + while (pool->busy) + { + if (thd->check_killed()) + { + thd->send_kill_message(); + res= 1; + break; + } + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + } + if (!res) + pool_mark_busy(pool); + if (thd) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + if (res) + return res;
/* Allocate the new list of threads up-front. @@ -1106,7 +1305,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, */ for (i= 0; i < pool->count; ++i) { - rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); + rpl_parallel_thread *rpt; + + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + while ((rpt= pool->free_list) == NULL) + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + pool->free_list= rpt->next; + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->stop= true; mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -1157,7 +1363,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, }
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - mysql_cond_broadcast(&pool->COND_rpl_thread_pool); + pool_mark_not_busy(pool); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return 0; @@ -1182,6 +1388,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, } my_free(new_list); } + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + pool_mark_not_busy(pool); + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); return 1; }
@@ -1445,7 +1654,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : count(0), threads(0), free_list(0), inited(false) + : threads(0), free_list(0), count(0), inited(false), busy(false) { }
@@ -1453,9 +1662,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool() int rpl_parallel_thread_pool::init(uint32 size) { - count= 0; threads= NULL; free_list= NULL; + count= 0; + busy= false;
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, MY_MUTEX_INIT_SLOW); @@ -1496,8 +1706,14 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, rpl_parallel_thread *rpt;
mysql_mutex_lock(&LOCK_rpl_thread_pool); - while ((rpt= free_list) == NULL) + for (;;) + { + while (unlikely(busy)) + mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); + if ((rpt= free_list) != NULL) + break; mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); + } free_list= rpt->next; mysql_mutex_unlock(&LOCK_rpl_thread_pool); mysql_mutex_lock(&rpt->LOCK_rpl_thread); @@ -1908,7 +2124,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); - e->need_sub_id_signal= true; + ++e->need_sub_id_signal; thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, &stage_waiting_for_workers_idle, &old_stage); while (e->current_sub_id > e->last_committed_sub_id) @@ -1921,7 +2137,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) } mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); } - e->need_sub_id_signal= false; + --e->need_sub_id_signal; thd->EXIT_COND(&old_stage); if (err) return err; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 04f8706..77a4005 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -199,12 +199,18 @@ struct rpl_parallel_thread {
struct rpl_parallel_thread_pool { - uint32 count; struct rpl_parallel_thread **threads; struct rpl_parallel_thread *free_list; mysql_mutex_t LOCK_rpl_thread_pool; mysql_cond_t COND_rpl_thread_pool; + uint32 count; bool inited; + /* + While FTWRL runs, this counter is incremented to make SQL thread or + STOP/START slave not try to start new activity while that operation + is in progress. + */ + bool busy;
rpl_parallel_thread_pool(); int init(uint32 size); @@ -219,6 +225,12 @@ struct rpl_parallel_entry { mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; uint32 domain_id; + /* + Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show + that they are waiting, so that finish_event_group knows to signal them + when last_committed_sub_id is increased. + */ + uint32 need_sub_id_signal; uint64 last_commit_id; bool active; /* @@ -228,12 +240,6 @@ struct rpl_parallel_entry { */ bool force_abort; /* - Set in wait_for_workers_idle() to show that it is waiting, so that - finish_event_group knows to signal it when last_committed_sub_id is - increased. - */ - bool need_sub_id_signal; - /* At STOP SLAVE (force_abort=true), we do not want to process all events in the queue (which could unnecessarily delay stop, if a lot of events happen to be queued). The stop_count provides a safe point at which to stop, so @@ -291,6 +297,11 @@ struct rpl_parallel_entry { The value is ULONGLONG_MAX when no error occured. */ uint64 stop_on_error_sub_id; + /* + During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than + this value must not start, but wait until the global read lock is released. + */ + uint64 pause_sub_id; /* Total count of event groups queued so far. */ uint64 count_queued_event_groups; /* @@ -331,5 +342,7 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool; extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); +extern int rpl_pause_for_ftwrl(THD *thd); +extern void rpl_unpause_for_ftwrl(THD *thd);
#endif /* RPL_PARALLEL_H */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 5635e9a..ca6b11e 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4271,6 +4271,17 @@ case SQLCOM_PREPARE: break; }
+ if (lex->type & REFRESH_READ_LOCK) + { + /* + We need to pause any parallel replication slave workers during FLUSH + TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as + worker threads eun run in arbitrary order but need to commit in a + specific given order. + */ + if (rpl_pause_for_ftwrl(thd)) + goto error; + } /* reload_acl_and_cache() will tell us if we are allowed to write to the binlog or not. @@ -4301,6 +4312,8 @@ case SQLCOM_PREPARE: if (!res) my_ok(thd); } + if (lex->type & REFRESH_READ_LOCK) + rpl_unpause_for_ftwrl(thd);
break; } _______________________________________________ commits mailing list commits@mariadb.org https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
participants (1)
-
Kristian Nielsen