Hi, Andrei! Looks better! There are no major problems, but see comments below. There're few suggestions how to simplify the code. On Nov 05, Andrei Elkin wrote:
revision-id: d282f5c55609469cd74d7390f70c7d922c778711 (mariadb-10.1.35-93-gd282f5c5560) parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51 author: Andrei Elkin <andrei.elkin@mariadb.com> committer: Andrei Elkin <andrei.elkin@mariadb.com> timestamp: 2018-10-21 23:42:00 +0300 message:
MDEV-10963 Fragmented BINLOG query
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test new file mode 100644 index 00000000000..bdf41c94c76 --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test @@ -0,0 +1,50 @@ +--source include/have_debug.inc +--source include/have_log_bin.inc +--source include/have_binlog_format_row.inc
you don't need to include have_log_bin, if you include have_binlog_format_row.
+ +--let $MYSQLD_DATADIR= `select @@datadir` +--let $max_size=1024 + +CREATE TABLE t (a TEXT); +# events of interest are guaranteed to stay in 000001 log +RESET MASTER; +--eval INSERT INTO t SET a=repeat('a', $max_size)
eh? why did you do it with let/eval instead of a simple sql statement? you don't use $max_size anywhere else.
+SELECT a from t into @a; +FLUSH LOGS; +DELETE FROM t; + +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +--let $assert_text= BINLOG is fragmented +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1 +--let $assert_count= 1 +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql +--source include/assert_grep.inc
no, please, use search_pattern_in_file.inc instead.
+ +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +SELECT a LIKE @a as 'true' FROM t; +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL';
that makes no sense, @binlog_fragment_0 and _1 were set in a separate client session. You cannot test whether they were cleared or not there, by looking at the values here
+ +# improper syntax error +--echo BINLOG number-of-fragments must be exactly two +--error ER_PARSE_ERROR +BINLOG @binlog_fragment; +--error ER_PARSE_ERROR +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment; + +# corrupted fragments error check (to the expected error code notice, +# the same error code occurs in a similar unfragmented case) +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_1='012345'; +--error ER_SYNTAX_ERROR +BINLOG @binlog_fragment_0, @binlog_fragment_1; + +# Not existing fragment is not allowed +SET @binlog_fragment_0='012345'; +--error ER_WRONG_TYPE_FOR_VAR +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist; + +--echo # Cleanup +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql +DROP TABLE t; diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -22,52 +22,53 @@ #include <stdarg.h> #include <m_ctype.h>
-/* - Copy contents of an IO_CACHE to a file. - - SYNOPSIS - my_b_copy_to_file() - cache IO_CACHE to copy from - file File to copy to - - DESCRIPTION - Copy the contents of the cache to the file. The cache will be - re-inited to a read cache and will read from the beginning of the - cache. - - If a failure to write fully occurs, the cache is only copied - partially. +/** + Copy the cache to the file. Copying can be constrained to @c count + number of bytes when the parameter is less than SIZE_T_MAX. The + cache will be optionally re-inited to a read cache and will read + from the beginning of the cache. If a failure to write fully + occurs, the cache is only copied partially.
TODO - Make this function solid by handling partial reads from the cache - in a correct manner: it should be atomic. - - RETURN VALUE - 0 All OK - 1 An error occurred + Make this function solid by handling partial reads from the cache + in a correct manner: it should be atomic. + + @param cache IO_CACHE to copy from + @param file File to copy to + @param do_reinit whether to turn the cache to read mode + @param count the copied size or the max of the type + when the whole cache is to be copied. + @return + 0 All OK + 1 An error occurred */ int -my_b_copy_to_file(IO_CACHE *cache, FILE *file) +my_b_copy_to_file(IO_CACHE *cache, FILE *file, + my_bool do_reinit, + size_t count) { - size_t bytes_in_cache; + size_t curr_write, bytes_in_cache; DBUG_ENTER("my_b_copy_to_file");
/* Reinit the cache to read from the beginning of the cache */ - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
generally, when there's a function that is always called with a constant (compile-time) argument, I prefer to split the code compile-time too, if it isn't too much trouble. In this case it would mean a new function like int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) { if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE) return 1; return my_b_copy_to_file(cache, file, SIZE_T_MAX); } and all old code will be changed to use my_b_copy_all_to_file(). Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and your code will use it directly.
DBUG_RETURN(1); bytes_in_cache= my_b_bytes_in_cache(cache); do { - if (my_fwrite(file, cache->read_pos, bytes_in_cache, + curr_write= MY_MIN(bytes_in_cache, count); + if (my_fwrite(file, cache->read_pos, curr_write, MYF(MY_WME | MY_NABP)) == (size_t) -1) DBUG_RETURN(1); - } while ((bytes_in_cache= my_b_fill(cache))); + + cache->read_pos += curr_write; + count -= curr_write; + } while (count && (bytes_in_cache= my_b_fill(cache))); if(cache->error == -1) DBUG_RETURN(1); DBUG_RETURN(0); }
- my_off_t my_b_append_tell(IO_CACHE* info) { /* diff --git a/sql/log_event.cc b/sql/log_event.cc index e07b7002398..aeca794f0cd 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol) #endif
#ifdef MYSQL_CLIENT +/** + Print an event "body" cache to @c file possibly in multiple fragements. + Each fragement is optionally per @c do_wrap to procude an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + The function signals on any error through setting @c body->error to -1. +*/ +void copy_cache_to_file_wrapped(FILE *file, + IO_CACHE *body, + bool do_wrap, + const char *delimiter) +{ + uint n_frag= 1; + const char* before_frag= NULL; + char* after_frag= NULL; + char* after_last= NULL; + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + */ + const char fmt_last_frag2[]= + "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n"; + const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d ='\n";
this ONE_SHOT is confusing, even if in a comment. Better not to do it :)
+ /* + Buffer to hold computed formatted strings according to specifiers. + The sizes may depend on an actual fragment number size in terms of decimal + signs so its maximum is estimated (not precisely yet safely) below. + */ + char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2) + + ((sizeof(n_frag) * 8)/3 + 1) // max of decimal index + + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n and 0
sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean sizeof(PRINT_EVENT_INFO::delimiter) or simply PRINT_EVENT_INFO::max_delimiter_len without sizeof?
+ + if (do_wrap) + { + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_frag, "'%s\n", delimiter); + if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size) + n_frag= 2; + if (n_frag > 1) + { + before_frag= fmt_before_frag; + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_last, fmt_last_frag2, (char*) delimiter); + } + else + { + before_frag= "\nBINLOG '\n"; // single "fragment" + } + } + + size_t total_size= my_b_tell(body), total_written= 0; + size_t frag_size= total_size / n_frag + 1, curr_size; + + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) + { + body->error= -1; + goto err; + } + + for (uint i= 0; i < n_frag; i++, total_written += curr_size) + { + curr_size= i < n_frag - 1 ? frag_size : total_size - total_written; + + DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size); + + if (before_frag) + { + sprintf(buf, before_frag, i); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + if (my_b_copy_to_file(body, file, FALSE, curr_size)) + { + body->error= -1; + goto err; + } + if (after_frag) + { + sprintf(buf, after_frag, NULL); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + }
Hmm, dunno. I suspect you can do it three times shorter and five times easier to read if you wouldn't try to generalize it for a arbitrary number of fragments with arbitrary prefixes and suffixes. Just if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin) { my_fprintf(file, "BINLOG '"); my_b_copy_to_file(body, file); my_fprintf(file, "'%s\n", delimiter); } else { my_fprintf(file, "SET @binlog_fragment_0='"); my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size); my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter); my_b_copy_to_file(body, file, SIZE_T_MAX); my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n", delimiter, delimiter); } See?
+ + if (after_last) + { + sprintf(buf, after_last, n_frag); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + +err: + my_free(after_frag); + my_free(after_last); +} + +/** + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1); + + where fragments are represented by a sequence of "indexed" user + variables. + Two more statements are composed as well + + SET @binlog_fragment_0=NULL; + SET @binlog_fragment_1=NULL; + + to promptly release memory.
No, they aren't
+ + NOTE.
@note
+ If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param name the name of a table that the event operates on + + The function signals on any error of cache access through setting + that cache's @c error to -1. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); diff --git a/sql/log_event.h b/sql/log_event.h index 90900f63533..28277e659d2 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -749,6 +749,7 @@ typedef struct st_print_event_info that was printed. We cache these so that we don't have to print them if they are unchanged. */ + static const uint max_delimiter_len= 16;
why did you introduce this max_delimiter_len, if all you use is sizeof(delimiter) anyway? (and even that is not needed)
// TODO: have the last catalog here ?? char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is bool flags2_inited; @@ -798,7 +799,7 @@ typedef struct st_print_event_info bool printed_fd_event; my_off_t hexdump_from; uint8 common_header_len; - char delimiter[16]; + char delimiter[max_delimiter_len];
uint verbose; table_mapping m_table_map; diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 91cf038907e..b4e3342d8f3 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -28,6 +28,70 @@ // START_EVENT_V3, // Log_event_type, // Log_event + +/** + Copy fragments into the standard placeholder thd->lex->comment.str. + + Compute the size of the (still) encoded total, + allocate and then copy fragments one after another. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + + @param thd THD handle + @return + 0 at success, + -1 otherwise. +*/ +int binlog_defragment(THD *thd) +{ + user_var_entry *entry[2]; + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident }; + + /* compute the total size */ + thd->lex->comment.str= NULL; + thd->lex->comment.length= 0; + for (uint k= 0; k < 2; k++) + { + entry[k]= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str, + name[k].length); + if (!entry[k] || entry[k]->type != STRING_RESULT) + { + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); + return -1; + } + thd->lex->comment.length += entry[k]->length; + } + + thd->lex->comment.str= // to be freed by the caller + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + return -1; + } + + /* fragments are merged into allocated buf while the user var:s get reset */ + size_t gathered_length= 0; + for (uint k=0; k < 2; k++) + { + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, entry[k]->length); + gathered_length += entry[k]->length; + if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0)) + { + my_printf_error(ER_WRONG_TYPE_FOR_VAR, + "%s: BINLOG fragment user " + "variable '%s' could not be unset", MYF(0), + ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value); + }
I don't see how update_hash(entry[k], true, ...) can ever fail, so there's no need to pretend that it can.
+ } + + DBUG_ASSERT(gathered_length == thd->lex->comment.length); + + return 0; +} + + /** Execute a BINLOG statement.
@@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd) rli->sql_driver_thd= thd; rli->no_storage= TRUE;
+ if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) + if (binlog_defragment(thd)) + goto end; + + if (!(coded_len= thd->lex->comment.length)) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + decoded_len= base64_needed_decoded_length(coded_len); + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } +
Technically, it should be possible to decode base64 in-place and avoid allocating a second 3GB buffer. But let's not do it in this MDEV :)
for (char const *strptr= thd->lex->comment.str ; strptr < thd->lex->comment.str + thd->lex->comment.length ; ) {
Regards, Sergei Chief Architect MariaDB and security@mariadb.org