Hi, Andrei! First, when I run it I got: Warning: 112 bytes lost at 0x7f212f26a670, allocated by T@0 at sql/sql_binlog.cc:68, sql/sql_binlog.cc:181, sql/sql_parse.cc:5625, sql/sql_parse.cc:7465, sql/sql_parse.cc:1497, sql/sql_parse.cc:1124, sql/sql_connect.cc:1330, sql/sql_connect.cc:1243 so you have a memory leak with your thd->lex->comment.str Second, BINGLOG CONCAT() is totally possible, inventing a new keyword defeats the whole point of using a familiar and intuitive syntax. I've attached a simple patch that implements it. On the other hand, I just thought that CONCAT() syntax might be not so intuitive after all, as - strictly speaking - a result of CONCAT cannot be larger than max_allowed_packet. So it might be confusing to use CONCAT here. May be just use your idea of BINLOG @a, @b; with no CONCAT or anything else at all? Also, I'm thinking whether BINLOG (in this last form) could automatically clear its variables @a and @b. Without an explicit SET @a=NULL. It's a bit weird to have such a side effect. But it'll help to save memory - the event size it over 3GB, right? So you'll have it split in two variables. Then you create a concatenated copy (it's over 3GB more), then you decode it (already more than 9GB in memory), and then it's executed where copies of huge blob values are created again, may be more than once. Freeing @a and @b and the concatenated copy as soon as possible will save over 6GB in RAM usage. Note, we cannot auto-reset variables in the syntax with CONCAT(), DEFRAGMENT() or whatever, because functions cannot do it to their arguments. On Oct 18, Andrei Elkin wrote:
revision-id: c850799967c561d08242987269d94af6ae4c7c5e (mariadb-10.1.35-59-gc850799967c) parent(s): d3a8b5aa9cee24a6397662e30df2e915f45460e0 author: Andrei Elkin <andrei.elkin@mariadb.com> committer: Andrei Elkin <andrei.elkin@mariadb.com> timestamp: 2018-09-21 18:48:06 +0300 message:
MDEV-10963 Fragmented BINLOG query
The problem was originally stated in http://bugs.mysql.com/bug.php?id=82212 The size of an base64-encoded Rows_log_event exceeds its vanilla byte representation in 4/3 times. When a binlogged event size is about 1GB mysqlbinlog generates a BINLOG query that can't be send out due to its size.
It is fixed with fragmenting the BINLOG argument C-string into (approximate) halves when the base64 encoded event is over 1GB size. The mysqlbinlog in such case puts out
SET @binlog_fragment_0='base64-encoded-fragment_0'; SET @binlog_fragment_1='base64-encoded-fragment_1'; BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
to represent a big BINLOG 'base64-encoded-"total"'. Two more statements are composed to promptly release memory SET @binlog_fragment_0=NULL; SET @binlog_fragment_1=NULL;
The 2 fragments are enough, though the client and server still may need to tweak their @@max_allowed_packet to satisfy to the fragment size (which they would have to do anyway with greater number of fragments, should that be desired).
On the lower level the following changes are made:
Log_event::print_base64() remains to call encoder and store the encoded data into a cache but now *without* doing any formatting. The latter is left for time when the cache is copied to an output file (e.g mysqlbinlog output). No formatting behavior is also reflected by the change in the meaning of the last argument which specifies whether to cache the encoded data.
my_b_copy_to_file() is turned into my_b_copy_to_file_frag() which accepts format specifier arguments to build a proper syntax BINLOG query in both the fragmented (n_frag > 1) and non-fragmented (n_frag == 1) cases.
Rows_log_event::print_helper() Takes decision whether to fragment, prepares respective format specifiers and invokes the cache-to-file copying function, which is now
copy_cache_frag_to_file_and_reinit() replaces original copy_event_cache_to_file_and_reinit() to pass extra arguments to my_b_copy_to_file() successor of
my_b_copy_to_file_frag() replaces the former pure copier not to also conduct wrapping the encoded data per format specifiers. With its 'n_frag' argument as 1 and the rest or args NULL works as the original function.
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test new file mode 100644 index 00000000000..5318f8acec6 --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test @@ -0,0 +1,43 @@ +--source include/have_debug.inc +--source include/have_log_bin.inc +--source include/have_binlog_format_row.inc + +--let $MYSQLD_DATADIR= `select @@datadir` +--let $max_size=1024 + +CREATE TABLE t (a TEXT); +# events of interest are guaranteed to stay in 000001 log +RESET MASTER; +--eval INSERT INTO t SET a=repeat('a', $max_size) +SELECT a from t into @a; +FLUSH LOGS; +DELETE FROM t; + +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
how do you know the event was split, if you aren't looking into mysqlbinlog.sql file? may be your code doesn't work at all, or it was removed by some incorrect merge? the test will pass anyway...
+ +SELECT a LIKE @a as 'true' FROM t; +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL'; + +# improper syntax error +--echo BINLOG number-of-fragments must be exactly two +--error ER_PARSE_ERROR +BINLOG DEFRAGMENT(@binlog_fragment); +--error ER_PARSE_ERROR +BINLOG DEFRAGMENT(@binlog_fragment, @binlog_fragment, @binlog_fragment); + +# corrupted fragments error check (to the expected error code notice, +# the same error code occurs in a similar unfragmented case) +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_1='012345'; +--error ER_SYNTAX_ERROR +BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1); + +# Not existing fragment is not allowed +SET @binlog_fragment_0='012345'; +--error ER_BASE64_DECODE_ERROR +BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_not_exist); + +--echo # Cleanup +DROP TABLE t; diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index 2499094037d..6b7ff8a7568 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -22,13 +22,23 @@ #include <stdarg.h> #include <m_ctype.h>
-/* +/** Copy contents of an IO_CACHE to a file.
SYNOPSIS - my_b_copy_to_file() - cache IO_CACHE to copy from - file File to copy to + my_b_copy_to_file_frag + + cache IO_CACHE to copy from + file File to copy to + n_frag # of fragments + + Other arguments represent format strings to enable wrapping + of the fragments and their total, including + + before_frag before a fragment + after_frag after a fragment + after_last_frag after all the fragments + final_per_frag in the end per each fragment
if you edit a comment, change it to Doxygen syntax. Especially, as you start it with Doxygen's /**, it is simply invalid to have SYNOPSIS and DESCRIPTION inside.
DESCRIPTION Copy the contents of the cache to the file. The cache will be @@ -38,33 +48,120 @@ If a failure to write fully occurs, the cache is only copied partially.
- TODO - Make this function solid by handling partial reads from the cache - in a correct manner: it should be atomic. + The copying is made in so many steps as the number of fragments as + specified by the parameter 'n_frag'. Each step is wrapped with + writing to the file 'before_frag' and 'after_frag' formated + strings, unless the parameters are NULL. In the end, optionally, + first 'after_last_frag' string is appended to 'file' followed by + 'final_per_frag' per each fragment. + final item.
RETURN VALUE 0 All OK 1 An error occurred + + TODO + Make this function solid by handling partial reads from the cache + in a correct manner: it should be atomic. */ int -my_b_copy_to_file(IO_CACHE *cache, FILE *file) +my_b_copy_to_file_frag(IO_CACHE *cache, FILE *file, + uint n_frag, + const char* before_frag, + const char* after_frag, + const char* after_last, + const char* final_per_frag, + char* buf) { - size_t bytes_in_cache; - DBUG_ENTER("my_b_copy_to_file"); + size_t bytes_in_cache; // block, may have short size in the last one + size_t written_off_last_block; // consumed part of the block by last fragment + size_t total_size= my_b_tell(cache); + size_t frag_size= total_size / n_frag + 1; + size_t total_written= 0; + size_t frag_written; // bytes collected in the current fragment + uint i; + + DBUG_ENTER("my_b_copy_to_file_frag"); + + DBUG_ASSERT(cache->type == WRITE_CACHE);
/* Reinit the cache to read from the beginning of the cache */ if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) DBUG_RETURN(1); - bytes_in_cache= my_b_bytes_in_cache(cache); - do - { - if (my_fwrite(file, cache->read_pos, bytes_in_cache, - MYF(MY_WME | MY_NABP)) == (size_t) -1) - DBUG_RETURN(1); - } while ((bytes_in_cache= my_b_fill(cache))); - if(cache->error == -1) - DBUG_RETURN(1); - DBUG_RETURN(0); + + for (i= 0, written_off_last_block= 0, bytes_in_cache= my_b_bytes_in_cache(cache); + i < n_frag; + i++, total_written += frag_written) + { + frag_written= 0; + if (before_frag) + { + sprintf(buf, before_frag, i); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + do + { + /* + Either the current block is the last (L) in making the + current fragment (and possibly has some extra not to fit (LG) into + the fragment), or (I) the current (whole then) block is + intermediate. + */ + size_t block_to_write= (frag_written + bytes_in_cache >= frag_size) ? + frag_size - frag_written : bytes_in_cache; + + DBUG_ASSERT(n_frag != 1 || + (block_to_write == bytes_in_cache && + written_off_last_block == 0)); + + if (my_fwrite(file, cache->read_pos + written_off_last_block, + block_to_write, + MYF(MY_WME | MY_NABP)) == (size_t) -1) + /* no cache->error is set here */ + DBUG_RETURN(1); + + frag_written += block_to_write; + if (frag_written == frag_size) // (L) + { + DBUG_ASSERT(block_to_write <= bytes_in_cache); + written_off_last_block= block_to_write; + bytes_in_cache -= written_off_last_block; // (LG) when bytes>0 + /* + Nothing should be left in cache at the end of the + last fragment construction. + */ + DBUG_ASSERT(i != n_frag - 1 || bytes_in_cache == 0); + + break; + } + else + { + written_off_last_block= 0; // (I) + } + } while ((bytes_in_cache= my_b_fill(cache))); + + if (after_frag) + { + sprintf(buf, after_frag, NULL); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + } + + DBUG_ASSERT(total_written == total_size); // output == input + + if (after_last) + { + sprintf(buf, after_last, n_frag); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + + for (i= 0; final_per_frag && i < n_frag ; i++) + { + sprintf(buf, final_per_frag, i); + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); + } + + DBUG_RETURN(cache->error == -1);
this is a big and very specialized function with non-standard indentation. Please, move it from mysys to mysqlbinlog.cc and fix the indentation.
}
diff --git a/sql/log_event.cc b/sql/log_event.cc index e1912ad4620..0c39200148f 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -10474,12 +10482,108 @@ void Rows_log_event::pack_info(Protocol *protocol) #endif
#ifdef MYSQL_CLIENT +void copy_cache_to_file_wrapped(FILE *file, + PRINT_EVENT_INFO *print_event_info, + IO_CACHE *body, + bool do_print_encoded) +{ + uint n_frag= 1; + const char* before_frag= NULL; + char* after_frag= NULL; + char* after_last= NULL; + char* final_per_frag= NULL; + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + */ + const char fmt_last_frag2[]= + "\nBINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1)%s\n"; + const char fmt_last_per_frag[]= "\nSET @binlog_fragment_%%d = NULL%s\n";
this is completely unnecessary "generalization", print simply "SET @binlog_fragment_0=NULL,binlog_fragment_1=NULL%s\n" in particular, as your "BINLOG DEFRAGMENT" statement is not generalized. (or to the cleanup inside BINLOG, as discussed above).
+ const char fmt_before_frag[]= "\nSET @binlog_fragment_%d ='\n"; + /* + Buffer to pass to copy_cache_frag_to_file_and_reinit to + compute formatted strings according to specifiers. + The sizes may depend on an actual fragment number size in terms of decimal + signs so its maximum is estimated (not precisely yet safely) below. + */ + char buf[(sizeof(fmt_last_frag2) + sizeof(fmt_last_per_frag)) + + ((sizeof(n_frag) * 8)/3 + 1) // decimal index + + sizeof(print_event_info->delimiter) + 3]; // delim, \n and 0. + + if (do_print_encoded) + { + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_frag, "'%s\n", print_event_info->delimiter); + if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size) + n_frag= 2; + if (n_frag > 1) + { + before_frag= fmt_before_frag; + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(after_last, fmt_last_frag2, (char*) print_event_info->delimiter); + final_per_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); + sprintf(final_per_frag, fmt_last_per_frag, + (char*) print_event_info->delimiter); + } + else + { + before_frag= "\nBINLOG '\n"; + } + } + if (copy_cache_frag_to_file_and_reinit(body, file, n_frag, + before_frag, after_frag, + after_last, final_per_frag, buf))
1. get rid of this copy_cache_frag_to_file_and_reinit function, it's not helping. 2. instead of pushing all this complex logic of printing fragments with before and after and in-between and start and final, it'd be *much* simpler to do it here and just extend my_b_copy_to_file() to print at most N bytes.
+ { + body->error= -1; + goto err; + } + +err: + my_free(after_frag); + my_free(after_last); + my_free(final_per_frag); +} + +/* + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1); + + where fragments are represented by a sequence of "indexed" user + variables. + Two more statements are composed as well + + SET @binlog_fragment_0=NULL; + SET @binlog_fragment_1=NULL; + + to promptly release memory. + + NOTE. + If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 91cf038907e..b15d72e036a 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -28,6 +28,73 @@ // START_EVENT_V3, // Log_event_type, // Log_event + +/* + Copy fragments into the standard placeholder thd->lex->comment.str. + + Compute the size of the (still) encoded total, + allocate and then copy fragments one after another. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + + Return 0 at success, -1 otherwise. +*/ +int binlog_defragment(THD *thd) +{ + LEX_STRING *curr_frag_name; + + thd->lex->comment.length= 0; + thd->lex->comment.str= NULL; + /* compute the total size */ + for (uint i= 0; i < thd->lex->fragmented_binlog_event.n_frag; i++) + { + user_var_entry *entry; + + curr_frag_name= &thd->lex->fragmented_binlog_event.frag_name[i]; + entry= + (user_var_entry*) my_hash_search(&thd->user_vars, + (uchar*) curr_frag_name->str, + curr_frag_name->length); + if (!entry || entry->type != STRING_RESULT) + { + my_printf_error(ER_BASE64_DECODE_ERROR, + "%s: BINLOG fragment user " + "variable '%s' has unexpectedly no value", MYF(0), + ER_THD(thd, ER_BASE64_DECODE_ERROR), curr_frag_name->str); + return -1; + } + thd->lex->comment.length += entry->length; + } + thd->lex->comment.str= // to be freed by the caller + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + return -1; + } + + /* fragments are merged into allocated buf */ + size_t gathered_length= 0; + for (uint i= 0; i < thd->lex->fragmented_binlog_event.n_frag; i++) + { + user_var_entry *entry; + + curr_frag_name= &thd->lex->fragmented_binlog_event.frag_name[i]; + entry= + (user_var_entry*) my_hash_search(&thd->user_vars, + (uchar*) curr_frag_name->str, + curr_frag_name->length); + memcpy(thd->lex->comment.str + gathered_length, + entry->value, entry->length); + + gathered_length += entry->length; + } + DBUG_ASSERT(gathered_length == thd->lex->comment.length);
ok. 1. BINLOG uses LEX::comment to avoid increasing sizeof(LEX) for a rarely used statement. So don't increase it for a much-more-rarely used event overlow case. That is, remove fragmented_binlog_event. You can use a pair of LEX_STRING's in LEX (e.g. comment/ident) or (less hackish) List<LEX_STRING>, which are plenty in LEX. 2. Don't look up variables twice, remember them in the first loop. You can unroll it too, there're only two variables anyway. E.g. the second loop can be replaced by just memcpy(thd->lex->comment.str, var[0]->value, var[0]->length); memcpy(thd->lex->comment.str + var[0]->length, var[1]->value, var[1]->length); or even shorter, if you start your function with LEX_STRING *out= &thd->lex->comment; 3. Not "unexpectedly no value". ER_WRONG_TYPE_FOR_VAR looks more appropriate there.
+ + return 0; +} + + /** Execute a BINLOG statement.
Regards, Sergei Chief Architect MariaDB and security@mariadb.org