Sergei, hallo.
Hi, Andrei!
Looks better!
There's never a limit to improvements though :-). Thanks for checking and insightful comments as usual! I am publishing an updated patch to cover all of them. To your notes, I am replying and commenting on a few of your questions below. Could you also please clarify on one point, which is
-my_b_copy_to_file(IO_CACHE *cache, FILE *file) +my_b_copy_to_file(IO_CACHE *cache, FILE *file, + my_bool do_reinit, + size_t count) { - size_t bytes_in_cache; + size_t curr_write, bytes_in_cache; DBUG_ENTER("my_b_copy_to_file");
/* Reinit the cache to read from the beginning of the cache */ - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
generally, when there's a function that is always called with a constant (compile-time) argument, I prefer to split the code compile-time too, if it isn't too much trouble. In this case it would mean a new function like
int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) { if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE) return 1; return my_b_copy_to_file(cache, file, SIZE_T_MAX); }
why is the preference specifically? I looked around to find something like https://en.wikipedia.org/wiki/Compile_time_function_execution ... but I somewhat doubt you meant that. Or did you really? Now to my acknowledgments and comments...
There are no major problems, but see comments below. There're few suggestions how to simplify the code.
On Nov 05, Andrei Elkin wrote:
revision-id: d282f5c55609469cd74d7390f70c7d922c778711 (mariadb-10.1.35-93-gd282f5c5560) parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51 author: Andrei Elkin <andrei.elkin@mariadb.com> committer: Andrei Elkin <andrei.elkin@mariadb.com> timestamp: 2018-10-21 23:42:00 +0300 message:
MDEV-10963 Fragmented BINLOG query
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test new file mode 100644 index 00000000000..bdf41c94c76 --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test @@ -0,0 +1,50 @@ +--source include/have_debug.inc +--source include/have_log_bin.inc +--source include/have_binlog_format_row.inc
you don't need to include have_log_bin, if you include have_binlog_format_row.
ack
+ +--let $MYSQLD_DATADIR= `select @@datadir` +--let $max_size=1024 + +CREATE TABLE t (a TEXT); +# events of interest are guaranteed to stay in 000001 log +RESET MASTER; +--eval INSERT INTO t SET a=repeat('a', $max_size)
eh? why did you do it with let/eval instead of a simple sql statement? you don't use $max_size anywhere else.
left inadvertently from a previous patch.
+SELECT a from t into @a; +FLUSH LOGS; +DELETE FROM t; + +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +--let $assert_text= BINLOG is fragmented +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1 +--let $assert_count= 1 +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql +--source include/assert_grep.inc
no, please, use search_pattern_in_file.inc instead.
ack
+ +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +SELECT a LIKE @a as 'true' FROM t; +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL';
that makes no sense, @binlog_fragment_0 and _1 were set in a separate client session. You cannot test whether they were cleared or not there, by looking at the values here
You caught me here! :-). In the new patch I relocate the check in the other test file.
+ +# improper syntax error +--echo BINLOG number-of-fragments must be exactly two +--error ER_PARSE_ERROR +BINLOG @binlog_fragment; +--error ER_PARSE_ERROR +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment; + +# corrupted fragments error check (to the expected error code notice, +# the same error code occurs in a similar unfragmented case) +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_1='012345'; +--error ER_SYNTAX_ERROR +BINLOG @binlog_fragment_0, @binlog_fragment_1; + +# Not existing fragment is not allowed +SET @binlog_fragment_0='012345'; +--error ER_WRONG_TYPE_FOR_VAR +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist; + +--echo # Cleanup +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql +DROP TABLE t; diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -22,52 +22,53 @@ #include <stdarg.h> #include <m_ctype.h>
-/* - Copy contents of an IO_CACHE to a file. - - SYNOPSIS - my_b_copy_to_file() - cache IO_CACHE to copy from - file File to copy to - - DESCRIPTION - Copy the contents of the cache to the file. The cache will be - re-inited to a read cache and will read from the beginning of the - cache. - - If a failure to write fully occurs, the cache is only copied - partially. +/** + Copy the cache to the file. Copying can be constrained to @c count + number of bytes when the parameter is less than SIZE_T_MAX. The + cache will be optionally re-inited to a read cache and will read + from the beginning of the cache. If a failure to write fully + occurs, the cache is only copied partially.
TODO - Make this function solid by handling partial reads from the cache - in a correct manner: it should be atomic. - - RETURN VALUE - 0 All OK - 1 An error occurred + Make this function solid by handling partial reads from the cache + in a correct manner: it should be atomic. + + @param cache IO_CACHE to copy from + @param file File to copy to + @param do_reinit whether to turn the cache to read mode + @param count the copied size or the max of the type + when the whole cache is to be copied. + @return + 0 All OK + 1 An error occurred */ int -my_b_copy_to_file(IO_CACHE *cache, FILE *file) +my_b_copy_to_file(IO_CACHE *cache, FILE *file, + my_bool do_reinit, + size_t count) { - size_t bytes_in_cache; + size_t curr_write, bytes_in_cache; DBUG_ENTER("my_b_copy_to_file");
/* Reinit the cache to read from the beginning of the cache */ - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
generally, when there's a function that is always called with a constant (compile-time) argument, I prefer to split the code compile-time too, if it isn't too much trouble. In this case it would mean a new function like
int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) { if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE) return 1; return my_b_copy_to_file(cache, file, SIZE_T_MAX); }
and all old code will be changed to use my_b_copy_all_to_file(). Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and your code will use it directly.
DBUG_RETURN(1); bytes_in_cache= my_b_bytes_in_cache(cache); do { - if (my_fwrite(file, cache->read_pos, bytes_in_cache, + curr_write= MY_MIN(bytes_in_cache, count); + if (my_fwrite(file, cache->read_pos, curr_write, MYF(MY_WME | MY_NABP)) == (size_t) -1) DBUG_RETURN(1); - } while ((bytes_in_cache= my_b_fill(cache))); + + cache->read_pos += curr_write; + count -= curr_write; + } while (count && (bytes_in_cache= my_b_fill(cache))); if(cache->error == -1) DBUG_RETURN(1); DBUG_RETURN(0); }
- my_off_t my_b_append_tell(IO_CACHE* info) { /* diff --git a/sql/log_event.cc b/sql/log_event.cc index e07b7002398..aeca794f0cd 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol) #endif
#ifdef MYSQL_CLIENT +/** + Print an event "body" cache to @c file possibly in multiple fragements. + Each fragement is optionally per @c do_wrap to procude an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + The function signals on any error through setting @c body->error to -1. +*/ +void copy_cache_to_file_wrapped(FILE *file, + IO_CACHE *body, + bool do_wrap, + const char *delimiter) +{ + uint n_frag= 1; + const char* before_frag= NULL; + char* after_frag= NULL; + char* after_last= NULL; + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + */ + const char fmt_last_frag2[]= + "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n"; + const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d ='\n";
this ONE_SHOT is confusing, even if in a comment. Better not to do it :)
ack
+ /* + Buffer to hold computed formatted strings according to specifiers. + The sizes may depend on an actual fragment number size in terms of decimal + signs so its maximum is estimated (not precisely yet safely) below. + */ + char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2) + + ((sizeof(n_frag) * 8)/3 + 1) // max of decimal index + + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n and 0
sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean
sizeof(PRINT_EVENT_INFO::delimiter)
or simply
PRINT_EVENT_INFO::max_delimiter_len
without sizeof?
Indeed!
+ + if (do_wrap) + { + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_frag, "'%s\n", delimiter); + if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size) + n_frag= 2; + if (n_frag > 1) + { + before_frag= fmt_before_frag; + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_last, fmt_last_frag2, (char*) delimiter); + } + else + { + before_frag= "\nBINLOG '\n"; // single "fragment" + } + } + + size_t total_size= my_b_tell(body), total_written= 0; + size_t frag_size= total_size / n_frag + 1, curr_size; + + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) + { + body->error= -1; + goto err; + } + + for (uint i= 0; i < n_frag; i++, total_written += curr_size) + { + curr_size= i < n_frag - 1 ? frag_size : total_size - total_written; + + DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size); + + if (before_frag) + { + sprintf(buf, before_frag, i); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + if (my_b_copy_to_file(body, file, FALSE, curr_size)) + { + body->error= -1; + goto err; + } + if (after_frag) + { + sprintf(buf, after_frag, NULL); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + }
Hmm, dunno. I suspect you can do it three times shorter and five times easier to read if you wouldn't try to generalize it for a arbitrary number of fragments with arbitrary prefixes and suffixes. Just
if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin) { my_fprintf(file, "BINLOG '"); my_b_copy_to_file(body, file); my_fprintf(file, "'%s\n", delimiter); } else { my_fprintf(file, "SET @binlog_fragment_0='"); my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size); my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter); my_b_copy_to_file(body, file, SIZE_T_MAX); my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n", delimiter, delimiter); }
See?
Very true. I did not do consider this simple method just "historically" having it all started with an idea of arbitrary number of fragments. As result now no loop in the new patch.
+ + if (after_last) + { + sprintf(buf, after_last, n_frag); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + +err: + my_free(after_frag); + my_free(after_last); +} + +/** + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1); + + where fragments are represented by a sequence of "indexed" user + variables. + Two more statements are composed as well + + SET @binlog_fragment_0=NULL; + SET @binlog_fragment_1=NULL; + + to promptly release memory.
No, they aren't
ack
+ + NOTE.
@note
+ If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param name the name of a table that the event operates on + + The function signals on any error of cache access through setting + that cache's @c error to -1. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); diff --git a/sql/log_event.h b/sql/log_event.h index 90900f63533..28277e659d2 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -749,6 +749,7 @@ typedef struct st_print_event_info that was printed. We cache these so that we don't have to print them if they are unchanged. */ + static const uint max_delimiter_len= 16;
why did you introduce this max_delimiter_len, if all you use is sizeof(delimiter) anyway? (and even that is not needed)
I use the new introduced constant in the new patch which is renamed though to hint at its 'size' rather than 'length' semantics.
// TODO: have the last catalog here ?? char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is bool flags2_inited; @@ -798,7 +799,7 @@ typedef struct st_print_event_info bool printed_fd_event; my_off_t hexdump_from; uint8 common_header_len; - char delimiter[16]; + char delimiter[max_delimiter_len];
uint verbose; table_mapping m_table_map; diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 91cf038907e..b4e3342d8f3 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -28,6 +28,70 @@ // START_EVENT_V3, // Log_event_type, // Log_event + +/** + Copy fragments into the standard placeholder thd->lex->comment.str. + + Compute the size of the (still) encoded total, + allocate and then copy fragments one after another. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + + @param thd THD handle + @return + 0 at success, + -1 otherwise. +*/ +int binlog_defragment(THD *thd) +{ + user_var_entry *entry[2]; + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident }; + + /* compute the total size */ + thd->lex->comment.str= NULL; + thd->lex->comment.length= 0; + for (uint k= 0; k < 2; k++) + { + entry[k]= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str, + name[k].length); + if (!entry[k] || entry[k]->type != STRING_RESULT) + { + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); + return -1; + } + thd->lex->comment.length += entry[k]->length; + } + + thd->lex->comment.str= // to be freed by the caller + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + return -1; + } + + /* fragments are merged into allocated buf while the user var:s get reset */ + size_t gathered_length= 0; + for (uint k=0; k < 2; k++) + { + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, entry[k]->length); + gathered_length += entry[k]->length; + if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0)) + { + my_printf_error(ER_WRONG_TYPE_FOR_VAR, + "%s: BINLOG fragment user " + "variable '%s' could not be unset", MYF(0), + ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value); + }
I don't see how update_hash(entry[k], true, ...) can ever fail, so there's no need to pretend that it can.
Right. I did not look into its branches. The 'true' one does not error out.
+ } + + DBUG_ASSERT(gathered_length == thd->lex->comment.length); + + return 0; +} + + /** Execute a BINLOG statement.
@@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd) rli->sql_driver_thd= thd; rli->no_storage= TRUE;
+ if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) + if (binlog_defragment(thd)) + goto end; + + if (!(coded_len= thd->lex->comment.length)) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + decoded_len= base64_needed_decoded_length(coded_len); + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } +
Technically, it should be possible to decode base64 in-place and avoid allocating a second 3GB buffer. But let's not do it in this MDEV :)
To my defenses I only can repeat the no-limit argument :-). Huge bunch of thanks! Andrei
for (char const *strptr= thd->lex->comment.str ; strptr < thd->lex->comment.str + thd->lex->comment.length ; ) {
Regards, Sergei Chief Architect MariaDB and security@mariadb.org
_______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : maria-developers@lists.launchpad.net Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp