[PATCH] MDEV-33551: Semi-sync Wait Point AFTER_COMMIT Slow on Workloads with Heavy Concurrency
From: Brandon Nesterenko <brandon.nesterenko@mariadb.com> When using semi-sync replication with rpl_semi_sync_master_wait_point=AFTER_COMMIT, the performance of the primary can significantly reduce compared to AFTER_SYNC's performance for workloads with many concurrent users executing transactions. This is because all connections on the primary share the same cond_wait variable/mutex pair, so any time an ACK is received from a replica, all waiting connections are awoken to check if the ACK was for itself, which is done in mutual exclusion. This patch changes this such that the waiting THD will use its own local condition variable, and the ACK receiver thread only signals connections which have been ACKed for wakeup. That is, the THD::LOCK_wakeup_ready condition variable is re-used for this purpose, and the Active_tranx queue nodes are extended to hold the waiting thread, so it can be signalled once ACKed. Additionally: 1) At master shutdown (when waiting for slaves), instead of the main loop individually waiting for each ACK, await_slave_reply() (renamed await_all_slave_replies) now awaits ACKs for each transaction within one invocation. 2) The time when thd::is_awaiting_semi_sync_ack is set is moved to at binlogging time, to ensure transactions which have been binlogged and queued up to await an ACK are not killed, and are still waited on. Review note: DBUG_ASSERT addition from previous commit is changed to a print warning, and the .test assert_greps to ensure its absence. Reviewed By: ============ Kristian Nielsen <knielsen@knielsen-hq.org> --- .../r/rpl_semi_sync_cond_var_per_thd.result | 32 +++ .../rpl/t/rpl_semi_sync_cond_var_per_thd.test | 10 + sql/log.cc | 12 +- sql/mysqld.cc | 14 +- sql/semisync_master.cc | 241 +++++++++++------- sql/semisync_master.h | 122 +++++++-- sql/sql_class.h | 14 +- 7 files changed, 311 insertions(+), 134 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result b/mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result new file mode 100644 index 00000000000..08f601447d5 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_cond_var_per_thd.result @@ -0,0 +1,32 @@ +include/master-slave.inc +[connection master] +connection master; +call mtr.add_suppression("Got an error reading communication packets"); +set @save_bgc_count= @@global.binlog_commit_wait_count; +set @save_bgc_usec= @@global.binlog_commit_wait_usec; +set @save_debug_dbug= @@global.debug_dbug; +set @@global.binlog_commit_wait_count=3; +set @@global.binlog_commit_wait_usec=10000000; +set @@global.debug_dbug="+d,testing_cond_var_per_thd"; +# Ensure semi-sync is on +connection slave; +connection master; +# Create three transactions to binlog group commit together +connection master; +create table t1 (a int); +connection server_1; +create table t2 (a int); +connection default; +create table t3 (a int); +connection master; +connection server_1; +connection default; +include/assert_grep.inc [Check that there is no 'Thread awaiting semi-sync ACK was awoken before its ACK' warning in error log.] +# +# Cleanup +connection master; +set @@global.binlog_commit_wait_count=@save_bgc_count; +set @@global.binlog_commit_wait_usec=@save_bgc_usec; +set @@global.debug_dbug=@save_debug_dbug; +drop table t1, t2, t3; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test b/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test index f8fa0a99d9c..2d9a6d13048 100644 --- a/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_cond_var_per_thd.test @@ -22,10 +22,13 @@ --source include/master-slave.inc --connection master +call mtr.add_suppression("Got an error reading communication packets"); set @save_bgc_count= @@global.binlog_commit_wait_count; set @save_bgc_usec= @@global.binlog_commit_wait_usec; +set @save_debug_dbug= @@global.debug_dbug; set @@global.binlog_commit_wait_count=3; set @@global.binlog_commit_wait_usec=10000000; +set @@global.debug_dbug="+d,testing_cond_var_per_thd"; --echo # Ensure semi-sync is on --connection slave @@ -53,12 +56,19 @@ source include/wait_for_status_var.inc; --connection default --reap +--let $assert_text= Check that there is no 'Thread awaiting semi-sync ACK was awoken before its ACK' warning in error log. +--let $assert_select=Thread awaiting semi-sync ACK was awoken before its ACK +--let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err +--let $assert_count= 0 +--let $assert_only_after=CURRENT_TEST +--source include/assert_grep.inc --echo # --echo # Cleanup --connection master set @@global.binlog_commit_wait_count=@save_bgc_count; set @@global.binlog_commit_wait_usec=@save_bgc_usec; +set @@global.debug_dbug=@save_debug_dbug; drop table t1, t2, t3; --source include/rpl_end.inc diff --git a/sql/log.cc b/sql/log.cc index ce5970b6a03..6df6ee7a03c 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6870,8 +6870,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); #ifdef HAVE_REPLICATION - if (repl_semisync_master.report_binlog_update(thd, log_file_name, - file->pos_in_file)) + if (repl_semisync_master.report_binlog_update( + thd, thd, log_file_name, file->pos_in_file)) { sql_print_error("Failed to run 'after_flush' hooks"); error= 1; @@ -8467,7 +8467,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) #ifdef HAVE_REPLICATION if (likely(!current->error) && unlikely(repl_semisync_master. - report_binlog_update(current->thd, + report_binlog_update(current->thd, leader->thd, current->cache_mngr-> last_commit_pos_file, current->cache_mngr-> @@ -8549,17 +8549,19 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mysql_mutex_assert_not_owner(&LOCK_commit_ordered); bool first __attribute__((unused))= true; - bool last __attribute__((unused)); + bool last; for (current= queue; current != NULL; current= current->next) { last= current->next == NULL; #ifdef HAVE_REPLICATION + if (likely(!current->error)) current->error= repl_semisync_master.wait_after_sync(current->cache_mngr-> last_commit_pos_file, current->cache_mngr-> - last_commit_pos_offset); + last_commit_pos_offset, + last); #endif first= false; } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index e224871795e..b315edc091c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1750,18 +1750,12 @@ static void close_connections(void) /* If we are waiting on any ACKs, delay killing the thread until either an ACK is received or the timeout is hit. - - Allow at max the number of sessions to await a timeout; however, if all - ACKs have been received in less iterations, then quit early */ - if (shutdown_wait_for_slaves && repl_semisync_master.get_master_enabled()) + if (shutdown_wait_for_slaves && repl_semisync_master.get_master_enabled() && + repl_semisync_master.sync_get_master_wait_sessions()) { - int waiting_threads= repl_semisync_master.sync_get_master_wait_sessions(); - if (waiting_threads) - sql_print_information("Delaying shutdown to await semi-sync ACK"); - - while (waiting_threads-- > 0) - repl_semisync_master.await_slave_reply(); + sql_print_information("Delaying shutdown to await semi-sync ACK"); + repl_semisync_master.await_all_slave_replies(); } DBUG_EXECUTE_IF("delay_shutdown_phase_2_after_semisync_wait", diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc index 0eaf0f0e0e2..411b3dfc84e 100644 --- a/sql/semisync_master.cc +++ b/sql/semisync_master.cc @@ -68,6 +68,14 @@ static ulonglong timespec_to_usec(const struct timespec *ts) return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND; } +int signal_waiting_transaction(THD *waiting_thd, const char *binlog_file, + my_off_t binlog_pos) +{ + if (waiting_thd->is_awaiting_semisync_ack) + mysql_cond_signal(&waiting_thd->COND_wakeup_ready); + return 0; +} + /******************************************************************************* * * <Active_tranx> class : manage all active transaction nodes @@ -142,7 +150,18 @@ int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1, return 0; } -int Active_tranx::insert_tranx_node(const char *log_file_name, +int Active_tranx::for_front(active_tranx_action action) +{ + /* + This should be called after validating !Active_tranx::is_empty() while + holding LOCK_binlog + */ + DBUG_ASSERT(m_trx_front); + return action(m_trx_front->thd, m_trx_front->log_name, m_trx_front->log_pos); +} + +int Active_tranx::insert_tranx_node(THD *thd_to_wait, + const char *log_file_name, my_off_t log_file_pos) { Tranx_node *ins_node; @@ -165,6 +184,7 @@ int Active_tranx::insert_tranx_node(const char *log_file_name, strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1); ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ ins_node->log_pos = log_file_pos; + ins_node->thd= thd_to_wait; if (!m_trx_front) { @@ -232,28 +252,22 @@ bool Active_tranx::is_tranx_end_pos(const char *log_file_name, DBUG_RETURN(entry != NULL); } -void Active_tranx::clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos) +void Active_tranx::clear_active_tranx_nodes( + const char *log_file_name, my_off_t log_file_pos, + active_tranx_action pre_delete_hook) { Tranx_node *new_front; DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes"); - if (log_file_name != NULL) + new_front= m_trx_front; + while (new_front) { - new_front = m_trx_front; - - while (new_front) - { - if (compare(new_front, log_file_name, log_file_pos) > 0) - break; - new_front = new_front->next; - } - } - else - { - /* If log_file_name is NULL, clear everything. */ - new_front = NULL; + if ((log_file_name != NULL) && + compare(new_front, log_file_name, log_file_pos) > 0) + break; + pre_delete_hook(new_front->thd, new_front->log_name, new_front->log_pos); + new_front = new_front->next; } if (new_front == NULL) @@ -500,19 +514,31 @@ void Repl_semi_sync_master::unlock() mysql_mutex_unlock(&LOCK_binlog); } -void Repl_semi_sync_master::cond_broadcast() -{ - mysql_cond_broadcast(&COND_binlog_send); -} - -int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time) +int Repl_semi_sync_master::cond_timewait(THD *thd, struct timespec *_wait_time) { int wait_res; + struct timespec gen_wait_time; + struct timespec *real_wait_time; DBUG_ENTER("Repl_semi_sync_master::cond_timewait()"); - wait_res= mysql_cond_timedwait(&COND_binlog_send, - &LOCK_binlog, wait_time); + if (_wait_time == NULL) + { + create_timeout(&gen_wait_time, NULL); + real_wait_time= &gen_wait_time; + } + else + { + real_wait_time= _wait_time; + } + + /* + All connection threads share the mutex (LOCK_binlog) to keep consistent + with the Active_tranx cache, but each thread has its own condition variable + on which it waits. + */ + wait_res= mysql_cond_timedwait(&thd->COND_wakeup_ready, &LOCK_binlog, + real_wait_time); DBUG_RETURN(wait_res); } @@ -533,7 +559,8 @@ void Repl_semi_sync_master::remove_slave() Signal transactions waiting in commit_trx() that they do not have to wait anymore. */ - cond_broadcast(); + m_active_tranxs->clear_active_tranx_nodes(NULL, NULL, + signal_waiting_transaction); } unlock(); } @@ -616,7 +643,6 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, my_off_t log_file_pos) { int cmp; - bool can_release_threads = false; bool need_copy_send_pos = true; DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog"); @@ -668,45 +694,26 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, /* Remove all active transaction nodes before this point. */ DBUG_ASSERT(m_active_tranxs != NULL); - m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos); + m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos, + signal_waiting_transaction); + m_wait_file_name_inited= false; DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)", "Repl_semi_sync_master::report_reply_binlog", log_file_name, (ulong)log_file_pos)); } - if (rpl_semi_sync_master_wait_sessions > 0) - { - /* Let us check if some of the waiting threads doing a trx - * commit can now proceed. - */ - cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, - m_wait_file_name, m_wait_file_pos); - if (cmp >= 0) - { - /* Yes, at least one waiting thread can now proceed: - * let us release all waiting threads with a broadcast - */ - can_release_threads = true; - m_wait_file_name_inited = false; - } - } l_end: unlock(); - if (can_release_threads) - { - DBUG_PRINT("semisync", ("%s: signal all waiting threads.", - "Repl_semi_sync_master::report_reply_binlog")); - - cond_broadcast(); - } DBUG_RETURN(0); } -int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos) +int Repl_semi_sync_master::wait_after_sync(const char *log_file, + my_off_t log_pos, + bool unmark_thd_awaiting_ack) { if (!get_master_enabled()) return 0; @@ -714,7 +721,8 @@ int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_po int ret= 0; if(log_pos && wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) - ret= commit_trx(log_file + dirname_length(log_file), log_pos); + ret= commit_trx(log_file + dirname_length(log_file), log_pos, + unmark_thd_awaiting_ack); return ret; } @@ -762,24 +770,27 @@ int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all) /** The method runs after flush to binary log is done. */ -int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file, +int Repl_semi_sync_master::report_binlog_update(THD *trans_thd, + THD *waiter_thd, + const char *log_file, my_off_t log_pos) { if (get_master_enabled()) { Trans_binlog_info *log_info; - if (!(log_info= thd->semisync_info)) + if (!(log_info= trans_thd->semisync_info)) { if(!(log_info= (Trans_binlog_info*)my_malloc(PSI_INSTRUMENT_ME, sizeof(Trans_binlog_info), MYF(0)))) return 1; - thd->semisync_info= log_info; + trans_thd->semisync_info= log_info; } strcpy(log_info->log_file, log_file + dirname_length(log_file)); log_info->log_pos = log_pos; - return write_tranx_in_binlog(log_info->log_file, log_pos); + return write_tranx_in_binlog(waiter_thd, log_info->log_file, + log_pos); } return 0; @@ -825,8 +836,9 @@ void Repl_semi_sync_master::dump_end(THD* thd) ack_receiver.remove_slave(thd); } -int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, - my_off_t trx_wait_binlog_pos) +int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos, + bool unmark_thd_awaiting_ack) { bool success= 0; DBUG_ENTER("Repl_semi_sync_master::commit_trx"); @@ -852,9 +864,8 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, lock(); /* This must be called after acquired the lock */ - THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog, - & stage_waiting_for_semi_sync_ack_from_slave, - & old_stage); + THD_ENTER_COND(thd, &thd->COND_wakeup_ready, &LOCK_binlog, + &stage_waiting_for_semi_sync_ack_from_slave, &old_stage); /* This is the real check inside the mutex. */ if (!get_master_enabled() || !is_on()) @@ -902,7 +913,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, trx_wait_binlog_pos, m_wait_file_name, m_wait_file_pos); if (cmp <= 0) - { + { /* This thd has a lower position, let's update the minimum info. */ strmake_buf(m_wait_file_name, trx_wait_binlog_name); m_wait_file_pos = trx_wait_binlog_pos; @@ -934,20 +945,13 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, */ rpl_semi_sync_master_wait_sessions++; - /* We keep track of when this thread is awaiting an ack to ensure it is - * not killed while awaiting an ACK if a shutdown is issued. - */ - set_thd_awaiting_semisync_ack(thd, TRUE); - DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)", "Repl_semi_sync_master::commit_trx", m_wait_timeout, m_wait_file_name, (ulong)m_wait_file_pos)); create_timeout(&abstime, &start_ts); - wait_result = cond_timewait(&abstime); - - set_thd_awaiting_semisync_ack(thd, FALSE); + wait_result = cond_timewait(thd, &abstime); rpl_semi_sync_master_wait_sessions--; if (wait_result != 0) @@ -979,14 +983,29 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, { rpl_semi_sync_master_trx_wait_num++; rpl_semi_sync_master_trx_wait_time += wait_time; - /* - Assert we have either recieved our ACK; or have timed out and are - awoken in an off state. - */ - DBUG_ASSERT(!get_master_enabled() || !is_on() || thd->is_killed() || - 0 <= Active_tranx::compare( - m_reply_file_name, m_reply_file_pos, - trx_wait_binlog_name, trx_wait_binlog_pos)); + + DBUG_EXECUTE_IF("testing_cond_var_per_thd", { + /* + DBUG log warning to ensure we have either recieved our ACK; or + have timed out and are awoken in an off state. Test + rpl.rpl_semi_sync_cond_var_per_thd scans the logs to ensure this + warning is not present. + */ + bool valid_wakeup= + (!get_master_enabled() || !is_on() || thd->is_killed() || + 0 <= Active_tranx::compare( + m_reply_file_name, m_reply_file_pos, + trx_wait_binlog_name, trx_wait_binlog_pos)); + if (!valid_wakeup) + { + sql_print_warning( + "Thread awaiting semi-sync ACK was awoken before its " + "ACK. THD (%llu), Wait coord: (%s, %llu), ACK coord: (%s, " + "%llu)", + thd->thread_id, trx_wait_binlog_name, trx_wait_binlog_pos, + m_reply_file_name, m_reply_file_pos); + } + }); } } } @@ -997,7 +1016,8 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, m_active_tranxs may be NULL if someone disabled semi sync during cond_timewait() */ - DBUG_ASSERT(thd_killed(thd) || !m_active_tranxs || aborted || + DBUG_ASSERT(thd_killed(thd) || !m_active_tranxs || + m_active_tranxs->is_empty() || aborted || !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name, trx_wait_binlog_pos)); @@ -1008,6 +1028,9 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, else rpl_semi_sync_master_no_transactions++; + if (unmark_thd_awaiting_ack) + set_thd_awaiting_semisync_ack(thd, FALSE); + /* The lock held will be released by thd_exit_cond, so no need to call unlock() here */ THD_EXIT_COND(thd, &old_stage); @@ -1038,20 +1061,22 @@ void Repl_semi_sync_master::switch_off() { DBUG_ENTER("Repl_semi_sync_master::switch_off"); + /* Clear the active transaction list. */ + if (m_active_tranxs) + { + m_active_tranxs->clear_active_tranx_nodes(NULL, 0, + signal_waiting_transaction); + } if (m_state) { m_state = false; - /* Clear the active transaction list. */ - DBUG_ASSERT(m_active_tranxs != NULL); - m_active_tranxs->clear_active_tranx_nodes(NULL, 0); rpl_semi_sync_master_off_times++; m_wait_file_name_inited = false; m_reply_file_name_inited = false; sql_print_information("Semi-sync replication switched OFF."); } - cond_broadcast(); /* wake up all waiting threads */ DBUG_VOID_RETURN; } @@ -1198,7 +1223,8 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet, DBUG_RETURN(0); } -int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, +int Repl_semi_sync_master::write_tranx_in_binlog(THD *thd, + const char *log_file_name, my_off_t log_file_pos) { int result = 0; @@ -1241,7 +1267,7 @@ int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, if (is_on()) { DBUG_ASSERT(m_active_tranxs != NULL); - if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos)) + if(m_active_tranxs->insert_tranx_node(thd, log_file_name, log_file_pos)) { /* if insert tranx_node failed, print a warning message @@ -1253,6 +1279,7 @@ int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, } else { + set_thd_awaiting_semisync_ack(thd, TRUE); rpl_semi_sync_master_request_ack++; } } @@ -1370,22 +1397,40 @@ void Repl_semi_sync_master::set_export_stats() unlock(); } -void Repl_semi_sync_master::await_slave_reply() +void Repl_semi_sync_master::await_all_slave_replies() { - struct timespec abstime; + int wait_result; - DBUG_ENTER("Repl_semi_sync_master::::await_slave_reply"); - lock(); - - /* Just return if there is nothing to wait for */ - if (!rpl_semi_sync_master_wait_sessions) - goto end; + DBUG_ENTER("Repl_semi_sync_master::::await_all_slave_replies"); - create_timeout(&abstime, NULL); - cond_timewait(&abstime); + /* + Wait for all transactions that need ACKS to have received them; or timeout. + If it is a timeout, the connection thread should attempt to turn off + semi-sync and broadcast to all other waiting threads to move on. -end: - unlock(); + The lock is taken per iteration rather than over the whole loop to allow + more opportunity for the user threads to handle their ACKs. + */ + while (TRUE) + { + lock(); + if (m_active_tranxs->is_empty() || !get_master_enabled() || !is_on()) + { + unlock(); + break; + } + wait_result= m_active_tranxs->for_front( + [](THD *thd, const char *binlog_file, my_off_t binlog_pos) -> int { + return (thd->is_awaiting_semisync_ack) + ? repl_semisync_master.cond_timewait(thd, NULL) + : 0; + }); + unlock(); + + // Check for timeout + if (wait_result != 0) + break; + } DBUG_VOID_RETURN; } diff --git a/sql/semisync_master.h b/sql/semisync_master.h index 99f46869354..1eb690b572b 100644 --- a/sql/semisync_master.h +++ b/sql/semisync_master.h @@ -31,6 +31,7 @@ extern PSI_cond_key key_COND_binlog_send; struct Tranx_node { char log_name[FN_REFLEN]; my_off_t log_pos; + THD *thd; /* The thread awaiting an ACK */ struct Tranx_node *next; /* the next node in the sorted list */ struct Tranx_node *hash_next; /* the next node during hash collision */ }; @@ -288,6 +289,18 @@ class Tranx_node_allocator } }; +/** + Function pointer type to run on the contents of an Active_tranx node. + + Return 0 for success, 1 for error. + + Note Repl_semi_sync_master::LOCK_binlog is not guaranteed to be held for + its invocation. See the context in which it is called to know. +*/ + +typedef int (*active_tranx_action)(THD *trx_thd, const char *log_file_name, + my_off_t trx_log_file_pos); + /** This class manages memory for active transaction list. @@ -338,29 +351,60 @@ class Active_tranx * Return: * 0: success; non-zero: error */ - int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); + int insert_tranx_node(THD *thd_to_wait, const char *log_file_name, + my_off_t log_file_pos); /* Clear the active transaction nodes until(inclusive) the specified * position. * If log_file_name is NULL, everything will be cleared: the sorted * list and the hash table will be reset to empty. + * + * The pre_delete_hook parameter is a function pointer that will be invoked + * for each Active_tranx node, in order, from m_trx_front to log_file_name, + * e.g. to signal their wakeup condition. Repl_semi_sync_binlog::LOCK_binlog + * is held while this is invoked. */ void clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos); + my_off_t log_file_pos, + active_tranx_action pre_delete_hook); /* Given a position, check to see whether the position is an active * transaction's ending position by probing the hash table. */ bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); + /* Invoke action (function parameter) on only the first element in the list + * of active transactions. + * + * Note this function does not explicitly lock + * Repl_semi_sync_master::LOCK_binlog. + */ + int for_front(active_tranx_action action); + /* Given two binlog positions, compare which one is bigger based on * (file_name, file_position). */ static int compare(const char *log_file_name1, my_off_t log_file_pos1, const char *log_file_name2, my_off_t log_file_pos2); + + /* Check if there are no transactions actively awaiting ACKs. Returns true + * if the internal linked list has no entries, false otherwise. + */ + bool is_empty() { return m_trx_front == NULL; } + }; +/* + Element in Repl_semi_sync_master::wait_queue to preserve the state of a + transaction waiting for an ACK. +*/ +typedef struct _semisync_wait_trx { + const char *binlog_name; + my_off_t binlog_pos; + THD *thd; +} semisync_wait_trx_t; + /** The extension class for the master of semi-synchronous replication */ @@ -433,8 +477,11 @@ class Repl_semi_sync_master void lock(); void unlock(); - void cond_broadcast(); - int cond_timewait(struct timespec *wait_time); + + /* Do a cond wait on thd->LOCK_wakup_ready. If wait_time is NULL, timeout + * will use m_wait_timeout. + */ + int cond_timewait(THD *thd, struct timespec *wait_time); /* Is semi-sync replication on? */ bool is_on() { @@ -482,10 +529,11 @@ class Repl_semi_sync_master void create_timeout(struct timespec *out, struct timespec *start_arg); /* - Blocks the calling thread until the ack_receiver either receives an ACK - or times out (from rpl_semi_sync_master_timeout) + Blocks the calling thread until the ack_receiver either receives ACKs for + all transactions awaiting ACKs, or times out (from + rpl_semi_sync_master_timeout) */ - void await_slave_reply(); + void await_all_slave_replies(); /*set the ACK point, after binlog sync or after transaction commit*/ void set_wait_point(unsigned long ack_point) @@ -544,26 +592,56 @@ class Repl_semi_sync_master * all other transaction would not wait either. * * Input: (the transaction events' ending binlog position) - * trx_wait_binlog_name - (IN) ending position's file name - * trx_wait_binlog_pos - (IN) ending position's file offset + * trx_wait_binlog_name - (IN) ending position's file name + * trx_wait_binlog_pos - (IN) ending position's file offset + * unmark_thd_awaiting_ack - (IN) Whether or not to unmark + * thd->is_awaiting_semisync_ack after receiving + * an ACK from the replica. If using wait_point + * AFTER_SYNC with binlog_group_commit, only the + * last transaction written to binlog in the + * group should negate the boolean, because the + * same thread (i.e. leader thread) will wait for + * all transaction ACKs. Negating it too early + * would break SHUTDOWN WAIT FOR ALL SLAVES, as + * that is the condition tested to bypass killing + * threads in phase 1. In all other situations, + * the boolean should be negated immediately. * * Return: * 0: success; non-zero: error */ int commit_trx(const char* trx_wait_binlog_name, - my_off_t trx_wait_binlog_pos); + my_off_t trx_wait_binlog_pos, + bool unmark_thd_awaiting_ack=TRUE); - /*Wait for ACK after writing/sync binlog to file*/ - int wait_after_sync(const char* log_file, my_off_t log_pos); + /* Wait for ACK after writing/sync binlog to file + * For details on parameters, see commit_trx() function declaration comment. + */ + int wait_after_sync(const char *log_file, my_off_t log_pos, + bool unmark_thd_awaiting_ack= TRUE); /*Wait for ACK after commting the transaction*/ int wait_after_commit(THD* thd, bool all); /*Wait after the transaction is rollback*/ int wait_after_rollback(THD *thd, bool all); - /*Store the current binlog position in m_active_tranxs. This position should - * be acked by slave*/ - int report_binlog_update(THD *thd, const char *log_file,my_off_t log_pos); + /* Store the current binlog position in m_active_tranxs. This position should + * be acked by slave. + * + * Inputs: + * trans_thd Thread of the transaction which is executing the + * transaction. + * waiter_thd Thread that will wait for the ACK from the replica, + * which depends on the semi-sync wait point. If AFTER_SYNC, + * and also using binlog group commit, this will be the leader + * thread of the binlog commit. Otherwise, it is the thread that + * is executing the transaction, i.e. the same as trans_thd. + * log_file Name of the binlog file that the transaction is written into + * log_pos Offset within the binlog file that the transaction is written + * at + */ + int report_binlog_update(THD *trans_thd, THD *waiter_thd, + const char *log_file, my_off_t log_pos); int dump_start(THD* thd, const char *log_file, @@ -609,13 +687,19 @@ class Repl_semi_sync_master * semi-sync is on * * Input: (the transaction events' ending binlog position) + * THD - (IN) thread that will wait for an ACK. This can be the + * binlog leader thread when using wait_point + * AFTER_SYNC with binlog group commit. In all other + * cases, this is the user thread executing the + * transaction. * log_file_name - (IN) transaction ending position's file name * log_file_pos - (IN) transaction ending position's file offset * * Return: * 0: success; non-zero: error */ - int write_tranx_in_binlog(const char* log_file_name, my_off_t log_file_pos); + int write_tranx_in_binlog(THD *thd, const char *log_file_name, + my_off_t log_file_pos); /* Read the slave's reply so that we know how much progress the slave makes * on receive replication events. @@ -634,9 +718,9 @@ class Repl_semi_sync_master int before_reset_master(); /* - Determines if the given thread is currently awaiting a semisync_ack. Note - that the thread's value is protected by this class's LOCK_binlog, so this - function (indirectly) provides safe access. + Determines if the given thread is currently awaiting (or queued to await) a + semisync_ack. Note that the thread's value is protected by this class's + LOCK_binlog, so this function (indirectly) provides safe access. */ my_bool is_thd_awaiting_semisync_ack(THD *thd) { diff --git a/sql/sql_class.h b/sql/sql_class.h index 4c6005416e1..35d72d61d48 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5318,8 +5318,18 @@ class THD: public THD_count, /* this must be first */ Flag, mutex and condition for a thread to wait for a signal from another thread. - Currently used to wait for group commit to complete, can also be used for - other purposes. + Currently used to wait for group commit to complete, and COND_wakeup_ready + is used for threads to wait on semi-sync ACKs (though is protected by + Repl_semi_sync_master::LOCK_binlog). Note the following relationships + between these two use-cases when using + rpl_semi_sync_master_wait_point=AFTER_SYNC during group commit: + 1) Non-leader threads use COND_wakeup_ready to wait for the leader thread + to complete binlog commit. + 2) The leader thread uses COND_wakeup_ready to await ACKs from the + replica before signalling the non-leader threads to wake up. + + With wait_point=AFTER_COMMIT, there is no overlap as binlogging has + finished, so COND_wakeup_ready is safe to re-use. */ bool wakeup_ready; mysql_mutex_t LOCK_wakeup_ready; -- 2.30.2
participants (1)
-
Kristian Nielsen