[Commits] 9b453c1ebf4: MDEV-10963 Fragmented BINLOG query
revision-id: 9b453c1ebf47a20ea1436de3284f8810d0e64c4c (mariadb-10.1.34-8-g9b453c1ebf4) parent(s): c09a8b5b36edb494e2bcc93074c06e26cd9f2b92 author: Andrei Elkin committer: Andrei Elkin timestamp: 2018-09-07 20:36:16 +0300 message: MDEV-10963 Fragmented BINLOG query The problem was originally stated in http://bugs.mysql.com/bug.php?id=82212 The size of an base64-encoded Rows_log_event exceeds its vanilla byte representation in 4/3 times. When a binlogged event size is about 1GB mysqlbinlog generates a BINLOG query that can't be send out due to its size. It is fixed with fragmenting the BINLOG argument C-string into (approximate) halves when the base64 encoded event is over 1GB size. The mysqlbinlog in such case puts out SET @binlog_fragment_0='base64-encoded-fragment_0'; SET @binlog_fragment_1='base64-encoded-fragment_1'; BINLOG 2, 'binlog_fragment'; to represent a big BINLOG 'base64-encoded-"total"'. Two more statements are composed to promptly release memory SET @binlog_fragment_0=NULL; SET @binlog_fragment_1=NULL; The 2 fragments are enough, though the client and server still may need to tweak their @@max_allowed_packet to satisfy to the fragment size (which they would have to do anyway with greater number of fragments, should that be desired). On the lower level the following changes are made: Log_event::print_base64() remains to call encoder and store the encoded data into a cache but now *without* doing any formatting. The latter is left for time when the cache is copied to an output file (e.g mysqlbinlog output). No formatting behavior is also reflected by the change in the meaning of the last argument which specifies whether to cache the encoded data. my_b_copy_to_file() is turned into my_b_copy_to_file_frag() which accepts format specifier arguments to build a proper syntax BINLOG query in both the fragmented (n_frag > 1) and non-fragmented (n_frag == 1) cases. Rows_log_event::print_helper() Takes decision whether to fragment, prepares respective format specifiers and invokes the cache-to-file copying function, which is now copy_cache_frag_to_file_and_reinit() replaces original copy_event_cache_to_file_and_reinit() to pass extra arguments to my_b_copy_to_file() successor of my_b_copy_to_file_frag() replaces the former pure copier. With its 'n_frag' argument as 1 and the rest or args NULL works as the original function. --- client/mysqlbinlog.cc | 33 ++++- include/my_sys.h | 10 +- .../suite/binlog/r/binlog_base64_flag.result | 15 +++ .../binlog/r/binlog_mysqlbinlog_row_frag.result | 27 ++++ mysql-test/suite/binlog/t/binlog_base64_flag.test | 21 ++++ .../binlog/t/binlog_mysqlbinlog_row_frag.test | 45 +++++++ mysys/mf_iocache2.c | 135 +++++++++++++++++--- sql/log_event.cc | 136 ++++++++++++++++++--- sql/log_event.h | 28 ++++- sql/log_event_old.cc | 70 ++++++++++- sql/sql_binlog.cc | 105 ++++++++++++++-- sql/sql_lex.cc | 3 +- sql/sql_lex.h | 1 + sql/sql_yacc.yy | 13 +- unittest/sql/mf_iocache-t.cc | 52 +++++++- 15 files changed, 633 insertions(+), 61 deletions(-) diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 9753125dd67..f0689631cfb 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -56,7 +56,13 @@ Rpl_filter *binlog_filter= 0; #define BIN_LOG_HEADER_SIZE 4 #define PROBE_HEADER_LEN (EVENT_LEN_OFFSET+4) - +/* + 2 fragments can always represent near 1GB row-based base64-encoded event as + two strings each of size less than max(max_allowed_packet). + Bigger number of fragments does not safe from potential need to tune (increase) + @@max_allowed_packet before to process the fragments. So 2 is safe and enough. +*/ +#define BINLOG_ROWS_EVENT_ENCODED_FRAGMENTS 2 #define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES) @@ -71,6 +77,9 @@ ulong bytes_sent = 0L, bytes_received = 0L; ulong mysqld_net_retry_count = 10L; ulong open_files_limit; ulong opt_binlog_rows_event_max_size; +#ifndef DBUG_OFF +ulong opt_binlog_rows_event_max_encoded_size; +#endif uint test_flags = 0; static uint opt_protocol= 0; static FILE *result_file; @@ -813,7 +822,14 @@ write_event_header_and_base64(Log_event *ev, FILE *result_file, /* Write header and base64 output to cache */ ev->print_header(head, print_event_info, FALSE); - ev->print_base64(body, print_event_info, FALSE); + + /* the assert states the only current use case for the function */ + DBUG_ASSERT(print_event_info->base64_output_mode == + BASE64_OUTPUT_ALWAYS); + + ev->print_base64(body, print_event_info, + print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS); /* Read data from cache and write to result file */ if (copy_event_cache_to_file_and_reinit(head, result_file) || @@ -852,7 +868,9 @@ static bool print_base64(PRINT_EVENT_INFO *print_event_info, Log_event *ev) return 1; } ev->print(result_file, print_event_info); - return print_event_info->head_cache.error == -1; + return + print_event_info->head_cache.error == -1 || + print_event_info->body_cache.error == -1; } @@ -1472,6 +1490,15 @@ that may lead to an endless loop.", "This value must be a multiple of 256.", &opt_binlog_rows_event_max_size, &opt_binlog_rows_event_max_size, 0, GET_ULONG, REQUIRED_ARG, UINT_MAX, 256, ULONG_MAX, 0, 256, 0}, +#ifndef DBUG_OFF + {"binlog-row-event-max-encoded-size", 0, + "The maximum size of base64-encoded rows-event in one BINLOG pseudo-query " + "instance. When the computed actual size exceeds the limit " + "the BINLOG's argument string is fragmented in two.", + &opt_binlog_rows_event_max_encoded_size, + &opt_binlog_rows_event_max_encoded_size, 0, + GET_ULONG, REQUIRED_ARG, UINT_MAX/4, 256, ULONG_MAX, 0, 256, 0}, +#endif {"verify-binlog-checksum", 'c', "Verify checksum binlog events.", (uchar**) &opt_verify_binlog_checksum, (uchar**) &opt_verify_binlog_checksum, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, diff --git a/include/my_sys.h b/include/my_sys.h index 110a2ee9af3..1d0088e6698 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -602,7 +602,15 @@ static inline size_t my_b_bytes_in_cache(const IO_CACHE *info) return *info->current_end - *info->current_pos; } -int my_b_copy_to_file(IO_CACHE *cache, FILE *file); +int +my_b_copy_to_file_frag(IO_CACHE *cache, FILE *file, + uint n_frag, + const char* before_frag, + const char* after_frag, + const char* after_last, + const char* after_last_per_frag, + char* buf); + my_off_t my_b_append_tell(IO_CACHE* info); my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */ int my_b_pread(IO_CACHE *info, uchar *Buffer, size_t Count, my_off_t pos); diff --git a/mysql-test/suite/binlog/r/binlog_base64_flag.result b/mysql-test/suite/binlog/r/binlog_base64_flag.result index d13e13c97b0..12b369680c6 100644 --- a/mysql-test/suite/binlog/r/binlog_base64_flag.result +++ b/mysql-test/suite/binlog/r/binlog_base64_flag.result @@ -28,6 +28,21 @@ a 1 1 3 +DELETE FROM t1 WHERE a=3; +BINLOG ' +ODdYRw8BAAAAZgAAAGoAAAABAAQANS4xLjIzLXJjLWRlYnVnLWxvZwAAAAAAAAAAAAAAAAAAAAAA +AAAAAAAAAAAAAAAAAAA4N1hHEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC +'; +SET @binlog_format_0=' +TFtYRxMBAAAAKQAAAH8BAAAAABAAAAAAAAAABHRlc3QAAnQxAAEDAAE= +TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA== +'; +BINLOG 1, 'binlog_format'; +select * from t1; +a +1 +1 +3 ==== Test --base64-output=never on a binlog with row events ==== /*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; /*!40019 SET @@session.max_insert_delayed_threads=0*/; diff --git a/mysql-test/suite/binlog/r/binlog_mysqlbinlog_row_frag.result b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_row_frag.result new file mode 100644 index 00000000000..d4037ff7983 --- /dev/null +++ b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_row_frag.result @@ -0,0 +1,27 @@ +CREATE TABLE t (a TEXT); +RESET MASTER; +INSERT INTO t SET a=repeat('a', 1024); +SELECT a from t into @a; +FLUSH LOGS; +DELETE FROM t; +SELECT a LIKE @a as 'true' FROM t; +true +1 +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL'; +@binlog_fragment_0 NULL +NULL NULL +BINLOG number-of-fragments must be greater than 0 +BINLOG 0, 'binlog_fragment'; +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near ''binlog_fragment'' at line 1 +BINLOG -1, 'binlog_fragment'; +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '-1, 'binlog_fragment'' at line 1 +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_2='012345'; +BINLOG 2, 'binlog_fragment'; +ERROR HY000: Decoding of base64 string failed: BINLOG fragment user variable 'binlog_fragment_1' is unexpectedly empty +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_1='012345'; +BINLOG 2, 'binlog_fragment'; +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use +# Cleanup +DROP TABLE t; diff --git a/mysql-test/suite/binlog/t/binlog_base64_flag.test b/mysql-test/suite/binlog/t/binlog_base64_flag.test index f8333315088..a1da28a8892 100644 --- a/mysql-test/suite/binlog/t/binlog_base64_flag.test +++ b/mysql-test/suite/binlog/t/binlog_base64_flag.test @@ -67,6 +67,27 @@ TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA== # The above line should succeed and 3 should be in the table select * from t1; +# The same as above with one-fragment BINLOG to prove +# BINLOG 'base64-encoded-data' is equivalent to the pair of +# SET @uservar='base64-encoded-data'; +# BINLOG 1, @uservar; +DELETE FROM t1 WHERE a=3; +# This is a binlog statement containing a Format_description_log_event +# from the same version as the Table_map and Write_rows_log_event. +BINLOG ' +ODdYRw8BAAAAZgAAAGoAAAABAAQANS4xLjIzLXJjLWRlYnVnLWxvZwAAAAAAAAAAAAAAAAAAAAAA +AAAAAAAAAAAAAAAAAAA4N1hHEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC +'; + +# This is a Table_map_log_event+Write_rows_log_event corresponding to: +# INSERT INTO TABLE test.t1 VALUES (3) +SET @binlog_format_0=' +TFtYRxMBAAAAKQAAAH8BAAAAABAAAAAAAAAABHRlc3QAAnQxAAEDAAE= +TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA== +'; +BINLOG 1, 'binlog_format'; +# The above line should succeed and 3 should be in the table +select * from t1; # Test that mysqlbinlog stops with an error message when the # --base64-output=never flag is used on a binlog with base64 events. diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test new file mode 100644 index 00000000000..81f4ddad1c1 --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test @@ -0,0 +1,45 @@ +--source include/have_debug.inc +--source include/have_log_bin.inc +--source include/have_binlog_format_row.inc + +--let $MYSQLD_DATADIR= `select @@datadir` +--let $max_size=1024 + +CREATE TABLE t (a TEXT); +# events of interest are guaranteed to stay in 000001 log +RESET MASTER; +--eval INSERT INTO t SET a=repeat('a', $max_size) +SELECT a from t into @a; +FLUSH LOGS; +DELETE FROM t; + +--exec $MYSQL_BINLOG --binlog-row-event-max-encoded-size=256 $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +SELECT a LIKE @a as 'true' FROM t; +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL'; + +# improper syntax error +--echo BINLOG number-of-fragments must be greater than 0 +--error ER_PARSE_ERROR +BINLOG 0, 'binlog_fragment'; +--error ER_PARSE_ERROR +BINLOG -1, 'binlog_fragment'; + +# lost fragment error check +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_2='012345'; +--error ER_BASE64_DECODE_ERROR +BINLOG 2, 'binlog_fragment'; + +# corrupted fragments error check (to the expected error code notice, +# the same error code occurs in a similar unfragmented case) +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_1='012345'; +--error ER_SYNTAX_ERROR +BINLOG 2, 'binlog_fragment'; + +--echo # Cleanup +DROP TABLE t; + diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index 2499094037d..ffc5458ed15 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -22,13 +22,23 @@ #include <stdarg.h> #include <m_ctype.h> -/* +/** Copy contents of an IO_CACHE to a file. SYNOPSIS - my_b_copy_to_file() - cache IO_CACHE to copy from - file File to copy to + my_b_copy_to_file_frag + + cache IO_CACHE to copy from + file File to copy to + n_frag # of fragments + + Other arguments represent format strings to enable wrapping + of the fragments and total, including + + before_frag before a fragment + after_frag after a fragment + after_last_frag after all the fragments + after_last_per_frag as the last items per each fragment DESCRIPTION Copy the contents of the cache to the file. The cache will be @@ -38,33 +48,120 @@ If a failure to write fully occurs, the cache is only copied partially. - TODO - Make this function solid by handling partial reads from the cache - in a correct manner: it should be atomic. + The copying is made in so many steps as the number of fragments as + specified by the parameter 'n_frag'. Each step is wrapped with + writing to the file 'before_frag' and 'after_frag' formated + strings, unless the parameters are NULL. In the end, optionally, + first 'after_last_frag' string is appended to 'file' followed by + 'after_last_per_frag' per each fragment. + final item. RETURN VALUE 0 All OK 1 An error occurred + + TODO + Make this function solid by handling partial reads from the cache + in a correct manner: it should be atomic. */ int -my_b_copy_to_file(IO_CACHE *cache, FILE *file) +my_b_copy_to_file_frag(IO_CACHE *cache, FILE *file, + uint n_frag, + const char* before_frag, + const char* after_frag, + const char* after_last, + const char* after_last_per_frag, + char* buf) { - size_t bytes_in_cache; - DBUG_ENTER("my_b_copy_to_file"); + size_t bytes_in_cache; // block, may have short size in the last one + size_t written_off_last_block; // consumed part of the block by last fragment + size_t total_size= my_b_tell(cache); + size_t frag_size= total_size / n_frag + 1; + size_t total_written= 0; + size_t frag_written; // bytes collected in the current fragment + uint i; + + DBUG_ENTER("my_b_copy_to_file_frag"); + + DBUG_ASSERT(cache->type == WRITE_CACHE); /* Reinit the cache to read from the beginning of the cache */ if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) DBUG_RETURN(1); - bytes_in_cache= my_b_bytes_in_cache(cache); - do + + for (i= 0, written_off_last_block= 0, bytes_in_cache= my_b_bytes_in_cache(cache); + i < n_frag; + i++, total_written += frag_written) { - if (my_fwrite(file, cache->read_pos, bytes_in_cache, - MYF(MY_WME | MY_NABP)) == (size_t) -1) - DBUG_RETURN(1); - } while ((bytes_in_cache= my_b_fill(cache))); - if(cache->error == -1) - DBUG_RETURN(1); - DBUG_RETURN(0); + frag_written= 0; + if (before_frag) + { + sprintf(buf, before_frag, i); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + do + { + /* + Either the current block is the last (L) in making the + current fragment and possibly has some extra not to fit (LG) into + the fragment, or (I) the current (whole then) block is + intermediate. + */ + size_t block_to_write= (frag_written + bytes_in_cache >= frag_size) ? + frag_size - frag_written : bytes_in_cache; + + DBUG_ASSERT(n_frag != 1 || + (block_to_write == bytes_in_cache && + written_off_last_block == 0)); + + if (my_fwrite(file, cache->read_pos + written_off_last_block, + block_to_write, + MYF(MY_WME | MY_NABP)) == (size_t) -1) + /* no cache->error is set here */ + DBUG_RETURN(1); + + frag_written += block_to_write; + if (frag_written == frag_size) // (L) + { + DBUG_ASSERT(block_to_write <= bytes_in_cache); + written_off_last_block= block_to_write; + bytes_in_cache -= written_off_last_block; // (LG) when bytes>0 + /* + Nothing should be left in cache at the end of the + last fragment composition. + */ + DBUG_ASSERT(i != n_frag - 1 || bytes_in_cache == 0); + + break; + } + else + { + written_off_last_block= 0; // (I) + } + } while ((bytes_in_cache= my_b_fill(cache))); + + if (after_frag) + { + sprintf(buf, after_frag, NULL); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + } + + DBUG_ASSERT(total_written == total_size); // output == input + + if (after_last) + { + sprintf(buf, after_last, n_frag); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + + for (i= 0; after_last_per_frag && i < n_frag ; i++) + { + sprintf(buf, after_last_per_frag, i); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + + DBUG_RETURN(cache->error == -1); } diff --git a/sql/log_event.cc b/sql/log_event.cc index 3ac7ac5a20f..319f34e214d 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2762,9 +2762,16 @@ void free_table_map_log_event(Table_map_log_event *event) delete event; } +/* + Encode the event, optionally per 'do_print_encoded_base64' store the result + into the argument cache; optionally per 'verbose' print into the cache + a verbose represenation of the event. + Note, no extra wrapping is done to the encoded data, like procuding a BINLOG + query. It's left for a routine that extracts from the cache. +*/ void Log_event::print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool more) + bool do_print_encoded_base64) { const uchar *ptr= (const uchar *)temp_buf; uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET); @@ -2783,17 +2790,9 @@ void Log_event::print_base64(IO_CACHE* file, DBUG_ASSERT(0); } - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) - { - if (my_b_tell(file) == 0) - my_b_write_string(file, "\nBINLOG '\n"); - + if (do_print_encoded_base64) my_b_printf(file, "%s\n", tmp_str); - if (!more) - my_b_printf(file, "'%s\n", print_event_info->delimiter); - } - if (print_event_info->verbose) { Rows_log_event *ev= NULL; @@ -4833,9 +4832,17 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && !print_event_info->short_form) { - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) + /* BINLOG is matched with the delimiter below on the same level */ + bool do_print_encoded_base64= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS; + if (do_print_encoded_base64) my_b_printf(&cache, "BINLOG '\n"); - print_base64(&cache, print_event_info, FALSE); + + print_base64(&cache, print_event_info, do_print_encoded_base64); + + if (do_print_encoded_base64) + my_b_printf(&cache, "'%s\n", print_event_info->delimiter); + print_event_info->printed_fd_event= TRUE; } DBUG_VOID_RETURN; @@ -10479,12 +10486,48 @@ void Rows_log_event::pack_info(Protocol *protocol) #endif #ifdef MYSQL_CLIENT +/* + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + BINLOG number_of_fragments,'user_var_name_prefix' + + where fragments are represented by a sequence of "indexed" user + variables. E.g when the above variable's name prefix is + 'binlog_fragment' and the number of fragments is 2 the fragmented + version is as the following: + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG 2, 'binlog_fragment'; + + Two more statements are composed as well + + SET @binlog_fragment_0=NULL; + SET @binlog_fragment_1=NULL; + + to promptly release memory. + + NOTE. + If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded_base64= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -10492,13 +10535,72 @@ void Rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded_base64); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + uint n_frag= 1; + const char* before_frag= NULL; + char* after_frag= NULL; + char* after_last= NULL; + char* after_last_per_frag= NULL; + const char fmt_last_frag[]= "\nBINLOG %%d, 'binlog_fragment'%s\n"; + const char fmt_last_per_frag[]= "\nSET @binlog_fragment_%%d = NULL%s\n"; + const char fmt_before_frag[]= "\nSET @binlog_fragment_%d ='\n"; + /* + Buffer to pass to copy_cache_frag_to_file_and_reinit to + compute formatted strings according to specifiers. + The sizes may depend on an actual fragment number size in terms of decimal + signs so its maximum is estimated (not precisely yet safely) below. + */ + char buf[(sizeof(fmt_last_frag) + sizeof(fmt_last_per_frag)) + + ((sizeof(n_frag) * 8)/3 + 1) // decimal index + + sizeof(print_event_info->delimiter + 3)]; // delim, \n and 0. + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + + if (do_print_encoded_base64) + { + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_frag, "'%s\n", print_event_info->delimiter); + if (my_b_tell(body) > +#ifndef DBUG_OFF + opt_binlog_rows_event_max_encoded_size +#else + MAX_MAX_ALLOWED_PACKET +#endif + ) + n_frag= BINLOG_ROWS_EVENT_ENCODED_FRAGMENTS; + if (n_frag > 1) + { + before_frag= fmt_before_frag; + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_last, fmt_last_frag, (char*) print_event_info->delimiter); + after_last_per_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_last_per_frag, fmt_last_per_frag, + (char*) print_event_info->delimiter); + } + else + { + before_frag= "\nBINLOG '\n"; + } + } + if (copy_cache_frag_to_file_and_reinit(body, file, n_frag, + before_frag, after_frag, + after_last, after_last_per_frag, buf)) + { + body->error= -1; + goto err; + } + +err: + my_free(after_frag); + my_free(after_last); + my_free(after_last_per_frag); } } #endif @@ -11357,7 +11459,9 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) m_dbnam, m_tblnam, m_table_id, ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? " (has triggers)" : "")); - print_base64(&print_event_info->body_cache, print_event_info, TRUE); + print_base64(&print_event_info->body_cache, print_event_info, + print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS); copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, file); } } diff --git a/sql/log_event.h b/sql/log_event.h index 90900f63533..ac675bcb5da 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1157,7 +1157,7 @@ class Log_event void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, bool is_more); void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool is_more); + bool do_print_encoded_base64); #endif /* read_log_event() functions read an event from a binlog or relay @@ -4895,11 +4895,33 @@ class Ignorable_log_event : public Log_event { static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file) { - return - my_b_copy_to_file(cache, file) || + return + my_b_copy_to_file_frag(cache, file, 1, NULL, NULL, NULL, NULL, NULL) || reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); } + +/** + Copying of 'cache' content to 'file' in steps of the number of + fragments as specified by 'n_frag'. Other arguments enables wrapping + of the fragments and total. See more in my_b_copy_to_file_frag() header comments. +*/ +inline bool copy_cache_frag_to_file_and_reinit(IO_CACHE *cache, + FILE *file, + uint n_frag, + const char* before_frag, + const char* after_frag, + const char* after_last, + const char* after_last_per_frag, + char* buf) +{ + return + my_b_copy_to_file_frag(cache, file, n_frag, before_frag, after_frag, + after_last, after_last_per_frag, buf) || + reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); +} + + #ifdef MYSQL_SERVER /***************************************************************************** diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index d2b4470bbf9..eb119cdc8bc 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1850,12 +1850,17 @@ void Old_rows_log_event::pack_info(Protocol *protocol) #ifdef MYSQL_CLIENT +/* Method duplicates Rows_log_event's one */ void Old_rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded_base64= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -1863,13 +1868,72 @@ void Old_rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded_base64); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + uint n_frag= 1; + const char* before_frag= NULL; + char* after_frag= NULL; + char* after_last= NULL; + char* after_last_per_frag= NULL; + const char fmt_last_frag[]= "\nBINLOG %%d, 'binlog_fragment'%s\n"; + const char fmt_last_per_frag[]= "\nSET @binlog_fragment_%%d = NULL%s\n"; + const char fmt_before_frag[]= "\nSET @binlog_fragment_%d ='\n"; + /* + Buffer to pass to copy_cache_frag_to_file_and_reinit to + compute formatted strings according to specifiers. + The sizes may depend on an actual fragment number size in terms of decimal + signs so its maximum is estimated (not precisely yet safely) below. + */ + char buf[(sizeof(fmt_last_frag) + sizeof(fmt_last_per_frag)) + + ((sizeof(n_frag) * 8)/3 + 1) // decimal index + + sizeof(print_event_info->delimiter + 3)]; // delim, \n and 0. + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + + if (do_print_encoded_base64) + { + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_frag, "'%s\n", print_event_info->delimiter); + if (my_b_tell(body) > +#ifndef DBUG_OFF + opt_binlog_rows_event_max_encoded_size +#else + MAX_MAX_ALLOWED_PACKET +#endif + ) + n_frag= BINLOG_ROWS_EVENT_ENCODED_FRAGMENTS; + if (n_frag > 1) + { + before_frag= fmt_before_frag; + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_last, fmt_last_frag, (char*) print_event_info->delimiter); + after_last_per_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_last_per_frag, fmt_last_per_frag, + (char*) print_event_info->delimiter); + } + else + { + before_frag= "\nBINLOG '\n"; + } + } + if (copy_cache_frag_to_file_and_reinit(body, file, n_frag, + before_frag, after_frag, + after_last, after_last_per_frag, buf)) + { + body->error= -1; + goto err; + } + +err: + my_free(after_frag); + my_free(after_last); + my_free(after_last_per_frag); } } #endif diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 91cf038907e..37789c5fe8f 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -53,14 +53,6 @@ void mysql_client_binlog_statement(THD* thd) if (check_global_access(thd, SUPER_ACL)) DBUG_VOID_RETURN; - size_t coded_len= thd->lex->comment.length; - if (!coded_len) - { - my_error(ER_SYNTAX_ERROR, MYF(0)); - DBUG_VOID_RETURN; - } - size_t decoded_len= base64_needed_decoded_length(coded_len); - /* option_bits will be changed when applying the event. But we don't expect it be changed permanently after BINLOG statement, so backup it first. @@ -81,7 +73,9 @@ void mysql_client_binlog_statement(THD* thd) int err; Relay_log_info *rli; rpl_group_info *rgi; - + char *buf= NULL; + size_t coded_len= 0, decoded_len= 0; + const char* name_buf; rli= thd->rli_fake; if (!rli) { @@ -102,15 +96,12 @@ void mysql_client_binlog_statement(THD* thd) rgi->thd= thd; const char *error= 0; - char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; /* Out of memory check */ - if (!(rli && - rli->relay_log.description_event_for_exec && - buf)) + if (!(rli && rli->relay_log.description_event_for_exec)) { my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */ goto end; @@ -119,6 +110,88 @@ void mysql_client_binlog_statement(THD* thd) rli->sql_driver_thd= thd; rli->no_storage= TRUE; + if (thd->lex->fragmented_binlog_event > 0) + { + /* + Copy fragments into the standard placeholder thd->lex->comment.str + and compute the size of the (still) encoded total. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + */ + const char *name_fmt= "%s_%d"; + name_buf= (char *) my_malloc(thd->lex->ident.length /* %s */ + 1 /* _ */ + + (sizeof(thd->lex-> + fragmented_binlog_event) * + (8/3 + 1)) /* %d */ + 1 /* 0 */, MYF(MY_WME)); + if (!name_buf) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } + + // In the first loop the total length is computed only. + thd->lex->comment.length= 0; + thd->lex->comment.str= NULL; + for (uint i= 0; i < thd->lex->fragmented_binlog_event; i++) + { + user_var_entry *entry; + + sprintf((char*) name_buf, name_fmt, thd->lex->ident.str, i); + entry= + (user_var_entry*) my_hash_search(&thd->user_vars, + (uchar*) name_buf, + strlen(name_buf)); + if (!entry || entry->type != STRING_RESULT) + { + my_printf_error(ER_BASE64_DECODE_ERROR, + "%s: BINLOG fragment user " + "variable '%s' is unexpectedly empty", MYF(0), + ER_THD(thd, ER_BASE64_DECODE_ERROR), name_buf); + goto end; + } + thd->lex->comment.length += entry->length; + } + + thd->lex->comment.str= (char *) my_malloc(thd->lex->comment.length, + MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } + + // In the 2nd the user var values are merged into allocated buf + size_t gathered_length= 0; + for (uint i= 0; i < thd->lex->fragmented_binlog_event; i++) + { + user_var_entry *entry; + + sprintf((char*) name_buf, name_fmt, thd->lex->ident.str, i); + entry= + (user_var_entry*) my_hash_search(&thd->user_vars, + (uchar*) name_buf, + strlen(name_buf)); + memcpy(thd->lex->comment.str + gathered_length, + entry->value, entry->length); + + gathered_length += entry->length; + } + DBUG_ASSERT(gathered_length == thd->lex->comment.length); + } + + if (!(coded_len= thd->lex->comment.length)) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + DBUG_VOID_RETURN; + } + + decoded_len= base64_needed_decoded_length(coded_len); + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } + for (char const *strptr= thd->lex->comment.str ; strptr < thd->lex->comment.str + thd->lex->comment.length ; ) { @@ -272,6 +345,12 @@ void mysql_client_binlog_statement(THD* thd) my_ok(thd); end: + if (thd->lex->fragmented_binlog_event > 0) + { + my_free(thd->lex->comment.str); + thd->lex->fragmented_binlog_event= 0; + my_free((char*) name_buf); + } thd->variables.option_bits= thd_options; rgi->slave_close_thread_tables(thd); my_free(buf); diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index 085ad1a4b3b..551183667bf 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -2874,7 +2874,8 @@ LEX::LEX() : explain(NULL), result(0), arena_for_set_stmt(0), mem_root_for_set_stmt(0), option_type(OPT_DEFAULT), context_analysis_only(0), sphead(0), - is_lex_started(0), limit_rows_examined_cnt(ULONGLONG_MAX) + is_lex_started(0), limit_rows_examined_cnt(ULONGLONG_MAX), + fragmented_binlog_event(0) { init_dynamic_array2(&plugins, sizeof(plugin_ref), plugins_static_buffer, diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 3b47b1d25c9..1aff4f9f2de 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -2747,6 +2747,7 @@ struct LEX: public Query_tables_list */ Item *limit_rows_examined; ulonglong limit_rows_examined_cnt; + uint fragmented_binlog_event; /** Holds a set of domain_ids for deletion at FLUSH..DELETE_DOMAIN_ID */ diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index fcfc63439cb..3f9593e1b02 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -7951,7 +7951,18 @@ binlog_base64_event: Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; Lex->comment= $2; } - ; + | + BINLOG_SYM NUM_literal ',' TEXT_STRING_sys + { + Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; + if ((Lex->fragmented_binlog_event= $2->val_int()) <= 0) + { + my_parse_error(thd, ER_SYNTAX_ERROR); + MYSQL_YYABORT; + } + Lex->ident= $4; + } + ; check_view_or_table: table_or_tables table_list opt_mi_check_type diff --git a/unittest/sql/mf_iocache-t.cc b/unittest/sql/mf_iocache-t.cc index 1b04f8eb0d3..89c5de8b0df 100644 --- a/unittest/sql/mf_iocache-t.cc +++ b/unittest/sql/mf_iocache-t.cc @@ -253,10 +253,58 @@ void mdev10259() } +void mdev10963() +{ + int res; + int n_frag_max= 16; + int n_checks= 8; + uchar buf[1024 * 512]; + FILE *file; + myf my_flags= MYF(MY_WME); + const char *file_name="cache.log"; + + memset(buf, FILL, sizeof(buf)); + diag("MDEV-10963 Fragmented BINLOG query"); + + init_io_cache_encryption(); + + /* copying source */ + res= open_cached_file(&info, 0, 0, CACHE_SIZE, 0); + ok(res == 0, "open_cached_file" INFO_TAIL); + res= my_b_write(&info, buf, sizeof(buf)); + ulong saved_pos= my_b_tell(&info); + ok(res == 0 && saved_pos == sizeof(buf), "cache is filled"); + + /* destination */ + file= my_fopen(file_name, O_WRONLY | O_TRUNC | O_CREAT, my_flags); + ok(my_fileno(file) > 0, "opened file fd = %d", my_fileno(file)); + + /* + Verify copying with random fragment numbers which cover cases + when the fragment size is less than the cache read buffer size. + */ + for (; n_checks; n_checks--) + { + int c_frag= rand() % n_frag_max + 1; + + res= my_b_copy_to_file_frag(&info, file, c_frag, + NULL, NULL, NULL, NULL, NULL); + ok(res == 0, "write to file" INFO_TAIL); + ok(my_ftell(file, my_flags) == sizeof(buf), "file written in %d fragments", c_frag); + res= reinit_io_cache(&info, WRITE_CACHE, saved_pos, 0, 0); + ok(res == 0 && my_b_tell(&info) == sizeof(buf), "write cache is filled back"); + + rewind(file); + my_chsize(my_fileno(file), 0, 0, my_flags); + } + close_cached_file(&info); + my_fclose(file, my_flags); +} + int main(int argc __attribute__((unused)),char *argv[]) { MY_INIT(argv[0]); - plan(46); + plan(73); /* temp files with and without encryption */ encrypt_tmp_files= 1; @@ -272,6 +320,8 @@ int main(int argc __attribute__((unused)),char *argv[]) mdev10259(); encrypt_tmp_files= 0; + mdev10963(); + my_end(0); return exit_status(); }
participants (1)
-
andrei.elkin@pp.inet.fi