Preparatory patch for pre-computing binlog checksums outside of holding LOCK_log. The existing code for MYSQL_BIN_LOG::write_cache() was needlessly complex and very hard to understand and modify for handling the new case where pre-computed checksums are already present in the IO_CACHE. Greatly simplify the logic by replacing the (implicit) state machine with direct code that pulls the events one by one from the IO_CACHE. This removes a lot of state flags and avoids duplicate code for handling full vs. split headers. This also removes the need for the CacheWriter class. As a bonus, this fixes the bug that CacheWriter::write() was completely ignoring write errors. No other functional changes are done with this patch, only code cleanup. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- sql/log.cc | 237 ++++++++++++++++++++--------------------------------- 1 file changed, 89 insertions(+), 148 deletions(-) diff --git a/sql/log.cc b/sql/log.cc index bcb89e79dd0..1ab90389a37 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -7485,40 +7485,6 @@ uint MYSQL_BIN_LOG::next_file_id() return res; } -class CacheWriter: public Log_event_writer -{ -public: - size_t remains; - - CacheWriter(THD *thd_arg, IO_CACHE *file_arg, - enum enum_binlog_checksum_alg checksum_alg, - Binlog_crypt_data *cr) - : Log_event_writer(file_arg, 0, checksum_alg, cr), remains(0), thd(thd_arg), - first(true) - { - } - - ~CacheWriter() - { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); } - - int write(uchar* pos, size_t len) - { - DBUG_ENTER("CacheWriter::write"); - if (first) - write_header(pos, len); - else - write_data(pos, len); - - remains -= len; - if ((first= !remains)) - write_footer(); - DBUG_RETURN(0); - } -private: - THD *thd; - bool first; -}; - /* Write the contents of a cache to the binary log. @@ -7543,13 +7509,16 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) mysql_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) DBUG_RETURN(ER_ERROR_ON_WRITE); - size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; - size_t val; + /* Amount of remaining bytes in the IO_CACHE read buffer. */ + size_t length= my_b_bytes_in_cache(cache); + size_t group; size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t - uchar header[LOG_EVENT_HEADER_LEN]; - CacheWriter writer(thd, &log_file, - (enum_binlog_checksum_alg)binlog_checksum_options, - &crypto); + uchar header_buf[LOG_EVENT_HEADER_LEN]; + Log_event_writer writer(&log_file, 0, + (enum_binlog_checksum_alg)binlog_checksum_options, + &crypto); + uint checksum_len= writer.checksum_len; + int err= 0; if (crypto.scheme) { @@ -7575,128 +7544,100 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) */ group= (size_t)my_b_tell(&log_file); - hdr_offs= carry= 0; - - do + for (;;) { /* - if we only got a partial header in the last iteration, - get the other half now and process a full header. + Empty cache at an event boundary means we are done (but empty cache + elsewhere is an error). */ - if (unlikely(carry > 0)) - { - DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN); - size_t tail= LOG_EVENT_HEADER_LEN - carry; - - /* assemble both halves */ - memcpy(&header[carry], (char *)cache->read_pos, tail); - - uint32 len= uint4korr(header + EVENT_LEN_OFFSET); - writer.remains= len; - - /* fix end_log_pos */ - end_log_pos_inc += writer.checksum_len; - val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; - int4store(header + LOG_POS_OFFSET, val); - - /* fix len */ - len+= writer.checksum_len; - int4store(header + EVENT_LEN_OFFSET, len); - - if (writer.write(header, LOG_EVENT_HEADER_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); + if (length == 0 && + (length= my_b_fill(cache)) == 0) + break; - cache->read_pos+= tail; - length-= tail; - carry= 0; + DBUG_EXECUTE_IF("fail_binlog_write_1", + { + errno= 28; + goto error_in_write; + }); - /* next event header at ... */ - hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len; + /* Get a full header, using local buffer if split in the IO_CACHE. */ + uchar *header; + if (likely(length > LOG_EVENT_HEADER_LEN)) + { + header= cache->read_pos; + cache->read_pos+= LOG_EVENT_HEADER_LEN; + length-= LOG_EVENT_HEADER_LEN; } - - /* if there is anything to write, process it. */ - - if (likely(length > 0)) + else { - DBUG_EXECUTE_IF("fail_binlog_write_1", - errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); - /* - process all event-headers in this (partial) cache. - if next header is beyond current read-buffer, - we'll get it later (though not necessarily in the - very next iteration, just "eventually"). - */ - - if (hdr_offs >= length) + size_t sofar= length; + size_t remain= LOG_EVENT_HEADER_LEN - sofar; + header= &header_buf[0]; + memcpy(header, cache->read_pos, sofar); + cache->read_pos+= sofar; + do { - if (writer.write(cache->read_pos, length)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - } - - while (hdr_offs < length) + length= my_b_fill(cache); + if (!length) + goto error_in_read; + size_t chunk= std::min(length, remain); + memcpy(header + sofar, cache->read_pos, chunk); + sofar+= chunk; + remain-= chunk; + cache->read_pos+= chunk; + length-= chunk; + } while (unlikely(remain > 0)); + } + + /* Adjust the length and end_log_pos appropriately. */ + uint ev_len= uint4korr(header + EVENT_LEN_OFFSET); // netto len + DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN); + if (unlikely(ev_len < LOG_EVENT_HEADER_LEN)) + goto error_in_read; + int4store(header + EVENT_LEN_OFFSET, ev_len + checksum_len); + end_log_pos_inc += checksum_len; + size_t val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; + int4store(header + LOG_POS_OFFSET, val); + + /* Write the header to the binlog. */ + if (writer.write_header(header, LOG_EVENT_HEADER_LEN)) + goto error_in_write; + ev_len-= LOG_EVENT_HEADER_LEN; + + /* Write the rest of the event. */ + while (ev_len > 0) + { + if (length == 0) { - /* - finish off with remains of the last event that crawls - from previous into the current buffer - */ - if (writer.remains != 0) - { - if (writer.write(cache->read_pos, hdr_offs)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - } - - /* - partial header only? save what we can get, process once - we get the rest. - */ - if (hdr_offs + LOG_EVENT_HEADER_LEN > length) - { - carry= length - hdr_offs; - memcpy(header, (char *)cache->read_pos + hdr_offs, carry); - length= hdr_offs; - } - else - { - /* we've got a full event-header, and it came in one piece */ - uchar *ev= (uchar *)cache->read_pos + hdr_offs; - uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len - uchar *log_pos= ev + LOG_POS_OFFSET; - - end_log_pos_inc += writer.checksum_len; - /* fix end_log_pos */ - val= uint4korr(log_pos) + group + end_log_pos_inc; - int4store(log_pos, val); - - /* fix length */ - int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len); - - writer.remains= ev_len; - if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs))) - DBUG_RETURN(ER_ERROR_ON_WRITE); + length= my_b_fill(cache); + if (!length) + goto error_in_read; + } + uint chunk= std::min(ev_len, (uint)length); + if (writer.write_data(cache->read_pos, chunk)) + goto error_in_write; + cache->read_pos+= chunk; + length-= chunk; + ev_len-= chunk; + } - /* next event header at ... */ - hdr_offs += ev_len; // incr by the netto len + if (writer.write_footer()) + goto error_in_write; - DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length); - } - } + } + goto end; // All OK - /* - Adjust hdr_offs. Note that it may still point beyond the segment - read in the next iteration; if the current event is very long, - it may take a couple of read-iterations (and subsequent adjustments - of hdr_offs) for it to point into the then-current segment. - If we have a split header (!carry), hdr_offs will be set at the - beginning of the next iteration, overwriting the value we set here: - */ - hdr_offs -= length; - } - } while ((length= my_b_fill(cache))); +error_in_write: + err= ER_ERROR_ON_WRITE; + goto end; - DBUG_ASSERT(carry == 0); - DBUG_ASSERT(!writer.checksum_len || writer.remains == 0); +error_in_read: + err= ER_ERROR_ON_READ; + goto end; - DBUG_RETURN(0); // All OK +end: + status_var_add(thd->status_var.binlog_bytes_written, writer.bytes_written); + DBUG_RETURN(err); } /* -- 2.30.2