Re: [Maria-developers] MDEV-8112: PATCH: no slave left behind
Hi Jonas, Thanks for the patch! I read through the patch. I am not that familiar with the semi-sync plugin code, but it looks reasonable. But the patch causes a test failure in sys_vars.all_vars. This is because not all newly introduced variables have corresponding test cases in mysql-test/suite/sys_vars/t/*_basic.test. Also, do you have some documentation for the feature? (There is a bit in the Jira entry, but for example the new variable rpl_semi_sync_master_slave_lag_heartbeat_frequency_us seems to be not described at all... If you can fix the above two points (and if Buildbot does not point out more test suite issues), then I will push it into the MariaDB tree. Thanks, - Kristian.
patch has been ported to 10.1 you can use it under new (also called "3-clause") BSD license i have done internal benchmarks...but you are most welcome to do some too.
DESCRIPTION: no slave left behind
this patch implements master throttling based on slave lag, aka no slave left behind. the core feature works as follows 1) the semi-sync-reply is ammended to also report back SQL-thread position (aka exec position) 2) transactions are not removed from the "active-transaction-list" in the semi-sync-master plugin until atleast one slave has reported that it has executed this transaction. the slave lag can then be estimated by calculating how long the oldest transaction has been lingering in the active-transaction-list. 3) client-threads are forced to wait before commit until slave lag has decreased to acceptable value.
the following variables are introduced on master:
rpl_semi_sync_master_max_slave_lag (global) rpl_semi_sync_master_slave_lag_wait_timeout (session)
the following status variables are introduced on master:
rpl_semi_sync_master_slave_lag_wait_sessions rpl_semi_sync_master_estimated_slave_lag rpl_semi_sync_master_trx_slave_lag_wait_time rpl_semi_sync_master_trx_slave_lag_wait_num rpl_semi_sync_master_avg_trx_slave_lag_wait_time
the following variables are introduced on slave:
rpl_semi_sync_slave_lag_enabled (global)
in addition to this, 2 optimizations that decreases overhead of semi-sync is introduced. 1) the idea of this is that if when a slave should send and transaction, it checks if it should be semi-synced, but rather than semi-sync:ing each transaction (which is done currently) the code will skip semi-syncing transaction if there already is newer transactions committed. But, since this can mean that semi-syncing is delayed indefinitely a cap is set using 2 new master variables:
rpl_semi_sync_master_max_unacked_event_bytes (global) rpl_semi_sync_master_max_unacked_event_count (global) 2) rpl_semi_sync_master_group_commit which makes the semi-sync plugin only semi-sync the last transaction in a group commit.
diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result new file mode 100644 index 0000000..f3545a4 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result @@ -0,0 +1,150 @@ +include/master-slave.inc +[connection master] +set global rpl_semi_sync_master_enabled = 1; +set global rpl_semi_sync_master_max_slave_lag = 10; +show variables like 'rpl_semi_sync_master_%slave_lag%'; +Variable_name Value +rpl_semi_sync_master_max_slave_lag 10 +rpl_semi_sync_master_slave_lag_heartbeat_frequency_us 500000 +rpl_semi_sync_master_slave_lag_wait_timeout 50 +include/stop_slave.inc +set global rpl_semi_sync_slave_enabled = 1; +set global rpl_semi_sync_slave_lag_enabled = 1; +include/start_slave.inc +# create non-root user for testing READ_ONLY +grant SELECT, INSERT on *.* to test@localhost; +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8)) +ENGINE=innodb; +# +# Check basic behaviour +# +INSERT INTO t1 (f) VALUES ('1'),('2'),('3'); +# Now wait for slave lag to decrease to 0 +# [ on slave ] +STOP SLAVE SQL_THREAD; +# [ on master ] +INSERT INTO t1 (f) VALUES ('4'),('5'),('6'); +# Now wait for slave lag to increase to > 0 +# [ on slave ] +START SLAVE SQL_THREAD; +# [ on master ] +# Now wait for slave lag to decrease to 0 +# [ on slave ] +STOP SLAVE SQL_THREAD; +# [ on master ] +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5; +# First transaction should succeed. slave_lag is zero when it commits +INSERT INTO t1 (f) VALUES ('7'),('8'),('9'); +# Now wait for slave lag to increase to > 10s +# Check that estimated_slave_lag is > 10s +SELECT VARIABLE_VALUE > 10000000 as should_be_1 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +should_be_1 +1 +# Second transaction should now fail. slave_lag is >10s when it commits +INSERT INTO t1 (f) VALUES ('a'),('b'),('c'); +ERROR HY000: Slave-lag timeout +# [ on slave ] +START SLAVE SQL_THREAD; +# [ on master ] +# Now wait for slave lag to decrease to < 10s +# And now it should succeed again +INSERT INTO t1 (f) VALUES ('d'),('e'),('f'); +SELECT * +FROM t1 +ORDER BY 1; +i f +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +13 d +14 e +15 f +# [ on slave ] +SELECT * +FROM t1 +ORDER BY 1; +i f +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +13 d +14 e +15 f +# +# Test interaction with READ_ONLY +# +# [ on slave ] +STOP SLAVE SQL_THREAD; +# [ on master ] +INSERT INTO t1 (f) VALUES ('g'),('h'),('i'); +# Now wait for slave lag to increase to > 10s +# [ on con1 ] +BEGIN; +INSERT INTO t1 (f) VALUES ('g'),('h'),('i'); +# [ on master ] +set global read_only = 1; +# [ on con1 ] +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5; +# read-only is check *before* slave lag +COMMIT; +ERROR HY000: The MariaDB server is running with the --read-only option so it cannot execute this statement +# [ on slave ] +START SLAVE SQL_THREAD; +# [ on master ] +# Now wait for slave lag to decrease to 0 +set global read_only = 0; +# +# check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag +# +# [ on slave ] +STOP SLAVE SQL_THREAD; +# [ on master ] +INSERT INTO t1 (f) VALUES ('j'),('k'),('l'); +# Now wait for slave lag to increase to > 0 +# Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction +select @count_before := VARIABLE_VALUE +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits'; +# Set maximum allowed slave lag to 24h +set global rpl_semi_sync_master_max_slave_lag = 86400; +INSERT INTO t1 (f) VALUES ('m'),('n'),('o'); +# Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction +select @count_after := VARIABLE_VALUE +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits'; +# There should have been no wait, since maximum allowed is very high +select @count_before = @count_after as should_be_1; +should_be_1 +1 +# [ on slave ] +START SLAVE SQL_THREAD; +# [ on master ] +# Now wait for slave lag to decrease to 0 +# +# Clean up +# +# [ on master ] +set global rpl_semi_sync_master_max_slave_lag = default; +set session rpl_semi_sync_master_slave_lag_wait_timeout = default; +DROP USER test@localhost; +include/stop_slave.inc +set global rpl_semi_sync_slave_enabled = 0; +set global rpl_semi_sync_slave_lag_enabled = default; +set global rpl_semi_sync_master_enabled = 0; +include/start_slave.inc +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test new file mode 100644 index 0000000..f9bdbd9 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test @@ -0,0 +1,252 @@ +source include/have_semisync.inc; +source include/not_embedded.inc; +source include/have_innodb.inc; +source include/master-slave.inc; + +connection master; +set global rpl_semi_sync_master_enabled = 1; +set global rpl_semi_sync_master_max_slave_lag = 10; + +show variables like 'rpl_semi_sync_master_%slave_lag%'; + +connection slave; +source include/stop_slave.inc; +set global rpl_semi_sync_slave_enabled = 1; +set global rpl_semi_sync_slave_lag_enabled = 1; + +source include/start_slave.inc; + +connection master; + +--echo # create non-root user for testing READ_ONLY +grant SELECT, INSERT on *.* to test@localhost; +connect (con1,localhost,test,,test); + +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8)) +ENGINE=innodb; + +--echo # +--echo # Check basic behaviour +--echo # +INSERT INTO t1 (f) VALUES ('1'),('2'),('3'); + +--echo # Now wait for slave lag to decrease to 0 +let $wait_condition= SELECT VARIABLE_VALUE = 0 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +--echo # [ on slave ] +connection slave; +STOP SLAVE SQL_THREAD; + +--echo # [ on master ] +connection master; +INSERT INTO t1 (f) VALUES ('4'),('5'),('6'); + +--echo # Now wait for slave lag to increase to > 0 +let $wait_condition= SELECT VARIABLE_VALUE > 0 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +--echo # [ on slave ] +connection slave; +START SLAVE SQL_THREAD; + +--echo # [ on master ] +connection master; + +--echo # Now wait for slave lag to decrease to 0 +let $wait_condition= SELECT VARIABLE_VALUE = 0 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +--echo # [ on slave ] +connection slave; +STOP SLAVE SQL_THREAD; + +--echo # [ on master ] +connection master; + +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5; + +--echo # First transaction should succeed. slave_lag is zero when it commits +INSERT INTO t1 (f) VALUES ('7'),('8'),('9'); + +--echo # Now wait for slave lag to increase to > 10s +let $wait_condition= SELECT VARIABLE_VALUE > 10000000 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +--echo # Check that estimated_slave_lag is > 10s +SELECT VARIABLE_VALUE > 10000000 as should_be_1 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; + +--echo # Second transaction should now fail. slave_lag is >10s when it commits +--error ER_ERROR_DURING_COMMIT +INSERT INTO t1 (f) VALUES ('a'),('b'),('c'); + +--echo # [ on slave ] +connection slave; +START SLAVE SQL_THREAD; + +--echo # [ on master ] +connection master; + +--echo # Now wait for slave lag to decrease to < 10s +let $wait_condition= SELECT VARIABLE_VALUE < 10000000 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +--echo # And now it should succeed again +INSERT INTO t1 (f) VALUES ('d'),('e'),('f'); + +SELECT * +FROM t1 +ORDER BY 1; + +sync_slave_with_master; + +--echo # [ on slave ] +connection slave; + +SELECT * +FROM t1 +ORDER BY 1; + +--echo # +--echo # Test interaction with READ_ONLY +--echo # + +--echo # [ on slave ] +connection slave; +STOP SLAVE SQL_THREAD; + +--echo # [ on master ] +connection master; + +INSERT INTO t1 (f) VALUES ('g'),('h'),('i'); + +--echo # Now wait for slave lag to increase to > 10s +let $wait_condition= SELECT VARIABLE_VALUE > 10000000 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +connection con1; +--echo # [ on con1 ] +BEGIN; +INSERT INTO t1 (f) VALUES ('g'),('h'),('i'); + +connection master; +--echo # [ on master ] +set global read_only = 1; + +connection con1; +--echo # [ on con1 ] +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5; + +--echo # read-only is check *before* slave lag +--error ER_OPTION_PREVENTS_STATEMENT +COMMIT; + +disconnect con1; + +--echo # [ on slave ] +connection slave; +START SLAVE SQL_THREAD; + +connection master; +--echo # [ on master ] + +--echo # Now wait for slave lag to decrease to 0 +let $wait_condition= SELECT VARIABLE_VALUE = 0 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +set global read_only = 0; + +--echo # +--echo # check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag +--echo # + +--echo # [ on slave ] +connection slave; +STOP SLAVE SQL_THREAD; + +connection master; +--echo # [ on master ] +INSERT INTO t1 (f) VALUES ('j'),('k'),('l'); + +--echo # Now wait for slave lag to increase to > 0 +let $wait_condition= SELECT VARIABLE_VALUE > 0 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction +--disable_result_log +select @count_before := VARIABLE_VALUE +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits'; +--enable_result_log + +--echo # Set maximum allowed slave lag to 24h +set global rpl_semi_sync_master_max_slave_lag = 86400; # 24h + +INSERT INTO t1 (f) VALUES ('m'),('n'),('o'); + +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction +--disable_result_log +select @count_after := VARIABLE_VALUE +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits'; +--enable_result_log + +--echo # There should have been no wait, since maximum allowed is very high +select @count_before = @count_after as should_be_1; + +--echo # [ on slave ] +connection slave; +START SLAVE SQL_THREAD; + +connection master; +--echo # [ on master ] + +--echo # Now wait for slave lag to decrease to 0 +let $wait_condition= SELECT VARIABLE_VALUE = 0 +FROM INFORMATION_SCHEMA.GLOBAL_STATUS +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag'; +--source include/wait_condition.inc + +--echo # +--echo # Clean up +--echo # +connection master; +--echo # [ on master ] +set global rpl_semi_sync_master_max_slave_lag = default; +set session rpl_semi_sync_master_slave_lag_wait_timeout = default; +DROP USER test@localhost; + +connection slave; +source include/stop_slave.inc; +set global rpl_semi_sync_slave_enabled = 0; +set global rpl_semi_sync_slave_lag_enabled = default; + +connection master; +set global rpl_semi_sync_master_enabled = 0; + +connection slave; +source include/start_slave.inc; + +connection master; + +DROP TABLE t1; +sync_slave_with_master; +--source include/rpl_end.inc diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc index 4a80360..fc02b9d 100644 --- a/plugin/semisync/semisync.cc +++ b/plugin/semisync/semisync.cc @@ -20,6 +20,7 @@
const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef; const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01; +const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02;
const unsigned long Trace::kTraceGeneral = 0x0001; @@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040;
const unsigned char ReplSemiSyncBase::kSyncHeader[2] = {ReplSemiSyncBase::kPacketMagicNum, 0}; + +const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec = + "rpl_semi_sync_slave_report_exec"; diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h index 2857729..78faba9 100644 --- a/plugin/semisync/semisync.h +++ b/plugin/semisync/semisync.h @@ -75,13 +75,23 @@ class ReplSemiSyncBase
/* Constants in network packet header. */ static const unsigned char kPacketMagicNum; + /* this event should be semisync acked */ static const unsigned char kPacketFlagSync; + /* this event should be semisync acked including the current SQL position */ + static const unsigned char kPacketFlagSyncAndReport; + + /* user variable for enabling exec-pos reporting */ + static const char* const kRplSemiSyncSlaveReportExec; };
/* The layout of a semisync slave reply packet: 1 byte for the magic num 8 bytes for the binlog positon - n bytes for the binlog filename, terminated with a '\0' + n bytes for the binlog filename, NOT terminated with a '\0' + [ optionally ] + 1 byte == 0 + 8 bytes for the sql-thread position + n bytes for the sql-thread filename, terminated with a '\0' */ #define REPLY_MAGIC_NUM_LEN 1 #define REPLY_BINLOG_POS_LEN 8 diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc index c88c162..3ee6e26 100644 --- a/plugin/semisync/semisync_master.cc +++ b/plugin/semisync/semisync_master.cc @@ -22,6 +22,9 @@ #define TIME_MILLION 1000000 #define TIME_BILLION 1000000000
+/* thd_key for per slave thread state */ +static MYSQL_THD_KEY_T thd_key; + /* This indicates whether semi-synchronous replication is enabled. */ char rpl_semi_sync_master_enabled; unsigned long rpl_semi_sync_master_wait_point = @@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0; unsigned long long rpl_semi_sync_master_trx_wait_time = 0; char rpl_semi_sync_master_wait_no_slave = 1;
+unsigned long rpl_semi_sync_master_max_unacked_event_count = 0; +unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096; + +unsigned long rpl_semi_sync_master_slave_lag_clients = 0; +unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0; +unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000; +unsigned long rpl_semi_sync_master_max_slave_lag = 0; +unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0; + +unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0; +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0; +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
static int getWaitTime(const struct timespec& start_ts);
@@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name, ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ ins_node->log_pos_ = log_file_pos;
+ { + /** + * set trans commit time + * this is called when writing into binlog, which is not + * exactly right, but close enough for our purposes + */ + ins_node->tranx_commit_time_us = my_hrtime().val; + } + if (!trx_front_) { /* The list is empty. */ @@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name, return function_exit(kWho, result); }
-bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, - my_off_t log_file_pos) +TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos) { - const char *kWho = "ActiveTranx::is_tranx_end_pos"; + const char *kWho = "ActiveTranx::lookup_tranx_end_pos"; function_enter(kWho); - unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); TranxNode *entry = trx_htb_[hash_val];
@@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, }
if (trace_level_ & kTraceDetail) - sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho, - log_file_name, (unsigned long)log_file_pos, hash_val); + sql_print_information("%s: probe (%s, %lu)", kWho, + log_file_name, (unsigned long)log_file_pos);
function_exit(kWho, (entry != NULL)); - return (entry != NULL); + return entry; }
-int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos) +int ActiveTranx::clear_active_tranx_nodes() { - const char *kWho = "ActiveTranx::::clear_active_tranx_nodes"; - TranxNode *new_front; + set_new_front(NULL); + return 0; +}
+void ActiveTranx::set_new_front(TranxNode *new_front) +{ + const char *kWho = "ActiveTranx::set_new_front"; function_enter(kWho);
- if (log_file_name != NULL) - { - new_front = trx_front_; - - while (new_front) - { - if (compare(new_front, log_file_name, log_file_pos) > 0) - break; - new_front = new_front->next_; - } - } - else - { - /* If log_file_name is NULL, clear everything. */ - new_front = NULL; - } - if (new_front == NULL) { /* No active transaction nodes after the call. */ @@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, trx_front_ = NULL; trx_rear_ = NULL; } - if (trace_level_ & kTraceDetail) sql_print_information("%s: cleared all nodes", kWho); } @@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
trx_front_ = new_front; allocator_.free_nodes_before(trx_front_); - if (trace_level_ & kTraceDetail) sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)", kWho, n_frees, trx_front_->log_name_, (unsigned long)trx_front_->log_pos_); } + function_exit(kWho, 0); +}
- return function_exit(kWho, 0); +bool ActiveTranx::prune_active_tranx_nodes( + LogPosPtr pos, + ulonglong *oldest_tranx_commit_time_us) +{ + TranxNode *old_front = trx_front_; + TranxNode *new_front; + + new_front = trx_front_; + while (new_front) + { + if (compare(new_front, pos.file_name, pos.file_pos) > 0) + break; + new_front = new_front->next_; + } + + set_new_front(new_front); + + if (oldest_tranx_commit_time_us) + { + if (trx_front_ == NULL) + *oldest_tranx_commit_time_us = 0; + else + *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us; + } + + return ! (old_front == trx_front_); }
@@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster() wait_file_pos_(0), master_enabled_(false), wait_timeout_(0L), - state_(0) + state_(0), + oldest_unapplied_tranx_commit_time_us_(0) { strcpy(reply_file_name_, ""); strcpy(wait_file_name_, ""); @@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject() mysql_cond_init(key_ss_cond_COND_binlog_send_, &COND_binlog_send_, NULL);
+ /* Mutex initialization can only be done after MY_INIT(). */ + mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_, + &LOCK_slave_lag_, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_ss_cond_COND_slave_lag_, + &COND_slave_lag_, NULL); + if (rpl_semi_sync_master_enabled) result = enableMaster(); else result = disableMaster();
+ thd_key_create(&thd_key); + return result; }
@@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup() { mysql_mutex_destroy(&LOCK_binlog_); mysql_cond_destroy(&COND_binlog_send_); + mysql_mutex_destroy(&LOCK_slave_lag_); + mysql_cond_destroy(&COND_slave_lag_); init_done_= 0; }
@@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave() { lock(); rpl_semi_sync_master_clients++; + if (has_semi_sync_slave_lag()) + rpl_semi_sync_master_slave_lag_clients++; unlock(); + + if (has_semi_sync_slave_lag()) + { + int null_val = 0; + longlong new_val = + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000; + longlong old_val = new_val + 1; + + get_user_var_int("master_heartbeat_period", &old_val, &null_val); + if (old_val > new_val || null_val) + { + /* if there no old value or it's bigger than what we want */ + int res = set_user_var_int("master_heartbeat_period",new_val, &old_val); + if (res == -1) + { + sql_print_error( + "Repl_semi_sync::failed to set master_heartbeat_period"); + } + } + } + + /** + * create per slave-state and store it in thread-local-storage */ + ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState; + thd_setspecific(current_thd, thd_key, state); }
void ReplSemiSyncMaster::remove_slave() @@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave() rpl_semi_sync_master_clients == 0) switch_off(); } + + bool no_slave_lag_clients = false; + if (has_semi_sync_slave_lag()) + { + if (--rpl_semi_sync_master_slave_lag_clients == 0) + { + no_slave_lag_clients = true; + } + } + unlock(); + + ReplSemiSyncMasterPerSlaveState *state = + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key); + thd_setspecific(current_thd, thd_key, NULL); + + if (state != NULL) + { + delete state; + } + + if (no_slave_lag_clients) + { + wake_slave_lag_waiters(0); + } }
bool ReplSemiSyncMaster::is_semi_sync_slave() @@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave() return val; }
+bool ReplSemiSyncMaster::has_semi_sync_slave_lag() +{ + int null_value; + long long val= 0; + get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value); + return val; +} + +int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos) +{ + if (log_pos == NULL) + { + /* heartbeat events does not have logpos (since they are not actually + * stored in the binlog). + */ + if (!has_semi_sync_slave_lag()) + { + /* don't semi-sync them if we haven't enabled slave-lag handling */ + return 0; + } + else + { + /* else ask for both IO and exec position */ + return 2; + } + } + + /** + * check if this log-pos is a candidate for semi-syncing event + */ + TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name, + log_pos->file_pos); + + if (entry == NULL) + return 0; + + ReplSemiSyncMasterPerSlaveState *state = + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, + thd_key); + do + { + state->unacked_event_count_++; + + if (active_tranxs_->is_rear(entry)) + { + /* always ask for ack on last event in tranx list */ + break; + } + + if (state->unacked_event_count_ >= + rpl_semi_sync_master_max_unacked_event_count) + { + /* enough events passed that it's time for another ack */ + break; + } + + if (!state->sync_req_pos_.IsInited()) + { + /* first event => time for ack */ + break; + } + + if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0) + { + /* new file => time for ack */ + break; + } + + if (log_pos->file_pos >= (state->sync_req_pos_.file_pos + + rpl_semi_sync_master_max_unacked_event_bytes)) + { + /* enough bytes => time for ack */ + break; + } + + /* we skip asking for semi-sync ack on this event */ + return 0; + + } while (0); + + /* keep track on when we last asked for semi-sync-ack */ + state->unacked_event_count_ = 0; + state->sync_req_pos_.Assign(log_pos); + + /** + * check if this slave can report back exec position + */ + if (!has_semi_sync_slave_lag()) + { + /* slave can't report back SQL position */ + return 1; + } + + /* ask for both IO and SQL position */ + return 2; +} + int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, const char *log_file_name, - my_off_t log_file_pos) + my_off_t log_file_pos, + const LogPos *exec_pos) { const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog"; int cmp; bool can_release_threads = false; bool need_copy_send_pos = true; + bool pruned_trx_list = false; + ulonglong oldest_tranx_commit_time_us = 0; +
if (!(getMasterEnabled())) return 0; @@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, reply_file_pos_ = log_file_pos; reply_file_name_inited_ = true;
- /* Remove all active transaction nodes before this point. */ - assert(active_tranxs_ != NULL); - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos); - if (trace_level_ & kTraceDetail) sql_print_information("%s: Got reply at (%s, %lu)", kWho, log_file_name, (unsigned long)log_file_pos); }
+ assert(active_tranxs_ != NULL); + if (exec_pos != NULL) + { + /* prune using exec_pos */ + LogPosPtr ptr(*exec_pos); + pruned_trx_list = active_tranxs_->prune_active_tranx_nodes( + ptr, &oldest_tranx_commit_time_us); + } + else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos) + { + /** + * if we don't have any slaves that can do exec_pos reporting, + * prune by IO position as "plain old semi sync" + */ + LogPosPtr ptr(log_file_name, log_file_pos); + active_tranxs_->prune_active_tranx_nodes(ptr, NULL); + } + if (rpl_semi_sync_master_wait_sessions > 0) { /* Let us check if some of the waiting threads doing a trx @@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, cond_broadcast(); }
+ if (pruned_trx_list) + { + /** + * if we did prune trx list, it might be that we should wake up + * threads waiting for slave-lag to decrease + */ + wake_slave_lag_waiters(oldest_tranx_commit_time_us); + } + return function_exit(kWho, 0); }
@@ -743,14 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, } }
- /* - At this point, the binlog file and position of this transaction - must have been removed from ActiveTranx. - */ - assert(thd_killed(NULL) || - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name, - trx_wait_binlog_pos)); - l_end: /* Update the status counter. */ if (is_on()) @@ -794,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off()
/* Clear the active transaction list. */ assert(active_tranxs_ != NULL); - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); + result = active_tranxs_->clear_active_tranx_nodes();
rpl_semi_sync_master_off_times++; wait_file_name_inited_ = false; @@ -884,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, { const char *kWho = "ReplSemiSyncMaster::updateSyncHeader"; int cmp = 0; - bool sync = false; + int sync = 0;
/* If the semi-sync master is not enabled, or the slave is not a semi-sync * target, do not request replies from the slave. @@ -905,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, /* semi-sync is ON */ /* sync= false; No sync unless a transaction is involved. */
+ if (log_file_name == NULL) + { + /* this is heartbeat, request io_pos and exec_pos */ + sync = checkSyncReq(0); + goto l_end; + } + if (reply_file_name_inited_) { cmp = ActiveTranx::compare(log_file_name, log_file_pos, @@ -933,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, */ if (cmp >= 0) { - /* + /* * We only wait if the event is a transaction's ending event. */ assert(active_tranxs_ != NULL); - sync = active_tranxs_->is_tranx_end_pos(log_file_name, - log_file_pos); + LogPosPtr pos(log_file_name, log_file_pos); + sync = checkSyncReq(&pos); } } else @@ -951,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, } else { - sync = true; + sync = 1; } }
@@ -966,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, /* We do not need to clear sync flag because we set it to 0 when we * reserve the packet header. */ - if (sync) + if (sync == 1) { (packet)[2] = kPacketFlagSync; } + else if (sync == 2) + { + (packet)[2] = kPacketFlagSyncAndReport; + }
return function_exit(kWho, 0); } @@ -1018,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, if (is_on()) { assert(active_tranxs_ != NULL); - if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) + bool empty = active_tranxs_->is_empty(); + if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) { /* if insert tranx_node failed, print a warning message @@ -1028,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, log_file_name, (ulong)log_file_pos); switch_off(); } + else if (empty && rpl_semi_sync_master_slave_lag_clients > 0) + { + /* if the list of transactions was empty, + * we need to init the oldest_tranx_commit_time_us + */ + oldest_unapplied_tranx_commit_time_us_ = + active_tranxs_->get_oldest_tranx_commit_time_us(); + } }
l_end: @@ -1037,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, }
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, - const char *event_buf) + const char *event_buf_) { const char *kWho = "ReplSemiSyncMaster::readSlaveReply"; - const unsigned char *packet; + const unsigned char *packet, *packet_start; char log_file_name[FN_REFLEN]; my_off_t log_file_pos; ulong log_file_len = 0; @@ -1048,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, int result = -1; struct timespec start_ts; ulong trc_level = trace_level_; + const unsigned char *event_buf = (const unsigned char*)event_buf_; + bool exec_pos_present = false; // is SQL exec pos present in reply + LogPos exec_pos; // position of SQL thread LINT_INIT_STRUCT(start_ts);
function_enter(kWho);
- assert((unsigned char)event_buf[1] == kPacketMagicNum); - if ((unsigned char)event_buf[2] != kPacketFlagSync) + assert(event_buf[1] == kPacketMagicNum); + if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0) { /* current event does not require reply */ result = 0; @@ -1111,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, goto l_end; }
- packet = net->read_pos; + packet_start = packet = net->read_pos; if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum) { sql_print_error("Read semi-sync reply magic number error"); goto l_end; }
+ /* we determine if this semisync ack contains a sql-thread exec-pos + * by checking if last byte == 0, since the packet then contains + * \0-terminated filenames */ + exec_pos_present = packet[packet_len - 1] == 0; + log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); - log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + if (exec_pos_present == false) + { + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + } + else + { + log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET, + MY_MIN((ulong)FN_REFLEN, + packet_len - REPLY_BINLOG_NAME_OFFSET)); + } if (log_file_len >= FN_REFLEN) { sql_print_error("Read semi-sync reply binlog file length too large"); goto l_end; } - strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); + packet+= REPLY_BINLOG_NAME_OFFSET; + + strncpy(log_file_name, (const char*)packet, log_file_len); log_file_name[log_file_len] = 0;
+ if (exec_pos_present) + { + packet += log_file_len + 1; + if (packet + 8 + 1 >= (packet_start + packet_len)) + { + sql_print_error("Read semi-sync reply binlog. " + "Packet to short to contain exec-position!"); + goto l_end; + } + exec_pos.file_pos = uint8korr(packet); + packet += 8; + strncpy(exec_pos.file_name, (char*)packet, + (packet_start + packet_len) - packet); + } + if (trc_level & kTraceDetail) sql_print_information("%s: Got reply (%s, %lu)", kWho, log_file_name, (ulong)log_file_pos);
- result = reportReplyBinlog(server_id, log_file_name, log_file_pos); + result = reportReplyBinlog(server_id, log_file_name, log_file_pos, + exec_pos_present ? &exec_pos : NULL);
l_end: return function_exit(kWho, result); @@ -1154,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster() wait_file_name_inited_ = false; reply_file_name_inited_ = false; commit_file_name_inited_ = false; + if (active_tranxs_ != NULL) + { + /** + * make sure to empty transaction hash/list + * with slave-lag reporting this container does + * not have to be empty even if no transaction is + * currently running + */ + active_tranxs_->clear_active_tranx_nodes(); + }
rpl_semi_sync_master_yes_transactions = 0; rpl_semi_sync_master_no_transactions = 0; @@ -1168,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster()
unlock();
+ mysql_mutex_lock(&LOCK_slave_lag_); + rpl_semi_sync_master_slave_lag_wait_sessions = 0; + oldest_unapplied_tranx_commit_time_us_ = 0; + rpl_semi_sync_master_trx_slave_lag_wait_num = 0; + rpl_semi_sync_master_trx_slave_lag_wait_time = 0; + mysql_mutex_unlock(&LOCK_slave_lag_); + return function_exit(kWho, result); }
@@ -1186,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats() ((double)rpl_semi_sync_master_net_wait_num)) : 0);
unlock(); + + if (oldest_unapplied_tranx_commit_time_us_ != 0) + { + rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val - + oldest_unapplied_tranx_commit_time_us_; + } + else + { + rpl_semi_sync_master_estimated_slave_lag = 0; + } + + mysql_mutex_lock(&LOCK_slave_lag_); + if (rpl_semi_sync_master_trx_slave_lag_wait_num) + { + rpl_semi_sync_master_avg_trx_slave_lag_wait_time = + (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time / + (double)rpl_semi_sync_master_trx_slave_lag_wait_num); + } + else + { + rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0; + } + mysql_mutex_unlock(&LOCK_slave_lag_); }
/* Get the waiting time given the wait's staring time. @@ -1213,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts)
return (int)(end_usecs - start_usecs); } + +void ReplSemiSyncMaster::wake_slave_lag_waiters( + ulonglong oldest_unapplied_tranx_commit_time_us) +{ + mysql_mutex_lock(&LOCK_slave_lag_); + oldest_unapplied_tranx_commit_time_us_ = + oldest_unapplied_tranx_commit_time_us; + + if (rpl_semi_sync_master_slave_lag_wait_sessions > 0) + { + mysql_cond_broadcast(&COND_slave_lag_); + } + mysql_mutex_unlock(&LOCK_slave_lag_); +} + +int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec) +{ + int error = 0; + PSI_stage_info old_stage; + + /* slave lag waiting not enabled, return directly */ + if (rpl_semi_sync_master_max_slave_lag == 0) + return 0; + + /* there is no slave that can report slave lag, return directly */ + if (rpl_semi_sync_master_slave_lag_clients == 0) + return 0; + + /* compute start_time and end_time */ + struct timespec end_time; + set_timespec(end_time, 0); + ulonglong start_time_us = timespec_to_usec(&end_time); + end_time.tv_sec += timeout_sec; + + mysql_mutex_lock(&LOCK_slave_lag_); + + if (oldest_unapplied_tranx_commit_time_us_ == 0) + { + /* no slave lag, atleast one slave is up to date */ + mysql_mutex_unlock(&LOCK_slave_lag_); + return 0; + } + + if (rpl_semi_sync_master_max_slave_lag == 0) + { + /* slave lag waiting not enabled */ + mysql_mutex_unlock(&LOCK_slave_lag_); + return 0; + } + + /* This must be called after acquired the lock */ + THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_, + &stage_waiting_for_semi_sync_slave_lag, + &old_stage); + + bool waited = false; + ulonglong lag = 0; + ulonglong max_lag = 0; + while (oldest_unapplied_tranx_commit_time_us_ != 0) + { + /* check kill_level after THD_ENTER_COND but *before* cond_wait + * to avoid missing kills */ + if (! (getMasterEnabled() && is_on() && + thd_kill_level(NULL) == THD_IS_NOT_KILLED)) + break; + + lag = start_time_us - oldest_unapplied_tranx_commit_time_us_; + max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag; + if (lag <= max_lag) + break; + + waited = true; + rpl_semi_sync_master_slave_lag_wait_sessions++; + int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_, + &end_time); + rpl_semi_sync_master_slave_lag_wait_sessions--; + + bool thd_was_killed = thd_kill_level(NULL) != THD_IS_NOT_KILLED; + if (wait_result != 0 || thd_was_killed) + { + break; + } + } + + if (thd_kill_level(NULL) != THD_IS_NOT_KILLED) + { + /* Return error to client. */ + error = 1; + my_printf_error(ER_ERROR_DURING_COMMIT, + "Killed while waiting for replication semi-sync slave-lag.", + MYF(0)); + } + else if (lag > max_lag) + { + /* Return error to client. */ + error = 1; + my_printf_error(ER_ERROR_DURING_COMMIT, + "Slave-lag timeout", + MYF(0)); + } + + if (waited) + { + rpl_semi_sync_master_trx_slave_lag_wait_num++; + rpl_semi_sync_master_trx_slave_lag_wait_time += + (my_hrtime().val - start_time_us); + } + + /* The lock held will be released by thd_exit_cond, so no need to + call unlock() here */ + THD_EXIT_COND(NULL, & old_stage); + + return error; +} diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h index d9dc4ce..e5c0de6 100644 --- a/plugin/semisync/semisync_master.h +++ b/plugin/semisync/semisync_master.h @@ -24,17 +24,101 @@ #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_ss_mutex_LOCK_binlog_; extern PSI_cond_key key_ss_cond_COND_binlog_send_; + +extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_; +extern PSI_cond_key key_ss_cond_COND_slave_lag_; #endif
extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; +extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
struct TranxNode { char log_name_[FN_REFLEN]; - my_off_t log_pos_; + my_off_t log_pos_; + ulonglong tranx_commit_time_us; struct TranxNode *next_; /* the next node in the sorted list */ struct TranxNode *hash_next_; /* the next node during hash collision */ };
+struct LogPos; + +/* This represent a log position */ +struct LogPosPtr { + LogPosPtr() { Uninit();} + LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){} + explicit LogPosPtr(const LogPos& pos) { Assign(&pos); } + + const char *file_name; + my_off_t file_pos; + + LogPosPtr& Assign(const LogPosPtr *src) { + file_name = src->file_name; + file_pos = src->file_pos; + return *this; + } + + LogPosPtr& Assign(const LogPos *src); + + void Uninit() { file_name = NULL;} + bool IsInited() const { return file_name != NULL; } +}; + +struct LogPos { + char file_name[FN_REFLEN]; + my_off_t file_pos; + + LogPos() { Uninit(); } + + LogPosPtr ToLogPosPtr() const { + if (IsInited()){ + LogPosPtr p(file_name, file_pos); + return p; + } else { + LogPosPtr p; + return p; + } + } + + LogPos& Assign(const LogPosPtr *src) { + if (src->IsInited()) { + strcpy(file_name, src->file_name); + file_pos = src->file_pos; + } else { + Uninit(); + } + return *this; + } + + LogPos& Assign(const LogPos* src) { + LogPosPtr p = src->ToLogPosPtr(); + Assign(&p); + return *this; + } + + void Uninit() { file_name[0] = 0; } + bool IsInited() const { return file_name[0] != 0; } +}; + +inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) { + LogPosPtr p = src->ToLogPosPtr(); + Assign(&p); + return *this; +} + +inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) { + int cmp = strcmp(pos1->file_name, pos2->file_name); + + if (cmp != 0) + return cmp; + + if (pos1->file_pos > pos2->file_pos) + return 1; + else if (pos1->file_pos < pos2->file_pos) + return -1; + else + return 0; +} + /** @class TranxNodeAllocator
@@ -329,10 +413,14 @@ class ActiveTranx node2->log_name_, node2->log_pos_); }
+ void set_new_front(TranxNode* new_front); + public: ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); ~ActiveTranx();
+ bool is_empty() const { return trx_front_ == NULL; } + /* Insert an active transaction node with the specified position. * * Return: @@ -340,21 +428,42 @@ class ActiveTranx */ int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
- /* Clear the active transaction nodes until(inclusive) the specified - * position. - * If log_file_name is NULL, everything will be cleared: the sorted + /* Clear the active transaction + * Everything will be cleared: the sorted * list and the hash table will be reset to empty. - * + * * Return: - * 0: success; non-zero: error + * 0 success; non-zero: error + */ + int clear_active_tranx_nodes(); + + /* Prune the active transaction nodes until the specified + * position (inclusive). + * + * Return: + * true if any transaction was removed + * false if list was left unchanged */ - int clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos); + bool prune_active_tranx_nodes(LogPosPtr logpos, + ulonglong *oldest_tranx_commit_time_us);
- /* Given a position, check to see whether the position is an active - * transaction's ending position by probing the hash table. + /* Lookup a transaction's ending position by probing the hash table. + * + * return entry if found or NULL otherwise */ - bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); + TranxNode* lookup_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos); + + + /* Check if an entry is rear (i.e last) */ + bool is_rear(TranxNode* entry) const { return entry == trx_rear_; } + + /** + * return timestamp of oldest transaction in list + */ + ulonglong get_oldest_tranx_commit_time_us() const { + return trx_front_->tranx_commit_time_us; + }
/* Given two binlog positions, compare which one is bigger based on * (file_name, file_position). @@ -365,6 +474,24 @@ class ActiveTranx };
/** + * State that semisync master keeps per slave + */ +struct ReplSemiSyncMasterPerSlaveState +{ + ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {} + + /** + * No of events that has not been semi-sync acked + */ + unsigned unacked_event_count_; + + /** + * Position of last event that was semisync'ed + */ + LogPos sync_req_pos_; +}; + +/** The extension class for the master of semi-synchronous replication */ class ReplSemiSyncMaster @@ -432,6 +559,16 @@ class ReplSemiSyncMaster
bool state_; /* whether semi-sync is switched */
+ /* This cond variable is signaled when slave lag has decreased */ + mysql_cond_t COND_slave_lag_; + + /* Mutex that protects oldest_unapplied_tranx_commit_time_us */ + mysql_mutex_t LOCK_slave_lag_; + + /* this is commit time of oldest transaction that has not been applied + * on any slave */ + ulonglong oldest_unapplied_tranx_commit_time_us_; + void lock(); void unlock(); void cond_broadcast(); @@ -493,6 +630,9 @@ class ReplSemiSyncMaster /* Is the slave servered by the thread requested semi-sync */ bool is_semi_sync_slave();
+ /* Does this slave have slave lag reporting capabilities */ + bool has_semi_sync_slave_lag(); + /* In semi-sync replication, reports up to which binlog position we have * received replies from the slave indicating that it already get the events. * @@ -501,13 +641,15 @@ class ReplSemiSyncMaster * log_file_name - (IN) binlog file name * end_offset - (IN) the offset in the binlog file up to which we have * the replies from the slave + * exec_position - (IN) position of SQL thread or NULL if not present * * Return: * 0: success; non-zero: error */ int reportReplyBinlog(uint32 server_id, const char* log_file_name, - my_off_t end_offset); + my_off_t end_offset, + const LogPos *exec_position);
/* Commit a transaction in the final step. This function is called from * InnoDB before returning from the low commit. If semi-sync is switch on, @@ -540,6 +682,16 @@ class ReplSemiSyncMaster */ int reserveSyncHeader(unsigned char *header, unsigned long size);
+ /* + * check if an event should be semi synced and optionally + * if it should report back position of SQL thread on slave + * + * return 0 - no semi sync + * 1 - semi sync + * 2 - semi sync and report exec position + */ + int checkSyncReq(const LogPosPtr *log_pos); + /* Update the sync bit in the packet header to indicate to the slave whether * the master will wait for the reply of the event. If semi-sync is switched * off and we detect that the slave is catching up, we switch semi-sync on. @@ -592,6 +744,21 @@ class ReplSemiSyncMaster * go off for that. */ int resetMaster(); + + /** + * wake potential slave-lag waiters + * called by binlog dump-thread(s) + */ + void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us); + + /** + * wait for slave lag to get below threshold + * called by user-thread(s) + * + * return 0 - success + * 1 - timeout + */ + int wait_slave_lag(ulong max_wait_time_sec); };
enum rpl_semi_sync_master_wait_point_t { @@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num; extern unsigned long long rpl_semi_sync_master_net_wait_time; extern unsigned long long rpl_semi_sync_master_trx_wait_time;
+extern unsigned long rpl_semi_sync_master_max_unacked_event_count; +extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes; +extern unsigned long rpl_semi_sync_master_max_slave_lag; +extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us; +extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions; +extern unsigned long long rpl_semi_sync_master_estimated_slave_lag; + +extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time; +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num; +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time; + /* This indicates whether we should keep waiting if no semi-sync slave is available. diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc index 7bb0eea..3282158 100644 --- a/plugin/semisync/semisync_master_plugin.cc +++ b/plugin/semisync/semisync_master_plugin.cc @@ -21,6 +21,11 @@
static ReplSemiSyncMaster repl_semisync;
+// forward declaration +static inline ulong get_slave_lag_wait_timeout(THD* thd); + +static char rpl_semi_sync_master_group_commit = 0; + C_MODE_START
int repl_semi_report_binlog_update(Binlog_storage_param *param, @@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
if (repl_semisync.getMasterEnabled()) { + if (rpl_semi_sync_master_group_commit && + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0)) + { + /** there are transactions more coming... */ + return 0; + } + /* Let us store the binlog file name and the position, so that we know how long to wait for the binlog to the replicated to @@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param, return error; }
-int repl_semi_request_commit(Trans_param *param) +int repl_semi_before_commit(Trans_param *param, int *error) { + *error = repl_semisync.wait_slave_lag( + get_slave_lag_wait_timeout(current_thd)); + return 0; }
@@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param, my_off_t log_pos, uint32 flags) { int error= 0; + + if (rpl_semi_sync_master_group_commit && + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0)) + { + /** there are transactions more coming... */ + return 0; + } + if (rpl_semi_sync_master_wait_point == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) { @@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param, Let's assume this semi-sync slave has already received all binlog events before the filename and position it requests. */ - repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos); + repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL); } sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", semi_sync_slave ? "semi-sync" : "asynchronous", @@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, &fix_rpl_semi_sync_master_trace_level, // update 32, 0, ~0UL, 1);
+static MYSQL_SYSVAR_ULONG(max_unacked_event_count, + rpl_semi_sync_master_max_unacked_event_count, + PLUGIN_VAR_OPCMDARG, + "Maximum unacked replication events", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1); + +static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes, + rpl_semi_sync_master_max_unacked_event_bytes, + PLUGIN_VAR_OPCMDARG, + "Maximum unacked replication bytes", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1); + +static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag, + PLUGIN_VAR_OPCMDARG, + "Maximum allowed lag of fastest semi-sync slave (in seconds), " + "checked before commit.", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1); + +static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout, + PLUGIN_VAR_RQCMDARG, + "Timeout in seconds a rw-transaction may wait for max slave lag before " + "being rolled back.", + NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0); + +static MYSQL_SYSVAR_ULONG( + slave_lag_heartbeat_frequency_us, + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us, + PLUGIN_VAR_RQCMDARG, + "Heartbeat frequency when slave-lag is enabled (in microseconds).", + NULL, // check + NULL, // update + 500000, /* 500 ms */ + 1, ~0UL, 1); + +static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit, + PLUGIN_VAR_OPCMDARG, + "Group commit for semi sync", + NULL, // check + NULL, + 0); + static SYS_VAR* semi_sync_master_system_vars[]= { MYSQL_SYSVAR(enabled), MYSQL_SYSVAR(wait_point), MYSQL_SYSVAR(timeout), MYSQL_SYSVAR(wait_no_slave), MYSQL_SYSVAR(trace_level), + MYSQL_SYSVAR(max_unacked_event_count), + MYSQL_SYSVAR(max_unacked_event_bytes), + MYSQL_SYSVAR(max_slave_lag), + MYSQL_SYSVAR(slave_lag_wait_timeout), + MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us), + MYSQL_SYSVAR(group_commit), NULL, };
+static inline ulong get_slave_lag_wait_timeout(THD* thd) +{ + return THDVAR(thd, slave_lag_wait_timeout); +}
static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, SYS_VAR *var, @@ -297,6 +377,7 @@ Trans_observer trans_observer = {
repl_semi_report_commit, // after_commit repl_semi_report_rollback, // after_rollback + repl_semi_before_commit, // before commit };
Binlog_storage_observer storage_observer = { @@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG) DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG) DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG) DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG) - +DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG) +DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
/* plugin status variables */ static SHOW_VAR semi_sync_master_status_vars[]= { @@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= { {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_slave_lag_wait_sessions", + (char*) &SHOW_FNAME(slave_lag_wait_sessions), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_estimated_slave_lag", + (char*) &SHOW_FNAME(estimated_slave_lag), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_slave_lag_wait_time", + (char*) &SHOW_FNAME(trx_slave_lag_wait_time), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_slave_lag_waits", + (char*) &SHOW_FNAME(trx_slave_lag_wait_num), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time", + (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time), + SHOW_SIMPLE_FUNC}, {NULL, NULL, SHOW_LONG}, };
#ifdef HAVE_PSI_INTERFACE PSI_mutex_key key_ss_mutex_LOCK_binlog_; +PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
static PSI_mutex_info all_semisync_mutexes[]= { - { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0} + { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 }, + { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 } };
PSI_cond_key key_ss_cond_COND_binlog_send_; +PSI_cond_key key_ss_cond_COND_slave_lag_;
static PSI_cond_info all_semisync_conds[]= { - { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0} + { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 }, + { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 } }; #endif /* HAVE_PSI_INTERFACE */
PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave= { 0, "Waiting for semi-sync ACK from slave", 0};
+PSI_stage_info stage_waiting_for_semi_sync_slave_lag= +{ 0, "Waiting for semi-sync slave lag", 0}; + #ifdef HAVE_PSI_INTERFACE PSI_stage_info *all_semisync_stages[]= { - & stage_waiting_for_semi_sync_ack_from_slave + & stage_waiting_for_semi_sync_ack_from_slave, + & stage_waiting_for_semi_sync_slave_lag };
static void init_semisync_psi_keys(void) @@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master) MariaDB_PLUGIN_MATURITY_GAMMA } maria_declare_plugin_end; - diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc index 5f98472..839e0cc 100644 --- a/plugin/semisync/semisync_slave.cc +++ b/plugin/semisync/semisync_slave.cc @@ -20,6 +20,7 @@ char rpl_semi_sync_slave_enabled; char rpl_semi_sync_slave_status= 0; unsigned long rpl_semi_sync_slave_trace_level; +char rpl_semi_sync_slave_lag_enabled= 0;
int ReplSemiSyncSlave::initObject() { @@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject()
int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, unsigned long total_len, - bool *need_reply, + unsigned char *need_reply, const char **payload, unsigned long *payload_len) { @@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
if ((unsigned char)(header[0]) == kPacketMagicNum) { - *need_reply = (header[1] & kPacketFlagSync); + *need_reply = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport)); *payload_len = total_len - 2; *payload = header + 2;
@@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) return 0; }
-int ReplSemiSyncSlave::slaveReply(MYSQL *mysql, - const char *binlog_filename, - my_off_t binlog_filepos) +int ReplSemiSyncSlave::slaveReply(unsigned char header_byte, + MYSQL *mysql, + const char *binlog_filename, + my_off_t binlog_filepos, + Master_info * mi) { const char *kWho = "ReplSemiSyncSlave::slaveReply"; NET *net= &mysql->net; - uchar reply_buffer[REPLY_MAGIC_NUM_LEN - + REPLY_BINLOG_POS_LEN - + REPLY_BINLOG_NAME_LEN]; + uchar reply_buffer[REPLY_MAGIC_NUM_LEN + + 2 * ( REPLY_BINLOG_POS_LEN + + REPLY_BINLOG_NAME_LEN + + /* '\0' */ 1) ]; int reply_res, name_len = strlen(binlog_filename); + int msg_len = name_len + REPLY_BINLOG_NAME_OFFSET;
function_enter(kWho);
@@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql, sql_print_information("%s: reply (%s, %lu)", kWho, binlog_filename, (ulong)binlog_filepos);
+ if (header_byte & kPacketFlagSyncAndReport) + { + /** + * master requests that we also report back SQL-thread position + */ + + // where to store sql filename/position + char *bufptr = (char*)reply_buffer + msg_len; + bufptr[0] = 0; // '\0' terminate previous filename + bufptr++; + + my_off_t sql_file_pos; + // get file/position and store the filename directly info bufptr+8 + size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos); + int8store(bufptr, sql_file_pos); // store position + + msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1; + } + net_clear(net, 0); /* Send the reply. */ - reply_res = my_net_write(net, reply_buffer, - name_len + REPLY_BINLOG_NAME_OFFSET); + reply_res = my_net_write(net, reply_buffer, msg_len); + if (!reply_res) { reply_res = net_flush(net); diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h index 1bf8cf3..c91847d 100644 --- a/plugin/semisync/semisync_slave.h +++ b/plugin/semisync/semisync_slave.h @@ -60,23 +60,30 @@ class ReplSemiSyncSlave * Return: * 0: success; non-zero: error */ - int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, + int slaveReadSyncHeader(const char *header, unsigned long total_len, + unsigned char *need_reply_byte, const char **payload, unsigned long *payload_len);
/* A slave replies to the master indicating its replication process. It * indicates that the slave has received all events before the specified * binlog position. - * + * * Input: + * need_reply_byte - (IN) the header byte * mysql - (IN) the mysql network connection * binlog_filename - (IN) the reply point's binlog file name * binlog_filepos - (IN) the reply point's binlog file offset + * master_info - (IN) the master info struct so that we can get more + * info if needed * * Return: * 0: success; non-zero: error */ - int slaveReply(MYSQL *mysql, const char *binlog_filename, - my_off_t binlog_filepos); + int slaveReply(unsigned char need_reply_byte, + MYSQL *mysql, + const char *binlog_filename, + my_off_t binlog_filepos, + Master_info* master_info);
int slaveStart(Binlog_relay_IO_param *param); int slaveStop(Binlog_relay_IO_param *param); @@ -93,5 +100,6 @@ class ReplSemiSyncSlave extern char rpl_semi_sync_slave_enabled; extern unsigned long rpl_semi_sync_slave_trace_level; extern char rpl_semi_sync_slave_status; +extern char rpl_semi_sync_slave_lag_enabled;
#endif /* SEMISYNC_SLAVE_H */ diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc index 572ead2..0bf03be 100644 --- a/plugin/semisync/semisync_slave_plugin.cc +++ b/plugin/semisync/semisync_slave_plugin.cc @@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync; event read is the last event of a transaction. And the value is checked in repl_semi_slave_queue_event. */ -bool semi_sync_need_reply= false; +unsigned char semi_sync_need_reply= 0;
C_MODE_START
@@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, return 1; } mysql_free_result(mysql_store_result(mysql)); + + if (rpl_semi_sync_slave_lag_enabled) + { + char buf[100]; + /* + Tell master that we can do exec-position reporting + */ + snprintf(buf, sizeof(buf), "SET @%s= 1", + ReplSemiSyncBase::kRplSemiSyncSlaveReportExec); + if (mysql_real_query(mysql, buf, strlen(buf))) + { + sql_print_error("query: %s on master failed", buf); + return 1; + } + mysql_free_result(mysql_store_result(mysql)); + } + rpl_semi_sync_slave_status= 1; return 0; } @@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, should not cause the slave IO thread to stop, and the error messages are already reported. */ - (void) repl_semisync.slaveReply(param->mysql, + (void) repl_semisync.slaveReply(semi_sync_need_reply, + param->mysql, param->master_log_name, - param->master_log_pos); + param->master_log_pos, + param->mi); } return 0; } @@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level, &fix_rpl_semi_sync_trace_level, // update 32, 0, ~0UL, 1);
+static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled, + PLUGIN_VAR_OPCMDARG, + "Enable semi-synchronous replication slave lag reporting. ", + NULL, // check + NULL, // update + 0); + static SYS_VAR* semi_sync_slave_system_vars[]= { MYSQL_SYSVAR(enabled), MYSQL_SYSVAR(trace_level), + MYSQL_SYSVAR(lag_enabled), NULL, };
@@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave) MariaDB_PLUGIN_MATURITY_GAMMA } maria_declare_plugin_end; - diff --git a/sql/handler.cc b/sql/handler.cc index 3ca9ec3..3e6cd65 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1364,12 +1364,30 @@ int ha_commit_trans(THD *thd, bool all) uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all); /* rw_trans is TRUE when we in a transaction changing data */ bool rw_trans= is_real_trans && (rw_ha_count > 0); + bool mdl_request_initialized= false; MDL_request mdl_request; DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d", is_real_trans, rw_trans, rw_ha_count));
if (rw_trans) { + /* check READ-ONLY just before before_commit hook to decrease likelihood + * of having threads hanging waiting for slave-lag only to be aborted + * due to read-only. + */ + if (opt_readonly && + !(thd->security_ctx->master_access & SUPER_ACL) && + !thd->slave_thread) + { + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); + goto err; + } + + if (RUN_HOOK(transaction, before_commit, (thd))) + { + goto err; + } + /* Acquire a metadata lock which will ensure that COMMIT is blocked by an active FLUSH TABLES WITH READ LOCK (and vice versa: @@ -1378,6 +1396,7 @@ int ha_commit_trans(THD *thd, bool all) We allow the owner of FTWRL to COMMIT; we assume that it knows what it does. */ + mdl_request_initialized= true; mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);
@@ -1486,7 +1505,7 @@ int ha_commit_trans(THD *thd, bool all) ha_rollback_trans(thd, all);
end: - if (rw_trans && mdl_request.ticket) + if (rw_trans && mdl_request_initialized && mdl_request.ticket) { /* We do not always immediately release transactional locks diff --git a/sql/replication.h b/sql/replication.h index 4731c22..309bdb4 100644 --- a/sql/replication.h +++ b/sql/replication.h @@ -110,6 +110,21 @@ typedef struct Trans_observer { @retval 1 Failure */ int (*after_rollback)(Trans_param *param); + + /** + This callback is called before transaction commit + If function does not return *error == 0 transaction will + not be committed but error code will be returned to client + + @note *error!=0 and return code 0 shall be used by plugin to signal + that transaction should be aborted. + If returning non-zero transaction will also be aborted and an error + will be printed to error log. + + @retval 0 Sucess + @retval non-zero error + */ + int (*before_commit)(Trans_param *param, int *error); } Trans_observer;
/** @@ -294,6 +309,8 @@ enum Binlog_relay_IO_flags { };
+class Master_info; + /** Replication binlog relay IO observer parameter */ @@ -309,8 +326,20 @@ typedef struct Binlog_relay_IO_param { my_off_t master_log_pos;
MYSQL *mysql; /* the connection to master */ + + Master_info * mi; /* master info handle */ } Binlog_relay_IO_param;
+ +/* get the master log given a Master_info + * and store it in filename_buf/filepos + * return length of filename (excluding \0) + * + * note: filename_buf should be a minimum FN_REFLEN + */ +size_t get_master_log_pos(const Master_info *mi, + char *filename_buf, my_off_t *filepos); + /** Observes and extends the service of slave IO thread. */ @@ -561,7 +590,21 @@ int get_user_var_str(const char *name, char *value, unsigned long len, unsigned int precision, int *null_value);
- + +/** + Set or replace the value of user variable as to an ulonglong + + @param name user variable name + @param value the value + @param old_value pointer to where old value will be stored (or NULL) + + @retval 0 Success, no prior value found + @retval 1 Success, old_value populated + @retval -1 Fail +*/ +int set_user_var_int(const char *name, + long long int value, + long long int *old_value);
#ifdef __cplusplus } diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index 3962600..209a809 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -23,6 +23,7 @@ #include "rpl_filter.h" #include <my_dir.h> #include "rpl_handler.h" +#include "sql_prepare.h"
Trans_delegate *transaction_delegate; Binlog_storage_delegate *binlog_storage_delegate; @@ -88,6 +89,42 @@ int get_user_var_str(const char *name, char *value, return 0; }
+int set_user_var_int(const char *name, + long long int value, + long long int *old_value) +{ + THD* thd= current_thd; + bool null_val; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, + (uchar*) name, strlen(name)); + if (entry != NULL) + { + if (old_value != NULL) + *old_value= entry->val_int(&null_val); + } + + Ed_connection con(thd); + + char buf[256]; + int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value); + if (/* error */ res < 0 || + /* truncated */ res >= sizeof(buf)) + { + return -1; + } + + LEX_STRING str; + lex_string_set(&str, buf); + + if (con.execute_direct(str)) + { + return -1; + } + + return entry == NULL ? 0 : 1; +} + int delegates_init() { static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem; @@ -249,6 +286,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all) return ret; }
+int Trans_delegate::before_commit(THD *thd) +{ + int ret= 0, error= 0; + Trans_param param; + param.flags= 0; + param.log_file= 0; + param.log_pos= 0; + FOREACH_OBSERVER(ret, before_commit, thd, (¶m, &error)); + return error; +} + int Binlog_storage_delegate::after_flush(THD *thd, const char *log_file, my_off_t log_pos, @@ -374,17 +422,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, String *packet, - const char *log_file, + const char *log_file_path, my_off_t log_pos) { Binlog_transmit_param param; param.flags= flags;
int ret= 0; + const char* log_file_name= log_file_path != NULL ? + log_file_path + dirname_length(log_file_path) : NULL; FOREACH_OBSERVER(ret, before_send_event, false, (¶m, (uchar *)packet->c_ptr(), packet->length(), - log_file+dirname_length(log_file), log_pos)); + log_file_name, log_pos)); return ret; }
@@ -414,6 +464,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, Master_info *mi) { + param->mi = mi; param->mysql= mi->mysql; param->user= mi->user; param->host= mi->host; @@ -540,6 +591,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void { return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); } + +/* get master log pos for a Master_info struct */ +size_t get_master_log_pos(const Master_info* mi, + char *filename_buf, my_off_t *filepos) +{ + mysql_mutex_t *mutex= &mi->rli.data_lock; + + mysql_mutex_lock(mutex); + *filepos= mi->rli.group_master_log_pos; + strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN); + mysql_mutex_unlock(mutex); + return strnlen(filename_buf, FN_REFLEN); +} + #else int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) { @@ -560,4 +625,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void { return 0; } + +size_t get_master_log_pos(const Master_info* mi, + char *filename_buf, my_off_t *filepos) +{ + *filepos= 0; + filename_buf[0]= 0; + return 0; +} + #endif /* HAVE_REPLICATION */ diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h index afcfd9d..5119ee4 100644 --- a/sql/rpl_handler.h +++ b/sql/rpl_handler.h @@ -142,7 +142,7 @@ class Trans_delegate :public Delegate { public: typedef Trans_observer Observer; - int before_commit(THD *thd, bool all); + int before_commit(THD *thd); int before_rollback(THD *thd, bool all); int after_commit(THD *thd, bool all); int after_rollback(THD *thd, bool all); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 56300c6..d60a122 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -154,7 +154,9 @@ class Relay_log_info : public Slave_reporting_capability standard lock acquisition order to avoid deadlocks: run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index */ - mysql_mutex_t data_lock, run_lock; + mutable mysql_mutex_t data_lock; + mysql_mutex_t run_lock; + /* start_cond is broadcast when SQL thread is started stop_cond - when stopped diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index d9ae6ca..9662058 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -818,6 +818,13 @@ static int send_heartbeat_event(binlog_send_info *info, packet->append(b, sizeof(b)); }
+ if (RUN_HOOK(binlog_transmit, before_send_event, + (info->thd, info->flags, packet, 0, 0))) + { + info->error= ER_UNKNOWN_ERROR; + DBUG_RETURN(-1); + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) || net_flush(net)) { @@ -825,6 +832,13 @@ static int send_heartbeat_event(binlog_send_info *info, DBUG_RETURN(-1); }
+ if (RUN_HOOK(binlog_transmit, after_send_event, + (info->thd, info->flags, packet))) + { + info->error= ER_UNKNOWN_ERROR; + DBUG_RETURN(-1); + } + DBUG_RETURN(0); }
participants (1)
-
Kristian Nielsen