
29 Nov '24
The FLUSH TABLE WITH READ LOCK briefly set the state (in PROCESSLIST) to
"Waiting while replication worker thread pool is busy", even if there was
nothing to wait for. This is somewhat confusing on a server that might not
even have any replication configured, let alone replication workers.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/rpl_parallel.cc | 29 +++++++++++++++++------------
1 file changed, 17 insertions(+), 12 deletions(-)
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index b5e1bb96c1e..4d3f12e4116 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -551,6 +551,7 @@ pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
{
PSI_stage_info old_stage;
int res= 0;
+ bool did_enter_cond= false;
/*
Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
@@ -567,24 +568,28 @@ pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
*/
DBUG_EXECUTE_IF("mark_busy_mdev_22370",my_sleep(1000000););
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
- if (thd)
- {
- thd->set_time_for_next_stage();
- thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
- &stage_waiting_for_rpl_thread_pool, &old_stage);
- }
- while (pool->busy)
+ if (pool->busy)
{
- if (thd && unlikely(thd->check_killed()))
+ if (thd)
{
- res= 1;
- break;
+ thd->set_time_for_next_stage();
+ thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
+ &stage_waiting_for_rpl_thread_pool, &old_stage);
+ did_enter_cond= true;
}
- mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ do
+ {
+ if (thd && unlikely(thd->check_killed()))
+ {
+ res= 1;
+ break;
+ }
+ mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ } while (pool->busy);
}
if (!res)
pool->busy= true;
- if (thd)
+ if (did_enter_cond)
thd->EXIT_COND(&old_stage);
else
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
--
2.39.5
1
0

