[Maria-developers] a Query_log_event charset bug in parallel replication
hi, If character_set in different Query_log_events changed, worker threads may apply them with wrong character_set. the codes leading this problem is in Query_log_event::do_apply_event, that is: if (charset_inited) { if (rli->cached_charset_compare(charset)) { /* Verify that we support the charsets found in the event. */ if (!(thd->variables.character_set_client= get_charset(uint2korr(charset), MYF(MY_WME))) || !(thd->variables.collation_connection= get_charset(uint2korr(charset+2), MYF(MY_WME))) || !(thd->variables.collation_server= get_charset(uint2korr(charset+4), MYF(MY_WME)))) { There is a charset[6] in rli, which cached last Query_log_event's charset in serial replication. But in parallel replication it would lead mistake, because every worker thread can read and set rli->charset[6], so rli->charset[6] isn't any worker threads' last Query_log_event charset. but rli->charset[6] can affect every worker threads' thd->variables.character_set_* setting. 2014-04-17 nanyi607rao
Hi nanyi607rao, sorry for the delayed response, in part due to the Easter holidays. "nanyi607rao" <nanyi607rao@gmail.com> writes:
If character_set in different Query_log_events changed, worker threads may apply them with wrong character_set. the codes leading this problem is in Query_log_event::do_apply_event, that is: if (charset_inited) { if (rli->cached_charset_compare(charset)) { /* Verify that we support the charsets found in the event. */ if (!(thd->variables.character_set_client= get_charset(uint2korr(charset), MYF(MY_WME))) || !(thd->variables.collation_connection= get_charset(uint2korr(charset+2), MYF(MY_WME))) || !(thd->variables.collation_server= get_charset(uint2korr(charset+4), MYF(MY_WME)))) {
There is a charset[6] in rli, which cached last Query_log_event's charset in serial replication. But in parallel replication it would lead mistake, because every worker thread can read and set rli->charset[6], so rli->charset[6] isn't any worker threads' last Query_log_event charset. but rli->charset[6] can affect every worker threads' thd->variables.character_set_* setting.
Right, I see. Probably the cached_charset just needs to be moved into the rpl_group_info struct. I have filed a bug for this that I will fix: https://mariadb.atlassian.net/browse/MDEV-6156 Thanks a lot for finding this, as usual your input on parallel replication is most useful, keep it coming! - Kristian.
Kristian Nielsen <knielsen@knielsen-hq.org> writes:
I have filed a bug for this that I will fix:
I've now pushed a fix for this bug to 10.0. Thanks again for your help! - Kristian.
hi, kristian
Right, I see. Probably the cached_charset just needs to be moved into the rpl_group_info struct.
I have filed a bug for this that I will fix:
I'm afraid there is still some cases would lead mistake if move cached_charset into rpl_group_info struct. For a worker thread can keep a lot rpl_group_info structs in its rgi_free_list, and those rgis can be reused many times. in this case: a worker thread executes there transactions, which is trans1,trans2 and trans3 trans1's charset is utf8 and it use rgi1 trans2's charset is latin1 and it use rgi2 trans3's charset is utf8 and it use rgi1 so when worker thread start to execute trans3, rgi1->cached_charset == utf8. but thd->variables.character_set_* are latin1. and them wouldn't be changed to utf8 for rgi->cached_charset_compare return 0. Thanks
"nanyi607rao" <nanyi607rao@gmail.com> writes:
I have filed a bug for this that I will fix:
I'm afraid there is still some cases would lead mistake if move cached_charset into rpl_group_info struct.
Yes, you are absolutely right, the patch I pushed for this is completely wrong, just as you explained :-( Sorry about this, I will try to come up with a better fix. Thanks, - Kristian.
nanyi607rao, Kristian Nielsen <knielsen@knielsen-hq.org> writes:
Yes, you are absolutely right, the patch I pushed for this is completely wrong, just as you explained :-(
Sorry about this, I will try to come up with a better fix.
What do you think about the below patch? It puts the cached_charset into the THD, which seems to be a better place, since it describes which charset is currently active in the THD. (I also introduced THD::system_thread_info, as I did not want to add even more data to the THD, which is already far too big; that's why the patch is a bit bigger). - Kristian. === modified file 'sql/log_event.cc' --- sql/log_event.cc 2014-04-23 14:06:06 +0000 +++ sql/log_event.cc 2014-04-24 11:00:52 +0000 @@ -4153,7 +4153,8 @@ int Query_log_event::do_apply_event(rpl_ (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE)); if (charset_inited) { - if (rgi->cached_charset_compare(charset)) + rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info; + if (sql_info->cached_charset_compare(charset)) { /* Verify that we support the charsets found in the event. */ if (!(thd->variables.character_set_client= === modified file 'sql/rpl_mi.h' --- sql/rpl_mi.h 2013-11-20 11:05:39 +0000 +++ sql/rpl_mi.h 2014-04-24 11:01:57 +0000 @@ -216,6 +216,16 @@ class Master_info_index bool stop_all_slaves(THD *thd); }; + +/* + The class rpl_io_thread_info is the THD::system_thread_info for the IO thread. +*/ +class rpl_io_thread_info +{ +public: +}; + + bool check_master_connection_name(LEX_STRING *name); void create_logfile_name_with_suffix(char *res_file_name, size_t length, const char *info_file, === modified file 'sql/rpl_parallel.cc' --- sql/rpl_parallel.cc 2014-04-09 12:42:46 +0000 +++ sql/rpl_parallel.cc 2014-04-24 11:40:29 +0000 @@ -33,7 +33,7 @@ rpt_handle_event(rpl_parallel_thread::qu THD *thd= rgi->thd; thd->rgi_slave= rgi; - thd->rpl_filter = rli->mi->rpl_filter; + thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; /* ToDo: Access to thd, and what about rli, split out a parallel part? */ mysql_mutex_lock(&rli->data_lock); @@ -212,6 +212,7 @@ handle_rpl_parallel_thread(void *arg) rpl_parallel_thread::queued_event *qevs_to_free; rpl_group_info *rgis_to_free; group_commit_orderer *gcos_to_free; + rpl_sql_thread_info sql_info(NULL); size_t total_event_size; int err; @@ -242,6 +243,7 @@ handle_rpl_parallel_thread(void *arg) thd_proc_info(thd, "Waiting for work from main SQL threads"); thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; + thd->system_thread_info.rpl_sql_info= &sql_info; /* For now, we need to run the replication parallel worker threads in READ COMMITTED. This is needed because gap locks are not symmetric. === modified file 'sql/rpl_rli.cc' --- sql/rpl_rli.cc 2014-04-23 14:06:06 +0000 +++ sql/rpl_rli.cc 2014-04-24 12:28:41 +0000 @@ -1479,7 +1479,6 @@ rpl_group_info::rpl_group_info(Relay_log deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) { reinit(rli); - cached_charset_invalidate(); bzero(¤t_gtid, sizeof(current_gtid)); mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); @@ -1562,29 +1561,6 @@ delete_or_keep_event_post_apply(rpl_grou } -void rpl_group_info::cached_charset_invalidate() -{ - DBUG_ENTER("rpl_group_info::cached_charset_invalidate"); - - /* Full of zeroes means uninitialized. */ - bzero(cached_charset, sizeof(cached_charset)); - DBUG_VOID_RETURN; -} - - -bool rpl_group_info::cached_charset_compare(char *charset) const -{ - DBUG_ENTER("rpl_group_info::cached_charset_compare"); - - if (memcmp(cached_charset, charset, sizeof(cached_charset))) - { - memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); - DBUG_RETURN(1); - } - DBUG_RETURN(0); -} - - void rpl_group_info::cleanup_context(THD *thd, bool error) { DBUG_ENTER("Relay_log_info::cleanup_context"); @@ -1769,4 +1745,33 @@ rpl_group_info::mark_start_commit() } +rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) + : rpl_filter(filter) +{ + cached_charset_invalidate(); +} + + +void rpl_sql_thread_info::cached_charset_invalidate() +{ + DBUG_ENTER("rpl_group_info::cached_charset_invalidate"); + + /* Full of zeroes means uninitialized. */ + bzero(cached_charset, sizeof(cached_charset)); + DBUG_VOID_RETURN; +} + + +bool rpl_sql_thread_info::cached_charset_compare(char *charset) const +{ + DBUG_ENTER("rpl_group_info::cached_charset_compare"); + + if (memcmp(cached_charset, charset, sizeof(cached_charset))) + { + memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); + DBUG_RETURN(1); + } + DBUG_RETURN(0); +} + #endif === modified file 'sql/rpl_rli.h' --- sql/rpl_rli.h 2014-04-23 14:06:06 +0000 +++ sql/rpl_rli.h 2014-04-24 12:28:48 +0000 @@ -26,6 +26,7 @@ struct RPL_TABLE_LIST; class Master_info; +class Rpl_filter; /**************************************************************************** @@ -536,8 +537,6 @@ struct rpl_group_info mysql_mutex_t sleep_lock; mysql_cond_t sleep_cond; - char cached_charset[6]; - /* trans_retries varies between 0 to slave_transaction_retries and counts how many times the slave has retried the present transaction; gets reset to 0 @@ -671,15 +670,6 @@ struct rpl_group_info return false; } - /* - Last charset (6 bytes) seen by slave SQL thread is cached here; it helps - the thread save 3 get_charset() per Query_log_event if the charset is not - changing from event to event (common situation). - When the 6 bytes are equal to 0 is used to mean "cache is invalidated". - */ - void cached_charset_invalidate(); - bool cached_charset_compare(char *charset) const; - void clear_tables_to_lock(); void cleanup_context(THD *, bool); void slave_close_thread_tables(THD *); @@ -727,6 +717,30 @@ struct rpl_group_info }; +/* + The class rpl_sql_thread_info is the THD::system_thread_info for an SQL + thread; this is either the driver SQL thread or a worker thread for parallel + replication. +*/ +class rpl_sql_thread_info +{ +public: + char cached_charset[6]; + Rpl_filter* rpl_filter; + + rpl_sql_thread_info(Rpl_filter *filter); + + /* + Last charset (6 bytes) seen by slave SQL thread is cached here; it helps + the thread save 3 get_charset() per Query_log_event if the charset is not + changing from event to event (common situation). + When the 6 bytes are equal to 0 is used to mean "cache is invalidated". + */ + void cached_charset_invalidate(); + bool cached_charset_compare(char *charset) const; +}; + + // Defined in rpl_rli.cc int init_relay_log_info(Relay_log_info* rli, const char* info_fname); === modified file 'sql/slave.cc' --- sql/slave.cc 2014-04-23 14:06:06 +0000 +++ sql/slave.cc 2014-04-24 11:39:30 +0000 @@ -2891,7 +2891,7 @@ void set_slave_thread_default_charset(TH global_system_variables.collation_server; thd->update_charset(); - rgi->cached_charset_invalidate(); + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); DBUG_VOID_RETURN; } @@ -3768,6 +3768,7 @@ pthread_handler_t handle_slave_io(void * uint retry_count; bool suppress_warnings; int ret; + rpl_io_thread_info io_info; #ifndef DBUG_OFF uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0; #endif @@ -3801,6 +3802,7 @@ pthread_handler_t handle_slave_io(void * sql_print_error("Failed during slave I/O thread initialization"); goto err_during_init; } + thd->system_thread_info.rpl_io_info= &io_info; mysql_mutex_lock(&LOCK_thread_count); threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); @@ -4367,6 +4369,7 @@ pthread_handler_t handle_slave_sql(void Relay_log_info* rli = &mi->rli; const char *errmsg; rpl_group_info *serial_rgi; + rpl_sql_thread_info sql_info(mi->rpl_filter); // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); @@ -4378,7 +4381,7 @@ pthread_handler_t handle_slave_sql(void serial_rgi= new rpl_group_info(rli); thd = new THD; // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is - thd->rpl_filter = mi->rpl_filter; + thd->system_thread_info.rpl_sql_info= &sql_info; DBUG_ASSERT(rli->inited); DBUG_ASSERT(rli->mi == mi); @@ -4676,7 +4679,7 @@ log '%s' at position %s, relay log '%s' mysql_cond_broadcast(&rli->data_cond); rli->ignore_log_space_limit= 0; /* don't need any lock */ /* we die so won't remember charset - re-update them on next thread start */ - serial_rgi->cached_charset_invalidate(); + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); /* TODO: see if we can do this conditionally in next_event() instead === modified file 'sql/sql_acl.cc' --- sql/sql_acl.cc 2014-03-29 10:33:25 +0000 +++ sql/sql_acl.cc 2014-04-24 12:02:04 +0000 @@ -38,6 +38,7 @@ #include "records.h" // READ_RECORD, read_record_info, // init_read_record, end_read_record #include "rpl_filter.h" // rpl_filter +#include "rpl_rli.h" #include <m_ctype.h> #include <stdarg.h> #include "sp_head.h" @@ -2558,7 +2559,7 @@ bool change_password(THD *thd, const cha { TABLE_LIST tables; TABLE *table; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; /* Buffer should be extended when password length is extended. */ char buff[512]; ulong query_length; @@ -2580,7 +2581,8 @@ bool change_password(THD *thd, const cha GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -5393,7 +5395,7 @@ int mysql_table_grant(THD *thd, TABLE_LI TABLE_LIST tables[3]; bool create_new_users=0; char *db_name, *table_name; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("mysql_table_grant"); if (!initialized) @@ -5483,7 +5485,8 @@ int mysql_table_grant(THD *thd, TABLE_LI GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -5670,7 +5673,7 @@ bool mysql_routine_grant(THD *thd, TABLE TABLE_LIST tables[2]; bool create_new_users=0, result=0; char *db_name, *table_name; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("mysql_routine_grant"); if (!initialized) @@ -5705,7 +5708,8 @@ bool mysql_routine_grant(THD *thd, TABLE GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -6141,7 +6145,7 @@ bool mysql_grant(THD *thd, const char *d char tmp_db[SAFE_NAME_LEN+1]; bool create_new_users=0; TABLE_LIST tables[2]; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("mysql_grant"); if (!initialized) @@ -6190,7 +6194,8 @@ bool mysql_grant(THD *thd, const char *d GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -8223,7 +8228,7 @@ void get_mqh(const char *user, const cha #define GRANT_TABLES 7 static int open_grant_tables(THD *thd, TABLE_LIST *tables) { - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("open_grant_tables"); if (!initialized) @@ -8267,7 +8272,8 @@ static int open_grant_tables(THD *thd, T GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into === modified file 'sql/sql_class.h' --- sql/sql_class.h 2014-04-23 06:57:25 +0000 +++ sql/sql_class.h 2014-04-24 12:28:29 +0000 @@ -72,6 +72,8 @@ class Parser_state; class Rows_log_event; class Sroutine_hash_entry; class user_var_entry; +class rpl_io_thread_info; +class rpl_sql_thread_info; enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME }; enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE }; @@ -1810,8 +1812,10 @@ class THD :public Statement, /* Slave applier execution context */ rpl_group_info* rgi_slave; - /* Used to SLAVE SQL thread */ - Rpl_filter* rpl_filter; + union { + rpl_io_thread_info *rpl_io_info; + rpl_sql_thread_info *rpl_sql_info; + } system_thread_info; void reset_for_next_command(); /* === modified file 'sql/sql_parse.cc' --- sql/sql_parse.cc 2014-03-29 10:33:25 +0000 +++ sql/sql_parse.cc 2014-04-24 11:54:18 +0000 @@ -171,8 +171,9 @@ const char *xa_state_names[]={ */ inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables) { - return thd->rpl_filter->is_on() && tables && !thd->spcont && - !thd->rpl_filter->tables_ok(thd->db, tables); + Rpl_filter *rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + return rpl_filter->is_on() && tables && !thd->spcont && + !rpl_filter->tables_ok(thd->db, tables); } #endif @@ -2233,7 +2234,7 @@ mysql_execute_command(THD *thd) /* have table map for update for multi-update statement (BUG#37051) */ bool have_table_map_for_update= FALSE; /* */ - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; #endif DBUG_ENTER("mysql_execute_command"); @@ -3885,12 +3886,15 @@ mysql_execute_command(THD *thd) above was not called. So we have to check rules again here. */ #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(lex->name.str) || - !rpl_filter->db_ok_with_wild_table(lex->name.str))) + if (thd->slave_thread) { - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(lex->name.str) || + !rpl_filter->db_ok_with_wild_table(lex->name.str)) + { + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0)) @@ -3913,12 +3917,15 @@ mysql_execute_command(THD *thd) above was not called. So we have to check rules again here. */ #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(lex->name.str) || - !rpl_filter->db_ok_with_wild_table(lex->name.str))) + if (thd->slave_thread) { - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(lex->name.str) || + !rpl_filter->db_ok_with_wild_table(lex->name.str)) + { + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0)) @@ -3930,13 +3937,16 @@ mysql_execute_command(THD *thd) { LEX_STRING *db= & lex->name; #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(db->str) || - !rpl_filter->db_ok_with_wild_table(db->str))) + if (thd->slave_thread) { - res= 1; - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(db->str) || + !rpl_filter->db_ok_with_wild_table(db->str)) + { + res= 1; + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_db_name(db)) @@ -3973,12 +3983,15 @@ mysql_execute_command(THD *thd) above was not called. So we have to check rules again here. */ #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(db->str) || - !rpl_filter->db_ok_with_wild_table(db->str))) + if (thd->slave_thread) { - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(db->str) || + !rpl_filter->db_ok_with_wild_table(db->str)) + { + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))
Kristian,
What do you think about the below patch? It puts the cached_charset into the THD, which seems to be a better place, since it describes which charset is currently active in the THD.
(I also introduced THD::system_thread_info, as I did not want to add even more data to the THD, which is already far too big; that's why the patch is a bit bigger).
It seems most reasonable to put cached_charset into the THD, and your patch is very smart and clean. Thanks!
participants (2)
-
Kristian Nielsen
-
nanyi607rao