[PATCH 0/4] MDEV-31273, Pre-compute binlog checksums outside of LOCK_log
Hi Monty, Here's the implementation of MDEV-31273, pre-compute binlog checksums. The main patch to review is the last one, number 4. This is the actual implementation of binlog checksum pre-computation, and the only patch that changes the behaviour of the code. Most of the work (and most of the changes) are cleanups of the old checksum code that don't change the functionality but removes a lot of complex and hard-to-modify logic (and I think actually fixes a bug or two). I have kept this cleanup separate in the first 3 patches to make it easier to review and not get mixed up with the actual implementation of the new functionality. With this patch series, calculation of binlog checksum will happen when writing events into the stmt/trx caches. Later, when writing the binlog file under LOCK_log, only a direct copy of the bytes is done, which should improve binlog scalability with checksums enabled. The patch series is also available on github: https://github.com/MariaDB/server/commits/knielsen_mdev31273 - Kristian. Kristian Nielsen (4): MDEV-31273: Replace Log_event::writer with function parameter MDEV-31273: Eliminate Log_event::checksum_alg MDEV-31273: Refactor MYSQL_BIN_LOG::write_cache() MDEV-31273: Precompute binlog checksums include/my_atomic.h | 41 +- include/my_sys.h | 2 + .../main/mysqlbinlog_row_compressed.result | 48 +- .../main/mysqlbinlog_row_minimal.result | 48 +- .../main/mysqlbinlog_stmt_compressed.result | 16 +- mysql-test/main/mysqld--help.result | 7 + .../suite/binlog/include/binlog_ioerr.inc | 3 + mysql-test/suite/binlog/r/binlog_ioerr.result | 2 + .../r/binlog_mysqlbinlog_raw_flush.result | 1 + mysql-test/suite/binlog/t/binlog_killed.test | 2 +- .../t/binlog_mysqlbinlog_raw_flush.test | 2 + .../t/binlog_table_map_optional_metadata.test | 4 +- .../binlog_encryption/binlog_ioerr.result | 2 + .../suite/rpl/r/rpl_checksum_cache.result | 43 +- .../suite/rpl/t/rpl_checksum_cache.test | 98 +++- .../r/sysvars_server_notembedded.result | 10 + mysys/mf_iocache2.c | 34 ++ sql/log.cc | 468 ++++++++++-------- sql/log.h | 14 +- sql/log_event.cc | 27 +- sql/log_event.h | 158 +++--- sql/log_event_client.cc | 22 +- sql/log_event_old.cc | 14 +- sql/log_event_old.h | 4 +- sql/log_event_server.cc | 395 ++++++--------- sql/mysqld.cc | 1 + sql/mysqld.h | 1 + sql/privilege.h | 3 + sql/slave.cc | 60 +-- sql/sql_repl.cc | 2 +- sql/sys_vars.cc | 13 + sql/wsrep_binlog.cc | 6 +- sql/wsrep_mysqld.cc | 12 +- 33 files changed, 922 insertions(+), 641 deletions(-) -- 2.30.2
This is a preparatory patch for precomputing binlog checksums outside of holding LOCK_log, no functional changes. Replace Log_event::writer with just passing the writer object as a function parameter to Log_event::write(). Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- sql/log_event.h | 106 ++++++++-------- sql/log_event_old.cc | 14 +-- sql/log_event_old.h | 4 +- sql/log_event_server.cc | 273 ++++++++++++++++++++-------------------- 4 files changed, 197 insertions(+), 200 deletions(-) diff --git a/sql/log_event.h b/sql/log_event.h index 67e06d70d8f..acdedb606c1 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1294,8 +1294,6 @@ class Log_event */ ulong slave_exec_mode; - Log_event_writer *writer; - #ifdef MYSQL_SERVER THD* thd; @@ -1433,24 +1431,26 @@ class Log_event static void operator delete(void*, void*) { } #ifdef MYSQL_SERVER - bool write_header(size_t event_data_length); - bool write_data(const uchar *buf, size_t data_length) + bool write_header(Log_event_writer *writer, size_t event_data_length); + bool write_data(Log_event_writer *writer, const uchar *buf, size_t data_length) { return writer->write_data(buf, data_length); } - bool write_data(const char *buf, size_t data_length) - { return write_data((uchar*)buf, data_length); } - bool write_footer() + bool write_data(Log_event_writer *writer, const char *buf, size_t data_length) + { return write_data(writer, (uchar*)buf, data_length); } + bool write_footer(Log_event_writer *writer) { return writer->write_footer(); } my_bool need_checksum(); - virtual bool write() + virtual bool write(Log_event_writer *writer) { - return write_header(get_data_size()) || write_data_header() || - write_data_body() || write_footer(); + return write_header(writer, get_data_size()) || + write_data_header(writer) || + write_data_body(writer) || + write_footer(writer); } - virtual bool write_data_header() + virtual bool write_data_header(Log_event_writer *writer) { return 0; } - virtual bool write_data_body() + virtual bool write_data_body(Log_event_writer *writer) { return 0; } /* Return start of query time or current time */ @@ -2208,8 +2208,8 @@ class Query_log_event: public Log_event static int begin_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); #ifdef MYSQL_SERVER - bool write(); - virtual bool write_post_header_for_derived() { return FALSE; } + bool write(Log_event_writer *writer); + virtual bool write_post_header_for_derived(Log_event_writer *writer) { return FALSE; } #endif bool is_valid() const { return query != 0; } @@ -2289,7 +2289,7 @@ class Query_compressed_log_event:public Query_log_event{ ulong query_length, bool using_trans, bool direct, bool suppress_use, int error); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif }; @@ -2622,8 +2622,8 @@ class Load_log_event: public Log_event return sql_ex.new_format() ? NEW_LOAD_EVENT: LOAD_EVENT; } #ifdef MYSQL_SERVER - bool write_data_header(); - bool write_data_body(); + bool write_data_header(Log_event_writer *writer); + bool write_data_body(Log_event_writer *writer); #endif bool is_valid() const { return table_name != 0; } int get_data_size() @@ -2711,7 +2711,7 @@ class Start_log_event_v3: public Log_event my_off_t get_header_len(my_off_t l __attribute__((unused))) { return LOG_EVENT_MINIMAL_HEADER_LEN; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif bool is_valid() const { return server_version[0] != 0; } int get_data_size() @@ -2760,14 +2760,14 @@ class Start_encryption_log_event : public Log_event memcpy(nonce, nonce_arg, BINLOG_NONCE_LENGTH); } - bool write_data_body() + bool write_data_body(Log_event_writer *writer) { uchar scheme_buf= crypto_scheme; uchar key_version_buf[BINLOG_KEY_VERSION_LENGTH]; int4store(key_version_buf, key_version); - return write_data(&scheme_buf, sizeof(scheme_buf)) || - write_data(key_version_buf, sizeof(key_version_buf)) || - write_data(nonce, BINLOG_NONCE_LENGTH); + return write_data(writer, &scheme_buf, sizeof(scheme_buf)) || + write_data(writer, key_version_buf, sizeof(key_version_buf)) || + write_data(writer, nonce, BINLOG_NONCE_LENGTH); } #else bool print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -2888,7 +2888,7 @@ class Format_description_log_event: public Start_log_event_v3 } Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;} #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif bool header_is_valid() const { @@ -3001,7 +3001,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg, const char* get_var_type_name(); int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;} #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif bool is_valid() const { return 1; } bool is_part_of_group() { return 1; } @@ -3081,7 +3081,7 @@ class Rand_log_event: public Log_event Log_event_type get_type_code() { return RAND_EVENT;} int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif bool is_valid() const { return 1; } bool is_part_of_group() { return 1; } @@ -3161,7 +3161,7 @@ class Xid_log_event: public Xid_apply_log_event Log_event_type get_type_code() { return XID_EVENT;} int get_data_size() { return sizeof(xid); } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif private: @@ -3311,7 +3311,7 @@ class XA_prepare_log_event: public Xid_apply_log_event } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif private: @@ -3380,7 +3380,7 @@ class User_var_log_event: public Log_event ~User_var_log_event() = default; Log_event_type get_type_code() { return USER_VAR_EVENT;} #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); /* Getter and setter for deferred User-event. Returns true if the event is not applied directly @@ -3532,7 +3532,7 @@ class Rotate_log_event: public Log_event int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} bool is_valid() const { return new_log_ident != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif private: @@ -3566,7 +3566,7 @@ class Binlog_checkpoint_log_event: public Log_event int get_data_size() { return binlog_file_len + BINLOG_CHECKPOINT_HEADER_LEN;} bool is_valid() const { return binlog_file_name != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); enum_skip_reason do_shall_skip(rpl_group_info *rgi); #endif }; @@ -3731,7 +3731,7 @@ class Gtid_log_event: public Log_event } bool is_valid() const { return seq_no != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); static int make_compatible_event(String *packet, bool *need_dummy_event, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); static bool peek(const uchar *event_start, size_t event_len, @@ -3849,7 +3849,7 @@ class Gtid_list_log_event: public Log_event bool is_valid() const { return list != NULL; } #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) bool to_packet(String *packet); - bool write(); + bool write(Log_event_writer *writer); virtual int do_apply_event(rpl_group_info *rgi); enum_skip_reason do_shall_skip(rpl_group_info *rgi); #endif @@ -3920,13 +3920,13 @@ class Create_file_log_event: public Load_log_event } bool is_valid() const { return inited_from_old || block != 0; } #ifdef MYSQL_SERVER - bool write_data_header(); - bool write_data_body(); + bool write_data_header(Log_event_writer *writer); + bool write_data_body(Log_event_writer *writer); /* Cut out Create_file extensions and write it as Load event - used on the slave */ - bool write_base(); + bool write_base(Log_event_writer *writer); #endif private: @@ -3980,7 +3980,7 @@ class Append_block_log_event: public Log_event int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;} bool is_valid() const { return block != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); const char* get_db() { return db; } #endif @@ -4021,7 +4021,7 @@ class Delete_file_log_event: public Log_event int get_data_size() { return DELETE_FILE_HEADER_LEN ;} bool is_valid() const { return file_id != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); const char* get_db() { return db; } #endif @@ -4061,7 +4061,7 @@ class Execute_load_log_event: public Log_event int get_data_size() { return EXEC_LOAD_HEADER_LEN ;} bool is_valid() const { return file_id != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); const char* get_db() { return db; } #endif @@ -4161,7 +4161,7 @@ class Execute_load_query_log_event: public Query_log_event ulong get_post_header_size_for_derived(); #ifdef MYSQL_SERVER - bool write_post_header_for_derived(); + bool write_post_header_for_derived(Log_event_writer *writer); #endif private: @@ -4229,8 +4229,8 @@ class Annotate_rows_log_event: public Log_event virtual bool is_part_of_group() { return 1; } #ifndef MYSQL_CLIENT - virtual bool write_data_header(); - virtual bool write_data_body(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -4895,8 +4895,8 @@ class Table_map_log_event : public Log_event virtual int get_data_size() { return (uint) m_data_size; } #ifdef MYSQL_SERVER virtual int save_field_metadata(); - virtual bool write_data_header(); - virtual bool write_data_body(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); virtual const char *get_db() { return m_dbnam; } #endif @@ -5154,9 +5154,9 @@ class Rows_log_event : public Log_event #endif #ifdef MYSQL_SERVER - virtual bool write_data_header(); - virtual bool write_data_body(); - virtual bool write_compressed(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); + virtual bool write_compressed(Log_event_writer *writer); virtual const char *get_db() { return m_table->s->db.str; } #endif /* @@ -5468,7 +5468,7 @@ class Write_rows_compressed_log_event : public Write_rows_log_event #if defined(MYSQL_SERVER) Write_rows_compressed_log_event(THD*, TABLE*, ulong table_id, bool is_transactional); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif #ifdef HAVE_REPLICATION Write_rows_compressed_log_event(const uchar *buf, uint event_len, @@ -5556,7 +5556,7 @@ class Update_rows_compressed_log_event : public Update_rows_log_event #if defined(MYSQL_SERVER) Update_rows_compressed_log_event(THD*, TABLE*, ulong table_id, bool is_transactional); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif #ifdef HAVE_REPLICATION Update_rows_compressed_log_event(const uchar *buf, uint event_len, @@ -5640,7 +5640,7 @@ class Delete_rows_compressed_log_event : public Delete_rows_log_event public: #if defined(MYSQL_SERVER) Delete_rows_compressed_log_event(THD*, TABLE*, ulong, bool is_transactional); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif #ifdef HAVE_REPLICATION Delete_rows_compressed_log_event(const uchar *buf, uint event_len, @@ -5733,8 +5733,8 @@ class Incident_log_event : public Log_event { #ifdef MYSQL_SERVER void pack_info(Protocol*); - virtual bool write_data_header(); - virtual bool write_data_body(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); #endif Incident_log_event(const uchar *buf, uint event_len, @@ -5871,9 +5871,7 @@ class Heartbeat_log_event: public Log_event inline int Log_event_writer::write(Log_event *ev) { - ev->writer= this; - int res= ev->write(); - IF_DBUG(ev->writer= 0,); // writer must be set before every Log_event::write + int res= ev->write(this); add_status(ev->logged_status()); return res; } diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 1990103598e..b320d6357bc 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1777,7 +1777,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi) #ifndef MYSQL_CLIENT -bool Old_rows_log_event::write_data_header() +bool Old_rows_log_event::write_data_header(Log_event_writer *writer) { uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer @@ -1789,15 +1789,15 @@ bool Old_rows_log_event::write_data_header() { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return write_data(buf, 6); + return write_data(writer, buf, 6); }); int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); - return write_data(buf, ROWS_HEADER_LEN); + return write_data(writer, buf, ROWS_HEADER_LEN); } -bool Old_rows_log_event::write_data_body() +bool Old_rows_log_event::write_data_body(Log_event_writer *writer) { /* Note that this should be the number of *bits*, not the number of @@ -1814,12 +1814,12 @@ bool Old_rows_log_event::write_data_body() DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); - res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf)); + res= res || write_data(writer, sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); - res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); + res= res || write_data(writer, (uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); DBUG_DUMP("rows", m_rows_buf, data_size); - res= res || write_data(m_rows_buf, (size_t) data_size); + res= res || write_data(writer, m_rows_buf, (size_t) data_size); return res; diff --git a/sql/log_event_old.h b/sql/log_event_old.h index e5aaacec209..9f92c66aba4 100644 --- a/sql/log_event_old.h +++ b/sql/log_event_old.h @@ -134,8 +134,8 @@ class Old_rows_log_event : public Log_event ulong get_table_id() const { return m_table_id; } #ifndef MYSQL_CLIENT - virtual bool write_data_header(); - virtual bool write_data_body(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); virtual const char *get_db() { return m_table->s->db.str; } #endif /* diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 09f9418f01e..5d07a6d853e 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -950,7 +950,7 @@ int Log_event_writer::write_footer() Log_event::write_header() */ -bool Log_event::write_header(size_t event_data_length) +bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length) { uchar header[LOG_EVENT_HEADER_LEN]; ulong now; @@ -1108,7 +1108,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src, will print! */ -bool Query_log_event::write() +bool Query_log_event::write(Log_event_writer *writer) { uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS]; uchar *start, *start_of_status; @@ -1361,16 +1361,16 @@ bool Query_log_event::write() event_length= ((uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len); - return write_header(event_length) || - write_data(buf, QUERY_HEADER_LEN) || - write_post_header_for_derived() || - write_data(start_of_status, (uint) status_vars_len) || - write_data(db, db_len + 1) || - write_data(query, q_len) || - write_footer(); + return write_header(writer, event_length) || + write_data(writer, buf, QUERY_HEADER_LEN) || + write_post_header_for_derived(writer) || + write_data(writer, start_of_status, (uint) status_vars_len) || + write_data(writer, db, db_len + 1) || + write_data(writer, query, q_len) || + write_footer(writer); } -bool Query_compressed_log_event::write() +bool Query_compressed_log_event::write(Log_event_writer *writer) { uchar *buffer; uint32 alloc_size, compressed_size; @@ -1389,7 +1389,7 @@ bool Query_compressed_log_event::write() uint32 q_len_tmp= q_len; query= (char*) buffer; q_len= compressed_size; - ret= Query_log_event::write(); + ret= Query_log_event::write(writer); query= query_tmp; q_len= q_len_tmp; } @@ -2506,7 +2506,7 @@ void Start_log_event_v3::pack_info(Protocol *protocol) #endif -bool Start_log_event_v3::write() +bool Start_log_event_v3::write(Log_event_writer *writer) { char buff[START_V3_HEADER_LEN]; int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); @@ -2514,9 +2514,9 @@ bool Start_log_event_v3::write() if (!dont_set_created) created= get_time(); // this sets when and when_sec_part as a side effect int4store(buff + ST_CREATED_OFFSET,created); - return write_header(sizeof(buff)) || - write_data(buff, sizeof(buff)) || - write_footer(); + return write_header(writer, sizeof(buff)) || + write_data(writer, buff, sizeof(buff)) || + write_footer(writer); } @@ -2606,7 +2606,7 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi) Format_description_log_event methods ****************************************************************************/ -bool Format_description_log_event::write() +bool Format_description_log_event::write(Log_event_writer *writer) { bool ret; bool no_checksum; @@ -2654,11 +2654,11 @@ bool Format_description_log_event::write() { checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway } - ret= write_header(rec_size) || - write_data(buff, sizeof(buff)) || - write_data(post_header_len, number_of_event_types) || - write_data(&checksum_byte, sizeof(checksum_byte)) || - write_footer(); + ret= write_header(writer, rec_size) || + write_data(writer, buff, sizeof(buff)) || + write_data(writer, post_header_len, number_of_event_types) || + write_data(writer, &checksum_byte, sizeof(checksum_byte)) || + write_footer(writer); if (no_checksum) checksum_alg= BINLOG_CHECKSUM_ALG_OFF; return ret; @@ -2933,7 +2933,7 @@ void Load_log_event::pack_info(Protocol *protocol) #endif /* defined(HAVE_REPLICATION) */ -bool Load_log_event::write_data_header() +bool Load_log_event::write_data_header(Log_event_writer *writer) { char buf[LOAD_HEADER_LEN]; int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id); @@ -2942,23 +2942,23 @@ bool Load_log_event::write_data_header() buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_DB_LEN_OFFSET] = (char)db_len; int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); - return write_data(buf, LOAD_HEADER_LEN) != 0; + return write_data(writer, buf, LOAD_HEADER_LEN) != 0; } -bool Load_log_event::write_data_body() +bool Load_log_event::write_data_body(Log_event_writer *writer) { if (sql_ex.write_data(writer)) return 1; if (num_fields && fields && field_lens) { - if (write_data(field_lens, num_fields) || - write_data(fields, field_block_len)) + if (write_data(writer, field_lens, num_fields) || + write_data(writer, fields, field_block_len)) return 1; } - return (write_data(table_name, table_name_len + 1) || - write_data(db, db_len + 1) || - write_data(fname, fname_len)); + return (write_data(writer, table_name, table_name_len + 1) || + write_data(writer, db, db_len + 1) || + write_data(writer, fname, fname_len)); } @@ -3412,14 +3412,14 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg, } -bool Rotate_log_event::write() +bool Rotate_log_event::write(Log_event_writer *writer) { char buf[ROTATE_HEADER_LEN]; int8store(buf + R_POS_OFFSET, pos); - return (write_header(ROTATE_HEADER_LEN + ident_len) || - write_data(buf, ROTATE_HEADER_LEN) || - write_data(new_log_ident, (uint) ident_len) || - write_footer()); + return (write_header(writer, ROTATE_HEADER_LEN + ident_len) || + write_data(writer, buf, ROTATE_HEADER_LEN) || + write_data(writer, new_log_ident, (uint) ident_len) || + write_footer(writer)); } @@ -3569,14 +3569,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( } -bool Binlog_checkpoint_log_event::write() +bool Binlog_checkpoint_log_event::write(Log_event_writer *writer) { uchar buf[BINLOG_CHECKPOINT_HEADER_LEN]; int4store(buf, binlog_file_len); - return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || - write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) || - write_data(binlog_file_name, binlog_file_len) || - write_footer(); + return write_header(writer, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || + write_data(writer, buf, BINLOG_CHECKPOINT_HEADER_LEN) || + write_data(writer, binlog_file_name, binlog_file_len) || + write_footer(writer); } @@ -3702,7 +3702,7 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len, bool -Gtid_log_event::write() +Gtid_log_event::write(Log_event_writer *writer) { uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4]; size_t write_len= 13; @@ -3750,9 +3750,9 @@ Gtid_log_event::write() bzero(buf+write_len, GTID_HEADER_LEN-write_len); write_len= GTID_HEADER_LEN; } - return write_header(write_len) || - write_data(buf, write_len) || - write_footer(); + return write_header(writer, write_len) || + write_data(writer, buf, write_len) || + write_footer(writer); } @@ -4040,7 +4040,7 @@ Gtid_list_log_event::to_packet(String *packet) bool -Gtid_list_log_event::write() +Gtid_list_log_event::write(Log_event_writer *writer) { char buf[128]; String packet(buf, sizeof(buf), system_charset_info); @@ -4048,9 +4048,9 @@ Gtid_list_log_event::write() packet.length(0); if (to_packet(&packet)) return true; - return write_header(get_data_size()) || - write_data(packet.ptr(), packet.length()) || - write_footer(); + return write_header(writer, get_data_size()) || + write_data(writer, packet.ptr(), packet.length()) || + write_footer(writer); } @@ -4144,14 +4144,14 @@ void Intvar_log_event::pack_info(Protocol *protocol) #endif -bool Intvar_log_event::write() +bool Intvar_log_event::write(Log_event_writer *writer) { uchar buf[9]; buf[I_TYPE_OFFSET]= (uchar) type; int8store(buf + I_VAL_OFFSET, val); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -4223,14 +4223,14 @@ void Rand_log_event::pack_info(Protocol *protocol) #endif -bool Rand_log_event::write() +bool Rand_log_event::write(Log_event_writer *writer) { uchar buf[16]; int8store(buf + RAND_SEED1_OFFSET, seed1); int8store(buf + RAND_SEED2_OFFSET, seed2); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -4479,12 +4479,12 @@ int Xid_log_event::do_commit() #endif -bool Xid_log_event::write() +bool Xid_log_event::write(Log_event_writer *writer) { DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return write_header(sizeof(xid)) || - write_data((uchar*)&xid, sizeof(xid)) || - write_footer(); + return write_header(writer, sizeof(xid)) || + write_data(writer, (uchar*)&xid, sizeof(xid)) || + write_footer(writer); } /************************************************************************** @@ -4529,7 +4529,7 @@ int XA_prepare_log_event::do_commit() #endif // HAVE_REPLICATION -bool XA_prepare_log_event::write() +bool XA_prepare_log_event::write(Log_event_writer *writer) { uchar data[1 + 4 + 4 + 4]= {one_phase,}; uint8 one_phase_byte= one_phase; @@ -4540,14 +4540,14 @@ bool XA_prepare_log_event::write() DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1); - return write_header(sizeof(one_phase_byte) + xid_subheader_no_data + + return write_header(writer, sizeof(one_phase_byte) + xid_subheader_no_data + static_cast<XID*>(xid)->gtrid_length + static_cast<XID*>(xid)->bqual_length) || - write_data(data, sizeof(data)) || - write_data((uchar*) static_cast<XID*>(xid)->data, + write_data(writer, data, sizeof(data)) || + write_data(writer, (uchar*) static_cast<XID*>(xid)->data, static_cast<XID*>(xid)->gtrid_length + static_cast<XID*>(xid)->bqual_length) || - write_footer(); + write_footer(writer); } @@ -4671,7 +4671,7 @@ void User_var_log_event::pack_info(Protocol* protocol) #endif // HAVE_REPLICATION -bool User_var_log_event::write() +bool User_var_log_event::write(Log_event_writer *writer) { char buf[UV_NAME_LEN_SIZE]; char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + @@ -4726,13 +4726,13 @@ bool User_var_log_event::write() /* Length of the whole event */ event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len; - return write_header(event_length) || - write_data(buf, sizeof(buf)) || - write_data(name, name_len) || - write_data(buf1, buf1_length) || - write_data(pos, val_len) || - write_data(&flags, unsigned_len) || - write_footer(); + return write_header(writer, event_length) || + write_data(writer, buf, sizeof(buf)) || + write_data(writer, name, name_len) || + write_data(writer, buf1, buf1_length) || + write_data(writer, pos, val_len) || + write_data(writer, &flags, unsigned_len) || + write_footer(writer); } @@ -4951,13 +4951,13 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex, Create_file_log_event::write_data_body() */ -bool Create_file_log_event::write_data_body() +bool Create_file_log_event::write_data_body(Log_event_writer *writer) { bool res; - if ((res= Load_log_event::write_data_body()) || fake_base) + if ((res= Load_log_event::write_data_body(writer)) || fake_base) return res; - return write_data("", 1) || - write_data(block, block_len); + return write_data(writer, "", 1) || + write_data(writer, block, block_len); } @@ -4965,14 +4965,14 @@ bool Create_file_log_event::write_data_body() Create_file_log_event::write_data_header() */ -bool Create_file_log_event::write_data_header() +bool Create_file_log_event::write_data_header(Log_event_writer *writer) { bool res; uchar buf[CREATE_FILE_HEADER_LEN]; - if ((res= Load_log_event::write_data_header()) || fake_base) + if ((res= Load_log_event::write_data_header(writer)) || fake_base) return res; int4store(buf + CF_FILE_ID_OFFSET, file_id); - return write_data(buf, CREATE_FILE_HEADER_LEN) != 0; + return write_data(writer, buf, CREATE_FILE_HEADER_LEN) != 0; } @@ -4980,11 +4980,11 @@ bool Create_file_log_event::write_data_header() Create_file_log_event::write_base() */ -bool Create_file_log_event::write_base() +bool Create_file_log_event::write_base(Log_event_writer *writer) { bool res; fake_base= 1; // pretend we are Load event - res= write(); + res= write(writer); fake_base= 0; return res; } @@ -5051,8 +5051,7 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi) // a trick to avoid allocating another buffer fname= fname_buf; fname_len= (uint) (strmov(ext, ".data") - fname); - writer= &lew; - if (write_base()) + if (write_base(&lew)) { strmov(ext, ".info"); // to have it right in the error message rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), @@ -5110,14 +5109,14 @@ Append_block_log_event::Append_block_log_event(THD *thd_arg, } -bool Append_block_log_event::write() +bool Append_block_log_event::write(Log_event_writer *writer) { uchar buf[APPEND_BLOCK_HEADER_LEN]; int4store(buf + AB_FILE_ID_OFFSET, file_id); - return write_header(APPEND_BLOCK_HEADER_LEN + block_len) || - write_data(buf, APPEND_BLOCK_HEADER_LEN) || - write_data(block, block_len) || - write_footer(); + return write_header(writer, APPEND_BLOCK_HEADER_LEN + block_len) || + write_data(writer, buf, APPEND_BLOCK_HEADER_LEN) || + write_data(writer, block, block_len) || + write_footer(writer); } @@ -5220,13 +5219,13 @@ Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg, } -bool Delete_file_log_event::write() +bool Delete_file_log_event::write(Log_event_writer *writer) { uchar buf[DELETE_FILE_HEADER_LEN]; int4store(buf + DF_FILE_ID_OFFSET, file_id); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -5268,13 +5267,13 @@ Execute_load_log_event::Execute_load_log_event(THD *thd_arg, } -bool Execute_load_log_event::write() +bool Execute_load_log_event::write(Log_event_writer *writer) { uchar buf[EXEC_LOAD_HEADER_LEN]; int4store(buf + EL_FILE_ID_OFFSET, file_id); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -5433,14 +5432,14 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, bool -Execute_load_query_log_event::write_post_header_for_derived() +Execute_load_query_log_event::write_post_header_for_derived(Log_event_writer *writer) { uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN]; int4store(buf, file_id); int4store(buf + 4, fn_pos_start); int4store(buf + 4 + 4, fn_pos_end); *(buf + 4 + 4 + 4)= (uchar) dup_handling; - return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); + return write_data(writer, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); } @@ -6405,7 +6404,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) #endif /* defined(HAVE_REPLICATION) */ -bool Rows_log_event::write_data_header() +bool Rows_log_event::write_data_header(Log_event_writer *writer) { uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer DBUG_ASSERT(m_table_id != ~0ULL); @@ -6413,14 +6412,14 @@ bool Rows_log_event::write_data_header() { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (write_data(buf, 6)); + return (write_data(writer, buf, 6)); }); int6store(buf + RW_MAPID_OFFSET, m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); - return write_data(buf, ROWS_HEADER_LEN); + return write_data(writer, buf, ROWS_HEADER_LEN); } -bool Rows_log_event::write_data_body() +bool Rows_log_event::write_data_body(Log_event_writer *writer) { /* Note that this should be the number of *bits*, not the number of @@ -6433,10 +6432,10 @@ bool Rows_log_event::write_data_body() DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); - res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf)); + res= res || write_data(writer, sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); - res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); + res= res || write_data(writer, (uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); /* TODO[refactor write]: Remove the "down cast" here (and elsewhere). */ @@ -6444,17 +6443,17 @@ bool Rows_log_event::write_data_body() { DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai)); - res= res || write_data((uchar*)m_cols_ai.bitmap, + res= res || write_data(writer, (uchar*)m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai)); } DBUG_DUMP("rows", m_rows_buf, data_size); - res= res || write_data(m_rows_buf, (size_t) data_size); + res= res || write_data(writer, m_rows_buf, (size_t) data_size); return res; } -bool Rows_log_event::write_compressed() +bool Rows_log_event::write_compressed(Log_event_writer *writer) { uchar *m_rows_buf_tmp= m_rows_buf; uchar *m_rows_cur_tmp= m_rows_cur; @@ -6468,7 +6467,7 @@ bool Rows_log_event::write_compressed() (uint32)(m_rows_cur_tmp - m_rows_buf_tmp), &comlen)) { m_rows_cur= comlen + m_rows_buf; - ret= Log_event::write(); + ret= Log_event::write(writer); } my_safe_afree(m_rows_buf, alloc_size); m_rows_buf= m_rows_buf_tmp; @@ -6510,15 +6509,15 @@ Annotate_rows_log_event::Annotate_rows_log_event(THD *thd, } -bool Annotate_rows_log_event::write_data_header() +bool Annotate_rows_log_event::write_data_header(Log_event_writer *writer) { return 0; } -bool Annotate_rows_log_event::write_data_body() +bool Annotate_rows_log_event::write_data_body(Log_event_writer *writer) { - return write_data(m_query_txt, m_query_len); + return write_data(writer, m_query_txt, m_query_len); } @@ -6964,7 +6963,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) #endif /* defined(HAVE_REPLICATION) */ -bool Table_map_log_event::write_data_header() +bool Table_map_log_event::write_data_header(Log_event_writer *writer) { DBUG_ASSERT(m_table_id != ~0ULL); uchar buf[TABLE_MAP_HEADER_LEN]; @@ -6972,14 +6971,14 @@ bool Table_map_log_event::write_data_header() { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (write_data(buf, 6)); + return (write_data(writer, buf, 6)); }); int6store(buf + TM_MAPID_OFFSET, m_table_id); int2store(buf + TM_FLAGS_OFFSET, m_flags); - return write_data(buf, TABLE_MAP_HEADER_LEN); + return write_data(writer, buf, TABLE_MAP_HEADER_LEN); } -bool Table_map_log_event::write_data_body() +bool Table_map_log_event::write_data_body(Log_event_writer *writer) { DBUG_ASSERT(m_dbnam != NULL); DBUG_ASSERT(m_tblnam != NULL); @@ -7000,17 +6999,17 @@ bool Table_map_log_event::write_data_body() uchar mbuf[MAX_INT_WIDTH]; uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size); - return write_data(dbuf, sizeof(dbuf)) || - write_data(m_dbnam, m_dblen+1) || - write_data(tbuf, sizeof(tbuf)) || - write_data(m_tblnam, m_tbllen+1) || - write_data(cbuf, (size_t) (cbuf_end - cbuf)) || - write_data(m_coltype, m_colcnt) || - write_data(mbuf, (size_t) (mbuf_end - mbuf)) || - write_data(m_field_metadata, m_field_metadata_size), - write_data(m_null_bits, (m_colcnt + 7) / 8) || - write_data((const uchar*) m_metadata_buf.ptr(), - m_metadata_buf.length()); + return write_data(writer, dbuf, sizeof(dbuf)) || + write_data(writer, m_dbnam, m_dblen+1) || + write_data(writer, tbuf, sizeof(tbuf)) || + write_data(writer, m_tblnam, m_tbllen+1) || + write_data(writer, cbuf, (size_t) (cbuf_end - cbuf)) || + write_data(writer, m_coltype, m_colcnt) || + write_data(writer, mbuf, (size_t) (mbuf_end - mbuf)) || + write_data(writer, m_field_metadata, m_field_metadata_size), + write_data(writer, m_null_bits, (m_colcnt + 7) / 8) || + write_data(writer, (const uchar*) m_metadata_buf.ptr(), + m_metadata_buf.length()); } /** @@ -7438,9 +7437,9 @@ Write_rows_compressed_log_event::Write_rows_compressed_log_event( m_type = WRITE_ROWS_COMPRESSED_EVENT_V1; } -bool Write_rows_compressed_log_event::write() +bool Write_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } @@ -8553,9 +8552,9 @@ Delete_rows_compressed_log_event::Delete_rows_compressed_log_event( m_type= DELETE_ROWS_COMPRESSED_EVENT_V1; } -bool Delete_rows_compressed_log_event::write() +bool Delete_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } @@ -8702,9 +8701,9 @@ Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1; } -bool Update_rows_compressed_log_event::write() +bool Update_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } void Update_rows_log_event::init(MY_BITMAP const *cols) @@ -8993,23 +8992,23 @@ int Incident_log_event::do_apply_event(rpl_group_info *rgi) bool -Incident_log_event::write_data_header() +Incident_log_event::write_data_header(Log_event_writer *writer) { DBUG_ENTER("Incident_log_event::write_data_header"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); uchar buf[sizeof(int16)]; int2store(buf, (int16) m_incident); - DBUG_RETURN(write_data(buf, sizeof(buf))); + DBUG_RETURN(write_data(writer, buf, sizeof(buf))); } bool -Incident_log_event::write_data_body() +Incident_log_event::write_data_body(Log_event_writer *writer) { uchar tmp[1]; DBUG_ENTER("Incident_log_event::write_data_body"); tmp[0]= (uchar) m_message.length; - DBUG_RETURN(write_data(tmp, sizeof(tmp)) || - write_data(m_message.str, m_message.length)); + DBUG_RETURN(write_data(writer, tmp, sizeof(tmp)) || + write_data(writer, m_message.str, m_message.length)); } -- 2.30.2
This is a preparatory commit for pre-computing checksums outside of holding LOCK_log, no functional changes. Which checksum algorithm is used (if any) when writing an event does not belong in the event, it is a property of the log being written to. Instead decide the checksum algorithm when constructing the Log_event_writer object, and store it there. Introduce a client-only Log_event::read_checksum_alg to be able to print the checksum read, and a Format_description_log_event::source_checksum_alg which is the checksum algorithm (if any) to use when reading events from a log. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- .../r/binlog_mysqlbinlog_raw_flush.result | 1 + .../t/binlog_mysqlbinlog_raw_flush.test | 2 + sql/log.cc | 104 +++++++++------- sql/log.h | 12 +- sql/log_event.cc | 27 +++-- sql/log_event.h | 52 +++++--- sql/log_event_client.cc | 22 ++-- sql/log_event_server.cc | 114 ++++-------------- sql/slave.cc | 60 ++++----- sql/sql_repl.cc | 2 +- sql/wsrep_binlog.cc | 6 +- sql/wsrep_mysqld.cc | 12 +- 12 files changed, 202 insertions(+), 212 deletions(-) diff --git a/mysql-test/suite/binlog/r/binlog_mysqlbinlog_raw_flush.result b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_raw_flush.result index 294e96e5997..d697788047f 100644 --- a/mysql-test/suite/binlog/r/binlog_mysqlbinlog_raw_flush.result +++ b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_raw_flush.result @@ -1,3 +1,4 @@ +RESET MASTER; # # MDEV-30698 Cover missing test cases for mariadb-binlog options # --raw [and] --flashback diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_raw_flush.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_raw_flush.test index 252a8577b6c..7d0a96fc087 100644 --- a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_raw_flush.test +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_raw_flush.test @@ -21,6 +21,8 @@ --source include/linux.inc --source include/have_log_bin.inc +RESET MASTER; + --echo # --echo # MDEV-30698 Cover missing test cases for mariadb-binlog options --echo # --raw [and] --flashback diff --git a/sql/log.cc b/sql/log.cc index ab7c9e8ba0d..bcb89e79dd0 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -3886,32 +3886,34 @@ bool MYSQL_BIN_LOG::open(const char *log_name, In 4.x we put Start event only in the first binlog. But from 5.0 we want a Start event even if this is not the very first binlog. */ - Format_description_log_event s(BINLOG_VERSION); - /* - don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache - as we won't be able to reset it later - */ - if (io_cache_type == WRITE_CACHE) - s.flags |= LOG_EVENT_BINLOG_IN_USE_F; - + enum enum_binlog_checksum_alg checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; if (is_relay_log) { if (relay_log_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) relay_log_checksum_alg= opt_slave_sql_verify_checksum ? (enum_binlog_checksum_alg) binlog_checksum_options : BINLOG_CHECKSUM_ALG_OFF; - s.checksum_alg= relay_log_checksum_alg; - s.set_relay_log_event(); + checksum_alg= relay_log_checksum_alg; } else - s.checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; + checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; + DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + + Format_description_log_event s(BINLOG_VERSION, NULL, checksum_alg); + if (is_relay_log) + s.set_relay_log_event(); + /* + don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache + as we won't be able to reset it later + */ + if (io_cache_type == WRITE_CACHE) + s.flags |= LOG_EVENT_BINLOG_IN_USE_F; crypto.scheme = 0; - DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if (!s.is_valid()) goto err; s.dont_set_created= null_created_arg; - if (write_event(&s)) + if (write_event(&s, checksum_alg)) goto err; bytes_written+= s.data_written; @@ -3930,8 +3932,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, goto err; Start_encryption_log_event sele(1, key_version, crypto.nonce); - sele.checksum_alg= s.checksum_alg; - if (write_event(&sele)) + if (write_event(&sele, checksum_alg)) goto err; // Start_encryption_log_event is written, enable the encryption @@ -4057,7 +4058,8 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* Don't set log_pos in event header */ description_event_for_queue->set_artificial_event(); - if (write_event(description_event_for_queue)) + if (write_event(description_event_for_queue, + description_event_for_queue->used_checksum_alg)) goto err; bytes_written+= description_event_for_queue->data_written; } @@ -5500,17 +5502,19 @@ int MYSQL_BIN_LOG::new_file_impl() */ Rotate_log_event r(new_name + dirname_length(new_name), 0, LOG_EVENT_OFFSET, is_relay_log ? Rotate_log_event::RELAY_LOG : 0); + enum enum_binlog_checksum_alg checksum_alg = BINLOG_CHECKSUM_ALG_UNDEF; /* The current relay-log's closing Rotate event must have checksum value computed with an algorithm of the last relay-logged FD event. */ if (is_relay_log) - r.checksum_alg= relay_log_checksum_alg; - DBUG_ASSERT(!is_relay_log || - relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + checksum_alg= relay_log_checksum_alg; + else + checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; + DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if ((DBUG_IF("fault_injection_new_file_rotate_event") && (error= close_on_error= TRUE)) || - (error= write_event(&r))) + (error= write_event(&r, checksum_alg))) { DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno= 2;); close_on_error= TRUE; @@ -5627,10 +5631,22 @@ int MYSQL_BIN_LOG::new_file_impl() DBUG_RETURN(error); } -bool MYSQL_BIN_LOG::write_event(Log_event *ev, binlog_cache_data *cache_data, +bool MYSQL_BIN_LOG::write_event(Log_event *ev, binlog_cache_data *data, IO_CACHE *file) { - Log_event_writer writer(file, 0, &crypto); + return write_event(ev, ev->select_checksum_alg(), data, file); +} + +bool MYSQL_BIN_LOG::write_event(Log_event *ev) +{ + return write_event(ev, ev->select_checksum_alg(), 0, &log_file); +} + +bool MYSQL_BIN_LOG::write_event(Log_event *ev, + enum enum_binlog_checksum_alg checksum_alg, + binlog_cache_data *cache_data, IO_CACHE *file) +{ + Log_event_writer writer(file, 0, checksum_alg, &crypto); if (crypto.scheme && file == &log_file) { writer.ctx= alloca(crypto.ctx_size); @@ -5641,17 +5657,19 @@ bool MYSQL_BIN_LOG::write_event(Log_event *ev, binlog_cache_data *cache_data, return writer.write(ev); } -bool MYSQL_BIN_LOG::append(Log_event *ev) +bool MYSQL_BIN_LOG::append(Log_event *ev, + enum enum_binlog_checksum_alg checksum_alg) { bool res; mysql_mutex_lock(&LOCK_log); - res= append_no_lock(ev); + res= append_no_lock(ev, checksum_alg); mysql_mutex_unlock(&LOCK_log); return res; } -bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) +bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev, + enum enum_binlog_checksum_alg checksum_alg) { bool error = 0; DBUG_ENTER("MYSQL_BIN_LOG::append"); @@ -5659,7 +5677,7 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) mysql_mutex_assert_owner(&LOCK_log); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); - if (write_event(ev)) + if (write_event(ev, checksum_alg)) { error=1; goto err; @@ -6051,7 +6069,8 @@ THD::binlog_start_trans_and_stmt() uchar *buf= 0; size_t len= 0; IO_CACHE tmp_io_cache; - Log_event_writer writer(&tmp_io_cache, 0); + // Replicated events in writeset doesn't have checksum + Log_event_writer writer(&tmp_io_cache, 0, BINLOG_CHECKSUM_ALG_OFF, NULL); if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, 128, MYF(MY_WME))) { @@ -6066,8 +6085,6 @@ THD::binlog_start_trans_and_stmt() } Gtid_log_event gtid_event(this, seqno, domain_id, true, LOG_EVENT_SUPPRESS_USE_F, true, 0); - // Replicated events in writeset doesn't have checksum - gtid_event.checksum_alg= BINLOG_CHECKSUM_ALG_OFF; gtid_event.server_id= server_id; writer.write(>id_event); wsrep_write_cache_buf(&tmp_io_cache, &buf, &len); @@ -6270,7 +6287,7 @@ bool THD::binlog_write_table_map(TABLE *table, bool with_annotate) binlog_cache_data *cache_data= (cache_mngr-> get_binlog_cache_data(is_transactional)); IO_CACHE *file= &cache_data->cache_log; - Log_event_writer writer(file, cache_data); + Log_event_writer writer(file, cache_data, the_event.select_checksum_alg(), NULL); if (with_annotate) if (binlog_write_annotated_row(&writer)) @@ -6424,7 +6441,8 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (Rows_log_event* pending= cache_data->pending()) { - Log_event_writer writer(&cache_data->cache_log, cache_data); + Log_event_writer writer(&cache_data->cache_log, cache_data, + pending->select_checksum_alg(), NULL); /* Write pending event to the cache. @@ -7472,11 +7490,13 @@ class CacheWriter: public Log_event_writer public: size_t remains; - CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum, + CacheWriter(THD *thd_arg, IO_CACHE *file_arg, + enum enum_binlog_checksum_alg checksum_alg, Binlog_crypt_data *cr) - : Log_event_writer(file_arg, 0, cr), remains(0), thd(thd_arg), + : Log_event_writer(file_arg, 0, checksum_alg, cr), remains(0), thd(thd_arg), first(true) - { checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; } + { + } ~CacheWriter() { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); } @@ -7527,7 +7547,9 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) size_t val; size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t uchar header[LOG_EVENT_HEADER_LEN]; - CacheWriter writer(thd, &log_file, binlog_checksum_options, &crypto); + CacheWriter writer(thd, &log_file, + (enum_binlog_checksum_alg)binlog_checksum_options, + &crypto); if (crypto.scheme) { @@ -9070,11 +9092,11 @@ void MYSQL_BIN_LOG::close(uint exiting) { Stop_log_event s; // the checksumming rule for relay-log case is similar to Rotate - s.checksum_alg= is_relay_log ? relay_log_checksum_alg - : (enum_binlog_checksum_alg)binlog_checksum_options; - DBUG_ASSERT(!is_relay_log || - relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - write_event(&s); + enum enum_binlog_checksum_alg checksum_alg= is_relay_log ? + relay_log_checksum_alg : + (enum_binlog_checksum_alg)binlog_checksum_options; + DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + write_event(&s, checksum_alg); bytes_written+= s.data_written; flush_io_cache(&log_file); update_binlog_end_pos(); @@ -11242,7 +11264,7 @@ bool Recovery_context::decide_or_assess(xid_recovery_member *member, int round, if (truncate_gtid.seq_no == 0 /* was reset or never set */ || (truncate_set_in_1st && round == 2 /* reevaluted at round turn */)) { - if (set_truncate_coord(linfo, round, fdle->checksum_alg)) + if (set_truncate_coord(linfo, round, fdle->used_checksum_alg)) return true; } else diff --git a/sql/log.h b/sql/log.h index c20f0fe5a57..f02b20c12bf 100644 --- a/sql/log.h +++ b/sql/log.h @@ -829,12 +829,18 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG void stop_union_events(THD *thd); bool is_query_in_union(THD *thd, query_id_t query_id_param); + bool write_event(Log_event *ev, enum enum_binlog_checksum_alg checksum_alg, + binlog_cache_data *data, IO_CACHE *file); bool write_event(Log_event *ev, binlog_cache_data *data, IO_CACHE *file); - bool write_event(Log_event *ev) { return write_event(ev, 0, &log_file); } + bool write_event(Log_event *ev, enum enum_binlog_checksum_alg checksum_alg) + { + return write_event(ev, checksum_alg, 0, &log_file); + } + bool write_event(Log_event *ev); bool write_event_buffer(uchar* buf,uint len); - bool append(Log_event* ev); - bool append_no_lock(Log_event* ev); + bool append(Log_event* ev, enum enum_binlog_checksum_alg checksum_alg); + bool append_no_lock(Log_event* ev, enum enum_binlog_checksum_alg checksum_alg); void mark_xids_active(ulong cookie, uint xid_count); void mark_xid_done(ulong cookie, bool write_checkpoint); diff --git a/sql/log_event.cc b/sql/log_event.cc index 5e255646528..7825e25086f 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -718,8 +718,7 @@ const char* Log_event::get_type_str() Log_event::Log_event(const uchar *buf, const Format_description_log_event* description_event) - :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE), - checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) + :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE) { #ifndef MYSQL_CLIENT thd= 0; @@ -1017,7 +1016,8 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len, uint event_type= buf[EVENT_TYPE_OFFSET]; // all following START events in the current file are without checksum if (event_type == START_EVENT_V3) - (const_cast< Format_description_log_event *>(fdle))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + (const_cast< Format_description_log_event *>(fdle))->used_checksum_alg= + BINLOG_CHECKSUM_ALG_OFF; /* CRC verification by SQL and Show-Binlog-Events master side. The caller has to provide @fdle->checksum_alg to @@ -1038,7 +1038,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len, Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF. */ alg= (event_type != FORMAT_DESCRIPTION_EVENT) ? - fdle->checksum_alg : get_checksum_alg(buf, event_len); + fdle->used_checksum_alg : get_checksum_alg(buf, event_len); // Emulate the corruption during reading an event DBUG_EXECUTE_IF("corrupt_read_log_event_char", if (event_type != FORMAT_DESCRIPTION_EVENT) @@ -1258,11 +1258,10 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len, if (ev) { - ev->checksum_alg= alg; #ifdef MYSQL_CLIENT - if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) - ev->crc= uint4korr(buf + (event_len)); + ev->read_checksum_alg= alg; + if (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF) + ev->read_checksum_value= uint4korr(buf + (event_len)); #endif } @@ -2039,8 +2038,10 @@ Start_log_event_v3::Start_log_event_v3(const uchar *buf, uint event_len, */ Format_description_log_event:: -Format_description_log_event(uint8 binlog_ver, const char* server_ver) - :Start_log_event_v3(), event_type_permutation(0) +Format_description_log_event(uint8 binlog_ver, const char* server_ver, + enum enum_binlog_checksum_alg checksum_alg) + :Start_log_event_v3(), event_type_permutation(0), + used_checksum_alg(checksum_alg) { binlog_version= binlog_ver; switch (binlog_ver) { @@ -2205,7 +2206,6 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) } calc_server_version_split(); deduct_options_written_to_bin_log(); - checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; reset_crypto(); } @@ -2236,6 +2236,7 @@ Format_description_log_event(const uchar *buf, uint event_len, common_header_len(0), post_header_len(NULL), event_type_permutation(0) { DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)"); + used_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; if (!Start_log_event_v3::is_valid()) DBUG_VOID_RETURN; /* sanity check */ buf+= LOG_EVENT_MINIMAL_HEADER_LEN; @@ -2257,11 +2258,11 @@ Format_description_log_event(const uchar *buf, uint event_len, { /* the last bytes are the checksum alg desc and value (or value's room) */ number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN; - checksum_alg= (enum_binlog_checksum_alg)post_header_len[number_of_event_types]; + used_checksum_alg= (enum_binlog_checksum_alg)post_header_len[number_of_event_types]; } else { - checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; + used_checksum_alg= BINLOG_CHECKSUM_ALG_OFF; } deduct_options_written_to_bin_log(); reset_crypto(); diff --git a/sql/log_event.h b/sql/log_event.h index acdedb606c1..33f689c9330 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -991,6 +991,13 @@ class Log_event_writer public: ulonglong bytes_written; void *ctx; ///< Encryption context or 0 if no encryption is needed + /* + The length of a checksum written at the end of the event, if any. + Currently this is always either 0, when checksums are disabled, or + BINLOG_CHECKSUM_LEN when using BINLOG_CHECKSUM_ALG_CRC32. + (If we ever add another checksum algorithm, we will need to instead store + here the algorithm to use instead of just the length). + */ uint checksum_len; int write(Log_event *ev); int write_header(uchar *pos, size_t len); @@ -1001,11 +1008,29 @@ class Log_event_writer void set_incident(); void set_encrypted_writer() { encrypt_or_write= &Log_event_writer::encrypt_and_write; } + /* + Set a specific checksum setting. Used to ensure that + Format_description_log_event is always written with a checksum. + */ + enum enum_binlog_checksum_alg set_checksum_alg(enum enum_binlog_checksum_alg alg) + { + /* Must be adapted to store the actual algorithm if we add another. */ + enum enum_binlog_checksum_alg orig= + (checksum_len ? BINLOG_CHECKSUM_ALG_CRC32 : BINLOG_CHECKSUM_ALG_OFF); + checksum_len= + (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF) ? + BINLOG_CHECKSUM_LEN : 0; + return orig; + } Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg, - Binlog_crypt_data *cr= 0) + enum enum_binlog_checksum_alg checksum_alg, + Binlog_crypt_data *cr) :encrypt_or_write(&Log_event_writer::write_internal), bytes_written(0), ctx(0), + checksum_len(( checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ? + BINLOG_CHECKSUM_LEN : 0), file(file_arg), cache_data(cache_data_arg), crypto(cr) { } private: @@ -1323,7 +1348,9 @@ class Log_event } #else Log_event() : temp_buf(0), when(0), flags(0) {} - ha_checksum crc; + /* The checksum algorithm used (if any) when the event was read. */ + enum enum_binlog_checksum_alg read_checksum_alg; + ha_checksum read_checksum_value; /* print*() functions are used by mysqlbinlog */ virtual bool print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; bool print_timestamp(IO_CACHE* file, time_t *ts = 0); @@ -1405,16 +1432,6 @@ class Log_event static int read_log_event(IO_CACHE* file, String* packet, const Format_description_log_event *fdle, enum enum_binlog_checksum_alg checksum_alg_arg); - /* - The value is set by caller of FD constructor and - Log_event::write_header() for the rest. - In the FD case it's propagated into the last byte - of post_header_len[] at FD::write(). - On the slave side the value is assigned from post_header_len[last] - of the last seen FD event. - */ - enum enum_binlog_checksum_alg checksum_alg; - static void *operator new(size_t size) { extern PSI_memory_key key_memory_log_event; @@ -1439,7 +1456,7 @@ class Log_event bool write_footer(Log_event_writer *writer) { return writer->write_footer(); } - my_bool need_checksum(); + enum enum_binlog_checksum_alg select_checksum_alg(); virtual bool write(Log_event_writer *writer) { @@ -2877,8 +2894,15 @@ class Format_description_log_event: public Start_log_event_v3 master_version_split server_version_split; const uint8 *event_type_permutation; uint32 options_written_to_bin_log; + /* + The checksum algorithm used in the binlog or relaylog following this + Format_description_event. Or BINLOG_CHECKSUM_ALG_UNDEF for a + Format_description_event which is not part of a binlog or relaylog file. + */ + enum enum_binlog_checksum_alg used_checksum_alg; - Format_description_log_event(uint8 binlog_ver, const char* server_ver=0); + Format_description_log_event(uint8 binlog_ver, const char* server_ver= 0, + enum enum_binlog_checksum_alg checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF); Format_description_log_event(const uchar *buf, uint event_len, const Format_description_log_event *description_event); diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc index 4ae8bffcad7..a4d3ffae085 100644 --- a/sql/log_event_client.cc +++ b/sql/log_event_client.cc @@ -346,14 +346,14 @@ bool Log_event::print_header(IO_CACHE* file, /* print the checksum */ - if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + if (read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { char checksum_buf[BINLOG_CHECKSUM_LEN * 2 + 4]; // to fit to "%p " size_t const bytes_written= - my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08x ", crc); + my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08x ", read_checksum_value); if (my_b_printf(file, "%s ", get_type(&binlog_checksum_typelib, - checksum_alg)) || + read_checksum_alg)) || my_b_printf(file, checksum_buf, bytes_written)) goto err; } @@ -1604,8 +1604,8 @@ bool Log_event::print_base64(IO_CACHE* file, uint tmp_size= size; Rows_log_event *ev= NULL; Log_event_type ev_type = (enum Log_event_type) ptr[EVENT_TYPE_OFFSET]; - if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && - checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + if (read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && + read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF) tmp_size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header switch (ev_type) { case WRITE_ROWS_EVENT: @@ -1672,8 +1672,8 @@ bool Log_event::print_base64(IO_CACHE* file, Rows_log_event *ev= NULL; Log_event_type et= (Log_event_type) ptr[EVENT_TYPE_OFFSET]; - if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && - checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + if (read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && + read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF) size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header switch (et) @@ -3680,7 +3680,7 @@ bool Write_rows_compressed_log_event::print(FILE *file, ulong len; bool is_malloc = false; if(!row_log_event_uncompress(glob_description_event, - checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) { @@ -3717,7 +3717,7 @@ bool Delete_rows_compressed_log_event::print(FILE *file, ulong len; bool is_malloc = false; if(!row_log_event_uncompress(glob_description_event, - checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) { @@ -3754,7 +3754,7 @@ Update_rows_compressed_log_event::print(FILE *file, ulong len; bool is_malloc= false; if(!row_log_event_uncompress(glob_description_event, - checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) { diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 5d07a6d853e..bee594291d6 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -574,8 +574,7 @@ int append_query_string(CHARSET_INFO *csinfo, String *to, **************************************************************************/ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) - :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg), - checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) + :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg) { server_id= thd->variables.server_id; when= thd->start_time; @@ -599,7 +598,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) Log_event::Log_event() :temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE), - thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) + thd(0) { server_id= global_system_variables.server_id; /* @@ -737,82 +736,17 @@ void Log_event::init_show_field_list(THD *thd, List<Item>* field_list) } /** - A decider of whether to trigger checksum computation or not. - To be invoked in Log_event::write() stack. - The decision is positive - - S,M) if it's been marked for checksumming with @c checksum_alg - - M) otherwise, if @@global.binlog_checksum is not NONE and the event is - directly written to the binlog file. - The to-be-cached event decides at @c write_cache() time. - - Otherwise the decision is negative. - - @note A side effect of the method is altering Log_event::checksum_alg - it the latter was undefined at calling. - - @return true Checksum should be used. Log_event::checksum_alg is set. - @return false No checksum + Select if and how to write checksum for an event written to the binlog. + It returns the actively configured binlog checksum option, unless the event + is being written to a cache (in which case the checksum, if any, is added + later when the cache is copied to the real binlog). */ - -my_bool Log_event::need_checksum() +enum enum_binlog_checksum_alg Log_event::select_checksum_alg() { - my_bool ret; - DBUG_ENTER("Log_event::need_checksum"); - - /* - few callers of Log_event::write - (incl FD::write, FD constructing code on the slave side, Rotate relay log - and Stop event) - provides their checksum alg preference through Log_event::checksum_alg. - */ - if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) - ret= checksum_alg != BINLOG_CHECKSUM_ALG_OFF; + if (cache_type == Log_event::EVENT_NO_CACHE) + return (enum_binlog_checksum_alg)binlog_checksum_options; else - { - ret= binlog_checksum_options && cache_type == Log_event::EVENT_NO_CACHE; - checksum_alg= ret ? (enum_binlog_checksum_alg)binlog_checksum_options - : BINLOG_CHECKSUM_ALG_OFF; - } - /* - FD calls the methods before data_written has been calculated. - The following invariant claims if the current is not the first - call (and therefore data_written is not zero) then `ret' must be - TRUE. It may not be null because FD is always checksummed. - */ - - DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret || - data_written == 0); - - DBUG_ASSERT(!ret || - ((checksum_alg == binlog_checksum_options || - /* - Stop event closes the relay-log and its checksum alg - preference is set by the caller can be different - from the server's binlog_checksum_options. - */ - get_type_code() == STOP_EVENT || - /* - Rotate:s can be checksummed regardless of the server's - binlog_checksum_options. That applies to both - the local RL's Rotate and the master's Rotate - which IO thread instantiates via queue_binlog_ver_3_event. - */ - get_type_code() == ROTATE_EVENT || - get_type_code() == START_ENCRYPTION_EVENT || - /* FD is always checksummed */ - get_type_code() == FORMAT_DESCRIPTION_EVENT) && - checksum_alg != BINLOG_CHECKSUM_ALG_OFF)); - - DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - - DBUG_ASSERT(((get_type_code() != ROTATE_EVENT && - get_type_code() != STOP_EVENT) || - get_type_code() != FORMAT_DESCRIPTION_EVENT) || - cache_type == Log_event::EVENT_NO_CACHE); - - DBUG_RETURN(ret); + return BINLOG_CHECKSUM_ALG_OFF; } int Log_event_writer::write_internal(const uchar *pos, size_t len) @@ -959,8 +893,6 @@ bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length) (longlong) writer->pos(), event_data_length, (int) get_type_code())); - writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0; - /* Store number of bytes that will be written by this event */ data_written= event_data_length + sizeof(header) + writer->checksum_len; @@ -2609,7 +2541,6 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi) bool Format_description_log_event::write(Log_event_writer *writer) { bool ret; - bool no_checksum; /* We don't call Start_log_event_v3::write() because this would make 2 my_b_safe_write(). @@ -2632,11 +2563,9 @@ bool Format_description_log_event::write(Log_event_writer *writer) FD_queue checksum_alg value. */ compile_time_assert(BINLOG_CHECKSUM_ALG_DESC_LEN == 1); -#ifdef DBUG_ASSERT_EXISTS - data_written= 0; // to prepare for need_checksum assert -#endif - uint8 checksum_byte= (uint8) - (need_checksum() ? checksum_alg : BINLOG_CHECKSUM_ALG_OFF); + uint8 checksum_byte= (uint8) (used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF ? + used_checksum_alg : BINLOG_CHECKSUM_ALG_OFF); + DBUG_ASSERT(used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); /* FD of checksum-aware server is always checksum-equipped, (V) is in, regardless of @@global.binlog_checksum policy. @@ -2650,17 +2579,16 @@ bool Format_description_log_event::write(Log_event_writer *writer) 1 + 4 bytes bigger comparing to the former FD. */ - if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF))) - { - checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway - } + enum enum_binlog_checksum_alg old_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; + if (checksum_byte == BINLOG_CHECKSUM_ALG_OFF) + old_checksum_alg= writer->set_checksum_alg(BINLOG_CHECKSUM_ALG_CRC32); ret= write_header(writer, rec_size) || write_data(writer, buff, sizeof(buff)) || write_data(writer, post_header_len, number_of_event_types) || write_data(writer, &checksum_byte, sizeof(checksum_byte)) || write_footer(writer); - if (no_checksum) - checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + if (checksum_byte == BINLOG_CHECKSUM_ALG_OFF) + writer->set_checksum_alg(old_checksum_alg); return ret; } @@ -5025,9 +4953,11 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi) char *ext; int fd = -1; IO_CACHE file; - Log_event_writer lew(&file, 0); - int error = 1; Relay_log_info const *rli= rgi->rli; + enum enum_binlog_checksum_alg checksum_alg= + rli->relay_log.description_event_for_exec->used_checksum_alg; + Log_event_writer lew(&file, 0, checksum_alg, NULL); + int error = 1; THD_STAGE_INFO(thd, stage_making_temp_file_create_before_load_data); bzero((char*)&file, sizeof(file)); diff --git a/sql/slave.cc b/sql/slave.cc index 30ddb4949e8..08ac2e65779 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1795,11 +1795,13 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) break; case 3: mi->rli.relay_log.description_event_for_queue= new - Format_description_log_event(1, mysql->server_version); + Format_description_log_event(1, mysql->server_version, + mi->rli.relay_log.relay_log_checksum_alg); break; case 4: mi->rli.relay_log.description_event_for_queue= new - Format_description_log_event(3, mysql->server_version); + Format_description_log_event(3, mysql->server_version, + mi->rli.relay_log.relay_log_checksum_alg); break; default: /* @@ -1811,7 +1813,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) master is 3.23, 4.0, etc. */ mi->rli.relay_log.description_event_for_queue= new - Format_description_log_event(4, mysql->server_version); + Format_description_log_event(4, mysql->server_version, + mi->rli.relay_log.relay_log_checksum_alg); break; } } @@ -1870,10 +1873,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) until it has received a new FD_m. */ - mi->rli.relay_log.description_event_for_queue->checksum_alg= - mi->rli.relay_log.relay_log_checksum_alg; - DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg != + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); @@ -2762,7 +2763,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) { DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); rev->server_id= 0; // don't be ignored by slave SQL thread - if (unlikely(rli->relay_log.append(rev))) + if (unlikely(rli->relay_log.append(rev, rli->relay_log.relay_log_checksum_alg))) mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "failed to write a Rotate event" @@ -2775,7 +2776,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events")); glev->server_id= 0; // don't be ignored by slave SQL thread glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos - if (unlikely(rli->relay_log.append(glev))) + if (unlikely(rli->relay_log.append(glev, rli->relay_log.relay_log_checksum_alg))) mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "failed to write a Gtid_list event to the relay log, " @@ -5961,7 +5962,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) break; Execute_load_log_event xev(thd,0,0); xev.log_pos = cev->log_pos; - if (unlikely(mi->rli.relay_log.append(&xev))) + if (unlikely(mi->rli.relay_log.append(&xev, mi->rli.relay_log.relay_log_checksum_alg))) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), @@ -5975,7 +5976,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) { cev->block = net->read_pos; cev->block_len = num_bytes; - if (unlikely(mi->rli.relay_log.append(cev))) + if (unlikely(mi->rli.relay_log.append(cev, mi->rli.relay_log.relay_log_checksum_alg))) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), @@ -5990,7 +5991,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) aev.block = net->read_pos; aev.block_len = num_bytes; aev.log_pos = cev->log_pos; - if (unlikely(mi->rli.relay_log.append(&aev))) + if (unlikely(mi->rli.relay_log.append(&aev, mi->rli.relay_log.relay_log_checksum_alg))) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), @@ -6060,15 +6061,13 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev) */ if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4) { - DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->used_checksum_alg == mi->rli.relay_log.relay_log_checksum_alg); delete mi->rli.relay_log.description_event_for_queue; /* start from format 3 (MySQL 4.0) again */ mi->rli.relay_log.description_event_for_queue= new - Format_description_log_event(3); - mi->rli.relay_log.description_event_for_queue->checksum_alg= - mi->rli.relay_log.relay_log_checksum_alg; + Format_description_log_event(3, NULL, mi->rli.relay_log.relay_log_checksum_alg); } /* Rotate the relay log makes binlog format detection easier (at next slave @@ -6176,13 +6175,15 @@ static int queue_binlog_ver_1_event(Master_info *mi, const uchar *buf, } if (likely(!ignore_event)) { + enum enum_binlog_checksum_alg checksum_alg= + mi->rli.relay_log.description_event_for_queue->used_checksum_alg; if (ev->log_pos) /* Don't do it for fake Rotate events (see comment in Log_event::Log_event(const char* buf...) in log_event.cc). */ ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */ - if (unlikely(rli->relay_log.append(ev))) + if (unlikely(rli->relay_log.append(ev, checksum_alg))) { delete ev; mysql_mutex_unlock(&mi->data_lock); @@ -6208,6 +6209,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const uchar *buf, ulong inc_pos; char *tmp_buf = 0; Relay_log_info *rli= &mi->rli; + enum enum_binlog_checksum_alg checksum_alg; DBUG_ENTER("queue_binlog_ver_3_event"); /* read_log_event() will adjust log_pos to be end_log_pos */ @@ -6240,7 +6242,8 @@ static int queue_binlog_ver_3_event(Master_info *mi, const uchar *buf, break; } - if (unlikely(rli->relay_log.append(ev))) + checksum_alg= mi->rli.relay_log.description_event_for_queue->used_checksum_alg; + if (unlikely(rli->relay_log.append(ev, checksum_alg))) { delete ev; mysql_mutex_unlock(&mi->data_lock); @@ -6342,7 +6345,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) { // checksum behaviour is similar to the pre-checksum FD handling mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; - mi->rli.relay_log.description_event_for_queue->checksum_alg= + mi->rli.relay_log.description_event_for_queue->used_checksum_alg= mi->rli.relay_log.relay_log_checksum_alg= checksum_alg= BINLOG_CHECKSUM_ALG_OFF; } @@ -6471,8 +6474,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) We detect this case by noticing a change of server_id and in this case likewise rollback the partially received event group. */ - Format_description_log_event fdle(4); - fdle.checksum_alg= checksum_alg; + Format_description_log_event fdle(4, NULL, checksum_alg); /* Possible crash is flagged in being created FD' common header @@ -6504,7 +6506,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) rev.new_log_ident); } mysql_mutex_lock(log_lock); - if (likely(!rli->relay_log.write_event(&fdle) && + if (likely(!rli->relay_log.write_event(&fdle, checksum_alg) && !rli->relay_log.flush_and_sync(NULL))) { rli->relay_log.harvest_bytes_written(&rli->log_space_total); @@ -6553,7 +6555,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) event_len - BINLOG_CHECKSUM_LEN); int4store(&rot_buf[event_len - BINLOG_CHECKSUM_LEN], rot_crc); DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); - DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->used_checksum_alg == mi->rli.relay_log.relay_log_checksum_alg); /* the first one */ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); @@ -6573,7 +6575,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) int4store(&rot_buf[EVENT_LEN_OFFSET], uint4korr(&rot_buf[EVENT_LEN_OFFSET]) - BINLOG_CHECKSUM_LEN); DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); - DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->used_checksum_alg == mi->rli.relay_log.relay_log_checksum_alg); /* the first one */ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); @@ -6613,11 +6615,11 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) tmp->copy_crypto_data(mi->rli.relay_log.description_event_for_queue); delete mi->rli.relay_log.description_event_for_queue; mi->rli.relay_log.description_event_for_queue= tmp; - if (tmp->checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) - tmp->checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + if (tmp->used_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) + tmp->used_checksum_alg= BINLOG_CHECKSUM_ALG_OFF; /* installing new value of checksum Alg for relay log */ - mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg; + mi->rli.relay_log.relay_log_checksum_alg= tmp->used_checksum_alg; /* Do not queue any format description event that we receive after a @@ -7182,7 +7184,8 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) rli->relay_log.description_event_for_queue->created= 0; rli->relay_log.description_event_for_queue->set_artificial_event(); if (rli->relay_log.append_no_lock - (rli->relay_log.description_event_for_queue)) + (rli->relay_log.description_event_for_queue, + rli->relay_log.relay_log_checksum_alg)) error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; else rli->relay_log.harvest_bytes_written(&rli->log_space_total); @@ -7195,7 +7198,8 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) */ Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0); fake_rev.server_id= mi->master_id; - if (rli->relay_log.append_no_lock(&fake_rev)) + if (rli->relay_log.append_no_lock(&fake_rev, + rli->relay_log.relay_log_checksum_alg)) error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; else rli->relay_log.harvest_bytes_written(&rli->log_space_total); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 8bde0f3bd53..69c4c9e889d 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -4267,7 +4267,7 @@ bool mysql_show_binlog_events(THD* thd) if (lex_mi->pos > BIN_LOG_HEADER_SIZE) { - checksum_alg= description_event->checksum_alg; + checksum_alg= description_event->used_checksum_alg; /* Validate user given position using checksum */ if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF && checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc index 5e1fa137fed..f71b1245bcc 100644 --- a/sql/wsrep_binlog.cc +++ b/sql/wsrep_binlog.cc @@ -236,7 +236,9 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, File file; IO_CACHE cache; - Log_event_writer writer(&cache, 0); + enum enum_binlog_checksum_alg checksum_alg= + (enum_binlog_checksum_alg) binlog_checksum_options; + Log_event_writer writer(&cache, 0, checksum_alg, NULL); Format_description_log_event *ev= 0; longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd); @@ -288,7 +290,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, to the dump file). */ ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) : - (new Format_description_log_event(4)); + (new Format_description_log_event(4, NULL, checksum_alg)); if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) || flush_io_cache(&cache)) diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 91abf1dffe8..0073f846ab3 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2150,16 +2150,16 @@ int wsrep_to_buf_helper( THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) { IO_CACHE tmp_io_cache; - Log_event_writer writer(&tmp_io_cache, 0); + enum enum_binlog_checksum_alg current_binlog_check_alg= + (enum_binlog_checksum_alg) binlog_checksum_options; + Log_event_writer writer(&tmp_io_cache, NULL, current_binlog_check_alg, NULL); if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, 65536, MYF(MY_WME))) return 1; int ret(0); - enum enum_binlog_checksum_alg current_binlog_check_alg= - (enum_binlog_checksum_alg) binlog_checksum_options; - Format_description_log_event *tmp_fd= new Format_description_log_event(4); - tmp_fd->checksum_alg= current_binlog_check_alg; + Format_description_log_event *tmp_fd= + new Format_description_log_event(4, NULL, current_binlog_check_alg); writer.write(tmp_fd); delete tmp_fd; @@ -2208,7 +2208,6 @@ int wsrep_to_buf_helper( Query_log_event ev(thd, thd->wsrep_TOI_pre_query, thd->wsrep_TOI_pre_query_len, FALSE, FALSE, FALSE, 0); - ev.checksum_alg= current_binlog_check_alg; if (writer.write(&ev)) ret= 1; } @@ -2217,7 +2216,6 @@ int wsrep_to_buf_helper( /* WSREP GTID mode, we need to change server_id */ if (wsrep_gtid_mode && !thd->variables.gtid_seq_no) ev.server_id= wsrep_gtid_server.server_id; - ev.checksum_alg= current_binlog_check_alg; if (!ret && writer.write(&ev)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; close_cached_file(&tmp_io_cache); -- 2.30.2
Preparatory patch for pre-computing binlog checksums outside of holding LOCK_log. The existing code for MYSQL_BIN_LOG::write_cache() was needlessly complex and very hard to understand and modify for handling the new case where pre-computed checksums are already present in the IO_CACHE. Greatly simplify the logic by replacing the (implicit) state machine with direct code that pulls the events one by one from the IO_CACHE. This removes a lot of state flags and avoids duplicate code for handling full vs. split headers. This also removes the need for the CacheWriter class. As a bonus, this fixes the bug that CacheWriter::write() was completely ignoring write errors. No other functional changes are done with this patch, only code cleanup. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- sql/log.cc | 237 ++++++++++++++++++++--------------------------------- 1 file changed, 89 insertions(+), 148 deletions(-) diff --git a/sql/log.cc b/sql/log.cc index bcb89e79dd0..1ab90389a37 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -7485,40 +7485,6 @@ uint MYSQL_BIN_LOG::next_file_id() return res; } -class CacheWriter: public Log_event_writer -{ -public: - size_t remains; - - CacheWriter(THD *thd_arg, IO_CACHE *file_arg, - enum enum_binlog_checksum_alg checksum_alg, - Binlog_crypt_data *cr) - : Log_event_writer(file_arg, 0, checksum_alg, cr), remains(0), thd(thd_arg), - first(true) - { - } - - ~CacheWriter() - { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); } - - int write(uchar* pos, size_t len) - { - DBUG_ENTER("CacheWriter::write"); - if (first) - write_header(pos, len); - else - write_data(pos, len); - - remains -= len; - if ((first= !remains)) - write_footer(); - DBUG_RETURN(0); - } -private: - THD *thd; - bool first; -}; - /* Write the contents of a cache to the binary log. @@ -7543,13 +7509,16 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) mysql_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) DBUG_RETURN(ER_ERROR_ON_WRITE); - size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; - size_t val; + /* Amount of remaining bytes in the IO_CACHE read buffer. */ + size_t length= my_b_bytes_in_cache(cache); + size_t group; size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t - uchar header[LOG_EVENT_HEADER_LEN]; - CacheWriter writer(thd, &log_file, - (enum_binlog_checksum_alg)binlog_checksum_options, - &crypto); + uchar header_buf[LOG_EVENT_HEADER_LEN]; + Log_event_writer writer(&log_file, 0, + (enum_binlog_checksum_alg)binlog_checksum_options, + &crypto); + uint checksum_len= writer.checksum_len; + int err= 0; if (crypto.scheme) { @@ -7575,128 +7544,100 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) */ group= (size_t)my_b_tell(&log_file); - hdr_offs= carry= 0; - - do + for (;;) { /* - if we only got a partial header in the last iteration, - get the other half now and process a full header. + Empty cache at an event boundary means we are done (but empty cache + elsewhere is an error). */ - if (unlikely(carry > 0)) - { - DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN); - size_t tail= LOG_EVENT_HEADER_LEN - carry; - - /* assemble both halves */ - memcpy(&header[carry], (char *)cache->read_pos, tail); - - uint32 len= uint4korr(header + EVENT_LEN_OFFSET); - writer.remains= len; - - /* fix end_log_pos */ - end_log_pos_inc += writer.checksum_len; - val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; - int4store(header + LOG_POS_OFFSET, val); - - /* fix len */ - len+= writer.checksum_len; - int4store(header + EVENT_LEN_OFFSET, len); - - if (writer.write(header, LOG_EVENT_HEADER_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); + if (length == 0 && + (length= my_b_fill(cache)) == 0) + break; - cache->read_pos+= tail; - length-= tail; - carry= 0; + DBUG_EXECUTE_IF("fail_binlog_write_1", + { + errno= 28; + goto error_in_write; + }); - /* next event header at ... */ - hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len; + /* Get a full header, using local buffer if split in the IO_CACHE. */ + uchar *header; + if (likely(length > LOG_EVENT_HEADER_LEN)) + { + header= cache->read_pos; + cache->read_pos+= LOG_EVENT_HEADER_LEN; + length-= LOG_EVENT_HEADER_LEN; } - - /* if there is anything to write, process it. */ - - if (likely(length > 0)) + else { - DBUG_EXECUTE_IF("fail_binlog_write_1", - errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); - /* - process all event-headers in this (partial) cache. - if next header is beyond current read-buffer, - we'll get it later (though not necessarily in the - very next iteration, just "eventually"). - */ - - if (hdr_offs >= length) + size_t sofar= length; + size_t remain= LOG_EVENT_HEADER_LEN - sofar; + header= &header_buf[0]; + memcpy(header, cache->read_pos, sofar); + cache->read_pos+= sofar; + do { - if (writer.write(cache->read_pos, length)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - } - - while (hdr_offs < length) + length= my_b_fill(cache); + if (!length) + goto error_in_read; + size_t chunk= std::min(length, remain); + memcpy(header + sofar, cache->read_pos, chunk); + sofar+= chunk; + remain-= chunk; + cache->read_pos+= chunk; + length-= chunk; + } while (unlikely(remain > 0)); + } + + /* Adjust the length and end_log_pos appropriately. */ + uint ev_len= uint4korr(header + EVENT_LEN_OFFSET); // netto len + DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN); + if (unlikely(ev_len < LOG_EVENT_HEADER_LEN)) + goto error_in_read; + int4store(header + EVENT_LEN_OFFSET, ev_len + checksum_len); + end_log_pos_inc += checksum_len; + size_t val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; + int4store(header + LOG_POS_OFFSET, val); + + /* Write the header to the binlog. */ + if (writer.write_header(header, LOG_EVENT_HEADER_LEN)) + goto error_in_write; + ev_len-= LOG_EVENT_HEADER_LEN; + + /* Write the rest of the event. */ + while (ev_len > 0) + { + if (length == 0) { - /* - finish off with remains of the last event that crawls - from previous into the current buffer - */ - if (writer.remains != 0) - { - if (writer.write(cache->read_pos, hdr_offs)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - } - - /* - partial header only? save what we can get, process once - we get the rest. - */ - if (hdr_offs + LOG_EVENT_HEADER_LEN > length) - { - carry= length - hdr_offs; - memcpy(header, (char *)cache->read_pos + hdr_offs, carry); - length= hdr_offs; - } - else - { - /* we've got a full event-header, and it came in one piece */ - uchar *ev= (uchar *)cache->read_pos + hdr_offs; - uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len - uchar *log_pos= ev + LOG_POS_OFFSET; - - end_log_pos_inc += writer.checksum_len; - /* fix end_log_pos */ - val= uint4korr(log_pos) + group + end_log_pos_inc; - int4store(log_pos, val); - - /* fix length */ - int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len); - - writer.remains= ev_len; - if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs))) - DBUG_RETURN(ER_ERROR_ON_WRITE); + length= my_b_fill(cache); + if (!length) + goto error_in_read; + } + uint chunk= std::min(ev_len, (uint)length); + if (writer.write_data(cache->read_pos, chunk)) + goto error_in_write; + cache->read_pos+= chunk; + length-= chunk; + ev_len-= chunk; + } - /* next event header at ... */ - hdr_offs += ev_len; // incr by the netto len + if (writer.write_footer()) + goto error_in_write; - DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length); - } - } + } + goto end; // All OK - /* - Adjust hdr_offs. Note that it may still point beyond the segment - read in the next iteration; if the current event is very long, - it may take a couple of read-iterations (and subsequent adjustments - of hdr_offs) for it to point into the then-current segment. - If we have a split header (!carry), hdr_offs will be set at the - beginning of the next iteration, overwriting the value we set here: - */ - hdr_offs -= length; - } - } while ((length= my_b_fill(cache))); +error_in_write: + err= ER_ERROR_ON_WRITE; + goto end; - DBUG_ASSERT(carry == 0); - DBUG_ASSERT(!writer.checksum_len || writer.remains == 0); +error_in_read: + err= ER_ERROR_ON_READ; + goto end; - DBUG_RETURN(0); // All OK +end: + status_var_add(thd->status_var.binlog_bytes_written, writer.bytes_written); + DBUG_RETURN(err); } /* -- 2.30.2
Hi! On Fri, Aug 25, 2023 at 10:16 AM Kristian Nielsen <knielsen@knielsen-hq.org> wrote:
Preparatory patch for pre-computing binlog checksums outside of holding LOCK_log.
The existing code for MYSQL_BIN_LOG::write_cache() was needlessly complex and very hard to understand and modify for handling the new case where pre-computed checksums are already present in the IO_CACHE.
MDEV-31273: Refactor MYSQL_BIN_LOG::write_cache()
if (likely(length > LOG_EVENT_HEADER_LEN)) { header= cache->read_pos; cache->read_pos+= LOG_EVENT_HEADER_LEN; length-= LOG_EVENT_HEADER_LEN; } else { size_t sofar= length; size_t remain= LOG_EVENT_HEADER_LEN - sofar; header= &header_buf[0]; memcpy(header, cache->read_pos, sofar); cache->read_pos+= sofar;
while (hdr_offs < length) length= my_b_fill(cache); if (!length) goto error_in_read; size_t chunk= std::min(length, remain); memcpy(header + sofar, cache->read_pos, chunk); sofar+= chunk; remain-= chunk; cache->read_pos+= chunk; length-= chunk; } while (unlikely(remain > 0)); }
The above can be replaced with: if (my_b_read(cache, header, LOG_EVENT_HEAD_LENGTH)) goto error_in_read; It is almost exactly as efficent as the above (one extra if) and avoids using cache internals. Note that you do not have to do call my_fill() if you use my_b_read(). my_b_read() will return 0 if it was able to read all data. In case of end of file, my_b_read() will return 1 and info->error will be 0. If needed, you can als find out how much data left to read from IO_CACHE: left_data_to_read= (cache->end_of_file - my_b_tell(cache)) (I can make an inline function of that if needed). /* Write the rest of the event. */
while (ev_len > 0) { if (length == 0) length= my_b_fill(cache); if (!length) goto error_in_read;
-> while (ev_len > 0) { if (length == 0) { if (!(length= my_b_fill(cache))); goto error_in_read; } <cut> uint chunk= std::min(ev_len, (uint)length); I would have prefer to have MY_MIN() used (like the rest of the code). (not critical) Regards, Monty
Compute binlog checksums (when enabled) already when writing events into the statement or transaction caches, where before it was done when the caches are copied to the real binlog file. This moves the checksum computation outside of holding LOCK_log, improving scalabitily. At stmt/trx cache write time, the final end_log_pos values are not known, so with this patch these will be set to 0. Events that are written directly to the binlog file (not through stmt/trx cache) keep the correct end_log_pos value. The GTID and COMMIT/XID events at the start and end of event groups are written directly, so the zero end_log_pos is only for events in the middle of event groups, which do not negatively affect replication. An option --binlog-legacy-event-pos, off by default, is provided to disable this behavior to provide backwards compatibility with any external applications that might rely on end_log_pos in events in the middle of event groups. Checksums cannot be pre-computed when binlog encryption is enabled, as encryption relies on correct end_log_pos to provide part of the nonce/IV. Checksum pre-computation is also disabled for WSREP/Galera. The current --binlog-checksum configuration is saved in binlog_cache_data at transaction start and used to pre-compute checksums in cache, if applicable. When the cache is later copied to the binlog, a check is made if the saved value still matches the configured global value; if so, the events are block-copied directly into the binlog file. If --binlog-checksum was changed during the transaction, events are re-written to the binlog file one-by-one and the checksums recomputed/discarded as appropriate. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- include/my_atomic.h | 41 ++++- include/my_sys.h | 2 + .../main/mysqlbinlog_row_compressed.result | 48 ++--- .../main/mysqlbinlog_row_minimal.result | 48 ++--- .../main/mysqlbinlog_stmt_compressed.result | 16 +- mysql-test/main/mysqld--help.result | 7 + .../suite/binlog/include/binlog_ioerr.inc | 3 + mysql-test/suite/binlog/r/binlog_ioerr.result | 2 + mysql-test/suite/binlog/t/binlog_killed.test | 2 +- .../t/binlog_table_map_optional_metadata.test | 4 +- .../binlog_encryption/binlog_ioerr.result | 2 + .../suite/rpl/r/rpl_checksum_cache.result | 43 ++++- .../suite/rpl/t/rpl_checksum_cache.test | 98 +++++++++- .../r/sysvars_server_notembedded.result | 10 + mysys/mf_iocache2.c | 34 ++++ sql/log.cc | 173 ++++++++++++++---- sql/log.h | 2 +- sql/log_event.h | 2 +- sql/log_event_server.cc | 24 +-- sql/mysqld.cc | 1 + sql/mysqld.h | 1 + sql/privilege.h | 3 + sql/sys_vars.cc | 13 ++ 23 files changed, 466 insertions(+), 113 deletions(-) diff --git a/include/my_atomic.h b/include/my_atomic.h index 270134a6caf..01e9170cb15 100644 --- a/include/my_atomic.h +++ b/include/my_atomic.h @@ -62,8 +62,8 @@ Order must be one of MY_MEMORY_ORDER_RELAXED, MY_MEMORY_ORDER_RELEASE, MY_MEMORY_ORDER_SEQ_CST. - '#' is substituted by a size suffix - 8, 16, 32, 64, or ptr - (e.g. my_atomic_add8, my_atomic_fas32, my_atomic_casptr). + '#' is substituted by a size suffix - 8, 16, 32, 64, ptr, or ul (for unsigned + long) (e.g. my_atomic_add8, my_atomic_fas32, my_atomic_casptr). The first version orders memory accesses according to MY_MEMORY_ORDER_SEQ_CST, the second version (with _explicit suffix) orders memory accesses according to @@ -153,4 +153,41 @@ #define my_atomic_casptr_strong_explicit(P, E, D, S, F) \ my_atomic_casptr((P), (E), (D)) #endif + +/* Convenience macros since ulong is 32 or 64 bit depending on platform. */ +#if SIZEOF_LONG == 4 +#define my_atomic_storeul(P, D) my_atomic_store32((int32 volatile *)(P), (D)) +#define my_atomic_storeul_explicit(P, D, O) \ + my_atomic_store32_explicit((int32 volatile *)(P), (D), (O)) +#define my_atomic_loadul(P) my_atomic_load32((int32 volatile *)(P)) +#define my_atomic_loadul_explicit(P, O) \ + my_atomic_load32_explicit((int32 volatile *)(P), (O)) +#define my_atomic_fasul(P, D) my_atomic_fas32((int32 volatile *)(P), (D)) +#define my_atomic_fasul_explict(P, D, O) \ + my_atomic_fas32_explicit((int32 volatile *)(P), (D), (O)) +#define my_atomic_addul(P, A) my_atomic_add32((int32 volatile *)(P), (A)) +#define my_atomic_addul_explict(P, A, O) \ + my_atomic_add32_explicit((int32 volatile *)(P), (A), (O)) +#define my_atomic_casul(P, E, D) \ + my_atomic_cas32((int32 volatile *)(P), (E), (D)) +#define my_atomic_casul_weak_explicit(P, E, D, S, F) \ + my_atomic_cas32_weak_explicit((int32 volatile *)(P), (E), (D), (S), (F)) +#define my_atomic_casul_strong_explicit(P, E, D, S, F) \ + my_atomic_cas32_strong_explicit((int32 volatile *)(P), (E), (D), (S), (F)) +#elif SIZEOF_LONG == 8 +#define my_atomic_storeul(P, D) my_atomic_store64((P), (D)) +#define my_atomic_storeul_explicit(P, D, O) my_atomic_store64_explicit((P), (D), (O)) +#define my_atomic_loadul(P) my_atomic_load64((P)) +#define my_atomic_loadul_explicit(P, O) my_atomic_load64_explicit((P), (O)) +#define my_atomic_fasul(P, D) my_atomic_fas64((P), (D)) +#define my_atomic_fasul_explict(P, D, O) my_atomic_fas64_explicit((P), (D), (O)) +#define my_atomic_addul(P, A) my_atomic_add64((P), (A)) +#define my_atomic_addul_explict(P, A, O) my_atomic_add64_explicit((P), (A), (O)) +#define my_atomic_casul(P, E, D) my_atomic_cas64((P), (E), (D)) +#define my_atomic_casul_weak_explicit(P, E, D, S, F) \ + my_atomic_cas64_weak_explicit((P), (E), (D), (S), (F)) +#define my_atomic_casul_strong_explicit(P, E, D, S, F) \ + my_atomic_cas64_strong_explicit((P), (E), (D), (S), (F)) +#endif + #endif /* MY_ATOMIC_INCLUDED */ diff --git a/include/my_sys.h b/include/my_sys.h index 2d1dbb7b2bf..145d9fb2603 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -599,6 +599,8 @@ static inline size_t my_b_bytes_in_cache(const IO_CACHE *info) int my_b_copy_to_file (IO_CACHE *cache, FILE *file, size_t count); int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file); +int my_b_copy_to_cache(IO_CACHE *from_cache, IO_CACHE *to_cache, size_t count); +int my_b_copy_all_to_cache(IO_CACHE *from_cache, IO_CACHE *to_cache); my_off_t my_b_append_tell(IO_CACHE* info); my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */ diff --git a/mysql-test/main/mysqlbinlog_row_compressed.result b/mysql-test/main/mysqlbinlog_row_compressed.result index 2cf652655e0..96a0ed61a71 100644 --- a/mysql-test/main/mysqlbinlog_row_compressed.result +++ b/mysql-test/main/mysqlbinlog_row_compressed.result @@ -57,11 +57,11 @@ START TRANSACTION /*!*/; # at 787 # at 861 -#<date> server id 1 end_log_pos 861 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, "") -#<date> server id 1 end_log_pos 917 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 917 -#<date> server id 1 end_log_pos 985 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -86,11 +86,11 @@ START TRANSACTION /*!*/; # at 1100 # at 1176 -#<date> server id 1 end_log_pos 1176 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL) -#<date> server id 1 end_log_pos 1232 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 1232 -#<date> server id 1 end_log_pos 1299 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=11 /* INT meta=0 nullable=0 is_null=0 */ @@ -115,11 +115,11 @@ START TRANSACTION /*!*/; # at 1414 # at 1492 -#<date> server id 1 end_log_pos 1492 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A") -#<date> server id 1 end_log_pos 1548 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 1548 -#<date> server id 1 end_log_pos 1614 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=12 /* INT meta=0 nullable=0 is_null=0 */ @@ -144,11 +144,11 @@ START TRANSACTION /*!*/; # at 1729 # at 1804 -#<date> server id 1 end_log_pos 1804 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A") -#<date> server id 1 end_log_pos 1860 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 1860 -#<date> server id 1 end_log_pos 1927 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=13 /* INT meta=0 nullable=0 is_null=0 */ @@ -173,11 +173,11 @@ START TRANSACTION /*!*/; # at 2042 # at 2096 -#<date> server id 1 end_log_pos 2096 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t2 SELECT * FROM t1 -#<date> server id 1 end_log_pos 2152 CRC32 XXX Table_map: `test`.`t2` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num # at 2152 -#<date> server id 1 end_log_pos 2243 CRC32 XXX Write_compressed_rows: table id 33 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 33 flags: STMT_END_F ### INSERT INTO `test`.`t2` ### SET ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -235,11 +235,11 @@ START TRANSACTION /*!*/; # at 2358 # at 2424 -#<date> server id 1 end_log_pos 2424 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL -#<date> server id 1 end_log_pos 2480 CRC32 XXX Table_map: `test`.`t2` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num # at 2480 -#<date> server id 1 end_log_pos 2579 CRC32 XXX Update_compressed_rows: table id 33 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Update_compressed_rows: table id 33 flags: STMT_END_F ### UPDATE `test`.`t2` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -316,11 +316,11 @@ START TRANSACTION /*!*/; # at 2694 # at 2731 -#<date> server id 1 end_log_pos 2731 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> DELETE FROM t1 -#<date> server id 1 end_log_pos 2787 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 2787 -#<date> server id 1 end_log_pos 2879 CRC32 XXX Delete_compressed_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Delete_compressed_rows: table id 32 flags: STMT_END_F ### DELETE FROM `test`.`t1` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -378,11 +378,11 @@ START TRANSACTION /*!*/; # at 2994 # at 3031 -#<date> server id 1 end_log_pos 3031 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> DELETE FROM t2 -#<date> server id 1 end_log_pos 3087 CRC32 XXX Table_map: `test`.`t2` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num # at 3087 -#<date> server id 1 end_log_pos 3172 CRC32 XXX Delete_compressed_rows: table id 33 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Delete_compressed_rows: table id 33 flags: STMT_END_F ### DELETE FROM `test`.`t2` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ diff --git a/mysql-test/main/mysqlbinlog_row_minimal.result b/mysql-test/main/mysqlbinlog_row_minimal.result index 69aa91a8903..6871d75a985 100644 --- a/mysql-test/main/mysqlbinlog_row_minimal.result +++ b/mysql-test/main/mysqlbinlog_row_minimal.result @@ -55,11 +55,11 @@ START TRANSACTION /*!*/; # at 834 # at 908 -#<date> server id 1 end_log_pos 908 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, "") -#<date> server id 1 end_log_pos 964 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 964 -#<date> server id 1 end_log_pos 1033 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -84,11 +84,11 @@ START TRANSACTION /*!*/; # at 1148 # at 1224 -#<date> server id 1 end_log_pos 1224 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL) -#<date> server id 1 end_log_pos 1280 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 1280 -#<date> server id 1 end_log_pos 1348 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=11 /* INT meta=0 nullable=0 is_null=0 */ @@ -113,11 +113,11 @@ START TRANSACTION /*!*/; # at 1463 # at 1541 -#<date> server id 1 end_log_pos 1541 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A") -#<date> server id 1 end_log_pos 1597 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 1597 -#<date> server id 1 end_log_pos 1664 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=12 /* INT meta=0 nullable=0 is_null=0 */ @@ -142,11 +142,11 @@ START TRANSACTION /*!*/; # at 1779 # at 1854 -#<date> server id 1 end_log_pos 1854 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A") -#<date> server id 1 end_log_pos 1910 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 1910 -#<date> server id 1 end_log_pos 1980 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_rows: table id 32 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=13 /* INT meta=0 nullable=0 is_null=0 */ @@ -171,11 +171,11 @@ START TRANSACTION /*!*/; # at 2095 # at 2149 -#<date> server id 1 end_log_pos 2149 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> INSERT INTO t2 SELECT * FROM t1 -#<date> server id 1 end_log_pos 2205 CRC32 XXX Table_map: `test`.`t2` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num # at 2205 -#<date> server id 1 end_log_pos 2372 CRC32 XXX Write_rows: table id 33 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Write_rows: table id 33 flags: STMT_END_F ### INSERT INTO `test`.`t2` ### SET ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -233,11 +233,11 @@ START TRANSACTION /*!*/; # at 2487 # at 2553 -#<date> server id 1 end_log_pos 2553 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL -#<date> server id 1 end_log_pos 2609 CRC32 XXX Table_map: `test`.`t2` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num # at 2609 -#<date> server id 1 end_log_pos 2675 CRC32 XXX Update_rows: table id 33 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Update_rows: table id 33 flags: STMT_END_F ### UPDATE `test`.`t2` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -266,11 +266,11 @@ START TRANSACTION /*!*/; # at 2790 # at 2827 -#<date> server id 1 end_log_pos 2827 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> DELETE FROM t1 -#<date> server id 1 end_log_pos 2883 CRC32 XXX Table_map: `test`.`t1` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num # at 2883 -#<date> server id 1 end_log_pos 2937 CRC32 XXX Delete_rows: table id 32 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Delete_rows: table id 32 flags: STMT_END_F ### DELETE FROM `test`.`t1` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -296,11 +296,11 @@ START TRANSACTION /*!*/; # at 3052 # at 3089 -#<date> server id 1 end_log_pos 3089 CRC32 XXX Annotate_rows: +#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows: #Q> DELETE FROM t2 -#<date> server id 1 end_log_pos 3145 CRC32 XXX Table_map: `test`.`t2` mapped to number num +#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num # at 3145 -#<date> server id 1 end_log_pos 3199 CRC32 XXX Delete_rows: table id 33 flags: STMT_END_F +#<date> server id 1 end_log_pos 0 CRC32 XXX Delete_rows: table id 33 flags: STMT_END_F ### DELETE FROM `test`.`t2` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ diff --git a/mysql-test/main/mysqlbinlog_stmt_compressed.result b/mysql-test/main/mysqlbinlog_stmt_compressed.result index c0d26f3f9df..6321e74127f 100644 --- a/mysql-test/main/mysqlbinlog_stmt_compressed.result +++ b/mysql-test/main/mysqlbinlog_stmt_compressed.result @@ -56,7 +56,7 @@ CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMIN START TRANSACTION /*!*/; # at 787 -#<date> server id 1 end_log_pos 915 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, "") /*!*/; @@ -71,7 +71,7 @@ COMMIT START TRANSACTION /*!*/; # at 1030 -#<date> server id 1 end_log_pos 1158 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL) /*!*/; @@ -86,7 +86,7 @@ COMMIT START TRANSACTION /*!*/; # at 1273 -#<date> server id 1 end_log_pos 1403 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A") /*!*/; @@ -101,7 +101,7 @@ COMMIT START TRANSACTION /*!*/; # at 1518 -#<date> server id 1 end_log_pos 1645 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A") /*!*/; @@ -116,7 +116,7 @@ COMMIT START TRANSACTION /*!*/; # at 1760 -#<date> server id 1 end_log_pos 1868 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; INSERT INTO t2 SELECT * FROM t1 /*!*/; @@ -131,7 +131,7 @@ COMMIT START TRANSACTION /*!*/; # at 1983 -#<date> server id 1 end_log_pos 2100 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL /*!*/; @@ -146,7 +146,7 @@ COMMIT START TRANSACTION /*!*/; # at 2215 -#<date> server id 1 end_log_pos 2306 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; DELETE FROM t1 /*!*/; @@ -161,7 +161,7 @@ COMMIT START TRANSACTION /*!*/; # at 2421 -#<date> server id 1 end_log_pos 2512 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> +#<date> server id 1 end_log_pos 0 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid> SET TIMESTAMP=X/*!*/; DELETE FROM t2 /*!*/; diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result index de0a8310ec1..999e0212d77 100644 --- a/mysql-test/main/mysqld--help.result +++ b/mysql-test/main/mysqld--help.result @@ -101,6 +101,12 @@ The following specify which files/extra groups are read (specified before remain --binlog-ignore-db=name Tells the master that updates to the given database should not be logged to the binary log. + --binlog-legacy-event-pos + Fill in the end_log_pos field of _all_ events in the + binlog, even when doing so costs performance. Can be used + in case some old application needs it for backwards + compatibility. Setting this option can hurt binlog + scalability. --binlog-optimize-thread-scheduling Run fast part of group commit in a single thread, to optimize kernel thread scheduling. On by default. Disable @@ -1526,6 +1532,7 @@ binlog-direct-non-transactional-updates FALSE binlog-expire-logs-seconds 0 binlog-file-cache-size 16384 binlog-format MIXED +binlog-legacy-event-pos FALSE binlog-optimize-thread-scheduling TRUE binlog-row-event-max-size 8192 binlog-row-image FULL diff --git a/mysql-test/suite/binlog/include/binlog_ioerr.inc b/mysql-test/suite/binlog/include/binlog_ioerr.inc index da6fb5ac727..b710eccc64b 100644 --- a/mysql-test/suite/binlog/include/binlog_ioerr.inc +++ b/mysql-test/suite/binlog/include/binlog_ioerr.inc @@ -17,11 +17,14 @@ CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; INSERT INTO t1 VALUES(0); SET @saved_dbug = @@SESSION.debug_dbug; SET SESSION debug_dbug='+d,fail_binlog_write_1'; +# The error injection is in the "legacy" code path. +SET GLOBAL binlog_legacy_event_pos= 1; --error ER_ERROR_ON_WRITE INSERT INTO t1 VALUES(1); --error ER_ERROR_ON_WRITE INSERT INTO t1 VALUES(2); SET SESSION debug_dbug=@saved_dbug; +SET GLOBAL binlog_legacy_event_pos= 0; INSERT INTO t1 VALUES(3); SELECT * FROM t1; diff --git a/mysql-test/suite/binlog/r/binlog_ioerr.result b/mysql-test/suite/binlog/r/binlog_ioerr.result index e4f00a017ba..aa4042d3f6f 100644 --- a/mysql-test/suite/binlog/r/binlog_ioerr.result +++ b/mysql-test/suite/binlog/r/binlog_ioerr.result @@ -4,11 +4,13 @@ CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; INSERT INTO t1 VALUES(0); SET @saved_dbug = @@SESSION.debug_dbug; SET SESSION debug_dbug='+d,fail_binlog_write_1'; +SET GLOBAL binlog_legacy_event_pos= 1; INSERT INTO t1 VALUES(1); ERROR HY000: Error writing file 'master-bin' (errno: 28 "No space left on device") INSERT INTO t1 VALUES(2); ERROR HY000: Error writing file 'master-bin' (errno: 28 "No space left on device") SET SESSION debug_dbug=@saved_dbug; +SET GLOBAL binlog_legacy_event_pos= 0; INSERT INTO t1 VALUES(3); SELECT * FROM t1; a diff --git a/mysql-test/suite/binlog/t/binlog_killed.test b/mysql-test/suite/binlog/t/binlog_killed.test index 7c3a262d2c1..271da705c82 100644 --- a/mysql-test/suite/binlog/t/binlog_killed.test +++ b/mysql-test/suite/binlog/t/binlog_killed.test @@ -67,7 +67,7 @@ let $rows= `select count(*) from t2 /* must be 2 or 0 */`; let $MYSQLD_DATADIR= `select @@datadir`; --let $binlog_killed_pos=query_get_value(SHOW BINLOG EVENTS, Pos, 6) ---let $binlog_killed_end_log_pos=query_get_value(SHOW BINLOG EVENTS, End_log_pos, 6) +--let $binlog_killed_end_log_pos=query_get_value(SHOW BINLOG EVENTS, Pos, 7) --exec $MYSQL_BINLOG --force-if-open --start-position=$binlog_killed_pos --stop-position=$binlog_killed_end_log_pos $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/kill_query_calling_sp.binlog --replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR --disable_result_log diff --git a/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test index 4577c6c1de1..ea4397306f3 100644 --- a/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test +++ b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test @@ -275,7 +275,7 @@ INSERT INTO t1 VALUES(2, "b"); # The invalid metadata will case assertion failure on Write_rows_log_event # So we need to stop mysqlbinlog before reading Write_rows_log_event. ---let $stop_position= query_get_value(SHOW BINLOG EVENTS FROM $start_pos LIMIT 3, End_log_pos, 3) +--let $stop_position= query_get_value(SHOW BINLOG EVENTS FROM $start_pos LIMIT 4, Pos, 4) --source include/print_optional_metadata.inc --echo # @@ -291,7 +291,7 @@ INSERT INTO t1(c_point) VALUES(ST_PointFromText('POINT(10 10)')); # The invalid metadata will case assertion failure on Write_rows_log_event # So we need to stop mysqlbinlog before reading Write_rows_log_event. ---let $stop_position= query_get_value(SHOW BINLOG EVENTS FROM $start_pos LIMIT 3, End_log_pos, 3) +--let $stop_position= query_get_value(SHOW BINLOG EVENTS FROM $start_pos LIMIT 4, Pos, 4) --source include/print_optional_metadata.inc DROP TABLE t1; diff --git a/mysql-test/suite/binlog_encryption/binlog_ioerr.result b/mysql-test/suite/binlog_encryption/binlog_ioerr.result index 2823b7050c3..146bc50c964 100644 --- a/mysql-test/suite/binlog_encryption/binlog_ioerr.result +++ b/mysql-test/suite/binlog_encryption/binlog_ioerr.result @@ -4,11 +4,13 @@ CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; INSERT INTO t1 VALUES(0); SET @saved_dbug = @@SESSION.debug_dbug; SET SESSION debug_dbug='+d,fail_binlog_write_1'; +SET GLOBAL binlog_legacy_event_pos= 1; INSERT INTO t1 VALUES(1); ERROR HY000: Error writing file 'master-bin' (errno: 28 "No space left on device") INSERT INTO t1 VALUES(2); ERROR HY000: Error writing file 'master-bin' (errno: 28 "No space left on device") SET SESSION debug_dbug=@saved_dbug; +SET GLOBAL binlog_legacy_event_pos= 0; INSERT INTO t1 VALUES(3); SELECT * FROM t1; a diff --git a/mysql-test/suite/rpl/r/rpl_checksum_cache.result b/mysql-test/suite/rpl/r/rpl_checksum_cache.result index e8f221cc181..b908c546ce3 100644 --- a/mysql-test/suite/rpl/r/rpl_checksum_cache.result +++ b/mysql-test/suite/rpl/r/rpl_checksum_cache.result @@ -121,13 +121,54 @@ connection slave; include/diff_tables.inc [master:test.t1, slave:test.t1] include/diff_tables.inc [master:test.t2, slave:test.t2] include/diff_tables.inc [master:test.t3, slave:test.t3] +*** Test switching checksum algorithm while ongoing transactions have pre-computed checksum in their binlog cache *** +connection master; +CREATE TABLE t4 (a INT, b INT, c VARCHAR(1024), PRIMARY KEY (a,b)) ENGINE=InnoDB; +BEGIN; +INSERT INTO t4 VALUES (1, 1, "small, pre-computed checksums"); +connection server_1; +BEGIN; +INSERT INTO t4 VALUES (2, 1, "big, pre-computed checksums"); +set @@global.binlog_checksum = NONE; +connection master; +INSERT INTO t4 VALUES (1, 2, "done"); +COMMIT; +connection server_1; +INSERT INTO t4 VALUES (2, 22, "done"); +COMMIT; +connection master; +BEGIN; +INSERT INTO t4 VALUES (3, 1, "small, no pre-computed checksums"); +connection server_1; +BEGIN; +INSERT INTO t4 VALUES (4, 1, "big, no pre-computed checksums"); +set @@global.binlog_checksum = CRC32; +connection master; +INSERT INTO t4 VALUES (3, 2, "done"); +COMMIT; +connection server_1; +INSERT INTO t4 VALUES (4, 22, "done"); +COMMIT; +connection slave; +*** Test the --binlog-legacy-event-pos option. +connection master; +FLUSH BINARY LOGS; +BEGIN; +INSERT INTO t4 VALUES (5, 1, "Zero end_log_pos"); +COMMIT; +set @@global.binlog_legacy_event_pos= 1; +BEGIN; +INSERT INTO t4 VALUES (6, 1, "Non-zero end_log_pos"); +COMMIT; +set @@global.binlog_legacy_event_pos= 0; +connection slave; connection master; begin; delete from t1; delete from t2; delete from t3; commit; -drop table t1, t2, t3; +drop table t1, t2, t3, t4; set @@global.binlog_cache_size = @save_binlog_cache_size; set @@global.binlog_checksum = @save_binlog_checksum; set @@global.master_verify_checksum = @save_master_verify_checksum; diff --git a/mysql-test/suite/rpl/t/rpl_checksum_cache.test b/mysql-test/suite/rpl/t/rpl_checksum_cache.test index e04f618b81e..173af8c1d0b 100644 --- a/mysql-test/suite/rpl/t/rpl_checksum_cache.test +++ b/mysql-test/suite/rpl/t/rpl_checksum_cache.test @@ -243,6 +243,102 @@ let $diff_tables=master:test.t3, slave:test.t3; source include/diff_tables.inc; +--echo *** Test switching checksum algorithm while ongoing transactions have pre-computed checksum in their binlog cache *** + +--connection master +CREATE TABLE t4 (a INT, b INT, c VARCHAR(1024), PRIMARY KEY (a,b)) ENGINE=InnoDB; + +# Create a couple transactions that will precompute checksums but commit +# without them. + +BEGIN; +INSERT INTO t4 VALUES (1, 1, "small, pre-computed checksums"); + +--connection server_1 +BEGIN; +INSERT INTO t4 VALUES (2, 1, "big, pre-computed checksums"); +--let $i= 20 +--disable_query_log +while ($i) { + eval INSERT INTO t4 VALUES (2, 22-$i, REPEAT("x", FLOOR(RAND()*100) + 831)); + dec $i; +} +--enable_query_log + +# Disable checksums dynamically, so MYSQL_BIN_LOG::write_cache() will have +# to drop the pre-computed checksums. +set @@global.binlog_checksum = NONE; + +--connection master +INSERT INTO t4 VALUES (1, 2, "done"); +COMMIT; +--connection server_1 +INSERT INTO t4 VALUES (2, 22, "done"); +COMMIT; + +# Create a couple transactions that will not precompute checksums but commit +# with them. + +--connection master +BEGIN; +INSERT INTO t4 VALUES (3, 1, "small, no pre-computed checksums"); + +--connection server_1 +BEGIN; +INSERT INTO t4 VALUES (4, 1, "big, no pre-computed checksums"); +--let $i= 20 +--disable_query_log +while ($i) { + eval INSERT INTO t4 VALUES (4, 22-$i, REPEAT("x", FLOOR(RAND()*100) + 853)); + dec $i; +} +--enable_query_log + +# Ebable checksums dynamically, so MYSQL_BIN_LOG::write_cache() will have +# to recompute the checksums. +set @@global.binlog_checksum = CRC32; + +--connection master +INSERT INTO t4 VALUES (3, 2, "done"); +COMMIT; +--connection server_1 +INSERT INTO t4 VALUES (4, 22, "done"); +COMMIT; + +sync_slave_with_master; + + +--echo *** Test the --binlog-legacy-event-pos option. +--connection master +FLUSH BINARY LOGS; +--source include/wait_for_binlog_checkpoint.inc + +--let $query_file= query_get_value(SHOW MASTER STATUS, File, 1) +--let $query_pos= query_get_value(SHOW MASTER STATUS, Position, 1) +BEGIN; +INSERT INTO t4 VALUES (5, 1, "Zero end_log_pos"); +COMMIT; +--let $end_log_pos= query_get_value(SHOW BINLOG EVENTS IN "$query_file" FROM $query_pos LIMIT 3, End_log_pos, 2) +if ($end_log_pos!=0) { + eval SHOW BINLOG EVENTS IN "$query_file"; + --die Wrong End_log_pos=$end_log_pos, expected zero. +} + +set @@global.binlog_legacy_event_pos= 1; +--let $query_pos= query_get_value(SHOW MASTER STATUS, Position, 1) +BEGIN; +INSERT INTO t4 VALUES (6, 1, "Non-zero end_log_pos"); +COMMIT; +--let $end_log_pos= query_get_value(SHOW BINLOG EVENTS IN "$query_file" FROM $query_pos LIMIT 3, End_log_pos, 2) +if ($end_log_pos==0) { + eval SHOW BINLOG EVENTS IN "$query_file"; + --die Wrong End_log_pos=$end_log_pos, expected non-zero. +} +set @@global.binlog_legacy_event_pos= 0; + +sync_slave_with_master; + + connection master; begin; @@ -251,7 +347,7 @@ delete from t2; delete from t3; commit; -drop table t1, t2, t3; +drop table t1, t2, t3, t4; set @@global.binlog_cache_size = @save_binlog_cache_size; set @@global.binlog_checksum = @save_binlog_checksum; set @@global.master_verify_checksum = @save_master_verify_checksum; diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index d1a6d85f861..6d43640bf65 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -442,6 +442,16 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST MIXED,STATEMENT,ROW READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME BINLOG_LEGACY_EVENT_POS +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE BOOLEAN +VARIABLE_COMMENT Fill in the end_log_pos field of _all_ events in the binlog, even when doing so costs performance. Can be used in case some old application needs it for backwards compatibility. Setting this option can hurt binlog scalability. +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST OFF,ON +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL VARIABLE_NAME BINLOG_OPTIMIZE_THREAD_SCHEDULING VARIABLE_SCOPE GLOBAL VARIABLE_TYPE BOOLEAN diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index 4622b68646e..5d2863bca1c 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -74,6 +74,40 @@ int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) DBUG_RETURN(my_b_copy_to_file(cache, file, SIZE_T_MAX)); } +/** + Similar to above my_b_copy_to_file(), but destination is another IO_CACHE. +*/ +int +my_b_copy_to_cache(IO_CACHE *from_cache, IO_CACHE *to_cache, + size_t count) +{ + size_t curr_write, bytes_in_cache; + DBUG_ENTER("my_b_copy_to_cache"); + + bytes_in_cache= my_b_bytes_in_cache(from_cache); + do + { + curr_write= MY_MIN(bytes_in_cache, count); + if (my_b_write(to_cache, from_cache->read_pos, curr_write)) + DBUG_RETURN(1); + + from_cache->read_pos += curr_write; + count -= curr_write; + } while (count && (bytes_in_cache= my_b_fill(from_cache))); + if(from_cache->error == -1) + DBUG_RETURN(1); + DBUG_RETURN(0); +} + +int my_b_copy_all_to_cache(IO_CACHE *from_cache, IO_CACHE *to_cache) +{ + DBUG_ENTER("my_b_copy_all_to_cache"); + /* Reinit the cache to read from the beginning of the cache */ + if (reinit_io_cache(from_cache, READ_CACHE, 0L, FALSE, FALSE)) + DBUG_RETURN(1); + DBUG_RETURN(my_b_copy_to_cache(from_cache, to_cache, SIZE_T_MAX)); +} + my_off_t my_b_append_tell(IO_CACHE* info) { /* diff --git a/sql/log.cc b/sql/log.cc index 1ab90389a37..dfed8265a69 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -107,6 +107,15 @@ static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; static my_bool opt_optimize_thread_scheduling= TRUE; +/* + The binlog_checksum_options value is accessed protected under LOCK_log. As + the checksum option used must be consistent across an entire binlog file, + and log rotation is needed whenever this is changed. + + As an exception, event checksums are precomputed using a non-locked read + of binlog_checksum_options. Thus updates to this variable must be atomic, + with relaxed semantics. +*/ ulong binlog_checksum_options; #ifndef DBUG_OFF ulong opt_binlog_dbug_fsync_sleep= 0; @@ -275,12 +284,22 @@ void make_default_log_name(char **out, const char* log_ext, bool once) class binlog_cache_data { public: - binlog_cache_data(): m_pending(0), status(0), - before_stmt_pos(MY_OFF_T_UNDEF), - incident(FALSE), + binlog_cache_data(bool precompute_checksums_) : m_pending(0), status(0), + before_stmt_pos(MY_OFF_T_UNDEF), incident(FALSE), + precompute_checksums(precompute_checksums_), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), ptr_binlog_cache_disk_use(0) - { } + { + /* + Read the current checksum setting. We will use this setting to decide + whether to pre-compute checksums in the cache. Then when writing the cache + to the actual binlog, another check will be made and checksums recomputed + in the unlikely case that the setting changed meanwhile. + */ + checksum_opt= !precompute_checksums_ ? (uchar)BINLOG_CHECKSUM_ALG_OFF : + (uchar)my_atomic_loadul_explicit(&binlog_checksum_options, + MY_MEMORY_ORDER_RELAXED); + } ~binlog_cache_data() { @@ -332,6 +351,9 @@ class binlog_cache_data bool truncate_file= (cache_log.file != -1 && my_b_write_tell(&cache_log) > CACHE_FILE_TRUNC_SIZE); truncate(0,1); // Forget what's in cache + checksum_opt= !precompute_checksums ? (uchar)BINLOG_CHECKSUM_ALG_OFF : + (uchar)my_atomic_loadul_explicit(&binlog_checksum_options, + MY_MEMORY_ORDER_RELAXED); if (!cache_was_empty) compute_statistics(); if (truncate_file) @@ -435,6 +457,17 @@ class binlog_cache_data */ bool incident; + /* Whether the caller requested precomputing checksums. */ + bool precompute_checksums; + +public: + /* + The algorithm (if any) used to pre-compute checksums in the cache. + Initialized from binlog_checksum_options when the cache is reset. + */ + uchar checksum_opt; + +private: /** This function computes binlog cache and disk usage. */ @@ -508,6 +541,37 @@ void Log_event_writer::set_incident() } +/** + Select if and how to write checksum for an event written to the binlog. + + - When writing directly to the binlog, the user-configured checksum option + is used. + - When writing to a transaction or statement cache, we have + binlog_cache_data that contains the checksum option to use (pre-computed + checksums). + - Otherwise, no checksum used. +*/ +enum enum_binlog_checksum_alg +Log_event::select_checksum_alg(const binlog_cache_data *data) +{ + if (cache_type == Log_event::EVENT_NO_CACHE) + { + DBUG_ASSERT(!data); + /* + When we're selecting the checksum algorithm to write directly to the + actual binlog, we must be holding the LOCK_log, otherwise the checksum + configuration could change just after we read it. + */ + mysql_mutex_assert_owner(mysql_bin_log.get_log_lock()); + return (enum enum_binlog_checksum_alg)binlog_checksum_options; + } + else if (data) + return (enum enum_binlog_checksum_alg)data->checksum_opt; + else + return BINLOG_CHECKSUM_ALG_OFF; +} + + class binlog_cache_mngr { public: binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size, @@ -515,8 +579,10 @@ class binlog_cache_mngr { ulong *param_ptr_binlog_stmt_cache_use, ulong *param_ptr_binlog_stmt_cache_disk_use, ulong *param_ptr_binlog_cache_use, - ulong *param_ptr_binlog_cache_disk_use) - : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0) + ulong *param_ptr_binlog_cache_disk_use, + bool precompute_checksums) + : stmt_cache(precompute_checksums), trx_cache(precompute_checksums), + last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0) { stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, param_ptr_binlog_stmt_cache_use, @@ -5558,7 +5624,8 @@ int MYSQL_BIN_LOG::new_file_impl() { DBUG_ASSERT(!is_relay_log); DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset); - binlog_checksum_options= checksum_alg_reset; + my_atomic_storeul_explicit(&binlog_checksum_options, checksum_alg_reset, + MY_MEMORY_ORDER_RELAXED); } /* Note that at this point, log_state != LOG_CLOSED @@ -5634,19 +5701,19 @@ int MYSQL_BIN_LOG::new_file_impl() bool MYSQL_BIN_LOG::write_event(Log_event *ev, binlog_cache_data *data, IO_CACHE *file) { - return write_event(ev, ev->select_checksum_alg(), data, file); + return write_event(ev, ev->select_checksum_alg(data), data, file); } bool MYSQL_BIN_LOG::write_event(Log_event *ev) { - return write_event(ev, ev->select_checksum_alg(), 0, &log_file); + return write_event(ev, ev->select_checksum_alg(NULL), 0, &log_file); } bool MYSQL_BIN_LOG::write_event(Log_event *ev, enum enum_binlog_checksum_alg checksum_alg, binlog_cache_data *cache_data, IO_CACHE *file) { - Log_event_writer writer(file, 0, checksum_alg, &crypto); + Log_event_writer writer(file, cache_data, checksum_alg, &crypto); if (crypto.scheme && file == &log_file) { writer.ctx= alloca(crypto.ctx_size); @@ -5953,13 +6020,22 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() } thd_set_ha_data(this, binlog_hton, cache_mngr); + /* + Don't attempt to precompute checksums if: + - Disabled by user request, --binlog-legacy-event-pos + - Binlog is encrypted, cannot use precomputed checksums + - WSREP/Galera. + */ + bool precompute_checksums= + !WSREP_NNULL(this) && !encrypt_binlog && !opt_binlog_legacy_event_pos; cache_mngr= new (cache_mngr) binlog_cache_mngr(max_binlog_stmt_cache_size, max_binlog_cache_size, &binlog_stmt_cache_use, &binlog_stmt_cache_disk_use, &binlog_cache_use, - &binlog_cache_disk_use); + &binlog_cache_disk_use, + precompute_checksums); DBUG_RETURN(cache_mngr); } @@ -6287,7 +6363,8 @@ bool THD::binlog_write_table_map(TABLE *table, bool with_annotate) binlog_cache_data *cache_data= (cache_mngr-> get_binlog_cache_data(is_transactional)); IO_CACHE *file= &cache_data->cache_log; - Log_event_writer writer(file, cache_data, the_event.select_checksum_alg(), NULL); + Log_event_writer writer(file, cache_data, + the_event.select_checksum_alg(cache_data), NULL); if (with_annotate) if (binlog_write_annotated_row(&writer)) @@ -6442,7 +6519,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (Rows_log_event* pending= cache_data->pending()) { Log_event_writer writer(&cache_data->cache_log, cache_data, - pending->select_checksum_alg(), NULL); + pending->select_checksum_alg(cache_data), NULL); /* Write pending event to the cache. @@ -7502,22 +7579,37 @@ uint MYSQL_BIN_LOG::next_file_id() events prior to fill in the binlog cache. */ -int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) +int MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data) { DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); - + IO_CACHE *cache= &cache_data->cache_log; mysql_mutex_assert_owner(&LOCK_log); + + /* + If possible, just copy the cache over byte-by-byte with pre-computed + checksums. + */ + if (likely(binlog_checksum_options == cache_data->checksum_opt) && + likely(!crypto.scheme) && + likely(!opt_binlog_legacy_event_pos)) + { + int res= my_b_copy_all_to_cache(cache, &log_file); + status_var_add(thd->status_var.binlog_bytes_written, my_b_tell(cache)); + DBUG_RETURN(res ? ER_ERROR_ON_WRITE : 0); + } + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) DBUG_RETURN(ER_ERROR_ON_WRITE); /* Amount of remaining bytes in the IO_CACHE read buffer. */ size_t length= my_b_bytes_in_cache(cache); size_t group; - size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t uchar header_buf[LOG_EVENT_HEADER_LEN]; Log_event_writer writer(&log_file, 0, (enum_binlog_checksum_alg)binlog_checksum_options, &crypto); uint checksum_len= writer.checksum_len; + bool precomputed_checksums= (cache_data->checksum_opt != BINLOG_CHECKSUM_ALG_OFF); + uint old_checksum_len= precomputed_checksums ? BINLOG_CHECKSUM_LEN : 0; int err= 0; if (crypto.scheme) @@ -7591,13 +7683,13 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) /* Adjust the length and end_log_pos appropriately. */ uint ev_len= uint4korr(header + EVENT_LEN_OFFSET); // netto len - DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN); - if (unlikely(ev_len < LOG_EVENT_HEADER_LEN)) + DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN + old_checksum_len); + if (unlikely(ev_len < LOG_EVENT_HEADER_LEN + old_checksum_len)) goto error_in_read; - int4store(header + EVENT_LEN_OFFSET, ev_len + checksum_len); - end_log_pos_inc += checksum_len; - size_t val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; - int4store(header + LOG_POS_OFFSET, val); + uint new_len= ev_len - old_checksum_len + checksum_len; + int4store(header + EVENT_LEN_OFFSET, new_len); + group+= new_len; + int4store(header + LOG_POS_OFFSET, group); /* Write the header to the binlog. */ if (writer.write_header(header, LOG_EVENT_HEADER_LEN)) @@ -7614,8 +7706,18 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) goto error_in_read; } uint chunk= std::min(ev_len, (uint)length); - if (writer.write_data(cache->read_pos, chunk)) - goto error_in_write; + /* + Any old precomputed checksum must _not_ be written here. Instead, it + must be discarded; the new checksum, if needed, is written by + writer.write_footer(). + */ + if (ev_len > old_checksum_len) + { + uint bytes_to_skip= + old_checksum_len - std::min(old_checksum_len, ev_len - chunk); + if (writer.write_data(cache->read_pos, chunk - bytes_to_skip)) + goto error_in_write; + } cache->read_pos+= chunk; length-= chunk; ev_len-= chunk; @@ -8745,7 +8847,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_RETURN(ER_ERROR_ON_WRITE); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && - write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) + write_cache(entry->thd, mngr->get_binlog_cache_data(FALSE))) { entry->error_cache= &mngr->stmt_cache.cache_log; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -8756,7 +8858,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_EXECUTE_IF("crash_before_writing_xid", { if ((write_cache(entry->thd, - mngr->get_binlog_cache_log(TRUE)))) + mngr->get_binlog_cache_data(TRUE)))) DBUG_PRINT("info", ("error writing binlog cache")); else flush_and_sync(0); @@ -8765,7 +8867,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_SUICIDE(); }); - if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE))) + if (write_cache(entry->thd, mngr->get_binlog_cache_data(TRUE))) { entry->error_cache= &mngr->trx_cache.cache_log; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -11383,6 +11485,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, char binlog_checkpoint_name[FN_REFLEN]; bool binlog_checkpoint_found; IO_CACHE log; + IO_CACHE *cur_log; File file= -1; const char *errmsg; #ifdef HAVE_REPLICATION @@ -11429,12 +11532,16 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, */ binlog_checkpoint_found= false; + cur_log= first_log; for (round= 1;;) { - while ((ev= Log_event::read_log_event(round == 1 ? first_log : &log, - fdle, opt_master_verify_checksum)) + while ((ev= Log_event::read_log_event(cur_log, fdle, + opt_master_verify_checksum)) && ev->is_valid()) { +#ifdef HAVE_REPLICATION + my_off_t end_pos= my_b_tell(cur_log); +#endif enum Log_event_type typ= ev->get_type_code(); switch (typ) { @@ -11451,7 +11558,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, member->decided_to_commit= true; } #else - if (ctx.decide_or_assess(member, round, fdle, linfo, ev->log_pos)) + if (ctx.decide_or_assess(member, round, fdle, linfo, end_pos)) goto err2; #endif } @@ -11552,11 +11659,12 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, goto err2; ctx.last_gtid_valid= false; } - ctx.prev_event_pos= ev->log_pos; + ctx.prev_event_pos= end_pos; #endif delete ev; ev= NULL; } // end of while + cur_log= &log; /* If the last binlog checkpoint event points to an older log, we have to @@ -11813,7 +11921,8 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, } else { - binlog_checksum_options= value; + my_atomic_storeul_explicit(&binlog_checksum_options, value, + MY_MEMORY_ORDER_RELAXED); } DBUG_ASSERT(binlog_checksum_options == value); mysql_bin_log.checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; diff --git a/sql/log.h b/sql/log.h index f02b20c12bf..91c406a71aa 100644 --- a/sql/log.h +++ b/sql/log.h @@ -821,7 +821,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG bool write_incident_already_locked(THD *thd); bool write_incident(THD *thd); void write_binlog_checkpoint_event_already_locked(const char *name, uint len); - int write_cache(THD *thd, IO_CACHE *cache); + int write_cache(THD *thd, binlog_cache_data *cache_data); void set_write_error(THD *thd, bool is_transactional); bool check_write_error(THD *thd); diff --git a/sql/log_event.h b/sql/log_event.h index 33f689c9330..5abc5fa0caf 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1456,7 +1456,7 @@ class Log_event bool write_footer(Log_event_writer *writer) { return writer->write_footer(); } - enum enum_binlog_checksum_alg select_checksum_alg(); + enum enum_binlog_checksum_alg select_checksum_alg(const binlog_cache_data *data); virtual bool write(Log_event_writer *writer) { diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index bee594291d6..468d28c389c 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -735,20 +735,6 @@ void Log_event::init_show_field_list(THD *thd, List<Item>* field_list) mem_root); } -/** - Select if and how to write checksum for an event written to the binlog. - It returns the actively configured binlog checksum option, unless the event - is being written to a cache (in which case the checksum, if any, is added - later when the cache is copied to the real binlog). -*/ -enum enum_binlog_checksum_alg Log_event::select_checksum_alg() -{ - if (cache_type == Log_event::EVENT_NO_CACHE) - return (enum_binlog_checksum_alg)binlog_checksum_options; - else - return BINLOG_CHECKSUM_ALG_OFF; -} - int Log_event_writer::write_internal(const uchar *pos, size_t len) { DBUG_ASSERT(!ctx || encrypt_or_write == &Log_event_writer::encrypt_and_write); @@ -901,11 +887,17 @@ bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length) change the position */ - if (is_artificial_event()) + if (is_artificial_event() || + cache_type == Log_event::EVENT_STMT_CACHE || + cache_type == Log_event::EVENT_TRANSACTIONAL_CACHE) { /* Artificial events are automatically generated and do not exist in master's binary log, so log_pos should be set to 0. + + Events written through transaction or statement cache have log_pos set + to 0 so that they can be copied directly to the binlog without having + to compute the real end_log_pos. */ log_pos= 0; } @@ -4937,7 +4929,7 @@ void Create_file_log_event::pack_info(Protocol *protocol) /** Create_file_log_event::do_apply_event() - Constructor for Create_file_log_event to intantiate an event + Constructor for Create_file_log_event to instantiate an event from the relay log on the slave. @retval diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 99717a2c058..0ddefe81856 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -762,6 +762,7 @@ char *relay_log_info_file, *report_user, *report_password, *report_host; char *opt_relay_logname = 0, *opt_relaylog_index_name=0; char *opt_logname, *opt_slow_logname, *opt_bin_logname; char *opt_binlog_index_name=0; +my_bool opt_binlog_legacy_event_pos= FALSE; diff --git a/sql/mysqld.h b/sql/mysqld.h index e99d5cb300c..113bc9112cb 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -173,6 +173,7 @@ extern ulong delay_key_write_options; extern char *opt_logname, *opt_slow_logname, *opt_bin_logname, *opt_relay_logname; extern char *opt_binlog_index_name; +extern my_bool opt_binlog_legacy_event_pos; extern char *opt_backup_history_logname, *opt_backup_progress_logname, *opt_backup_settings_name; extern const char *log_output_str; diff --git a/sql/privilege.h b/sql/privilege.h index 8e9b9a3748e..7356181975d 100644 --- a/sql/privilege.h +++ b/sql/privilege.h @@ -362,6 +362,9 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_BINLOG_COMMIT_WAIT_USEC= constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_BINLOG_ROW_METADATA= SUPER_ACL | BINLOG_ADMIN_ACL; +constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_BINLOG_LEGACY_EVENT_POS= + SUPER_ACL | BINLOG_ADMIN_ACL; + constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_EXPIRE_LOGS_DAYS= SUPER_ACL | BINLOG_ADMIN_ACL; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 6142c0bf077..eef6cd34043 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3518,6 +3518,19 @@ Sys_master_verify_checksum( GLOBAL_VAR(opt_master_verify_checksum), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +static Sys_var_on_access_global<Sys_var_mybool, + PRIV_SET_SYSTEM_GLOBAL_VAR_BINLOG_LEGACY_EVENT_POS> +Sys_binlog_legacy_event_pos( + "binlog_legacy_event_pos", + "Fill in the end_log_pos field of _all_ events in the binlog, even when " + "doing so costs performance. Can be used in case some old application needs " + "it for backwards compatibility. Setting this option can hurt binlog " + "scalability.", + GLOBAL_VAR(opt_binlog_legacy_event_pos), CMD_LINE(OPT_ARG), + DEFAULT(FALSE)); + + /* These names must match RPL_SKIP_XXX #defines in slave.h. */ static const char *replicate_events_marked_for_skip_names[]= { "REPLICATE", "FILTER_ON_SLAVE", "FILTER_ON_MASTER", 0 -- 2.30.2
On Fri, Aug 25, 2023 at 10:16 AM Kristian Nielsen via commits <commits@lists.mariadb.org> wrote:
Compute binlog checksums (when enabled) already when writing events into the statement or transaction caches, where before it was done when the caches are copied to the real binlog file. This moves the checksum computation outside of holding LOCK_log, improving scalabitily.
At stmt/trx cache write time, the final end_log_pos values are not known, so with this patch these will be set to 0.
[...]
Checksums cannot be pre-computed when binlog encryption is enabled, as encryption relies on correct end_log_pos to provide part of the nonce/IV. Checksum pre-computation is also disabled for WSREP/Galera.
Have you thought about a format change that would address these issues? In MDEV-14425 https://github.com/MariaDB/server/commit/685d958e38b825ad9829be311f26729cccf... I redesigned the InnoDB log file format in a way that allows checksums to be computed before the final position of the records (the log sequence number of LSN) is known. For encryption, an additional 64-bit nonce will be written; this currently is the LSN at the time the encryption was scheduled. Tablespace identifiers, page numbers and file names are not encrypted, because these were never fully encrypted in innodb_encrypt_tables=ON either. This allows the metadata to be used as initialization vectors, and also avoids the need for backup tools to know any encryption keys. Thanks to MDEV-27774, not only InnoDB computes checksums while not holding any mutex, but multiple threads can copy their mini-transactions to the shared log_sys.buf while holding a shared lock_sys.latch. An exclusive latch will only be needed during a log checkpoint when we need to guarantee that all changes up to a particular LSN have been written to the log buffer. This would not be possible without the format change that was implemented in MDEV-14425.
+/* Convenience macros since ulong is 32 or 64 bit depending on platform. */ +#if SIZEOF_LONG == 4 +#define my_atomic_storeul(P, D) my_atomic_store32((int32 volatile *)(P), (D))
In all currently supported MariaDB Server versions, C++11 is available. Is there a particular reason why you would not use std::atomic<size_t> here? Marko -- Marko Mäkelä, Lead Developer InnoDB MariaDB plc
Hi! Review of MDEV-31273: Precompute binlog checksums On Fri, Aug 25, 2023 at 10:16 AM Kristian Nielsen <knielsen@knielsen-hq.org> wrote:
Compute binlog checksums (when enabled) already when writing events into the statement or transaction caches, where before it was done when the caches are copied to the real binlog file. This moves the checksum computation outside of holding LOCK_log, improving scalabitily.
DBUG_RETURN(my_b_copy_to_cache(from_cache, to_cache, SIZE_T_MAX)); You could use from_cache->end_of_file instead of SIZE_T_MAX <cut> uchar checksum_opt; Wouldn't it be better to have this as an "enum_binlog_checksum_alg" to avoid some casts ?
} else if (data) return (enum enum_binlog_checksum_alg)data->checksum_opt; else return BINLOG_CHECKSUM_ALG_OFF; }
You can remove the else's <cut> ulong *param_ptr_binlog_cache_disk_use, bool precompute_checksums) : stmt_cache(precompute_checksums), trx_cache(precompute_checksums), last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0) { stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, param_ptr_binlog_stmt_cache_use, This code was a bit confusing as we first initialize stmt_cache and trx_cache with checksum and then we initialize them again in the next two lines. I understand why and this does not have to be changed, but it does look a bit strange. The other option would be to have set_binlog_cache_info() take checksum as an argument(). <cut> This one confused me a bit: Log_event_writer writer(file, 0, checksum_alg, &crypto); Log_event_writer writer(file, cache_data, checksum_alg, &crypto); Why this change? It may at least affect calls to: void Log_event_writer::add_status(enum_logged_status status) { if (likely(cache_data)) Fix (after discussions on slack): Remove the following lines in MYSQL_BIN_LOG::write_event(Log_event *ev, binlog_cache_data *cache_data, if (cache_data) cache_data->add_status(ev->logged_status()); <cut>
Don't attempt to precompute checksums if: - Disabled by user request, --binlog-legacy-event-pos - Binlog is encrypted, cannot use precomputed checksums - WSREP/Galera.
Why Galera? Would be good to explain this in the commit comment. <cut>
/* If possible, just copy the cache over byte-by-byte with pre-computed checksums. */ if (likely(binlog_checksum_options == cache_data->checksum_opt) && likely(!crypto.scheme) && likely(!opt_binlog_legacy_event_pos)) { int res= my_b_copy_all_to_cache(cache, &log_file); status_var_add(thd->status_var.binlog_bytes_written, my_b_tell(cache)); DBUG_RETURN(res ? ER_ERROR_ON_WRITE : 0); }
I was just wondering if this could sometimes be optimized to write directly to the real binlog file without another cache in between. <cut> bool precomputed_checksums= (cache_data->checksum_opt != BINLOG_CHECKSUM_ALG_OFF); uint old_checksum_len= precomputed_checksums ? BINLOG_CHECKSUM_LEN : 0; Why two variables when one can as easily do: uint old_checksum_len= ((cache_data->checksum_opt != BINLOG_CHECKSUM_ALG_OFF) ? BINLOG_CHECSUM_LEN : 0); In current write_cache() code, group is a bad name. Please rename 'group' to 'log_file_pos'. By the way, great that you removed the 'end_log_file_pos' variable, which also was a very confusing name related how it was used!
/* Any old precomputed checksum must _not_ be written here. Instead, it must be discarded; the new checksum, if needed, is written by writer.write_footer(). */ if (ev_len > old_checksum_len) { uint bytes_to_skip= old_checksum_len - std::min(old_checksum_len, ev_len - chunk); if (writer.write_data(cache->read_pos, chunk - bytes_to_skip)) goto error_in_write; }
The above is likely wrong as for long events we would execute the inner part multiple times, while there is only on checksum. As checksum is last in the event, why not just do 'even_len-= old_checksum' Before the loop to copy the event and then disregard 'old_checksum_len' bytes from the cache at the end of the loop? Anyway, to find bugs like this, we need to have a test case with events that are bigger than the IO_CACHE size for cache. Setting binlog_file_cache_size to 8192 (min value) should make it easy to test this. <cut>
binlog_checksum_options= value; my_atomic_storeul_explicit(&binlog_checksum_options, value, MY_MEMORY_ORDER_RELAXED); Atom is not needed as we have a lock on binlog.
...... Something totally different. I noticed in MYSQL_BIN_LOG::write_cache(): group= (size_t)my_b_tell(&log_file); val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; int4store(header + LOG_POS_OFFSET, val); The first log entry is probably ok, as we have done a rotate() event before. However the cache can have a lot of log_events (as part of a transaction). The end_log_pos after the first event can be wrong as it may not fit into 4 bytes. My understanding is that this is not a problem anymore as we are not using end_log_pos anymore. However I still wonder if rotate() should not only consider the current log file size but also the size of all events we plan to write to the log and do a rotate if the total new log file size > 4G. Regards, Monty
participants (3)
-
Kristian Nielsen
-
Marko Mäkelä
-
Michael Widenius