[PATCH] MDEV-31756: WAIT/NOWAIT in DDL makes binary logs difficult or impossible to replay
by Kristian Nielsen 29 Nov '24
by Kristian Nielsen 29 Nov '24
29 Nov '24
Remove any WAIT <n> or NOWAIT option from the query before binlogging DDL.
Otherwise applying the event with mysqlbinlog | mysql may fail on NOWAIT
if there is temporarily a lock on the table during the apply (while no such
lock was there when the original query ran and was binlogged). Such locks
can occur even without user control, since InnoDB background tasks can
sometimes hold such locks for a short while.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
.../suite/binlog/r/binlog_ddl_nowait.result | 64 +++++++++++++++
.../suite/binlog/t/binlog_ddl_nowait.test | 77 +++++++++++++++++++
sql/sql_lex.cc | 26 +++++--
sql/sql_lex.h | 19 +++++
sql/sql_table.cc | 57 +++++++++++++-
sql/sql_yacc.yy | 62 ++++++++++++---
6 files changed, 286 insertions(+), 19 deletions(-)
create mode 100644 mysql-test/suite/binlog/r/binlog_ddl_nowait.result
create mode 100644 mysql-test/suite/binlog/t/binlog_ddl_nowait.test
diff --git a/mysql-test/suite/binlog/r/binlog_ddl_nowait.result b/mysql-test/suite/binlog/r/binlog_ddl_nowait.result
new file mode 100644
index 00000000000..86712fb533a
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_ddl_nowait.result
@@ -0,0 +1,64 @@
+*** MDEV-31756: WAIT/NOWAIT in DDL makes binary logs difficult or impossible to replay
+connection default;
+RESET MASTER;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
+ALTER TABLE t1 WAIT +0.0 ADD b INT;
+ALTER TABLE t1 WAIT +4.3 ADD c INT;
+INSERT INTO t1(a,b) VALUES (1,1), (2,1), (3,2), (4,3), (5, 5), (6, 8), (7, 13);
+CREATE INDEX b_idx ON t1(b) WAIT 0;
+OPTIMIZE TABLE t1 NOWAIT;
+Table Op Msg_type Msg_text
+test.t1 optimize status OK
+TRUNCATE TABLE t1 NOWAIT;
+DROP INDEX b_idx ON t1 WAIT 0;
+RENAME TABLE t1 NOWAIT TO t2;
+DROP TABLE t2 WAIT 1;
+connection default;
+FLUSH BINARY LOGS;
+SHOW CREATE TABLE t1;
+Table Create Table
+t1 CREATE TABLE `t1` (
+ `a` int(11) NOT NULL,
+ PRIMARY KEY (`a`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_swedish_ci
+connect con1,localhost,root,,;
+INSERT INTO t1 VALUES (0*SLEEP(0.2));
+connection default;
+SHOW CREATE TABLE t1;
+Table Create Table
+t1 CREATE TABLE `t1` (
+ `a` int(11) NOT NULL,
+ `b` int(11) DEFAULT NULL,
+ `c` int(11) DEFAULT NULL,
+ PRIMARY KEY (`a`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_swedish_ci
+connection con1;
+INSERT INTO t1(a) VALUES (0*SLEEP(0.2) + 100);
+connection default;
+Table Op Msg_type Msg_text
+test.t1 optimize status OK
+SHOW CREATE TABLE t1;
+Table Create Table
+t1 CREATE TABLE `t1` (
+ `a` int(11) NOT NULL,
+ `b` int(11) DEFAULT NULL,
+ `c` int(11) DEFAULT NULL,
+ PRIMARY KEY (`a`),
+ KEY `b_idx` (`b`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_swedish_ci
+connection con1;
+INSERT INTO t1(a) VALUES (0*SLEEP(0.2) + 200);
+connection default;
+SHOW CREATE TABLE t1;
+Table Create Table
+t1 CREATE TABLE `t1` (
+ `a` int(11) NOT NULL,
+ `b` int(11) DEFAULT NULL,
+ `c` int(11) DEFAULT NULL,
+ PRIMARY KEY (`a`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_swedish_ci
+connection con1;
+disconnect con1;
+connection default;
+NOT FOUND /WAIT/ in mdev31756.text
+DROP TABLE t1;
diff --git a/mysql-test/suite/binlog/t/binlog_ddl_nowait.test b/mysql-test/suite/binlog/t/binlog_ddl_nowait.test
new file mode 100644
index 00000000000..b2b5a4ec2b6
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_ddl_nowait.test
@@ -0,0 +1,77 @@
+# DDL binlogging is the same regardless of binlog format
+--source include/have_binlog_format_mixed.inc
+
+--echo *** MDEV-31756: WAIT/NOWAIT in DDL makes binary logs difficult or impossible to replay
+
+--connection default
+--let $datadir= `select @@datadir`
+RESET MASTER;
+
+--let $file= query_get_value(SHOW MASTER STATUS, File, 1)
+--let $pos0= query_get_value(SHOW MASTER STATUS, Position, 1)
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
+--let $pos1= query_get_value(SHOW MASTER STATUS, Position, 1)
+ALTER TABLE t1 WAIT +0.0 ADD b INT;
+ALTER TABLE t1 WAIT +4.3 ADD c INT;
+INSERT INTO t1(a,b) VALUES (1,1), (2,1), (3,2), (4,3), (5, 5), (6, 8), (7, 13);
+--let $pos2= query_get_value(SHOW MASTER STATUS, Position, 1)
+CREATE INDEX b_idx ON t1(b) WAIT 0;
+OPTIMIZE TABLE t1 NOWAIT;
+TRUNCATE TABLE t1 NOWAIT;
+--let $pos3= query_get_value(SHOW MASTER STATUS, Position, 1)
+DROP INDEX b_idx ON t1 WAIT 0;
+--let $pos4= query_get_value(SHOW MASTER STATUS, Position, 1)
+RENAME TABLE t1 NOWAIT TO t2;
+DROP TABLE t2 WAIT 1;
+
+
+# Test that we can mysqlbinlog|mysql the DDL even though it has to wait for
+# the table lock for a short while.
+#
+# This is very prone to races of course, but that is ok. The case we want to
+# test is when mysqbinlog|mysql runs while the table lock is held, and that
+# will be the case "most" of the time. If we race and the lock is not held,
+# it just means the test is ineffective, it will still pass.
+
+--connection default
+FLUSH BINARY LOGS;
+--exec $MYSQL_BINLOG --start-position=$pos0 --stop-position=$pos1 $datadir/master-bin.000001 | $MYSQL test
+SHOW CREATE TABLE t1;
+
+connect (con1,localhost,root,,);
+send INSERT INTO t1 VALUES (0*SLEEP(0.2));
+
+--connection default
+--exec $MYSQL_BINLOG --start-position=$pos1 --stop-position=$pos2 $datadir/master-bin.000001 | $MYSQL test
+SHOW CREATE TABLE t1;
+
+--connection con1
+reap;
+send INSERT INTO t1(a) VALUES (0*SLEEP(0.2) + 100);
+
+--connection default
+--exec $MYSQL_BINLOG --start-position=$pos2 --stop-position=$pos3 $datadir/master-bin.000001 | $MYSQL test
+SHOW CREATE TABLE t1;
+
+--connection con1
+reap;
+send INSERT INTO t1(a) VALUES (0*SLEEP(0.2) + 200);
+
+--connection default
+--exec $MYSQL_BINLOG --start-position=$pos3 --stop-position=$pos4 $datadir/master-bin.000001 | $MYSQL test
+SHOW CREATE TABLE t1;
+
+--connection con1
+reap;
+--disconnect con1
+--connection default
+
+# Check that there is no WAIT <n> or NOWAIT in binlogged queries.
+--exec $MYSQL_BINLOG $datadir/master-bin.000001 >$MYSQL_TMP_DIR/mdev31756.text
+--let SEARCH_FILE= $MYSQL_TMP_DIR/mdev31756.text
+--let SEARCH_PATTERN= WAIT
+--let SEARCH_ABORT= FOUND
+--source include/search_pattern_in_file.inc
+--remove_file $MYSQL_TMP_DIR/mdev31756.text
+
+DROP TABLE t1;
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index 0131068a530..75ec0d9dffe 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -1557,6 +1557,19 @@ LEX_CSTRING Lex_input_stream::get_token(uint skip, uint length)
}
+Lex_string_with_pos_st
+Lex_input_stream::get_token_with_pos(uint skip, uint length)
+{
+ LEX_CSTRING tmp;
+ Lex_string_with_pos_st out;
+ out.start_pos= m_tok_start + skip - get_buf();
+ tmp= get_token(skip, length);
+ out.str= tmp.str;
+ out.length= tmp.length;
+ return out;
+}
+
+
static size_t
my_unescape(CHARSET_INFO *cs, char *to, const char *str, const char *end,
int sep, bool backslash_escapes)
@@ -2193,7 +2206,7 @@ int Lex_input_stream::lex_one_token(YYSTYPE *yylval, THD *thd)
if ((yyLength() >= 3) && !ident_map[c])
{
/* skip '0x' */
- yylval->lex_str= get_token(2, yyLength() - 2);
+ yylval->lex_str_with_pos= get_token_with_pos(2, yyLength() - 2);
return (HEX_NUM);
}
yyUnget();
@@ -2233,7 +2246,7 @@ int Lex_input_stream::lex_one_token(YYSTYPE *yylval, THD *thd)
{
yySkip();
while (my_isdigit(cs, yyGet())) ;
- yylval->lex_str= get_token(0, yyLength());
+ yylval->lex_str_with_pos= get_token_with_pos(0, yyLength());
return(FLOAT_NUM);
}
}
@@ -2272,8 +2285,9 @@ int Lex_input_stream::lex_one_token(YYSTYPE *yylval, THD *thd)
- the number is either not followed by a dot at all, or
- the number is followed by a double dot as in: FOR i IN 1..10
*/
- yylval->lex_str= get_token(0, yyLength());
- return int_token(yylval->lex_str.str, (uint) yylval->lex_str.length);
+ yylval->lex_str_with_pos= get_token_with_pos(0, yyLength());
+ return int_token(yylval->lex_str_with_pos.str,
+ (uint) yylval->lex_str_with_pos.length);
}
// fall through
case MY_LEX_REAL: // Incomplete real number
@@ -2287,10 +2301,10 @@ int Lex_input_stream::lex_one_token(YYSTYPE *yylval, THD *thd)
if (!my_isdigit(cs, c))
return ABORT_SYM; // No digit after sign
while (my_isdigit(cs, yyGet())) ;
- yylval->lex_str= get_token(0, yyLength());
+ yylval->lex_str_with_pos= get_token_with_pos(0, yyLength());
return(FLOAT_NUM);
}
- yylval->lex_str= get_token(0, yyLength());
+ yylval->lex_str_with_pos= get_token_with_pos(0, yyLength());
return(DECIMAL_NUM);
case MY_LEX_HEX_NUMBER: // Found x'hexstring'
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 780ad3d9e16..0c17480bf3b 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -103,6 +103,12 @@ struct Lex_string_with_metadata_st: public LEX_CSTRING
};
+struct Lex_string_with_pos_st: public LEX_CSTRING
+{
+ uint start_pos;
+};
+
+
/*
Used to store identifiers in the client character set.
Points to a query fragment.
@@ -2863,6 +2869,7 @@ class Lex_input_stream
int find_keyword(Lex_ident_cli_st *str, uint len, bool function) const;
int find_keyword_qualified_special_func(Lex_ident_cli_st *str, uint len) const;
LEX_CSTRING get_token(uint skip, uint length);
+ Lex_string_with_pos_st get_token_with_pos(uint skip, uint length);
int scan_ident_start(THD *thd, Lex_ident_cli_st *str);
int scan_ident_middle(THD *thd, Lex_ident_cli_st *str,
CHARSET_INFO **cs, my_lex_states *);
@@ -3540,6 +3547,12 @@ struct LEX: public Query_tables_list
*/
uint table_count_update;
+ /*
+ Used to remember a character position during parsing, eg. the end of
+ ulong_num for ddl_wait_nowait_end_offset.
+ */
+ uint last_lex_end_pos;
+
uint8 describe;
/*
A flag that indicates what kinds of derived tables are present in the
@@ -3585,15 +3598,21 @@ struct LEX: public Query_tables_list
keyword_delayed_begin_offset is the offset to the beginning of the DELAYED
keyword in INSERT DELAYED statement. keyword_delayed_end_offset is the
offset to the character right after the DELAYED keyword.
+
+ Similarly, ddl_wait_nowait_begin_offset and ddl_wait_nowait_end_offset mark
+ the start and end of the "NOWAIT" or "WAIT <number>" (including last
+ character of <number>) option for ALTER TABLE and similar statements.
*/
union {
const char *stmt_definition_begin;
uint keyword_delayed_begin_offset;
+ uint ddl_wait_nowait_begin_offset;
};
union {
const char *stmt_definition_end;
uint keyword_delayed_end_offset;
+ uint ddl_wait_nowait_end_offset;
};
/**
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 748ae999087..a007f7689e6 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -996,6 +996,23 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
}
+/**
+ Create a new query string for DDL with the WAIT <n> or NOWAIT option removed.
+ This option should not be in the binlog, as a query that succeeded on the
+ master _must_ also succeed on the slave, even if it needs to wait.
+*/
+static int
+create_stmt_without_nowait(THD *thd, String *buf)
+{
+ if (buf->append(thd->query(), thd->query_length()) ||
+ buf->replace(thd->lex->ddl_wait_nowait_begin_offset,
+ thd->lex->ddl_wait_nowait_end_offset -
+ thd->lex->ddl_wait_nowait_begin_offset, NULL, 0))
+ return 1;
+ return 0;
+}
+
+
/*
SYNOPSIS
write_bin_log()
@@ -1035,9 +1052,43 @@ int write_bin_log(THD *thd, bool clear_error,
}
else
errcode= query_error_code(thd, TRUE);
- error= thd->binlog_query(THD::STMT_QUERY_TYPE,
- query, query_length, is_trans, FALSE, FALSE,
- errcode) > 0;
+
+ /* Remove any NOWAIT or WAIT <n> from DDL. */
+ String log_query;
+ switch(thd->lex->sql_command)
+ {
+ case SQLCOM_TRUNCATE:
+ case SQLCOM_DROP_INDEX:
+ case SQLCOM_RENAME_TABLE:
+ case SQLCOM_OPTIMIZE:
+ case SQLCOM_ALTER_TABLE:
+ case SQLCOM_CREATE_INDEX:
+ /* DROP TABLE is binlogged with a rewritten query, so omitted here. */
+ if (thd->lex->ddl_wait_nowait_begin_offset > 0)
+ {
+ if (create_stmt_without_nowait(thd, &log_query))
+ {
+ sql_print_error("Event Error: An error occurred while creating query "
+ "string for DDL with NOWAIT/WAIT removed, before "
+ "writing it into binary log.");
+ error= 1;
+ }
+ else
+ {
+ query= log_query.c_ptr();
+ query_length= log_query.length();
+ }
+ }
+ break;
+
+ default:
+ ; /* Nothing */
+ }
+
+ if (!error)
+ error= thd->binlog_query(THD::STMT_QUERY_TYPE,
+ query, query_length, is_trans, FALSE, FALSE,
+ errcode) > 0;
thd_proc_info(thd, 0);
}
return error;
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 85717713325..75be19cb4c8 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -201,6 +201,7 @@ void _CONCAT_UNDERSCORED(turn_parser_debug_on,yyparse)()
/* structs */
LEX_CSTRING lex_str;
+ Lex_string_with_pos_st lex_str_with_pos;
Lex_ident_cli_st kwd;
Lex_ident_cli_st ident_cli;
Lex_ident_sys_st ident_sys;
@@ -1316,9 +1317,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
%right INTO
%type <lex_str>
- DECIMAL_NUM FLOAT_NUM NUM LONG_NUM
- HEX_NUM HEX_STRING
- LEX_HOSTNAME ULONGLONG_NUM field_ident select_alias ident_or_text
+ HEX_STRING
+ LEX_HOSTNAME field_ident select_alias ident_or_text
TEXT_STRING_sys TEXT_STRING_literal
key_cache_name
sp_opt_label BIN_NUM TEXT_STRING_filesystem
@@ -1326,6 +1326,9 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
sp_block_label sp_control_label opt_place opt_db
udt_name
+%type <lex_str_with_pos>
+ DECIMAL_NUM FLOAT_NUM NUM LONG_NUM HEX_NUM ULONGLONG_NUM
+
%type <ident_sys>
IDENT_sys
ident_func
@@ -12747,12 +12750,41 @@ int_num:
;
ulong_num:
- opt_plus NUM { int error; $$= (ulong) my_strtoll10($2.str, (char**) 0, &error); }
- | HEX_NUM { $$= strtoul($1.str, (char**) 0, 16); }
- | opt_plus LONG_NUM { int error; $$= (ulong) my_strtoll10($2.str, (char**) 0, &error); }
- | opt_plus ULONGLONG_NUM { int error; $$= (ulong) my_strtoll10($2.str, (char**) 0, &error); }
- | opt_plus DECIMAL_NUM { int error; $$= (ulong) my_strtoll10($2.str, (char**) 0, &error); }
- | opt_plus FLOAT_NUM { int error; $$= (ulong) my_strtoll10($2.str, (char**) 0, &error); }
+ opt_plus NUM
+ {
+ int error;
+ $$= (ulong) my_strtoll10($2.str, (char**) 0, &error);
+ Lex->last_lex_end_pos= $2.start_pos + (uint)$2.length;
+ }
+ | HEX_NUM
+ {
+ $$= strtoul($1.str, (char**) 0, 16);
+ Lex->last_lex_end_pos= $1.start_pos + (uint)$1.length;
+ }
+ | opt_plus LONG_NUM
+ {
+ int error;
+ $$= (ulong) my_strtoll10($2.str, (char**) 0, &error);
+ Lex->last_lex_end_pos= $2.start_pos + (uint)$2.length;
+ }
+ | opt_plus ULONGLONG_NUM
+ {
+ int error;
+ $$= (ulong) my_strtoll10($2.str, (char**) 0, &error);
+ Lex->last_lex_end_pos= $2.start_pos + (uint)$2.length;
+ }
+ | opt_plus DECIMAL_NUM
+ {
+ int error;
+ $$= (ulong) my_strtoll10($2.str, (char**) 0, &error);
+ Lex->last_lex_end_pos= $2.start_pos + (uint)$2.length;
+ }
+ | opt_plus FLOAT_NUM
+ {
+ int error;
+ $$= (ulong) my_strtoll10($2.str, (char**) 0, &error);
+ Lex->last_lex_end_pos= $2.start_pos + (uint)$2.length;
+ }
;
real_ulong_num:
@@ -17116,15 +17148,25 @@ lock:
opt_lock_wait_timeout:
/* empty */
- {}
+ {
+ LEX *lex= Lex;
+ lex->ddl_wait_nowait_begin_offset= 0;
+ lex->ddl_wait_nowait_end_offset= 0;
+ }
| WAIT_SYM ulong_num
{
+ LEX *lex= Lex;
+ lex->ddl_wait_nowait_begin_offset= (uint)($1.pos() - thd->query());
+ lex->ddl_wait_nowait_end_offset= lex->last_lex_end_pos;
if (unlikely(set_statement_var_if_exists(thd, STRING_WITH_LEN("lock_wait_timeout"), $2)) ||
unlikely(set_statement_var_if_exists(thd, STRING_WITH_LEN("innodb_lock_wait_timeout"), $2)))
MYSQL_YYABORT;
}
| NOWAIT_SYM
{
+ LEX *lex= Lex;
+ lex->ddl_wait_nowait_begin_offset= (uint)($1.pos() - thd->query());
+ lex->ddl_wait_nowait_end_offset= (uint)($1.end() - thd->query());
if (unlikely(set_statement_var_if_exists(thd, STRING_WITH_LEN("lock_wait_timeout"), 0)) ||
unlikely(set_statement_var_if_exists(thd, STRING_WITH_LEN("innodb_lock_wait_timeout"), 0)))
MYSQL_YYABORT;
--
2.39.5
1
0

[PATCH] MDEV-31761: mariadb-binlog prints fractional timestamp part incorrectly
by Kristian Nielsen 28 Nov '24
by Kristian Nielsen 28 Nov '24
28 Nov '24
Fractional part < 100000 microseconds was printed without leading zeros,
causing such timestamps to be applied incorrectly in mariadb-binlog | mysql
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
mysql-test/main/mysqlbinlog.result | 26 ++++++++++++++++++++++++
mysql-test/main/mysqlbinlog.test | 32 ++++++++++++++++++++++++++++++
sql/log_event_client.cc | 7 +++++--
3 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/mysql-test/main/mysqlbinlog.result b/mysql-test/main/mysqlbinlog.result
index c6d9ef97229..76ded3d31f8 100644
--- a/mysql-test/main/mysqlbinlog.result
+++ b/mysql-test/main/mysqlbinlog.result
@@ -1286,3 +1286,29 @@ ERROR: Bad syntax in rewrite-db: empty FROM db
ERROR: Bad syntax in rewrite-db: empty FROM db
+#
+# MDEV-31761: Timestamp is written into binary log incorrectly
+#
+SET SESSION binlog_format= MIXED;
+RESET MASTER;
+CREATE TABLE t (a INT,
+b TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6));
+set SESSION timestamp= 1689978980.012345;
+INSERT INTO t (a) VALUES (1);
+SELECT * from t;
+a b
+1 2023-07-22 00:36:20.012345
+FLUSH BINARY LOGS;
+SET SESSION timestamp= 1689978980.567890;
+SET SESSION binlog_format= ROW;
+UPDATE t SET a = 2;
+FLUSH BINARY LOGS;
+SET SESSION binlog_format= STATEMENT;
+DROP TABLE t;
+SELECT * FROM t;
+a b
+1 2023-07-22 00:36:20.012345
+SELECT * FROM t;
+a b
+2 2023-07-22 00:36:20.567890
+DROP TABLE t;
diff --git a/mysql-test/main/mysqlbinlog.test b/mysql-test/main/mysqlbinlog.test
index 22a85393a35..1747d55d29d 100644
--- a/mysql-test/main/mysqlbinlog.test
+++ b/mysql-test/main/mysqlbinlog.test
@@ -637,3 +637,35 @@ FLUSH LOGS;
--exec $MYSQL_BINLOG --rewrite-db=" ->" --short-form $MYSQLD_DATADIR/master-bin.000001 2>&1
--exec $MYSQL_BINLOG --rewrite-db=" test -> foo " --short-form $MYSQLD_DATADIR/master-bin.000001 > /dev/null 2> $MYSQLTEST_VARDIR/tmp/mysqlbinlog.warn
+
+
+--echo #
+--echo # MDEV-31761: Timestamp is written into binary log incorrectly
+--echo #
+
+SET SESSION binlog_format= MIXED;
+
+RESET MASTER;
+CREATE TABLE t (a INT,
+ b TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6));
+set SESSION timestamp= 1689978980.012345;
+INSERT INTO t (a) VALUES (1);
+SELECT * from t;
+FLUSH BINARY LOGS;
+SET SESSION timestamp= 1689978980.567890;
+SET SESSION binlog_format= ROW;
+UPDATE t SET a = 2;
+FLUSH BINARY LOGS;
+SET SESSION binlog_format= STATEMENT;
+
+# Replay to see that timestamps are applied correctly.
+# The bug was that leading zeros on the fractional part were not included in
+# the mysqlbinlog output, so 1689978980.012345 was applied as 1689978980.12345.
+
+DROP TABLE t;
+--let $datadir= `select @@datadir`
+--exec $MYSQL_BINLOG $datadir/master-bin.000001 | $MYSQL test
+SELECT * FROM t;
+--exec $MYSQL_BINLOG $datadir/master-bin.000002 | $MYSQL test
+SELECT * FROM t;
+DROP TABLE t;
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc
index 11fabbbca39..720cc5ab611 100644
--- a/sql/log_event_client.cc
+++ b/sql/log_event_client.cc
@@ -1851,8 +1851,11 @@ bool Query_log_event::print_query_header(IO_CACHE* file,
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
if (when_sec_part && when_sec_part <= TIME_MAX_SECOND_PART)
{
- *end++= '.';
- end=int10_to_str(when_sec_part, end, 10);
+ char buff2[1 + 6 + 1];
+ /* Ensure values < 100000 are printed with leading zeros, MDEV-31761. */
+ snprintf(buff2, sizeof(buff2), ".%06lu", when_sec_part);
+ DBUG_ASSERT(strlen(buff2) == 1 + 6);
+ end= strmov(end, buff2);
}
end= strmov(end, print_event_info->delimiter);
*end++='\n';
--
2.39.5
1
0

28 Nov '24
If both do_gco_wait() and do_ftwrl_wait() had to wait, the state was not restored correctly.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/rpl_parallel.cc | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index e59c1de7a7c..553557638f6 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -500,7 +500,8 @@ do_ftwrl_wait(rpl_group_info *rgi,
{
thd->set_time_for_next_stage();
thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
- &stage_waiting_for_ftwrl, old_stage);
+ &stage_waiting_for_ftwrl,
+ (*did_enter_cond ? nullptr : old_stage));
*did_enter_cond= true;
do
{
--
2.39.5
1
0

[PATCH] MDEV-34049: Parallel access to temptable in different domain_id in parallel replication
by Kristian Nielsen 25 Oct '24
by Kristian Nielsen 25 Oct '24
25 Oct '24
Disallow changing @@gtid_domain_id while a temporary table is open in
STATEMENT or MIXED binlog mode. Otherwise, a slave may try to replicate
events refering to the same temporary table in parallel, using domain-based
out-of-order parallel replication. This is not valid, temporary tables are
only available for use within a single thread at a time.
Use an existing error code that's somewhat close to the real issue
(ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO), to not add a
new error code in a GA release. When this is merged to the next GA release,
we could optionally introduce a new and more precise error code for an
attempt to change the domain_id while temporary tables are open.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
.../suite/rpl/r/rpl_gtid_errorhandling.result | 36 ++++++++++++++++---
.../suite/rpl/t/rpl_gtid_errorhandling.test | 31 ++++++++++++++++
sql/sys_vars.cc | 27 ++++++++++++--
3 files changed, 87 insertions(+), 7 deletions(-)
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_errorhandling.result b/mysql-test/suite/rpl/r/rpl_gtid_errorhandling.result
index 54156685806..5e26bdb0032 100644
--- a/mysql-test/suite/rpl/r/rpl_gtid_errorhandling.result
+++ b/mysql-test/suite/rpl/r/rpl_gtid_errorhandling.result
@@ -60,6 +60,34 @@ ROLLBACK;
SELECT * FROM t1 ORDER BY a;
a
1
+SET @old_mode= @@SESSION.binlog_format;
+SET SESSION binlog_format= row;
+SET SESSION gtid_domain_id= 200;
+CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+SET SESSION gtid_domain_id= 0;
+BEGIN;
+INSERT INTO t2 VALUES (200);
+INSERT INTO t1 SELECT * FROM t2;
+COMMIT;
+SET SESSION gtid_domain_id= 201;
+SET SESSION gtid_domain_id= 0;
+DELETE FROM t1 WHERE a=200;
+SET SESSION gtid_domain_id= 202;
+DROP TEMPORARY TABLE t2;
+SET SESSION binlog_format= mixed;
+SET SESSION gtid_domain_id= 0;
+CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1);
+SET SESSION gtid_domain_id= 0;
+SET SESSION gtid_domain_id= 204;
+ERROR HY000: Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a transaction
+SET SESSION binlog_format=statement;
+INSERT INTO t2 VALUES (2);
+SET SESSION gtid_domain_id= 205;
+ERROR HY000: Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a transaction
+DROP TEMPORARY TABLE t2;
+SET SESSION gtid_domain_id= @old_domain;
+SET SESSION binlog_format= @old_mode;
*** Test requesting an explicit GTID position that conflicts with newer GTIDs of our own in the binlog. ***
connection slave;
include/stop_slave.inc
@@ -83,16 +111,16 @@ ERROR 25000: You are not allowed to execute this command in a transaction
ROLLBACK;
SET GLOBAL gtid_strict_mode= 1;
SET GLOBAL gtid_slave_pos = "0-1-1";
-ERROR HY000: Specified GTID 0-1-1 conflicts with the binary log which contains a more recent GTID 0-2-11. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
+ERROR HY000: Specified GTID 0-1-1 conflicts with the binary log which contains a more recent GTID 0-2-17. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
SET GLOBAL gtid_slave_pos = "";
-ERROR HY000: Specified value for @@gtid_slave_pos contains no value for replication domain 0. This conflicts with the binary log which contains GTID 0-2-11. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
+ERROR HY000: Specified value for @@gtid_slave_pos contains no value for replication domain 0. This conflicts with the binary log which contains GTID 0-2-17. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
SET GLOBAL gtid_strict_mode= 0;
SET GLOBAL gtid_slave_pos = "0-1-1";
Warnings:
-Warning 1947 Specified GTID 0-1-1 conflicts with the binary log which contains a more recent GTID 0-2-11. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
+Warning 1947 Specified GTID 0-1-1 conflicts with the binary log which contains a more recent GTID 0-2-17. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
SET GLOBAL gtid_slave_pos = "";
Warnings:
-Warning 1948 Specified value for @@gtid_slave_pos contains no value for replication domain 0. This conflicts with the binary log which contains GTID 0-2-11. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
+Warning 1948 Specified value for @@gtid_slave_pos contains no value for replication domain 0. This conflicts with the binary log which contains GTID 0-2-17. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos
RESET MASTER;
SET GLOBAL gtid_slave_pos = "0-1-1";
START SLAVE;
diff --git a/mysql-test/suite/rpl/t/rpl_gtid_errorhandling.test b/mysql-test/suite/rpl/t/rpl_gtid_errorhandling.test
index eec7a275e03..e3ca6df1a6e 100644
--- a/mysql-test/suite/rpl/t/rpl_gtid_errorhandling.test
+++ b/mysql-test/suite/rpl/t/rpl_gtid_errorhandling.test
@@ -68,6 +68,37 @@ SELECT * FROM t1 ORDER BY a;
ROLLBACK;
SELECT * FROM t1 ORDER BY a;
+# MDEV-34049: Parallel access to temptable in different domain_id in parallel replication
+SET @old_mode= @@SESSION.binlog_format;
+SET SESSION binlog_format= row;
+SET SESSION gtid_domain_id= 200;
+CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+SET SESSION gtid_domain_id= 0;
+BEGIN;
+INSERT INTO t2 VALUES (200);
+INSERT INTO t1 SELECT * FROM t2;
+COMMIT;
+SET SESSION gtid_domain_id= 201;
+SET SESSION gtid_domain_id= 0;
+DELETE FROM t1 WHERE a=200;
+SET SESSION gtid_domain_id= 202;
+DROP TEMPORARY TABLE t2;
+
+SET SESSION binlog_format= mixed;
+SET SESSION gtid_domain_id= 0;
+CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1);
+SET SESSION gtid_domain_id= 0;
+--error ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
+SET SESSION gtid_domain_id= 204;
+SET SESSION binlog_format=statement;
+INSERT INTO t2 VALUES (2);
+--error ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
+SET SESSION gtid_domain_id= 205;
+DROP TEMPORARY TABLE t2;
+SET SESSION gtid_domain_id= @old_domain;
+SET SESSION binlog_format= @old_mode;
+
--echo *** Test requesting an explicit GTID position that conflicts with newer GTIDs of our own in the binlog. ***
--connection slave
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 115bbdf499b..2ef1e404f85 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1808,12 +1808,33 @@ Sys_pseudo_thread_id(
static bool
check_gtid_domain_id(sys_var *self, THD *thd, set_var *var)
{
- if (var->type != OPT_GLOBAL &&
- error_if_in_trans_or_substatement(thd,
+ if (var->type != OPT_GLOBAL)
+ {
+ if (error_if_in_trans_or_substatement(thd,
ER_STORED_FUNCTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO,
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO))
return true;
-
+ /*
+ All binlogged statements on a temporary table must be binlogged in the
+ same domain_id; it is not safe to run them in parallel in different
+ domains, temporary table must be exclusive to a single thread.
+ In row-based binlogging, temporary tables do not end up in the binlog,
+ so there is no such issue.
+
+ ToDo: When merging to next (non-GA) release, introduce a more specific
+ error that describes that the problem is changing gtid_domain_id with
+ open temporary tables in statement/mixed binlogging mode; it is not
+ really due to doing it inside a "transaction".
+ */
+ if (thd->has_thd_temporary_tables() &&
+ !thd->is_current_stmt_binlog_format_row() &&
+ var->save_result.ulonglong_value != thd->variables.gtid_domain_id)
+ {
+ my_error(ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO,
+ MYF(0));
+ return true;
+ }
+ }
return false;
}
--
2.39.5
1
0

[PATCH] MDEV-29744: Fix incorrect locking order of LOCK_log/LOCK_commit_ordered and LOCK_global_system_variables
by Kristian Nielsen 25 Oct '24
by Kristian Nielsen 25 Oct '24
25 Oct '24
The LOCK_global_system_variables must not be held when taking mutexes
such as LOCK_commit_ordered and LOCK_log, as this causes inconsistent
mutex locking order that can theoretically cause the server to
deadlock.
To avoid this, temporarily release LOCK_global_system_variables in two
system variable update functions, like it is done in many other
places.
Enforce the correct locking order at server startup, to more easily
catch (in debug builds) any remaining wrong orders that may be hidden
elsewhere in the code.
Note that when this is merged to 11.4, similar unlock/lock of
LOCK_global_system_variables must be added in update_binlog_space_limit()
as is done in binlog_checksum_update() and fix_max_binlog_size(), as this
is a new function added in 11.4 that also needs the same fix. Tests will
fail with wrong mutex order until this is done.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
fixup_locking_order
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/log.cc | 5 +++++
sql/sys_vars.cc | 2 ++
2 files changed, 7 insertions(+)
diff --git a/sql/log.cc b/sql/log.cc
index 512be2e2a6d..ecb177a34fb 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -3595,6 +3595,9 @@ void MYSQL_BIN_LOG::init_pthread_objects()
mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
MY_MUTEX_INIT_SLOW);
+
+ /* Fix correct mutex order to catch violations quicker (MDEV-35197). */
+ mysql_mutex_record_order(&LOCK_log, &LOCK_global_system_variables);
}
@@ -11753,6 +11756,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
bool check_purge= false;
ulong UNINIT_VAR(prev_binlog_id);
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(mysql_bin_log.get_log_lock());
if(mysql_bin_log.is_open())
{
@@ -11771,6 +11775,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
if (check_purge)
mysql_bin_log.checkpoint_and_purge(prev_binlog_id);
+ mysql_mutex_lock(&LOCK_global_system_variables);
}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index cb35386f883..115bbdf499b 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1660,7 +1660,9 @@ Sys_max_binlog_stmt_cache_size(
static bool fix_max_binlog_size(sys_var *self, THD *thd, enum_var_type type)
{
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_bin_log.set_max_size(max_binlog_size);
+ mysql_mutex_lock(&LOCK_global_system_variables);
return false;
}
static Sys_var_on_access_global<Sys_var_ulong,
--
2.39.5
1
0

[PATCH] MDEV-35197: Fix incorrect locking order of LOCK_log/LOCK_commit_ordered and LOCK_global_system_variables
by Kristian Nielsen 22 Oct '24
by Kristian Nielsen 22 Oct '24
22 Oct '24
The LOCK_global_system_variables must not be held when taking mutexes
such as LOCK_commit_ordered and LOCK_log, as this causes inconsistent
mutex locking order that can theoretically cause the server to
deadlock.
To avoid this, temporarily release LOCK_global_system_variables in two
system variable update functions, like it is done in many other
places.
Enforce the correct locking order at server startup, to more easily
catch (in debug builds) any remaining wrong orders that may be hidden
elsewhere in the code.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/log.cc | 8 ++++++++
sql/sys_vars.cc | 2 ++
2 files changed, 10 insertions(+)
diff --git a/sql/log.cc b/sql/log.cc
index 512be2e2a6d..70fe33f9272 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -3595,6 +3595,12 @@ void MYSQL_BIN_LOG::init_pthread_objects()
mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
MY_MUTEX_INIT_SLOW);
+
+ /* Fix correct mutex order to catch violations quicker (MDEV-35197). */
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_unlock(&LOCK_log);
}
@@ -11753,6 +11759,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
bool check_purge= false;
ulong UNINIT_VAR(prev_binlog_id);
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(mysql_bin_log.get_log_lock());
if(mysql_bin_log.is_open())
{
@@ -11771,6 +11778,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
if (check_purge)
mysql_bin_log.checkpoint_and_purge(prev_binlog_id);
+ mysql_mutex_lock(&LOCK_global_system_variables);
}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index cb35386f883..115bbdf499b 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1660,7 +1660,9 @@ Sys_max_binlog_stmt_cache_size(
static bool fix_max_binlog_size(sys_var *self, THD *thd, enum_var_type type)
{
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_bin_log.set_max_size(max_binlog_size);
+ mysql_mutex_lock(&LOCK_global_system_variables);
return false;
}
static Sys_var_on_access_global<Sys_var_ulong,
--
2.39.2
1
0

21 Oct '24
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
cmake/mariadb_connector_c.cmake | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/cmake/mariadb_connector_c.cmake b/cmake/mariadb_connector_c.cmake
index 9b4f25498fd..aff69b3ba17 100644
--- a/cmake/mariadb_connector_c.cmake
+++ b/cmake/mariadb_connector_c.cmake
@@ -32,6 +32,10 @@ ELSE()
SET(CONC_INSTALL_LAYOUT "DEFAULT")
ENDIF()
+IF(WITH_BOOST_CONTEXT)
+ SET(CONC_WITH_BOOST_CONTEXT ON)
+ENDIF()
+
SET(PLUGIN_INSTALL_DIR ${INSTALL_PLUGINDIR})
SET(MARIADB_UNIX_ADDR ${MYSQL_UNIX_ADDR})
--
2.39.2
1
0

16 Oct '24
Follow-up from previous patch, which introduced catalog in the GTID event.
With catalog in GTID, there is no longer a need to use the catalog in
Query_log_event or Table_map_log_event. In fact, since catalog must never
change in the middle of a transaction, it is better to _only_ set the
catalog from the GTID event and don't use any catalog info from other
events.
To save space, remove the previously introduced catalog from
Table_map_log_event. (The catalog in Query_log_event is kept as it was there
for long, so as not to change event formats).
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/log_event.cc | 102 ++++++++++-----------------------------
sql/log_event.h | 23 ++-------
sql/log_event_client.cc | 51 ++++++++------------
sql/log_event_server.cc | 104 +++++++++++++++++++++++-----------------
4 files changed, 110 insertions(+), 170 deletions(-)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 004010e9d99..e8bb22b0ecb 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1376,7 +1376,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
Log_event_type event_type)
:Log_event(buf, description_event), data_buf(0),
catalog(0), db(NullS), query(NullS),
- status_vars_len(0),
+ catalog_len(0), status_vars_len(0),
flags2_inited(0), sql_mode_inited(0), charset_inited(0), flags2(0),
auto_increment_increment(1), auto_increment_offset(1),
time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
@@ -1387,12 +1387,11 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
uint8 common_header_len, post_header_len;
Log_event::Byte *start;
const Log_event::Byte *end;
+ bool catalog_nz= 1;
DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
memset(&user, 0, sizeof(user));
memset(&host, 0, sizeof(host));
- catalog_name.str= 0;
- catalog_name.length= 0;
common_header_len= description_event->common_header_len;
post_header_len= description_event->post_header_len[event_type-1];
DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
@@ -1461,16 +1460,13 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
}
case Q_CATALOG_NZ_CODE:
{
- uint length;
DBUG_PRINT("info", ("case Q_CATALOG_NZ_CODE; pos:%p; end:%p",
pos, end));
- if (get_str_len_and_pointer(&pos, &catalog_name.str, &length, end))
+ if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
{
query= 0;
DBUG_VOID_RETURN;
}
- catalog_name.length= length;
- break;
}
break;
case Q_AUTO_INCREMENT:
@@ -1500,10 +1496,11 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
{
CHECK_SPACE(pos, end, 1);
- if ((catalog_name.length= *pos))
- catalog_name.str= (char*) pos+1; // Will be copied later
- CHECK_SPACE(pos, end, catalog_name.length + 2);
- pos+= catalog_name.length+2; // leap over end 0
+ if ((catalog_len= *pos))
+ catalog= (char*) pos+1; // Will be copied later
+ CHECK_SPACE(pos, end, catalog_len + 2);
+ pos+= catalog_len+2; // leap over end 0
+ catalog_nz= 0; // catalog has end 0 in event
}
break;
case Q_LC_TIME_NAMES_CODE:
@@ -1618,7 +1615,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
#if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
if (!(start= data_buf= (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
- catalog_name.length + 1
+ catalog_len + 1
+ time_zone_len + 1
+ user.length + 1
+ host.length + 1
@@ -1630,7 +1627,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
MYF(MY_WME))))
#else
if (!(start= data_buf= (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
- catalog_name.length + 1
+ catalog_len + 1
+ time_zone_len + 1
+ user.length + 1
+ host.length + 1
@@ -1638,36 +1635,22 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
MYF(MY_WME))))
#endif
DBUG_VOID_RETURN;
-
- /*
- Ensure we can support old replication clients that are using 'std' as catalog name
- This is also needed to support old mtr test that uses copies of old replication logs that
- still are using 'std'.
- */
- if (catalog_name.length == 3 and memcmp(catalog_name.str, "std", 3) == 0)
- catalog_name.str= "def";
-
-#ifndef MYSQL_CLIENT
- if (catalog_name.length) // If catalog was given
+ if (catalog_len) // If catalog is given
{
- if (!(catalog= get_catalog(&catalog_name, 1)))
+ /**
+ @todo we should clean up and do only copy_str_and_move; it
+ works for both cases. Then we can remove the catalog_nz
+ flag. /sven
+ */
+ if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
+ copy_str_and_move(&catalog, &start, catalog_len);
+ else
{
- if (!user.str)
- user.str= "";
- if (!host.str)
- host.str= "";
- my_error(ER_ACCESS_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
- user.str, host.str, catalog_name.length, catalog_name.str);
- query= 0;
- DBUG_VOID_RETURN;
+ memcpy(start, catalog, catalog_len+1); // copy end 0
+ catalog= (const char *)start;
+ start+= catalog_len+1;
}
}
- else
- catalog= default_catalog();
-#endif
-
- copy_str_and_move(&catalog_name.str, &start, catalog_name.length);
-
if (time_zone_len)
copy_str_and_move(&time_zone_str, &start, time_zone_len);
@@ -2456,13 +2439,14 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
return;
}
uint32_t cat_len= *buf++;
- if (unlikely(cat_len > MAX_CATALOG_NAME) ||
+ if (unlikely(cat_len >= MAX_CATALOG_NAME) ||
unlikely(buf - buf_0 + cat_len) >= event_len)
{
seq_no= 0;
return;
}
memcpy(cat_name_buf, buf, cat_len);
+ cat_name_buf[cat_len]= '\0';
cat_name_int.str= cat_name_buf;
cat_name_int.length= cat_len;
cat_name= &cat_name_int;
@@ -3341,7 +3325,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
#ifndef MYSQL_CLIENT
m_table(NULL),
#endif
- m_catalog(NULL),
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
m_colcnt(0), m_coltype(0),
m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
@@ -3350,7 +3333,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
m_optional_metadata_len(0), m_optional_metadata(NULL)
{
unsigned int bytes_read= 0;
- LEX_CSTRING catalog= {"",0};
uint8 common_header_len= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
@@ -3359,9 +3341,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
"post_header_len: %d",
event_len, common_header_len, post_header_len));
- m_catnam.str="";
- m_catnam.length= 0;
-
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
@@ -3396,35 +3375,7 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
m_flags= uint2korr(post_start);
/* Read the variable part of the event */
- const uchar *vpart= buf + common_header_len + post_header_len;
-
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- {
- size_t catalog_len= *vpart;
- const char *catalog_name= (const char*) vpart + 1;
-
- catalog.str= catalog_name;
- catalog.length= catalog_len;
- if (vpart + catalog_len + 2 > buf + event_len)
- DBUG_VOID_RETURN;
-
-#ifndef MYSQL_CLIENT
- if (!(m_catalog= get_catalog(&catalog, 1)))
- {
- my_error(ER_ACCESS_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
- "replication", "localhost",
- catalog_len, catalog_name);
- DBUG_VOID_RETURN;
- }
-#endif
- vpart+= catalog_len+2;
- }
-#ifndef MYSQL_CLIENT
- else
- {
- m_catalog= default_catalog();
- }
-#endif /* MYSQL_CLIENT */
+ const uchar *const vpart= buf + common_header_len + post_header_len;
/* Extract the length of the various parts from the buffer */
uchar const *const ptr_dblen= (uchar const*)vpart + 0;
@@ -3449,7 +3400,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
m_memory= (uchar*) my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
- &m_catnam.str, (uint) catalog.length+1,
&m_dbnam, (uint) m_dblen + 1,
&m_tblnam, (uint) m_tbllen + 1,
&m_coltype, (uint) m_colcnt,
@@ -3458,8 +3408,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
if (m_memory)
{
/* Copy the different parts into their memory */
- memcpy(const_cast<char*>(m_catnam.str), catalog.str, catalog.length+1);
- m_catnam.length= catalog.length;
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
diff --git a/sql/log_event.h b/sql/log_event.h
index 2f866efca7c..3099d693c8b 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -838,7 +838,7 @@ typedef struct st_print_event_info
them if they are unchanged.
*/
char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
- const SQL_CATALOG *catalog;
+ char catalog_name[MAX_CATALOG_NAME];
char charset[6]; // 3 variables, each of them storable in 2 bytes
char time_zone_str[MAX_TIME_ZONE_NAME_LENGTH];
char delimiter[16];
@@ -2082,8 +2082,7 @@ class Query_log_event: public Log_event
protected:
Log_event::Byte* data_buf;
public:
- const SQL_CATALOG *catalog;
- LEX_CSTRING catalog_name;
+ const char *catalog;
const char *db;
const char *query;
/*
@@ -2107,6 +2106,8 @@ class Query_log_event: public Log_event
concerned) from here.
*/
+ uint catalog_len; // <= 255 char; 0 means uninited
+
/*
We want to be able to store a variable number of N-bit status vars:
(generally N=32; but N=64 for SQL_MODE) a user may want to log the number
@@ -3791,19 +3792,8 @@ class Annotate_rows_log_event: public Log_event
<td>Has the following bit values</tdd>
<td>TM_BIT_LEN_EXACT_F= 1 << 0</td>
<td>TM_BIT_HAS_TRIGGERS=1 << 14</td>
- <td>TM_BIT_HAS_CATALOG= 1 << 15</td>
</tr>
- <tr>
- <td>catalog_name</td>
- <td>one byte string length, followed by null-terminated string</td>
- <td>The name of the catalog in which the table resides. The name
- is represented as a one byte unsigned integer representing the
- number of bytes in the name, followed by length bytes containing
- the database name, followed by a terminating 0 byte. (Note the
- redundancy in the representation of the length.)
- This entry only exists if bit TM_BIT_HAS_CATALOG is set.</td>
- </tr>
</table>
The Body has the following components:
@@ -4362,8 +4352,7 @@ class Table_map_log_event : public Log_event
TM_NO_FLAGS = 0U,
TM_BIT_LEN_EXACT_F = (1U << 0),
// MariaDB flags (we starts from the other end)
- TM_BIT_HAS_TRIGGERS_F= (1U << 14),
- TM_BIT_HAS_CATALOG_F= (1U << 15),
+ TM_BIT_HAS_TRIGGERS_F= (1U << 14)
};
flag_set get_flags(flag_set flag) const { return m_flags & flag; }
@@ -4468,8 +4457,6 @@ class Table_map_log_event : public Log_event
class Default_charset_iterator;
class Column_charset_iterator;
#endif
- const SQL_CATALOG *m_catalog;
- LEX_CSTRING m_catnam; // Used by mysqlbinlog
char const *m_dbnam;
size_t m_dblen;
char const *m_tblnam;
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc
index 9315fdcf32b..b73dcd5c02c 100644
--- a/sql/log_event_client.cc
+++ b/sql/log_event_client.cc
@@ -1849,14 +1849,6 @@ bool Query_log_event::print_query_header(IO_CACHE* file,
goto err;
}
}
- if (print_event_info->catalog != catalog)
- {
- print_event_info->catalog= catalog;
- if (my_b_printf(file, "/*!110600 SET CATALOG `%.*s` */%s\n",
- catalog_name.length, catalog_name.str,
- print_event_info->delimiter))
- goto err;
- }
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
if (when_sec_part && when_sec_part <= TIME_MAX_SECOND_PART)
@@ -3017,11 +3009,9 @@ int Table_map_log_event::rewrite_db(const char *new_db, size_t new_len,
// m_dbnam resides in m_memory together with m_tblnam and m_coltype
uchar* memory= m_memory;
char const* tblnam= m_tblnam;
- char const* catnam= m_catnam.str;
uchar* coltype= m_coltype;
m_memory= (uchar*) my_multi_malloc(PSI_NOT_INSTRUMENTED, MYF(MY_WME),
- &m_catnam.str, (uint) m_catnam.length,
&m_dbnam, (uint) m_dblen + 1,
&m_tblnam, (uint) m_tbllen + 1,
&m_coltype, (uint) m_colcnt,
@@ -3030,13 +3020,11 @@ int Table_map_log_event::rewrite_db(const char *new_db, size_t new_len,
if (!m_memory)
{
sql_print_error("Table_map_log_event::rewrite_db: "
- "failed to allocate new m_memory (%d, %d + %d + %d bytes required)",
- (int) m_catnam.length + 1,+ m_dblen + 1, m_tbllen + 1,
- m_colcnt);
+ "failed to allocate new m_memory (%d + %d + %d bytes required)",
+ m_dblen + 1, m_tbllen + 1, m_colcnt);
DBUG_RETURN(-1);
}
- memcpy((void*)m_catnam.str, catnam, m_catnam.length+1);
memcpy((void*)m_dbnam, new_db, m_dblen + 1);
memcpy((void*)m_tblnam, tblnam, m_tbllen + 1);
memcpy(m_coltype, coltype, m_colcnt);
@@ -3051,26 +3039,13 @@ bool Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
if (!print_event_info->short_form)
{
char llbuff[22];
- char options[80], *ptr;
-
- ptr= options;
- if (m_flags & (TM_BIT_HAS_TRIGGERS_F | TM_BIT_HAS_CATALOG_F))
- {
- ptr= strmov(ptr, " (has ");
- if (m_flags & TM_BIT_HAS_TRIGGERS_F)
- ptr= strmov(ptr, "triggers,");
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- ptr= strmov(ptr, "catalog,");
- ptr[-1]= ')';
- ptr++;
- }
- *ptr= 0;
print_header(&print_event_info->head_cache, print_event_info, TRUE);
if (my_b_printf(&print_event_info->head_cache,
"\tTable_map: %`s.%`s mapped to number %s%s\n",
m_dbnam, m_tblnam, ullstr(m_table_id, llbuff),
- options))
+ ((m_flags & TM_BIT_HAS_TRIGGERS_F) ?
+ " (has triggers)" : "")))
goto err;
}
if (!print_event_info->short_form || print_event_info->print_row_count)
@@ -3653,8 +3628,8 @@ st_print_event_info::st_print_event_info()
bzero(db, sizeof(db));
bzero(charset, sizeof(charset));
bzero(time_zone_str, sizeof(time_zone_str));
- // Set catalog to impossible value to ensure that catalog is updated later!
- catalog= (SQL_CATALOG *) 1;
+ catalog_name[0]= '\0';
+ catalog_length= 0;
delimiter[0]= ';';
delimiter[1]= 0;
flags2_inited= 0;
@@ -3725,6 +3700,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
Write_on_release_cache::FLUSH_F, this);
char buf[21];
char buf2[21];
+ bool different;
if (!print_event_info->short_form && !is_flashback)
{
@@ -3797,6 +3773,19 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
buf, print_event_info->delimiter))
goto err;
}
+
+ different= print_event_info->catalog_length != cat_name->length ||
+ memcmp(print_event_info->catalog_name, cat_name->str, cat_name->length);
+ if (different)
+ {
+ print_event_info->catalog_length= cat_name->length;
+ memcpy(print_event_info->catalog_name, cat_name->str, cat_name->length);
+ if (my_b_printf(&cache, "/*!110600 SET CATALOG `%.*s` */%s\n",
+ cat_name->length, cat_name->str,
+ print_event_info->delimiter))
+ goto err;
+ }
+
if ((flags2 & FL_PREPARED_XA) && !is_flashback)
{
my_b_write_string(&cache, "XA START ");
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 22ad0a8b4a6..d3a5f616476 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -995,21 +995,13 @@ void Query_log_event::pack_info(Protocol *protocol)
{
char buf_mem[1024];
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
- buf.real_alloc(30 + ((catalog ? catalog->name.length : 0 )+ db_len)*2 + q_len);
- if (!(flags & LOG_EVENT_SUPPRESS_USE_F))
+ buf.real_alloc(9 + db_len + q_len);
+ if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
+ && db && db_len)
{
- if (catalog)
- {
- buf.append(STRING_WITH_LEN("SET CATALOG "));
- append_identifier(protocol->thd, &buf, catalog->name.str, catalog->name.length);
- buf.append(STRING_WITH_LEN("; "));
- }
- if (db && db_len)
- {
- buf.append(STRING_WITH_LEN("use "));
- append_identifier(protocol->thd, &buf, db, db_len);
- buf.append(STRING_WITH_LEN("; "));
- }
+ buf.append(STRING_WITH_LEN("use "));
+ append_identifier(protocol->thd, &buf, db, db_len);
+ buf.append(STRING_WITH_LEN("; "));
}
DBUG_ASSERT(!flags2 || flags2_inited);
@@ -1139,7 +1131,7 @@ bool Query_log_event::write()
int8store(start, (ulonglong)sql_mode);
start+= 8;
}
- store_str_with_code_and_len(&start, catalog->name.str, catalog->name.length,
+ store_str_with_code_and_len(&start, catalog, catalog_len,
(uint) Q_CATALOG_NZ_CODE);
/*
In 5.0.x where x<4 masters we used to store the end zero here. This was
@@ -1383,7 +1375,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
using_trans),
- data_buf(0), catalog(thd_arg->catalog), query(query_arg),
+ data_buf(0), catalog(thd_arg->catalog->name.str), query(query_arg),
q_len((uint32) query_length),
thread_id(thd_arg->thread_id),
/* save the original thread id; we already know the server id */
@@ -1422,6 +1414,11 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
end_time= my_time(0);
exec_time = (ulong) (end_time - thd_arg->start_time);
+ /**
+ @todo this means that if we have no catalog, then it is replicated
+ as an existing catalog of length zero. is that safe? /sven
+ */
+ catalog_len = (catalog) ? (uint32) strlen(catalog) : 0;
if (!(db= thd->db.str))
db= "";
@@ -1855,9 +1852,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
bool skip_error_check= false;
DBUG_ENTER("Query_log_event::do_apply_event");
- DBUG_ASSERT(catalog);
- if (thd->catalog != catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(catalog));
+ /*
+ Colleagues: please never free(thd->catalog) in MySQL. This would
+ lead to bugs as here thd->catalog is a part of an alloced block,
+ not an entire alloced block (see
+ Query_log_event::do_apply_event()). Same for thd->db. Thank
+ you.
+ */
rgi->start_alter_ev= this;
size_t valid_len= Well_formed_prefix(system_charset_info,
@@ -2348,11 +2349,6 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
DBUG_ASSERT(query && q_len > 0);
DBUG_ASSERT(thd == rgi->thd);
- /* Set thd to point to the current catalog */
- DBUG_ASSERT(catalog);
- if (thd->catalog != catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(catalog));
-
/*
An event skipped due to @@skip_replication must not be counted towards the
number of events to be skipped due to @@sql_slave_skip_counter.
@@ -3157,9 +3153,22 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
void
Gtid_log_event::pack_info(Protocol *protocol)
{
- char buf[6+5+10+1+10+1+20+1+4+20+1+ ser_buf_size+5 /* sprintf */];
- char *p;
- p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " :
+ static constexpr int catalog_needed= 12 + 2*MAX_CATALOG_NAME+2 + 2;
+ char buf[catalog_needed+6+5+10+1+10+1+20+1+4+20+1+
+ ser_buf_size+5 /* sprintf */];
+ char *p= buf;
+ if (flags_extra & FL_CATALOG)
+ {
+ char buf_mem[2*MAX_CATALOG_NAME+2];
+ String buf2(buf_mem, sizeof(buf_mem), system_charset_info);
+ append_identifier(protocol->thd, &buf2, cat_name->str, cat_name->length);
+ size_t buf2_len= std::min(buf2.length(), (uint32)(2*MAX_CATALOG_NAME+2));
+ DBUG_ASSERT(buf2_len == buf2.length());
+ p= strmov(p, "SET CATALOG ");
+ memcpy(p, buf2.ptr(), buf2_len);
+ p= strmov(p + buf2_len, "; ");
+ }
+ p = strmov(p, (flags2 & FL_STANDALONE ? "GTID " :
flags2 & FL_PREPARED_XA ? "XA START " : "BEGIN GTID "));
if (flags2 & FL_PREPARED_XA)
{
@@ -3215,6 +3224,29 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
return 1;
}
+ if (using_catalogs)
+ {
+ if (flags_extra & FL_CATALOG)
+ {
+ if (!thd->catalog || cmp(cat_name, &thd->catalog->name))
+ {
+ SQL_CATALOG *catalog= get_catalog(cat_name, 1);
+ if (!catalog)
+ {
+ my_error(ER_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
+ cat_name->length, cat_name->str);
+ return 1;
+ }
+ thd->change_catalog(catalog);
+ }
+ }
+ else
+ {
+ if (thd->catalog != default_catalog())
+ thd->change_catalog(default_catalog());
+ }
+ }
+
DBUG_ASSERT((bits & OPTION_GTID_BEGIN) == 0);
Master_info *mi=rgi->rli->mi;
@@ -5689,8 +5721,6 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
bool is_transactional)
: Log_event(thd, 0, is_transactional),
m_table(tbl),
- m_catalog(thd->catalog),
- m_catnam(empty_clex_str),
m_dbnam(tbl->s->db.str),
m_dblen(m_dbnam ? tbl->s->db.length : 0),
m_tblnam(tbl->s->table_name.str),
@@ -5736,11 +5766,6 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
if (tbl->triggers)
m_flags|= TM_BIT_HAS_TRIGGERS_F;
- if (m_catalog != default_catalog())
- {
- m_flags|= TM_BIT_HAS_CATALOG_F;
- m_data_size+= m_catalog->name.length+2;
- }
/* If malloc fails, caught in is_valid() */
if ((m_memory= (uchar*) my_malloc(PSI_INSTRUMENT_ME, m_colcnt, MYF(MY_WME))))
@@ -5910,8 +5935,6 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
/* Step the query id to mark what columns that are actually used. */
thd->set_query_id(next_query_id());
- if (thd->catalog != m_catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(m_catalog));
if (!(memory= my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
&table_list, (uint) sizeof(RPL_TABLE_LIST),
@@ -5938,7 +5961,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
LEX_CSTRING tmp_db_name= {db_mem, db_mem_length };
LEX_CSTRING tmp_tbl_name= {tname_mem, tname_mem_length };
- table_list->init_one_table(m_catalog, &tmp_db_name, &tmp_tbl_name, 0,
+ table_list->init_one_table(thd->catalog, &tmp_db_name, &tmp_tbl_name, 0,
TL_WRITE);
table_list->table_id= DBUG_IF("inject_tblmap_same_id_maps_diff_table") ? 0 : m_table_id;
table_list->updating= 1;
@@ -6079,13 +6102,6 @@ bool Table_map_log_event::write_data_body()
uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- {
- uchar const len[]= { (uchar) m_catalog->name.length };
- if (write_data(len, 1) ||
- write_data(m_catalog->name.str, m_catalog->name.length+1))
- return 1;
- }
return write_data(dbuf, sizeof(dbuf)) ||
write_data(m_dbnam, m_dblen+1) ||
write_data(tbuf, sizeof(tbuf)) ||
--
2.39.2
1
0

[PATCH] Catalogs: replicate single catalog from catalog-enabled master
by Kristian Nielsen 16 Oct '24
by Kristian Nielsen 16 Oct '24
16 Oct '24
This implements the basics to do replication from a single catalog on a
catalog-enabled master ("the cloud") to a (not necessarily catalog-enabled)
slave ("on-premise"). Such slave must only see its own events, eg. events
from the catalog of the connecting user.
Changes done:
- Extend the GTID event with the catalog (a follow-up patch will change
replication slave-side to use this when applying events).
- In the dump thread, when a catalog user does a binlog dump (eg.
connecting slave), filter events so only events from that catalog are
sent.
- Extend the server version of the connector library to be able to connect
to a specific catalog.
- Add a Master_catalog option to CHANGE MASTER to make a slave connect to a
user in a specific catalog.
Open issues:
- Possibly there needs to be a way for the catalog superuser to control
which catalogs allow the COM_BINLOG_DUMP command.
- In GTID replication, how in the single-catalog slave to maintain the GTID
position in domains that it does not receive events from. One option is
to update the GTID position (eg. with the GTID_LIST events) in the other
domains; this way the slave will have the correct global GTID position,
but it will not be possible to use multi-source to replicate two catalogs
individually to the same server. Another option is to binlog each catalog
in their own domain(s) and do master-side filtering on domain_id; then
the master will ignore irrelevant domains for a connecting catalog slave
(and not report "position too old" in those domains), and the slave will
only have its own domains in its GTID position.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
include/mysql.h | 7 +-
include/sql_common.h | 3 +-
mysql-test/suite/rpl/r/rpl_catalogs.result | 137 +++++++++++++++++++++
mysql-test/suite/rpl/t/rpl_catalogs.test | 119 ++++++++++++++++++
sql-common/client.c | 36 +++++-
sql/lex.h | 1 +
sql/lex_string.h | 2 +-
sql/log_event.cc | 20 +++
sql/log_event.h | 13 +-
sql/log_event_server.cc | 88 ++++++++++++-
sql/privilege.h | 7 +-
sql/rpl_mi.cc | 15 ++-
sql/rpl_mi.h | 1 +
sql/slave.cc | 3 +
sql/sql_lex.h | 4 +-
sql/sql_repl.cc | 37 ++++++
sql/sql_yacc.yy | 6 +
17 files changed, 483 insertions(+), 16 deletions(-)
create mode 100644 mysql-test/suite/rpl/r/rpl_catalogs.result
create mode 100644 mysql-test/suite/rpl/t/rpl_catalogs.test
diff --git a/include/mysql.h b/include/mysql.h
index a66dcc7bd02..44ca2099694 100644
--- a/include/mysql.h
+++ b/include/mysql.h
@@ -189,14 +189,15 @@ enum mysql_option
/* MariaDB options */
MYSQL_PROGRESS_CALLBACK=5999,
MYSQL_OPT_NONBLOCK,
- MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY
+ MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
+ MARIADB_OPT_CATALOG=7029
};
/**
@todo remove the "extension", move st_mysql_options completely
out of mysql.h
*/
-struct st_mysql_options_extention;
+struct st_mysql_options_extension;
struct st_mysql_options {
unsigned int connect_timeout, read_timeout, write_timeout;
@@ -231,7 +232,7 @@ struct st_mysql_options {
void (*local_infile_end)(void *);
int (*local_infile_error)(void *, char *, unsigned int);
void *local_infile_userdata;
- struct st_mysql_options_extention *extension;
+ struct st_mysql_options_extension *extension;
};
enum mysql_status
diff --git a/include/sql_common.h b/include/sql_common.h
index ad5ab7e19af..c261ee9fe94 100644
--- a/include/sql_common.h
+++ b/include/sql_common.h
@@ -28,7 +28,7 @@ extern const char *cant_connect_sqlstate;
extern const char *not_error_sqlstate;
-struct st_mysql_options_extention {
+struct st_mysql_options_extension {
char *plugin_dir;
char *default_auth;
char *ssl_crl; /* PEM CRL file */
@@ -41,6 +41,7 @@ struct st_mysql_options_extention {
uint proc_info_length);
HASH connection_attributes;
size_t connection_attributes_length;
+ char *catalog;
};
typedef struct st_mysql_methods
diff --git a/mysql-test/suite/rpl/r/rpl_catalogs.result b/mysql-test/suite/rpl/r/rpl_catalogs.result
new file mode 100644
index 00000000000..4b25fd20e6c
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_catalogs.result
@@ -0,0 +1,137 @@
+include/master-slave.inc
+[connection master]
+*** Create some catalogs to work with.
+connection master_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+connection slave_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+*** Replicate a few events full-catalogs -> full-catalogs
+connection master_config;
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+sync_slave_with_master slave;
+connection slave_config;
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Create a normal slave user to replicate single catalog
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+master_catalog="bar";
+connection master_config;
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw";
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG def;
+use test;
+connect con1,localhost,rpl_bar,bar_pw,bar.db2;
+SELECT * FROM t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+disconnect con1;
+connection master;
+save_master_pos;
+connection slave_config;
+include/start_slave.inc
+connection slave;
+sync_with_master;
+connection slave_config;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Clean up.
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+include/start_slave.inc
+connection master_config;
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+connection master;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_catalogs.test b/mysql-test/suite/rpl/t/rpl_catalogs.test
new file mode 100644
index 00000000000..939e68b920d
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_catalogs.test
@@ -0,0 +1,119 @@
+--source include/have_catalogs.inc
+--source include/master-slave.inc
+
+--echo *** Create some catalogs to work with.
+--connection master_config
+# ToDo: It seems CREATE CATALOG cannot replicate currently.
+# So don't binlog it for now.
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+--connection slave_config
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+
+--echo *** Replicate a few events full-catalogs -> full-catalogs
+
+--connection master_config
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+
+--sync_slave_with_master slave
+
+--connection slave_config
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+
+--echo *** Create a normal slave user to replicate single catalog
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+ master_catalog="bar";
+
+--connection master_config
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw";
+
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+
+# Do something in other catalogs, see that it is not replicated.
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+USE CATALOG def;
+use test;
+
+connect(con1,localhost,rpl_bar,bar_pw,bar.db2);
+SELECT * FROM t2 ORDER BY a;
+--disconnect con1
+
+--connection master
+--save_master_pos
+
+--connection slave_config
+--source include/start_slave.inc
+--connection slave
+--sync_with_master
+
+--connection slave_config
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+--echo *** Clean up.
+
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+--source include/start_slave.inc
+
+--connection master_config
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+
+--connection master
+--source include/rpl_end.inc
diff --git a/sql-common/client.c b/sql-common/client.c
index ba804ce2a7c..711f8733c9f 100644
--- a/sql-common/client.c
+++ b/sql-common/client.c
@@ -99,6 +99,7 @@ extern my_bool using_catalogs;
#define CONNECT_TIMEOUT 0
+#define MAX_CATALOG_NAME 65 /* Including terminating '\0' */
#include "client_settings.h"
#include <ssl_compat.h>
@@ -836,9 +837,9 @@ static int add_init_command(struct st_mysql_options *options, const char *cmd)
#define ALLOCATE_EXTENSIONS(OPTS) \
- (OPTS)->extension= (struct st_mysql_options_extention *) \
+ (OPTS)->extension= (struct st_mysql_options_extension *) \
my_malloc(key_memory_mysql_options, \
- sizeof(struct st_mysql_options_extention), \
+ sizeof(struct st_mysql_options_extension), \
MYF(MY_WME | MY_ZEROFILL)) \
@@ -2089,7 +2090,8 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
see end= buff+32 below, fixed size of the packet is 32 bytes.
+9 because data is a length encoded binary where meta data size is max 9.
*/
- buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN + connect_attrs_len + 9;
+ buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN +
+ connect_attrs_len + 9 + MAX_CATALOG_NAME;
buff= my_alloca(buff_size);
mysql->client_flag|= mysql->options.client_flag;
@@ -2122,10 +2124,19 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
if (mysql->client_flag & CLIENT_PROTOCOL_41)
{
/* 4.1 server and 4.1 client has a 32 byte option flag */
+ if (!(mysql->server_capabilities & CLIENT_MYSQL))
+ mysql->client_flag&= ~CLIENT_MYSQL;
int4store(buff,mysql->client_flag);
int4store(buff+4, net->max_packet_size);
buff[8]= (char) mysql->charset->number;
bzero(buff+9, 32-9);
+ if (!(mysql->server_capabilities & CLIENT_MYSQL))
+ {
+ /* ToDo: Should this check if the server has the catalog capability? */
+ uint client_extended_cap= mysql->options.extension->catalog ?
+ (uint)((MARIADB_CLIENT_CONNECT_CATALOG) >> 32) : (uint)0;
+ int4store(buff + 28, client_extended_cap);
+ }
end= buff+32;
}
else
@@ -2274,6 +2285,17 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
end= (char *) send_client_connect_attrs(mysql, (uchar *) end);
+ /* Add catalog */
+ if (mysql->options.extension->catalog)
+ {
+ size_t len= strlen(mysql->options.extension->catalog);
+ if (len >= MAX_CATALOG_NAME)
+ len= MAX_CATALOG_NAME - 1;
+ end= (char*)write_length_encoded_string4(
+ (uchar*)end, buff_size - (end - buff),
+ (uchar *)mysql->options.extension->catalog, len);
+ }
+
/* Write authentication package */
if (my_net_write(net, (uchar*) buff, (size_t) (end-buff)) || net_flush(net))
{
@@ -3332,6 +3354,7 @@ static void mysql_close_free_options(MYSQL *mysql)
my_free(mysql->options.extension->plugin_dir);
my_free(mysql->options.extension->default_auth);
my_hash_free(&mysql->options.extension->connection_attributes);
+ my_free(mysql->options.extension->catalog);
my_free(mysql->options.extension);
}
bzero((char*) &mysql->options,sizeof(mysql->options));
@@ -3850,9 +3873,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
break;
case MYSQL_PROGRESS_CALLBACK:
if (!mysql->options.extension)
- mysql->options.extension= (struct st_mysql_options_extention *)
+ mysql->options.extension= (struct st_mysql_options_extension *)
my_malloc(key_memory_mysql_options,
- sizeof(struct st_mysql_options_extention),
+ sizeof(struct st_mysql_options_extension),
MYF(MY_WME | MY_ZEROFILL));
if (mysql->options.extension)
mysql->options.extension->report_progress=
@@ -3918,6 +3941,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
}
}
break;
+ case MARIADB_OPT_CATALOG:
+ EXTENSION_SET_STRING(&mysql->options, catalog, arg);
+ break;
case MYSQL_SHARED_MEMORY_BASE_NAME:
default:
DBUG_RETURN(1);
diff --git a/sql/lex.h b/sql/lex.h
index bf0223ce544..9771c0e39ba 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -367,6 +367,7 @@ SYMBOL symbols[] = {
{ "LOOP", SYM(LOOP_SYM)},
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
+ { "MASTER_CATALOG", SYM(MASTER_CATALOG_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
{ "MASTER_DELAY", SYM(MASTER_DELAY_SYM)},
{ "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)},
diff --git a/sql/lex_string.h b/sql/lex_string.h
index e7a732346c4..f49e567c64c 100644
--- a/sql/lex_string.h
+++ b/sql/lex_string.h
@@ -71,7 +71,7 @@ static inline bool lex_string_cmp(CHARSET_INFO *charset, const LEX_CSTRING *a,
}
/*
- Compare to LEX_CSTRING's and return 0 if equal
+ Compare two LEX_CSTRING's and return 0 if equal
*/
static inline bool cmp(const LEX_CSTRING *a, const LEX_CSTRING *b)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 992054c7d17..004010e9d99 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -2448,6 +2448,26 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
sa_seq_no= uint8korr(buf);
buf+= 8;
}
+ if (flags_extra & FL_CATALOG)
+ {
+ if (unlikely(buf - buf_0) >= event_len)
+ {
+ seq_no= 0;
+ return;
+ }
+ uint32_t cat_len= *buf++;
+ if (unlikely(cat_len > MAX_CATALOG_NAME) ||
+ unlikely(buf - buf_0 + cat_len) >= event_len)
+ {
+ seq_no= 0;
+ return;
+ }
+ memcpy(cat_name_buf, buf, cat_len);
+ cat_name_int.str= cat_name_buf;
+ cat_name_int.length= cat_len;
+ cat_name= &cat_name_int;
+ buf+= cat_len;
+ }
}
/*
the strict '<' part of the assert corresponds to extra zero-padded
diff --git a/sql/log_event.h b/sql/log_event.h
index bd318f147d7..2f866efca7c 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3250,13 +3250,16 @@ class Gtid_log_event: public Log_event
public:
uint64 seq_no;
uint64 commit_id;
- uint32 domain_id;
uint64 sa_seq_no; // start alter identifier for CA/RA
+ const LEX_CSTRING *cat_name; // Points either to catalog object or own buffer
#ifdef MYSQL_SERVER
event_xid_t xid;
#else
event_mysql_xid_t xid;
#endif
+ LEX_CSTRING cat_name_int;
+ uint32 domain_id;
+ char cat_name_buf[MAX_CATALOG_NAME];
uchar flags2;
/*
More flags area placed after the regular flags2's area. The type
@@ -3309,11 +3312,15 @@ class Gtid_log_event: public Log_event
FL_EXTRA_MULTI_ENGINE_E1 is set for event group comprising a transaction
involving multiple storage engines. No flag and extra data are added
to the event when the transaction involves only one engine.
+
+ FL_CATALOG is set when a catalog name is included in the GTID (happens
+ when not the default catalog).
*/
static const uchar FL_EXTRA_MULTI_ENGINE_E1= 1;
static const uchar FL_START_ALTER_E1= 2;
static const uchar FL_COMMIT_ALTER_E1= 4;
static const uchar FL_ROLLBACK_ALTER_E1= 8;
+ static const uchar FL_CATALOG= 16;
#ifdef MYSQL_SERVER
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
@@ -3346,6 +3353,10 @@ class Gtid_log_event: public Log_event
enum enum_binlog_checksum_alg checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2, const Format_description_log_event *fdev);
+ static bool peek_catalog(const uchar *event_start, size_t event_len,
+ const Format_description_log_event *fdev,
+ enum enum_binlog_checksum_alg checksum_alg,
+ uchar *out_flags2, LEX_CSTRING *out_catname);
#endif
};
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index ff3a5bda8e5..22ad0a8b4a6 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -2936,6 +2936,13 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
if (extra_engines > 0)
flags_extra|= FL_EXTRA_MULTI_ENGINE_E1;
}
+ const SQL_CATALOG *cat= thd_arg->catalog;
+ if (cat != default_catalog())
+ {
+ flags_extra|= FL_CATALOG;
+ cat_name= &cat->name;
+ }
+
if (thd->get_binlog_flags_for_alter())
{
flags_extra |= thd->get_binlog_flags_for_alter();
@@ -2982,10 +2989,80 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len,
}
+/*
+ Obtain the catalog (if any) in the GTID (without constructing the full
+ object).
+
+ This is a separate function from Gtid_log_event::peek(), since this function
+ needs to do a lot of parsing of flags etc. to know where the catalog is, and
+ this overhead is not wanted in the often-used Gtid_log_event::peek(). But if
+ more peek-functionality would be needed in the future, it could make sense to
+ add it to this function which already has the parsing overhead.
+
+ Returns true if error (malformed or short event), false if ok. Returns the
+ name of the default catalog if catalog is not included explicitly in the GTID.
+
+ Note that the returned out_catname will point into the passed-in packet
+ memory, so will only be valid as long as the packet memory is!
+*/
+bool
+Gtid_log_event::peek_catalog(const uchar *event_start, size_t event_len,
+ const Format_description_log_event *fdev,
+ enum enum_binlog_checksum_alg checksum_alg,
+ uchar *out_flags2, LEX_CSTRING *out_catname)
+{
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ if (event_len > BINLOG_CHECKSUM_LEN)
+ event_len-= BINLOG_CHECKSUM_LEN;
+ else
+ event_len= 0;
+ }
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN)
+ return true;
+ const uchar *p= event_start + fdev->common_header_len;
+ const uchar *p_end= event_start + event_len;
+ uchar flags2= *out_flags2= p[12];
+ p+= 13; /* seq_no, domain_id, and flags2. */
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ p+= 8;
+ if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA))
+ {
+ if (p + 6 > p_end)
+ return true;
+ p+= 6 + p[4] + p[5];
+ }
+ uchar flags_extra;
+ if (p >= p_end || !((flags_extra= *p) & FL_CATALOG))
+ {
+ *out_catname= default_catalog_name;
+ return false;
+ }
+ ++p;
+
+ if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1)
+ ++p;
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ p+= 8;
+
+ uchar cat_len;
+ if (p >= p_end || (p + (cat_len= *p)) >= p_end)
+ return true;
+ out_catname->str= (const char *)p+1;
+ out_catname->length= cat_len;
+
+ return false;
+}
+
+
bool
Gtid_log_event::write()
{
- uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4];
+ uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+1+8+MAX_CATALOG_NAME];
size_t write_len= 13;
int8store(buf, seq_no);
@@ -3026,6 +3103,15 @@ Gtid_log_event::write()
write_len+= 8;
}
+ if (flags_extra & FL_CATALOG)
+ {
+ uint32_t cat_len= std::min(cat_name->length, (size_t)(MAX_CATALOG_NAME-1));
+ DBUG_ASSERT(cat_name->length <= MAX_CATALOG_NAME-1);
+ buf[write_len++]= cat_len;
+ memcpy(buf + write_len, cat_name->str, cat_len);
+ write_len+= cat_len;
+ }
+
if (write_len < GTID_HEADER_LEN)
{
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
diff --git a/sql/privilege.h b/sql/privilege.h
index 953ffb177c1..16d7f91f27d 100644
--- a/sql/privilege.h
+++ b/sql/privilege.h
@@ -305,7 +305,12 @@ constexpr privilege_t CATALOG_ACLS=
CATALOG_ACL |
SHUTDOWN_ACL |
CREATE_TABLESPACE_ACL |
- REPL_SLAVE_ACL |
+ /*
+ ToDo: REPL_SLAVE_ACL is needed to be able to replicate from a single
+ catalog to an on-premise slave. However, we may need a way for the catalog
+ superuser to control replication access for a catalog.
+ */
+// REPL_SLAVE_ACL |
BINLOG_ADMIN_ACL |
BINLOG_MONITOR_ACL |
// BINLOG_REPLAY_ACL |
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 3c698f27a19..6a68254052a 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -47,6 +47,7 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
{
char *tmp;
host[0] = 0; user[0] = 0; password[0] = 0;
+ catalog_name[0]= 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
ssl_crl[0]= 0; ssl_crlpath[0]= 0;
@@ -644,6 +645,17 @@ file '%s')", fname);
}
seen_ignore_domain_ids= true;
}
+ else if (got_eq && !strcmp(buf, "master_catalog"))
+ {
+ if (init_strvar_from_file(mi->catalog_name,
+ sizeof(mi->catalog_name),
+ &mi->file, ""))
+
+ {
+ sql_print_error("Failed to initialize master info do_domain_ids");
+ goto errwithmsg;
+ }
+ }
else if (!got_eq && !strcmp(buf, "END_MARKER"))
{
/*
@@ -817,6 +829,7 @@ int flush_master_info(Master_info* mi,
"using_gtid=%d\n"
"do_domain_ids=%s\n"
"ignore_domain_ids=%s\n"
+ "master_catalog=%s\n"
"END_MARKER\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
@@ -827,7 +840,7 @@ int flush_master_info(Master_info* mi,
heartbeat_buf, "", ignore_server_ids_buf,
"", 0,
mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid,
- do_domain_ids_buf, ignore_domain_ids_buf);
+ do_domain_ids_buf, ignore_domain_ids_buf, mi->catalog_name);
err= flush_io_cache(file);
if (sync_masterinfo_period && !err &&
++(mi->sync_counter) >= sync_masterinfo_period)
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 6058b7fb34c..98f26a4d050 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -213,6 +213,7 @@ class Master_info : public Slave_reporting_capability
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
char host[HOSTNAME_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
+ char catalog_name[MAX_CATALOG_NAME];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
LEX_CSTRING connection_name; /* User supplied connection name */
diff --git a/sql/slave.cc b/sql/slave.cc
index 28b183fbd69..02f58777732 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -7140,6 +7140,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr)
mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr);
+ if (mi->catalog_name[0])
+ mysql_options(mysql, MARIADB_OPT_CATALOG, mi->catalog_name);
+
/* we disallow empty users */
if (mi->user[0] == 0)
{
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index adcf45aea93..2d540c948a2 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -487,7 +487,7 @@ struct LEX_MASTER_INFO
DYNAMIC_ARRAY repl_ignore_server_ids;
DYNAMIC_ARRAY repl_do_domain_ids;
DYNAMIC_ARRAY repl_ignore_domain_ids;
- const char *host, *user, *password, *log_file_name;
+ const char *host, *catalog, *user, *password, *log_file_name;
const char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
const char *ssl_crl, *ssl_crlpath;
const char *relay_log_name;
@@ -533,7 +533,7 @@ struct LEX_MASTER_INFO
delete_dynamic(&repl_ignore_domain_ids);
}
- host= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
+ host= catalog= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
ssl_capath= ssl_cipher= ssl_crl= ssl_crlpath= relay_log_name= NULL;
pos= relay_log_pos= server_id= port= connect_retry= 0;
heartbeat_period= 0;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index dc27ab9ff8b..c3c6eadc0b3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -129,6 +129,7 @@ struct binlog_send_info {
slave_connection_state *until_gtid_state;
slave_connection_state until_gtid_state_obj;
Format_description_log_event *fdev;
+ const SQL_CATALOG *catalog_filter;
int mariadb_slave_capability;
enum_gtid_skip_type gtid_skip_group;
enum_gtid_until_state gtid_until_group;
@@ -167,6 +168,7 @@ struct binlog_send_info {
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+ catalog_filter(NULL),
gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
slave_gtid_strict_mode(false), send_fake_gtid_list(false),
@@ -459,6 +461,17 @@ inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet,
}
+static const SQL_CATALOG *get_catalog_filter(THD *thd)
+{
+ if (!using_catalogs)
+ return nullptr;
+ if ((thd->security_ctx->master_access & CATALOG_ACL) &&
+ thd->catalog == default_catalog())
+ return nullptr;
+ return thd->catalog;
+}
+
+
static user_var_entry * get_binlog_checksum_uservar(THD * thd)
{
LEX_CSTRING name= { STRING_WITH_LEN("master_binlog_checksum")};
@@ -1751,6 +1764,26 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
+ if (info->catalog_filter && event_type == GTID_EVENT)
+ {
+ uchar flags2;
+ LEX_CSTRING cat_name;
+ if (ev_offset > len || Gtid_log_event::peek_catalog(
+ (uchar*) packet->ptr()+ev_offset, len - ev_offset,
+ info->fdev, current_checksum_alg, &flags2, &cat_name))
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to read Gtid_log_event: corrupt binlog";
+ }
+
+ if (cmp(&info->catalog_filter->name, &cat_name))
+ {
+ /* Skip this event group as it doesn't match the user's catalog. */
+ info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ }
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && info->using_gtid_state)
{
@@ -2128,6 +2161,7 @@ static int init_binlog_sender(binlog_send_info *info,
/** init last pos */
info->last_pos= *pos;
+ info->catalog_filter= get_catalog_filter(thd);
info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
@@ -3729,6 +3763,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1,
"MASTER_HOST", system_charset_info) ||
+ get_string_parameter(mi->catalog_name, lex_mi->catalog,
+ sizeof(mi->catalog_name)-1, "MASTER_CATALOG",
+ system_charset_info) ||
get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1,
"MASTER_USER", system_charset_info) ||
get_string_parameter(mi->password, lex_mi->password,
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 98e18d32e0f..8e3c93cbe51 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -923,6 +923,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
%token <kwd> LOCKS_SYM
%token <kwd> LOGFILE_SYM
%token <kwd> LOGS_SYM
+%token <kwd> MASTER_CATALOG_SYM
%token <kwd> MASTER_CONNECT_RETRY_SYM
%token <kwd> MASTER_DELAY_SYM
%token <kwd> MASTER_GTID_POS_SYM
@@ -2126,6 +2127,10 @@ master_def:
{
Lex->mi.host = $3.str;
}
+ | MASTER_CATALOG_SYM '=' TEXT_STRING_sys
+ {
+ Lex->mi.catalog = $3.str;
+ }
| MASTER_USER_SYM '=' TEXT_STRING_sys
{
Lex->mi.user = $3.str;
@@ -15988,6 +15993,7 @@ keyword_sp_var_and_label:
| MASTER_HEARTBEAT_PERIOD_SYM
| MASTER_GTID_POS_SYM
| MASTER_HOST_SYM
+ | MASTER_CATALOG_SYM
| MASTER_PORT_SYM
| MASTER_LOG_FILE_SYM
| MASTER_LOG_POS_SYM
--
2.39.2
1
0