diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 390fdfa..ef00f9c 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -39,7 +39,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, slave_running(0), slave_run_id(0), sync_counter(0), heartbeat_period(0), received_heartbeats(0), master_id(0), using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), - gtid_reconnect_event_skip_count(0), gtid_event_seen(false) + gtid_reconnect_event_skip_count(0), gtid_event_seen(false), + last_gtid_is_standalone(false) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -168,6 +169,7 @@ void init_master_log_pos(Master_info* mi) mi->events_queued_since_last_gtid= 0; mi->gtid_reconnect_event_skip_count= 0; mi->gtid_event_seen= false; + mi->last_gtid_is_standalone= false; /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 57f0f57..4aaa256 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -167,6 +167,11 @@ class Master_info : public Slave_reporting_capability uint64 gtid_reconnect_event_skip_count; /* gtid_event_seen is false until we receive first GTID event from master. */ bool gtid_event_seen; + /* + False if last GTID event wasn't standalone, i.e. it was an equivalent of the + BEGIN statement. True otherwise. + */ + bool last_gtid_is_standalone; }; int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname, diff --git a/sql/slave.cc b/sql/slave.cc index 9d3a172..f97567f 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5153,11 +5153,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) case GTID_EVENT: { - uchar dummy_flag; + uchar gtid_flags; if (Gtid_log_event::peek(buf, event_len, checksum_alg, &event_gtid.domain_id, &event_gtid.server_id, - &event_gtid.seq_no, &dummy_flag, + &event_gtid.seq_no, >id_flags, rli->relay_log.description_event_for_queue)) { error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; @@ -5217,6 +5217,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->events_queued_since_last_gtid= 0; } mi->last_queued_gtid= event_gtid; + mi->last_gtid_is_standalone= (gtid_flags & Gtid_log_event::FL_STANDALONE) != 0; ++mi->events_queued_since_last_gtid; inc_pos= event_len; } @@ -5298,6 +5299,15 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) else rli->relay_log.harvest_bytes_written(&rli->log_space_total); } + else + { + Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0); + fake_rev.server_id= mi->master_id; + if (rli->relay_log.append_no_lock(&fake_rev)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } } else if ((s_id == global_system_variables.server_id && @@ -5369,6 +5379,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } mysql_mutex_unlock(log_lock); + if (!error && mi->using_gtid != Master_info::USE_GTID_NO && + (buf[EVENT_TYPE_OFFSET] == XID_EVENT || + (buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + (mi->last_gtid_is_standalone || + Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg))))) + { + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + skip_relay_logging: err: