From: Brandon Nesterenko <brandon.nesterenko@mariadb.com> When using semi-sync replication with rpl_semi_sync_master_wait_point=AFTER_COMMIT, the performance of the primary can significantly reduce compared to AFTER_SYNC's performance for workloads with many concurrent users executing transactions. This is because all connections on the primary share the same cond_wait variable/mutex pair, so any time an ACK is received from a replica, all waiting connections are awoken to check if the ACK was for itself, which is done in mutual exclusion. This patch changes this such that the waiting THD will use its own local condition variable, and the ACK receiver thread only signals connections which have been ACKed for wakeup. Additionally: 1) At master shutdown (when waiting for slaves), instead of the main loop individually waiting for each ACK, await_slave_reply() (renamed await_all_slave_replies) now awaits ACKs for each transaction within one invocation. 2) The time when thd::is_awaiting_semi_sync_ack is set is moved to at binlogging time, to ensure transactions which have been binlogged and queued up to await an ACK are not killed, and are still waited on. 3) Repl_semi_sync_master::commit_trx() no longer loops to await its specific ACK. It waits once, and will either fail from timeout, or receive its ACK. Reviewed By: ============ <TODO> --- .../r/rpl_semi_sync_cond_var_per_thd.result | 28 ++ .../rpl/t/rpl_semi_sync_cond_var_per_thd.test | 1 + sql/mysqld.cc | 14 +- sql/semisync_master.cc | 321 +++++++++++------- sql/semisync_master.h | 51 ++- sql/sql_class.h | 14 +- 6 files changed, 283 insertions(+), 146 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result b/mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result new file mode 100644 index 00000000000..65c87e94c94 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result @@ -0,0 +1,28 @@ +include/master-slave.inc +[connection master] +connection master; +call mtr.add_suppression("Got an error reading communication packets"); +set @save_bgc_count= @@global.binlog_commit_wait_count; +set @save_bgc_usec= @@global.binlog_commit_wait_usec; +set @@global.binlog_commit_wait_count=3; +set @@global.binlog_commit_wait_usec=10000000; +# Ensure semi-sync is on +connection slave; +connection master; +# Create three transactions to binlog group commit together +connection master; +create table t1 (a int); +connection server_1; +create table t2 (a int); +connection default; +create table t3 (a int); +connection master; +connection server_1; +connection default; +# +# Cleanup +connection master; +set @@global.binlog_commit_wait_count=@save_bgc_count; +set @@global.binlog_commit_wait_usec=@save_bgc_usec; +drop table t1, t2, t3; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test b/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test index f8fa0a99d9c..ebe15a4ca32 100644 --- a/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test @@ -22,6 +22,7 @@ --source include/master-slave.inc --connection master +call mtr.add_suppression("Got an error reading communication packets"); set @save_bgc_count= @@global.binlog_commit_wait_count; set @save_bgc_usec= @@global.binlog_commit_wait_usec; set @@global.binlog_commit_wait_count=3; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index e224871795e..b315edc091c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1750,18 +1750,12 @@ static void close_connections(void) /* If we are waiting on any ACKs, delay killing the thread until either an ACK is received or the timeout is hit. - - Allow at max the number of sessions to await a timeout; however, if all - ACKs have been received in less iterations, then quit early */ - if (shutdown_wait_for_slaves && repl_semisync_master.get_master_enabled()) + if (shutdown_wait_for_slaves && repl_semisync_master.get_master_enabled() && + repl_semisync_master.sync_get_master_wait_sessions()) { - int waiting_threads= repl_semisync_master.sync_get_master_wait_sessions(); - if (waiting_threads) - sql_print_information("Delaying shutdown to await semi-sync ACK"); - - while (waiting_threads-- > 0) - repl_semisync_master.await_slave_reply(); + sql_print_information("Delaying shutdown to await semi-sync ACK"); + repl_semisync_master.await_all_slave_replies(); } DBUG_EXECUTE_IF("delay_shutdown_phase_2_after_semisync_wait", diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc index 0eaf0f0e0e2..f421e924a61 100644 --- a/sql/semisync_master.cc +++ b/sql/semisync_master.cc @@ -502,16 +502,33 @@ void Repl_semi_sync_master::unlock() void Repl_semi_sync_master::cond_broadcast() { - mysql_cond_broadcast(&COND_binlog_send); + while (!wait_queue.empty()) + { + semisync_wait_trx_t next_waiter= wait_queue.front(); + + /* + Signal the transaction to wake up, and remove it from the queue because + semi-sync is being disabled and the ACK_Receiver thread will no longer + wake these threads. + */ + DBUG_ASSERT(next_waiter.thd); + mysql_cond_signal(&next_waiter.thd->COND_wakeup_ready); + wait_queue.pop(); + } } -int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time) +int Repl_semi_sync_master::cond_timewait(THD *thd, struct timespec *wait_time) { int wait_res; DBUG_ENTER("Repl_semi_sync_master::cond_timewait()"); - wait_res= mysql_cond_timedwait(&COND_binlog_send, + /* + All connection threads share the mutex (LOCK_binlog) to keep consistent + with the Active_tranx cache, but each thread has its own condition variable + on which it waits. + */ + wait_res= mysql_cond_timedwait(&thd->COND_wakeup_ready, &LOCK_binlog, wait_time); DBUG_RETURN(wait_res); @@ -695,12 +712,36 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, l_end: unlock(); + /* + Signal waiting threads which the slave has received transactions for + */ if (can_release_threads) { - DBUG_PRINT("semisync", ("%s: signal all waiting threads.", - "Repl_semi_sync_master::report_reply_binlog")); + lock(); + while (!wait_queue.empty()) + { + semisync_wait_trx_t next_waiter= wait_queue.front(); - cond_broadcast(); + cmp= Active_tranx::compare(m_reply_file_name, m_reply_file_pos, + next_waiter.binlog_name, next_waiter.binlog_pos); + if (cmp >= 0) + { + DBUG_PRINT("semisync", ("%s: signal thread %llu.", + "Repl_semi_sync_master::report_reply_binlog", + next_waiter.thd->thread_id)); + mysql_cond_signal(&next_waiter.thd->COND_wakeup_ready); + wait_queue.pop(); + } + else + { + /* + This thread is ahead of the current ACK binlog coord; quit looping + because it and all later queued transactions won't be signalled. + */ + break; + } + } + unlock(); } DBUG_RETURN(0); @@ -768,6 +809,7 @@ int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file, if (get_master_enabled()) { Trans_binlog_info *log_info; + THD *thd_to_cond_wait; if (!(log_info= thd->semisync_info)) { @@ -779,7 +821,20 @@ int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file, strcpy(log_info->log_file, log_file + dirname_length(log_file)); log_info->log_pos = log_pos; - return write_tranx_in_binlog(log_info->log_file, log_pos); + /* + THD arg depends on wait point mode. If after storage engine commit, the + individual connection threads will perform the wait for semi-sync ACKt, + thd is the thread of the user connection thread. + If it is after binlog sync, the binlog leader thread will perform the + semi-sync waits on behalf of the grouped transaction (which at this + point, we (current_thd) are the leader). If using binlog_group_commit, + thd is the thread of the user connection thread. + */ + thd_to_cond_wait= ((wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) + ? thd + : current_thd); + return write_tranx_in_binlog(thd_to_cond_wait, log_info->log_file, + log_pos); } return 0; @@ -852,12 +907,11 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, lock(); /* This must be called after acquired the lock */ - THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog, - & stage_waiting_for_semi_sync_ack_from_slave, - & old_stage); + THD_ENTER_COND(thd, &thd->COND_wakeup_ready, &LOCK_binlog, + &stage_waiting_for_semi_sync_ack_from_slave, &old_stage); /* This is the real check inside the mutex. */ - if (!get_master_enabled() || !is_on()) + if (!get_master_enabled() || !is_on() || thd_killed(thd)) goto l_end; DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)", @@ -865,129 +919,115 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, (int)is_on())); - while (is_on() && !thd_killed(thd)) - { - /* We have to check these again as things may have changed */ - if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave) - { - aborted= 1; - break; - } - if (m_reply_file_name_inited) + if (m_reply_file_name_inited) + { + int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, + trx_wait_binlog_name, + trx_wait_binlog_pos); + if (cmp >= 0) { - int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, - trx_wait_binlog_name, - trx_wait_binlog_pos); - if (cmp >= 0) - { - /* We have already sent the relevant binlog to the slave: no need to - * wait here. - */ - DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),", - "Repl_semi_sync_master::commit_trx", - m_reply_file_name, - (ulong)m_reply_file_pos)); - success= 1; - break; - } + /* We have already sent the relevant binlog to the slave: no need to + * wait here. + */ + DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),", + "Repl_semi_sync_master::commit_trx", + m_reply_file_name, + (ulong)m_reply_file_pos)); + success= 1; + goto l_end; } + } - /* Let us update the info about the minimum binlog position of waiting - * threads. - */ - if (m_wait_file_name_inited) - { - int cmp = Active_tranx::compare(trx_wait_binlog_name, - trx_wait_binlog_pos, - m_wait_file_name, m_wait_file_pos); - if (cmp <= 0) - { - /* This thd has a lower position, let's update the minimum info. */ - strmake_buf(m_wait_file_name, trx_wait_binlog_name); - m_wait_file_pos = trx_wait_binlog_pos; - - rpl_semi_sync_master_wait_pos_backtraverse++; - DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),", - "Repl_semi_sync_master::commit_trx", - m_wait_file_name, (ulong)m_wait_file_pos)); - } - } - else - { + /* Let us update the info about the minimum binlog position of waiting + * threads. + */ + if (m_wait_file_name_inited) + { + int cmp = Active_tranx::compare(trx_wait_binlog_name, + trx_wait_binlog_pos, + m_wait_file_name, m_wait_file_pos); + if (cmp <= 0) + { + /* This thd has a lower position, let's update the minimum info. */ strmake_buf(m_wait_file_name, trx_wait_binlog_name); m_wait_file_pos = trx_wait_binlog_pos; - m_wait_file_name_inited = true; - DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),", + rpl_semi_sync_master_wait_pos_backtraverse++; + DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),", "Repl_semi_sync_master::commit_trx", m_wait_file_name, (ulong)m_wait_file_pos)); } + } + else + { + strmake_buf(m_wait_file_name, trx_wait_binlog_name); + m_wait_file_pos = trx_wait_binlog_pos; + m_wait_file_name_inited = true; - /* In semi-synchronous replication, we wait until the binlog-dump - * thread has received the reply on the relevant binlog segment from the - * replication slave. - * - * Let us suspend this thread to wait on the condition; - * when replication has progressed far enough, we will release - * these waiting threads. - */ - rpl_semi_sync_master_wait_sessions++; - - /* We keep track of when this thread is awaiting an ack to ensure it is - * not killed while awaiting an ACK if a shutdown is issued. - */ - set_thd_awaiting_semisync_ack(thd, TRUE); - - DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)", + DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),", "Repl_semi_sync_master::commit_trx", - m_wait_timeout, m_wait_file_name, (ulong)m_wait_file_pos)); + } - create_timeout(&abstime, &start_ts); - wait_result = cond_timewait(&abstime); + /* In semi-synchronous replication, we wait until the binlog-dump + * thread has received the reply on the relevant binlog segment from the + * replication slave. + * + * Let us suspend this thread to wait on the condition; + * when replication has progressed far enough, we will release + * these waiting threads. + */ + rpl_semi_sync_master_wait_sessions++; - set_thd_awaiting_semisync_ack(thd, FALSE); - rpl_semi_sync_master_wait_sessions--; + DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)", + "Repl_semi_sync_master::commit_trx", + m_wait_timeout, + m_wait_file_name, (ulong)m_wait_file_pos)); + + create_timeout(&abstime, &start_ts); + wait_result = cond_timewait(thd, &abstime); + set_thd_awaiting_semisync_ack(thd, FALSE); + rpl_semi_sync_master_wait_sessions--; + + if (wait_result != 0) + { + /* This is a real wait timeout. */ + sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " + "semi-sync up to file %s, position %lu.", + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, + m_reply_file_name, (ulong)m_reply_file_pos); + rpl_semi_sync_master_wait_timeouts++; + + /* switch semi-sync off */ + switch_off(); + } + else + { + int wait_time; - if (wait_result != 0) + wait_time = get_wait_time(start_ts); + if (wait_time < 0) { - /* This is a real wait timeout. */ - sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " - "semi-sync up to file %s, position %lu.", - trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, - m_reply_file_name, (ulong)m_reply_file_pos); - rpl_semi_sync_master_wait_timeouts++; - - /* switch semi-sync off */ - switch_off(); + DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at " + "wait position (%s, %lu)", + trx_wait_binlog_name, + (ulong)trx_wait_binlog_pos)); + rpl_semi_sync_master_timefunc_fails++; } else { - int wait_time; - - wait_time = get_wait_time(start_ts); - if (wait_time < 0) - { - DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at " - "wait position (%s, %lu)", - trx_wait_binlog_name, - (ulong)trx_wait_binlog_pos)); - rpl_semi_sync_master_timefunc_fails++; - } - else - { - rpl_semi_sync_master_trx_wait_num++; - rpl_semi_sync_master_trx_wait_time += wait_time; - /* - Assert we have either recieved our ACK; or have timed out and are - awoken in an off state. - */ - DBUG_ASSERT(!get_master_enabled() || !is_on() || thd->is_killed() || - 0 <= Active_tranx::compare( - m_reply_file_name, m_reply_file_pos, - trx_wait_binlog_name, trx_wait_binlog_pos)); - } + rpl_semi_sync_master_trx_wait_num++; + rpl_semi_sync_master_trx_wait_time += wait_time; + success= 1; + /* + Assert we have either recieved our ACK; or have timed out and are + awoken in an off state. + */ + DBUG_ASSERT(!get_master_enabled() || !is_on() || thd->is_killed() || + 0 <= Active_tranx::compare( + m_reply_file_name, m_reply_file_pos, + trx_wait_binlog_name, trx_wait_binlog_pos)); } } @@ -1198,7 +1238,8 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet, DBUG_RETURN(0); } -int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, +int Repl_semi_sync_master::write_tranx_in_binlog(THD *thd, + const char *log_file_name, my_off_t log_file_pos) { int result = 0; @@ -1253,6 +1294,8 @@ int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, } else { + wait_queue.push({log_file_name, log_file_pos, thd}); + set_thd_awaiting_semisync_ack(thd, TRUE); rpl_semi_sync_master_request_ack++; } } @@ -1370,22 +1413,48 @@ void Repl_semi_sync_master::set_export_stats() unlock(); } -void Repl_semi_sync_master::await_slave_reply() +void Repl_semi_sync_master::await_all_slave_replies() { struct timespec abstime; + int wait_result; + semisync_wait_trx_t front; - DBUG_ENTER("Repl_semi_sync_master::::await_slave_reply"); - lock(); + DBUG_ENTER("Repl_semi_sync_master::::await_all_slave_replies"); - /* Just return if there is nothing to wait for */ - if (!rpl_semi_sync_master_wait_sessions) - goto end; - - create_timeout(&abstime, NULL); - cond_timewait(&abstime); + /* + Wait for all elements in the wait_queue to have received acks; or timeout. + If it is a timeout, the connection thread should attempt to turn off + semi-sync and broadcast to all other waiting threads to move on. + + The lock is taken per iteration rather than over the whole loop to allow + more opportunity for the user threads to handle the ACK and remove itself + from the wait_queue (we don't remove it in await_all_slave_replies). + + Note that here, we don't need to compare the binlog coordinates of + wait_queue.front() to Active_tranx, because the ACK_Receiver removes all + acknowledged (and preceding in-binlog-order) transactions from the + wait_queue with the lock. So its presence in wait_queue indicates it is + currently waiting, or about to start waiting. So it will eventually either + be signalled by the ACK_Receiver, or in the timeout case, the thread which + is handling the corresponding timeout. + */ + while (TRUE) + { + lock(); + if (wait_queue.empty() || !get_master_enabled() || !is_on()) + { + unlock(); + break; + } + front= wait_queue.front(); + create_timeout(&abstime, NULL); + wait_result= cond_timewait(front.thd, &abstime); + unlock(); -end: - unlock(); + // Check for timeout + if (wait_result != 0) + break; + } DBUG_VOID_RETURN; } diff --git a/sql/semisync_master.h b/sql/semisync_master.h index 99f46869354..6de0333d4ce 100644 --- a/sql/semisync_master.h +++ b/sql/semisync_master.h @@ -21,6 +21,7 @@ #include "semisync.h" #include "semisync_master_ack_receiver.h" +#include <queue> #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_LOCK_rpl_semi_sync_master_enabled; @@ -361,6 +362,16 @@ class Active_tranx }; +/* + Element in Repl_semi_sync_master::wait_queue to preserve the state of a + transaction waiting for an ACK. +*/ +typedef struct _semisync_wait_trx { + const char *binlog_name; + my_off_t binlog_pos; + THD *thd; +} semisync_wait_trx_t; + /** The extension class for the master of semi-synchronous replication */ @@ -431,10 +442,27 @@ class Repl_semi_sync_master /*Waiting for ACK before/after innodb commit*/ ulong m_wait_point; + /* + Transactions, queued in binlog order, which have not yet received an ACK. + To ensure transactions are waited on chronologically, enque happens at + binlogging time by the binlogging thread (leader thread if using binlog + group commit). + + Deque is done by either 1) the ACK_Receiver thread when signalling an + acknowledged transaction, or 2) in the event of a timeout, the thread which + is handling the timeout when switching off semi-sync. + + Each element consists of the THD performing the wait (if using wait_point + AFTER_SYNC and binlog group commit, this is the leader thread; otherwise it + is the user connection thread executing the transaction), and its binlog + coordinate that must be ACKed. + */ + std::queue<semisync_wait_trx_t> wait_queue; + void lock(); void unlock(); void cond_broadcast(); - int cond_timewait(struct timespec *wait_time); + int cond_timewait(THD *thd, struct timespec *wait_time); /* Is semi-sync replication on? */ bool is_on() { @@ -482,10 +510,11 @@ class Repl_semi_sync_master void create_timeout(struct timespec *out, struct timespec *start_arg); /* - Blocks the calling thread until the ack_receiver either receives an ACK - or times out (from rpl_semi_sync_master_timeout) + Blocks the calling thread until the ack_receiver either receives ACKs for + all transactions awaiting ACKs, or times out (from + rpl_semi_sync_master_timeout) */ - void await_slave_reply(); + void await_all_slave_replies(); /*set the ACK point, after binlog sync or after transaction commit*/ void set_wait_point(unsigned long ack_point) @@ -609,13 +638,19 @@ class Repl_semi_sync_master * semi-sync is on * * Input: (the transaction events' ending binlog position) + * THD - (IN) thread that will wait for an ACK. This can be the + * binlog leader thread when using wait_point + * AFTER_SYNC with binlog group commit. In all other + * cases, this is the user thread executing the + * transaction. * log_file_name - (IN) transaction ending position's file name * log_file_pos - (IN) transaction ending position's file offset * * Return: * 0: success; non-zero: error */ - int write_tranx_in_binlog(const char* log_file_name, my_off_t log_file_pos); + int write_tranx_in_binlog(THD *thd, const char *log_file_name, + my_off_t log_file_pos); /* Read the slave's reply so that we know how much progress the slave makes * on receive replication events. @@ -634,9 +669,9 @@ class Repl_semi_sync_master int before_reset_master(); /* - Determines if the given thread is currently awaiting a semisync_ack. Note - that the thread's value is protected by this class's LOCK_binlog, so this - function (indirectly) provides safe access. + Determines if the given thread is currently awaiting (or queued to await) a + semisync_ack. Note that the thread's value is protected by this class's + LOCK_binlog, so this function (indirectly) provides safe access. */ my_bool is_thd_awaiting_semisync_ack(THD *thd) { diff --git a/sql/sql_class.h b/sql/sql_class.h index 4c6005416e1..35d72d61d48 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5318,8 +5318,18 @@ class THD: public THD_count, /* this must be first */ Flag, mutex and condition for a thread to wait for a signal from another thread. - Currently used to wait for group commit to complete, can also be used for - other purposes. + Currently used to wait for group commit to complete, and COND_wakeup_ready + is used for threads to wait on semi-sync ACKs (though is protected by + Repl_semi_sync_master::LOCK_binlog). Note the following relationships + between these two use-cases when using + rpl_semi_sync_master_wait_point=AFTER_SYNC during group commit: + 1) Non-leader threads use COND_wakeup_ready to wait for the leader thread + to complete binlog commit. + 2) The leader thread uses COND_wakeup_ready to await ACKs from the + replica before signalling the non-leader threads to wake up. + + With wait_point=AFTER_COMMIT, there is no overlap as binlogging has + finished, so COND_wakeup_ready is safe to re-use. */ bool wakeup_ready; mysql_mutex_t LOCK_wakeup_ready; -- 2.30.2