[PATCH] Catalogs: replicate single catalog from catalog-enabled master
This implements the basics to do replication from a single catalog on a catalog-enabled master ("the cloud") to a (not necessarily catalog-enabled) slave ("on-premise"). Such slave must only see its own events, eg. events from the catalog of the connecting user. Changes done: - Extend the GTID event with the catalog (a follow-up patch will change replication slave-side to use this when applying events). - In the dump thread, when a catalog user does a binlog dump (eg. connecting slave), filter events so only events from that catalog are sent. - Extend the server version of the connector library to be able to connect to a specific catalog. - Add a Master_catalog option to CHANGE MASTER to make a slave connect to a user in a specific catalog. Open issues: - Possibly there needs to be a way for the catalog superuser to control which catalogs allow the COM_BINLOG_DUMP command. - In GTID replication, how in the single-catalog slave to maintain the GTID position in domains that it does not receive events from. One option is to update the GTID position (eg. with the GTID_LIST events) in the other domains; this way the slave will have the correct global GTID position, but it will not be possible to use multi-source to replicate two catalogs individually to the same server. Another option is to binlog each catalog in their own domain(s) and do master-side filtering on domain_id; then the master will ignore irrelevant domains for a connecting catalog slave (and not report "position too old" in those domains), and the slave will only have its own domains in its GTID position. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org> --- include/mysql.h | 7 +- include/sql_common.h | 3 +- mysql-test/suite/rpl/r/rpl_catalogs.result | 137 +++++++++++++++++++++ mysql-test/suite/rpl/t/rpl_catalogs.test | 119 ++++++++++++++++++ sql-common/client.c | 36 +++++- sql/lex.h | 1 + sql/lex_string.h | 2 +- sql/log_event.cc | 20 +++ sql/log_event.h | 13 +- sql/log_event_server.cc | 88 ++++++++++++- sql/privilege.h | 7 +- sql/rpl_mi.cc | 15 ++- sql/rpl_mi.h | 1 + sql/slave.cc | 3 + sql/sql_lex.h | 4 +- sql/sql_repl.cc | 37 ++++++ sql/sql_yacc.yy | 6 + 17 files changed, 483 insertions(+), 16 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_catalogs.result create mode 100644 mysql-test/suite/rpl/t/rpl_catalogs.test diff --git a/include/mysql.h b/include/mysql.h index a66dcc7bd02..44ca2099694 100644 --- a/include/mysql.h +++ b/include/mysql.h @@ -189,14 +189,15 @@ enum mysql_option /* MariaDB options */ MYSQL_PROGRESS_CALLBACK=5999, MYSQL_OPT_NONBLOCK, - MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY + MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + MARIADB_OPT_CATALOG=7029 }; /** @todo remove the "extension", move st_mysql_options completely out of mysql.h */ -struct st_mysql_options_extention; +struct st_mysql_options_extension; struct st_mysql_options { unsigned int connect_timeout, read_timeout, write_timeout; @@ -231,7 +232,7 @@ struct st_mysql_options { void (*local_infile_end)(void *); int (*local_infile_error)(void *, char *, unsigned int); void *local_infile_userdata; - struct st_mysql_options_extention *extension; + struct st_mysql_options_extension *extension; }; enum mysql_status diff --git a/include/sql_common.h b/include/sql_common.h index ad5ab7e19af..c261ee9fe94 100644 --- a/include/sql_common.h +++ b/include/sql_common.h @@ -28,7 +28,7 @@ extern const char *cant_connect_sqlstate; extern const char *not_error_sqlstate; -struct st_mysql_options_extention { +struct st_mysql_options_extension { char *plugin_dir; char *default_auth; char *ssl_crl; /* PEM CRL file */ @@ -41,6 +41,7 @@ struct st_mysql_options_extention { uint proc_info_length); HASH connection_attributes; size_t connection_attributes_length; + char *catalog; }; typedef struct st_mysql_methods diff --git a/mysql-test/suite/rpl/r/rpl_catalogs.result b/mysql-test/suite/rpl/r/rpl_catalogs.result new file mode 100644 index 00000000000..4b25fd20e6c --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_catalogs.result @@ -0,0 +1,137 @@ +include/master-slave.inc +[connection master] +*** Create some catalogs to work with. +connection master_config; +SET SESSION sql_log_bin= 0; +CREATE CATALOG foo; +CREATE CATALOG bar; +CREATE CATALOG baz; +SET SESSION sql_log_bin= 1; +connection slave_config; +SET SESSION sql_log_bin= 0; +CREATE CATALOG foo; +CREATE CATALOG bar; +CREATE CATALOG baz; +SET SESSION sql_log_bin= 1; +*** Replicate a few events full-catalogs -> full-catalogs +connection master_config; +USE CATALOG foo; +CREATE DATABASE db1; +use db1; +CREATE TABLE t1 (a INT PRIMARY KEY); +INSERT INTO t1 VALUES (1); +USE CATALOG baz; +CREATE DATABASE db3; +use db3; +CREATE TABLE t3 (a INT PRIMARY KEY); +INSERT INTO t3 VALUES (1); +sync_slave_with_master slave; +connection slave_config; +USE CATALOG foo; +use db1; +SELECT * FROM t1 ORDER BY a; +a +1 +USE CATALOG baz; +SELECT * FROM db3.t3 ORDER BY a; +a +1 +*** Create a normal slave user to replicate single catalog +connection slave_config; +include/stop_slave.inc +CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw', +master_catalog="bar"; +connection master_config; +USE CATALOG bar; +CREATE USER rpl_bar@localhost; +GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw"; +CREATE DATABASE db2; +use db2; +CREATE TABLE t2 (a INT PRIMARY KEY, b INT); +INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5); +INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2); +USE CATALOG foo; +INSERT INTO db1.t1 VALUES (10), (20), (30); +USE CATALOG baz; +INSERT INTO db3.t3 VALUES (10), (20), (30); +USE CATALOG bar; +use db2; +UPDATE t2 SET b=a*a WHERE b IS NULL; +USE CATALOG bar; +SELECT * FROM db2.t2 ORDER BY a; +a b +1 1 +2 4 +3 9 +4 16 +5 25 +6 0 +7 1 +8 2 +USE CATALOG foo; +SELECT * FROM db1.t1 ORDER BY a; +a +1 +10 +20 +30 +USE CATALOG baz; +SELECT * FROM db3.t3 ORDER BY a; +a +1 +10 +20 +30 +USE CATALOG def; +use test; +connect con1,localhost,rpl_bar,bar_pw,bar.db2; +SELECT * FROM t2 ORDER BY a; +a b +1 1 +2 4 +3 9 +4 16 +5 25 +6 0 +7 1 +8 2 +disconnect con1; +connection master; +save_master_pos; +connection slave_config; +include/start_slave.inc +connection slave; +sync_with_master; +connection slave_config; +USE CATALOG bar; +SELECT * FROM db2.t2 ORDER BY a; +a b +1 1 +2 4 +3 9 +4 16 +5 25 +6 0 +7 1 +8 2 +USE CATALOG foo; +SELECT * FROM db1.t1 ORDER BY a; +a +1 +USE CATALOG baz; +SELECT * FROM db3.t3 ORDER BY a; +a +1 +*** Clean up. +connection slave_config; +include/stop_slave.inc +CHANGE MASTER TO master_user='root', master_password='', master_catalog=''; +include/start_slave.inc +connection master_config; +USE CATALOG def; +use test; +DROP CATALOG foo; +DROP CATALOG bar; +DROP CATALOG baz; +connection master; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_catalogs.test b/mysql-test/suite/rpl/t/rpl_catalogs.test new file mode 100644 index 00000000000..939e68b920d --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_catalogs.test @@ -0,0 +1,119 @@ +--source include/have_catalogs.inc +--source include/master-slave.inc + +--echo *** Create some catalogs to work with. +--connection master_config +# ToDo: It seems CREATE CATALOG cannot replicate currently. +# So don't binlog it for now. +SET SESSION sql_log_bin= 0; +CREATE CATALOG foo; +CREATE CATALOG bar; +CREATE CATALOG baz; +SET SESSION sql_log_bin= 1; + +--connection slave_config +SET SESSION sql_log_bin= 0; +CREATE CATALOG foo; +CREATE CATALOG bar; +CREATE CATALOG baz; +SET SESSION sql_log_bin= 1; + + +--echo *** Replicate a few events full-catalogs -> full-catalogs + +--connection master_config +USE CATALOG foo; +CREATE DATABASE db1; +use db1; +CREATE TABLE t1 (a INT PRIMARY KEY); +INSERT INTO t1 VALUES (1); + +USE CATALOG baz; +CREATE DATABASE db3; +use db3; +CREATE TABLE t3 (a INT PRIMARY KEY); +INSERT INTO t3 VALUES (1); + +--sync_slave_with_master slave + +--connection slave_config +USE CATALOG foo; +use db1; +SELECT * FROM t1 ORDER BY a; +USE CATALOG baz; +SELECT * FROM db3.t3 ORDER BY a; + + +--echo *** Create a normal slave user to replicate single catalog +--connection slave_config +--source include/stop_slave.inc +CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw', + master_catalog="bar"; + +--connection master_config +USE CATALOG bar; +CREATE USER rpl_bar@localhost; +GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw"; + +CREATE DATABASE db2; +use db2; +CREATE TABLE t2 (a INT PRIMARY KEY, b INT); +INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5); +INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2); + +# Do something in other catalogs, see that it is not replicated. +USE CATALOG foo; +INSERT INTO db1.t1 VALUES (10), (20), (30); +USE CATALOG baz; +INSERT INTO db3.t3 VALUES (10), (20), (30); + +USE CATALOG bar; +use db2; +UPDATE t2 SET b=a*a WHERE b IS NULL; + +USE CATALOG bar; +SELECT * FROM db2.t2 ORDER BY a; +USE CATALOG foo; +SELECT * FROM db1.t1 ORDER BY a; +USE CATALOG baz; +SELECT * FROM db3.t3 ORDER BY a; + +USE CATALOG def; +use test; + +connect(con1,localhost,rpl_bar,bar_pw,bar.db2); +SELECT * FROM t2 ORDER BY a; +--disconnect con1 + +--connection master +--save_master_pos + +--connection slave_config +--source include/start_slave.inc +--connection slave +--sync_with_master + +--connection slave_config +USE CATALOG bar; +SELECT * FROM db2.t2 ORDER BY a; +USE CATALOG foo; +SELECT * FROM db1.t1 ORDER BY a; +USE CATALOG baz; +SELECT * FROM db3.t3 ORDER BY a; + +--echo *** Clean up. + +--connection slave_config +--source include/stop_slave.inc +CHANGE MASTER TO master_user='root', master_password='', master_catalog=''; +--source include/start_slave.inc + +--connection master_config +USE CATALOG def; +use test; +DROP CATALOG foo; +DROP CATALOG bar; +DROP CATALOG baz; + +--connection master +--source include/rpl_end.inc diff --git a/sql-common/client.c b/sql-common/client.c index ba804ce2a7c..711f8733c9f 100644 --- a/sql-common/client.c +++ b/sql-common/client.c @@ -99,6 +99,7 @@ extern my_bool using_catalogs; #define CONNECT_TIMEOUT 0 +#define MAX_CATALOG_NAME 65 /* Including terminating '\0' */ #include "client_settings.h" #include <ssl_compat.h> @@ -836,9 +837,9 @@ static int add_init_command(struct st_mysql_options *options, const char *cmd) #define ALLOCATE_EXTENSIONS(OPTS) \ - (OPTS)->extension= (struct st_mysql_options_extention *) \ + (OPTS)->extension= (struct st_mysql_options_extension *) \ my_malloc(key_memory_mysql_options, \ - sizeof(struct st_mysql_options_extention), \ + sizeof(struct st_mysql_options_extension), \ MYF(MY_WME | MY_ZEROFILL)) \ @@ -2089,7 +2090,8 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio, see end= buff+32 below, fixed size of the packet is 32 bytes. +9 because data is a length encoded binary where meta data size is max 9. */ - buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN + connect_attrs_len + 9; + buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN + + connect_attrs_len + 9 + MAX_CATALOG_NAME; buff= my_alloca(buff_size); mysql->client_flag|= mysql->options.client_flag; @@ -2122,10 +2124,19 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio, if (mysql->client_flag & CLIENT_PROTOCOL_41) { /* 4.1 server and 4.1 client has a 32 byte option flag */ + if (!(mysql->server_capabilities & CLIENT_MYSQL)) + mysql->client_flag&= ~CLIENT_MYSQL; int4store(buff,mysql->client_flag); int4store(buff+4, net->max_packet_size); buff[8]= (char) mysql->charset->number; bzero(buff+9, 32-9); + if (!(mysql->server_capabilities & CLIENT_MYSQL)) + { + /* ToDo: Should this check if the server has the catalog capability? */ + uint client_extended_cap= mysql->options.extension->catalog ? + (uint)((MARIADB_CLIENT_CONNECT_CATALOG) >> 32) : (uint)0; + int4store(buff + 28, client_extended_cap); + } end= buff+32; } else @@ -2274,6 +2285,17 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio, end= (char *) send_client_connect_attrs(mysql, (uchar *) end); + /* Add catalog */ + if (mysql->options.extension->catalog) + { + size_t len= strlen(mysql->options.extension->catalog); + if (len >= MAX_CATALOG_NAME) + len= MAX_CATALOG_NAME - 1; + end= (char*)write_length_encoded_string4( + (uchar*)end, buff_size - (end - buff), + (uchar *)mysql->options.extension->catalog, len); + } + /* Write authentication package */ if (my_net_write(net, (uchar*) buff, (size_t) (end-buff)) || net_flush(net)) { @@ -3332,6 +3354,7 @@ static void mysql_close_free_options(MYSQL *mysql) my_free(mysql->options.extension->plugin_dir); my_free(mysql->options.extension->default_auth); my_hash_free(&mysql->options.extension->connection_attributes); + my_free(mysql->options.extension->catalog); my_free(mysql->options.extension); } bzero((char*) &mysql->options,sizeof(mysql->options)); @@ -3850,9 +3873,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg) break; case MYSQL_PROGRESS_CALLBACK: if (!mysql->options.extension) - mysql->options.extension= (struct st_mysql_options_extention *) + mysql->options.extension= (struct st_mysql_options_extension *) my_malloc(key_memory_mysql_options, - sizeof(struct st_mysql_options_extention), + sizeof(struct st_mysql_options_extension), MYF(MY_WME | MY_ZEROFILL)); if (mysql->options.extension) mysql->options.extension->report_progress= @@ -3918,6 +3941,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg) } } break; + case MARIADB_OPT_CATALOG: + EXTENSION_SET_STRING(&mysql->options, catalog, arg); + break; case MYSQL_SHARED_MEMORY_BASE_NAME: default: DBUG_RETURN(1); diff --git a/sql/lex.h b/sql/lex.h index bf0223ce544..9771c0e39ba 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -367,6 +367,7 @@ SYMBOL symbols[] = { { "LOOP", SYM(LOOP_SYM)}, { "LOW_PRIORITY", SYM(LOW_PRIORITY)}, { "MASTER", SYM(MASTER_SYM)}, + { "MASTER_CATALOG", SYM(MASTER_CATALOG_SYM)}, { "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)}, { "MASTER_DELAY", SYM(MASTER_DELAY_SYM)}, { "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)}, diff --git a/sql/lex_string.h b/sql/lex_string.h index e7a732346c4..f49e567c64c 100644 --- a/sql/lex_string.h +++ b/sql/lex_string.h @@ -71,7 +71,7 @@ static inline bool lex_string_cmp(CHARSET_INFO *charset, const LEX_CSTRING *a, } /* - Compare to LEX_CSTRING's and return 0 if equal + Compare two LEX_CSTRING's and return 0 if equal */ static inline bool cmp(const LEX_CSTRING *a, const LEX_CSTRING *b) diff --git a/sql/log_event.cc b/sql/log_event.cc index 992054c7d17..004010e9d99 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2448,6 +2448,26 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len, sa_seq_no= uint8korr(buf); buf+= 8; } + if (flags_extra & FL_CATALOG) + { + if (unlikely(buf - buf_0) >= event_len) + { + seq_no= 0; + return; + } + uint32_t cat_len= *buf++; + if (unlikely(cat_len > MAX_CATALOG_NAME) || + unlikely(buf - buf_0 + cat_len) >= event_len) + { + seq_no= 0; + return; + } + memcpy(cat_name_buf, buf, cat_len); + cat_name_int.str= cat_name_buf; + cat_name_int.length= cat_len; + cat_name= &cat_name_int; + buf+= cat_len; + } } /* the strict '<' part of the assert corresponds to extra zero-padded diff --git a/sql/log_event.h b/sql/log_event.h index bd318f147d7..2f866efca7c 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3250,13 +3250,16 @@ class Gtid_log_event: public Log_event public: uint64 seq_no; uint64 commit_id; - uint32 domain_id; uint64 sa_seq_no; // start alter identifier for CA/RA + const LEX_CSTRING *cat_name; // Points either to catalog object or own buffer #ifdef MYSQL_SERVER event_xid_t xid; #else event_mysql_xid_t xid; #endif + LEX_CSTRING cat_name_int; + uint32 domain_id; + char cat_name_buf[MAX_CATALOG_NAME]; uchar flags2; /* More flags area placed after the regular flags2's area. The type @@ -3309,11 +3312,15 @@ class Gtid_log_event: public Log_event FL_EXTRA_MULTI_ENGINE_E1 is set for event group comprising a transaction involving multiple storage engines. No flag and extra data are added to the event when the transaction involves only one engine. + + FL_CATALOG is set when a catalog name is included in the GTID (happens + when not the default catalog). */ static const uchar FL_EXTRA_MULTI_ENGINE_E1= 1; static const uchar FL_START_ALTER_E1= 2; static const uchar FL_COMMIT_ALTER_E1= 4; static const uchar FL_ROLLBACK_ALTER_E1= 8; + static const uchar FL_CATALOG= 16; #ifdef MYSQL_SERVER Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, @@ -3346,6 +3353,10 @@ class Gtid_log_event: public Log_event enum enum_binlog_checksum_alg checksum_alg, uint32 *domain_id, uint32 *server_id, uint64 *seq_no, uchar *flags2, const Format_description_log_event *fdev); + static bool peek_catalog(const uchar *event_start, size_t event_len, + const Format_description_log_event *fdev, + enum enum_binlog_checksum_alg checksum_alg, + uchar *out_flags2, LEX_CSTRING *out_catname); #endif }; diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index ff3a5bda8e5..22ad0a8b4a6 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -2936,6 +2936,13 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, if (extra_engines > 0) flags_extra|= FL_EXTRA_MULTI_ENGINE_E1; } + const SQL_CATALOG *cat= thd_arg->catalog; + if (cat != default_catalog()) + { + flags_extra|= FL_CATALOG; + cat_name= &cat->name; + } + if (thd->get_binlog_flags_for_alter()) { flags_extra |= thd->get_binlog_flags_for_alter(); @@ -2982,10 +2989,80 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len, } +/* + Obtain the catalog (if any) in the GTID (without constructing the full + object). + + This is a separate function from Gtid_log_event::peek(), since this function + needs to do a lot of parsing of flags etc. to know where the catalog is, and + this overhead is not wanted in the often-used Gtid_log_event::peek(). But if + more peek-functionality would be needed in the future, it could make sense to + add it to this function which already has the parsing overhead. + + Returns true if error (malformed or short event), false if ok. Returns the + name of the default catalog if catalog is not included explicitly in the GTID. + + Note that the returned out_catname will point into the passed-in packet + memory, so will only be valid as long as the packet memory is! +*/ +bool +Gtid_log_event::peek_catalog(const uchar *event_start, size_t event_len, + const Format_description_log_event *fdev, + enum enum_binlog_checksum_alg checksum_alg, + uchar *out_flags2, LEX_CSTRING *out_catname) +{ + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + { + if (event_len > BINLOG_CHECKSUM_LEN) + event_len-= BINLOG_CHECKSUM_LEN; + else + event_len= 0; + } + else + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_OFF); + + if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN) + return true; + const uchar *p= event_start + fdev->common_header_len; + const uchar *p_end= event_start + event_len; + uchar flags2= *out_flags2= p[12]; + p+= 13; /* seq_no, domain_id, and flags2. */ + if (flags2 & FL_GROUP_COMMIT_ID) + p+= 8; + if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA)) + { + if (p + 6 > p_end) + return true; + p+= 6 + p[4] + p[5]; + } + uchar flags_extra; + if (p >= p_end || !((flags_extra= *p) & FL_CATALOG)) + { + *out_catname= default_catalog_name; + return false; + } + ++p; + + if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1) + ++p; + if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1)) + p+= 8; + + uchar cat_len; + if (p >= p_end || (p + (cat_len= *p)) >= p_end) + return true; + out_catname->str= (const char *)p+1; + out_catname->length= cat_len; + + return false; +} + + bool Gtid_log_event::write() { - uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4]; + uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+1+8+MAX_CATALOG_NAME]; size_t write_len= 13; int8store(buf, seq_no); @@ -3026,6 +3103,15 @@ Gtid_log_event::write() write_len+= 8; } + if (flags_extra & FL_CATALOG) + { + uint32_t cat_len= std::min(cat_name->length, (size_t)(MAX_CATALOG_NAME-1)); + DBUG_ASSERT(cat_name->length <= MAX_CATALOG_NAME-1); + buf[write_len++]= cat_len; + memcpy(buf + write_len, cat_name->str, cat_len); + write_len+= cat_len; + } + if (write_len < GTID_HEADER_LEN) { bzero(buf+write_len, GTID_HEADER_LEN-write_len); diff --git a/sql/privilege.h b/sql/privilege.h index 953ffb177c1..16d7f91f27d 100644 --- a/sql/privilege.h +++ b/sql/privilege.h @@ -305,7 +305,12 @@ constexpr privilege_t CATALOG_ACLS= CATALOG_ACL | SHUTDOWN_ACL | CREATE_TABLESPACE_ACL | - REPL_SLAVE_ACL | + /* + ToDo: REPL_SLAVE_ACL is needed to be able to replicate from a single + catalog to an on-premise slave. However, we may need a way for the catalog + superuser to control replication access for a catalog. + */ +// REPL_SLAVE_ACL | BINLOG_ADMIN_ACL | BINLOG_MONITOR_ACL | // BINLOG_REPLAY_ACL | diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 3c698f27a19..6a68254052a 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -47,6 +47,7 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg, { char *tmp; host[0] = 0; user[0] = 0; password[0] = 0; + catalog_name[0]= 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; ssl_cipher[0]= 0; ssl_key[0]= 0; ssl_crl[0]= 0; ssl_crlpath[0]= 0; @@ -644,6 +645,17 @@ file '%s')", fname); } seen_ignore_domain_ids= true; } + else if (got_eq && !strcmp(buf, "master_catalog")) + { + if (init_strvar_from_file(mi->catalog_name, + sizeof(mi->catalog_name), + &mi->file, "")) + + { + sql_print_error("Failed to initialize master info do_domain_ids"); + goto errwithmsg; + } + } else if (!got_eq && !strcmp(buf, "END_MARKER")) { /* @@ -817,6 +829,7 @@ int flush_master_info(Master_info* mi, "using_gtid=%d\n" "do_domain_ids=%s\n" "ignore_domain_ids=%s\n" + "master_catalog=%s\n" "END_MARKER\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), @@ -827,7 +840,7 @@ int flush_master_info(Master_info* mi, heartbeat_buf, "", ignore_server_ids_buf, "", 0, mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid, - do_domain_ids_buf, ignore_domain_ids_buf); + do_domain_ids_buf, ignore_domain_ids_buf, mi->catalog_name); err= flush_io_cache(file); if (sync_masterinfo_period && !err && ++(mi->sync_counter) >= sync_masterinfo_period) diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 6058b7fb34c..98f26a4d050 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -213,6 +213,7 @@ class Master_info : public Slave_reporting_capability /* the variables below are needed because we can change masters on the fly */ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ char host[HOSTNAME_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1]; + char catalog_name[MAX_CATALOG_NAME]; char user[USERNAME_LENGTH+1]; char password[MAX_PASSWORD_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1]; LEX_CSTRING connection_name; /* User supplied connection name */ diff --git a/sql/slave.cc b/sql/slave.cc index 28b183fbd69..02f58777732 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -7140,6 +7140,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr) mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr); + if (mi->catalog_name[0]) + mysql_options(mysql, MARIADB_OPT_CATALOG, mi->catalog_name); + /* we disallow empty users */ if (mi->user[0] == 0) { diff --git a/sql/sql_lex.h b/sql/sql_lex.h index adcf45aea93..2d540c948a2 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -487,7 +487,7 @@ struct LEX_MASTER_INFO DYNAMIC_ARRAY repl_ignore_server_ids; DYNAMIC_ARRAY repl_do_domain_ids; DYNAMIC_ARRAY repl_ignore_domain_ids; - const char *host, *user, *password, *log_file_name; + const char *host, *catalog, *user, *password, *log_file_name; const char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher; const char *ssl_crl, *ssl_crlpath; const char *relay_log_name; @@ -533,7 +533,7 @@ struct LEX_MASTER_INFO delete_dynamic(&repl_ignore_domain_ids); } - host= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca= + host= catalog= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca= ssl_capath= ssl_cipher= ssl_crl= ssl_crlpath= relay_log_name= NULL; pos= relay_log_pos= server_id= port= connect_retry= 0; heartbeat_period= 0; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index dc27ab9ff8b..c3c6eadc0b3 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -129,6 +129,7 @@ struct binlog_send_info { slave_connection_state *until_gtid_state; slave_connection_state until_gtid_state_obj; Format_description_log_event *fdev; + const SQL_CATALOG *catalog_filter; int mariadb_slave_capability; enum_gtid_skip_type gtid_skip_group; enum_gtid_until_state gtid_until_group; @@ -167,6 +168,7 @@ struct binlog_send_info { char *lfn) : thd(thd_arg), net(&thd_arg->net), packet(packet_arg), log_file_name(lfn), until_gtid_state(NULL), fdev(NULL), + catalog_filter(NULL), gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE), flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), slave_gtid_strict_mode(false), send_fake_gtid_list(false), @@ -459,6 +461,17 @@ inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet, } +static const SQL_CATALOG *get_catalog_filter(THD *thd) +{ + if (!using_catalogs) + return nullptr; + if ((thd->security_ctx->master_access & CATALOG_ACL) && + thd->catalog == default_catalog()) + return nullptr; + return thd->catalog; +} + + static user_var_entry * get_binlog_checksum_uservar(THD * thd) { LEX_CSTRING name= { STRING_WITH_LEN("master_binlog_checksum")}; @@ -1751,6 +1764,26 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, } } + if (info->catalog_filter && event_type == GTID_EVENT) + { + uchar flags2; + LEX_CSTRING cat_name; + if (ev_offset > len || Gtid_log_event::peek_catalog( + (uchar*) packet->ptr()+ev_offset, len - ev_offset, + info->fdev, current_checksum_alg, &flags2, &cat_name)) + { + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to read Gtid_log_event: corrupt binlog"; + } + + if (cmp(&info->catalog_filter->name, &cat_name)) + { + /* Skip this event group as it doesn't match the user's catalog. */ + info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + } + } + /* Skip GTID event groups until we reach slave position within a domain_id. */ if (event_type == GTID_EVENT && info->using_gtid_state) { @@ -2128,6 +2161,7 @@ static int init_binlog_sender(binlog_send_info *info, /** init last pos */ info->last_pos= *pos; + info->catalog_filter= get_catalog_filter(thd); info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd); info->mariadb_slave_capability= get_mariadb_slave_capability(thd); info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); @@ -3729,6 +3763,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1, "MASTER_HOST", system_charset_info) || + get_string_parameter(mi->catalog_name, lex_mi->catalog, + sizeof(mi->catalog_name)-1, "MASTER_CATALOG", + system_charset_info) || get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1, "MASTER_USER", system_charset_info) || get_string_parameter(mi->password, lex_mi->password, diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 98e18d32e0f..8e3c93cbe51 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -923,6 +923,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize); %token <kwd> LOCKS_SYM %token <kwd> LOGFILE_SYM %token <kwd> LOGS_SYM +%token <kwd> MASTER_CATALOG_SYM %token <kwd> MASTER_CONNECT_RETRY_SYM %token <kwd> MASTER_DELAY_SYM %token <kwd> MASTER_GTID_POS_SYM @@ -2126,6 +2127,10 @@ master_def: { Lex->mi.host = $3.str; } + | MASTER_CATALOG_SYM '=' TEXT_STRING_sys + { + Lex->mi.catalog = $3.str; + } | MASTER_USER_SYM '=' TEXT_STRING_sys { Lex->mi.user = $3.str; @@ -15988,6 +15993,7 @@ keyword_sp_var_and_label: | MASTER_HEARTBEAT_PERIOD_SYM | MASTER_GTID_POS_SYM | MASTER_HOST_SYM + | MASTER_CATALOG_SYM | MASTER_PORT_SYM | MASTER_LOG_FILE_SYM | MASTER_LOG_POS_SYM -- 2.39.2
participants (1)
-
Kristian Nielsen