Re: [Maria-developers] [Commits] Rev 4391: MDEV-6676: Speculative parallel replication. in http://bazaar.launchpad.net/~maria-captains/maria/10.0
how about also adding a mode = 'auto' which currently can be == transaction, but in future could be extended to based on statistics flip actual modes back and forth ? /Jonas On Tue, Oct 7, 2014 at 4:20 PM, <knielsen@knielsen-hq.org> wrote:
At http://bazaar.launchpad.net/~maria-captains/maria/10.0
------------------------------------------------------------ revno: 4391 revision-id: knielsen@knielsen-hq.org-20141007142037-y85xq099ltdiqno6 parent: knielsen@knielsen-hq.org-20141003094323-nosx0y7vv0kjv20e committer: Kristian Nielsen <knielsen@knielsen-hq.org> branch nick: work-10.0-mdev6676 timestamp: Tue 2014-10-07 16:20:37 +0200 message: MDEV-6676: Speculative parallel replication.
Better syntax for configuring the parallel mode. Now it is a CHANGE MASTER option instead of a system variable, which makes it possible to configure differently for different multi-source replication masters.
Now also the domain-based replication mode can be disabled. === modified file 'mysql-test/include/check-testcase.test' --- a/mysql-test/include/check-testcase.test 2013-12-16 12:02:21 +0000 +++ b/mysql-test/include/check-testcase.test 2014-10-07 14:20:37 +0000 @@ -64,6 +64,7 @@ if ($tmp) --echo Master_SSL_Crlpath # --echo Using_Gtid No --echo Gtid_IO_Pos # + --echo Parallel_Mode domain,groupcommit } if (!$tmp) { # Note: after WL#5177, fields 13-18 shall not be filtered-out.
=== modified file 'mysql-test/r/mysqld--help.result' --- a/mysql-test/r/mysqld--help.result 2014-09-19 13:25:37 +0000 +++ b/mysql-test/r/mysqld--help.result 2014-10-07 14:20:37 +0000 @@ -915,17 +915,6 @@ parallel replication thread when reading ahead in the relay log looking for opportunities for parallel replication. Only used when --slave-parallel-threads > 0. - --slave-parallel-mode=name - Controls what transactions are applied in parallel when - using --slave-parallel-threads. Syntax: - slave_paralle_mode=value[,value...], where "value" could - be one or more of: "domain", to apply different - replication domains in parallel; "groupcommit", to apply - in parallel transactions that group-committed together on - the master; "transactional", to optimistically try to - apply all transactional DML in parallel; and "waiting" to - extend "transactional" to even transactions that had to - wait on the master. --slave-parallel-threads=# If non-zero, number of threads to spawn to apply in parallel events on the slave that were group-committed on @@ -1320,7 +1309,6 @@ slave-exec-mode STRICT slave-max-allowed-packet 1073741824 slave-net-timeout 3600 slave-parallel-max-queued 131072 -slave-parallel-mode domain,groupcommit slave-parallel-threads 0 slave-skip-errors (No default value) slave-sql-verify-checksum TRUE
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result' --- a/mysql-test/suite/rpl/r/rpl_parallel.result 2014-08-15 09:31:13 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel.result 2014-10-07 14:20:37 +0000 @@ -1,5 +1,5 @@ -include/rpl_init.inc [topology=1->2] -SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/master-slave.inc +[connection master] SET GLOBAL slave_parallel_threads=10; ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first include/stop_slave.inc @@ -923,8 +923,79 @@ SELECT * FROM t2 WHERE a >= 30 ORDER BY 32 33 34 +*** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx *** +Parallel_Mode = 'domain,groupcommit' include/stop_slave.inc -SET GLOBAL slave_parallel_threads=@old_parallel_threads; +CHANGE MASTER TO parallel_mode=(,domain,groupcommit); +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'domain,groupcommit)' at line 1 +CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional); +ERROR HY000: Invalid use of 'transactional' option for PARALLEL_MODE in CHANGE MASTER TO +CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting); +ERROR HY000: Invalid use of 'waiting' option for PARALLEL_MODE in CHANGE MASTER TO +CHANGE MASTER TO parallel_mode=(domain,domain); +ERROR HY000: Invalid use of 'domain' option for PARALLEL_MODE in CHANGE MASTER TO +CHANGE MASTER TO parallel_mode=(waiting,transactional,domain); +Parallel_Mode = 'domain,transactional,waiting' +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +Parallel_Mode = 'domain,groupcommit' +*** MDEV-6676 - test that empty parallel_mode does not replicate in parallel *** +INSERT INTO t2 VALUES (40); +include/save_master_gtid.inc +CHANGE MASTER TO parallel_mode=(); +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply"; +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +a +40 +include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +*** MDEV-6676 - test disabling domain-based parallel replication *** +SET gtid_domain_id = 1; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +DELETE FROM t2 WHERE a >= 41; +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +SET gtid_domain_id = 0; +include/save_master_gtid.inc +CHANGE MASTER TO parallel_mode=(groupcommit); +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +a +40 +41 +42 +43 +44 +45 +46 +include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +include/start_slave.inc +*** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart *** +Parallel_Mode = 'domain,groupcommit' +include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(transactional,waiting); +Parallel_Mode = 'transactional,waiting' +include/start_slave.inc +include/rpl_restart_server.inc [server_number=2] +Parallel_Mode = 'transactional,waiting' +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +include/start_slave.inc +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=0; include/start_slave.inc SET DEBUG_SYNC= 'RESET'; DROP function foo;
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic.result' --- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-10-07 14:20:37 +0000 @@ -2,11 +2,9 @@ include/rpl_init.inc [topology=1->2] ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; include/stop_slave.inc SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional); INSERT INTO t1 VALUES(1,1); BEGIN; INSERT INTO t1 VALUES(2,1); @@ -288,8 +286,8 @@ SET GLOBAL binlog_format= @old_format; SET GLOBAL tx_isolation= @old_isolation; include/start_slave.inc include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; include/start_slave.inc DROP TABLE t1, t2, t3; include/rpl_end.inc
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result' --- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result 2014-10-07 14:20:37 +0000 @@ -6,12 +6,10 @@ INSERT INTO t1 VALUES (1,0), (2,0), (3,0 INSERT INTO t2 VALUES (1,0), (2,0); SET @old_isolation= @@GLOBAL.tx_isolation; SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; include/stop_slave.inc SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED; SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional,waiting"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting); *** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave *** INSERT INTO t1 SELECT 4, COUNT(*) FROM t2; INSERT INTO t2 SELECT 4, COUNT(*) FROM t1; @@ -78,8 +76,8 @@ a b 10 10 include/stop_slave.inc SET GLOBAL tx_isolation= @old_isolation; +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; include/start_slave.inc DROP TABLE t1, t2; include/rpl_end.inc
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test' --- a/mysql-test/suite/rpl/t/rpl_parallel.test 2014-08-15 09:31:13 +0000 +++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2014-10-07 14:20:37 +0000 @@ -1,13 +1,12 @@ --source include/have_innodb.inc --source include/have_debug.inc --source include/have_debug_sync.inc ---let $rpl_topology=1->2 ---source include/rpl_init.inc +--source include/master-slave.inc
# Test various aspects of parallel replication.
--connection server_2 -SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--let $old_parallel_threads=`SELECT @@GLOBAL.slave_parallel_threads` --error ER_SLAVE_MUST_STOP SET GLOBAL slave_parallel_threads=10; --source include/stop_slave.inc @@ -1466,9 +1465,100 @@ SET sql_slave_skip_counter= 1; SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+--echo *** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx *** +--connection server_2 + +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +--source include/stop_slave.inc +--error ER_PARSE_ERROR +CHANGE MASTER TO parallel_mode=(,domain,groupcommit); +--error ER_INVALID_SLAVE_PARALLEL_MODE +CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional); +--error ER_INVALID_SLAVE_PARALLEL_MODE +CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting); +--error ER_INVALID_SLAVE_PARALLEL_MODE +CHANGE MASTER TO parallel_mode=(domain,domain); +CHANGE MASTER TO parallel_mode=(waiting,transactional,domain); +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc + + +--echo *** MDEV-6676 - test that empty parallel_mode does not replicate in parallel *** +--connection server_1 +INSERT INTO t2 VALUES (40); +--source include/save_master_gtid.inc + +--connection server_2 +CHANGE MASTER TO parallel_mode=(); +# Test that we do not use parallel apply, by injecting an unconditional +# crash in the parallel apply code. +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply"; +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +--source include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; + + +--echo *** MDEV-6676 - test disabling domain-based parallel replication *** +--connection server_1 +# Let's do a bunch of transactions that will conflict if run out-of-order in +# domain-based parallel replication mode. +SET gtid_domain_id = 1; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +DELETE FROM t2 WHERE a >= 41; +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +SET gtid_domain_id = 0; +--source include/save_master_gtid.inc +--connection server_2 +CHANGE MASTER TO parallel_mode=(groupcommit); +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +--source include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +--source include/start_slave.inc + + +--echo *** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart *** +--connection server_2 +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +--source include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(transactional,waiting); +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +--source include/start_slave.inc + +--let $rpl_server_number= 2 +--source include/rpl_restart_server.inc + +--connection server_2 +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +--source include/start_slave.inc + + --connection server_2 --source include/stop_slave.inc -SET GLOBAL slave_parallel_threads=@old_parallel_threads; +eval SET GLOBAL slave_parallel_threads=$old_parallel_threads; --source include/start_slave.inc SET DEBUG_SYNC= 'RESET';
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic.test' --- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-10-07 14:20:37 +0000 @@ -11,11 +11,9 @@ CREATE TABLE t1 (a int PRIMARY KEY, b IN --connection server_2 --sync_with_master SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
--connection server_1 @@ -309,8 +307,8 @@ SET GLOBAL tx_isolation= @old_isolation;
--connection server_2 --source include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; --source include/start_slave.inc
--connection server_1
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test' --- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test 2014-10-07 14:20:37 +0000 @@ -16,12 +16,10 @@ INSERT INTO t2 VALUES (1,0), (2,0); --sync_with_master SET @old_isolation= @@GLOBAL.tx_isolation; SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; --source include/stop_slave.inc SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED; SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional,waiting"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
--echo *** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave *** @@ -63,8 +61,8 @@ SELECT * FROM t2 ORDER BY a; --connection server_2 --source include/stop_slave.inc SET GLOBAL tx_isolation= @old_isolation; +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; --source include/start_slave.inc
--connection server_1
=== modified file 'sql/lex.h' --- a/sql/lex.h 2014-02-02 09:00:36 +0000 +++ b/sql/lex.h 2014-10-07 14:20:37 +0000 @@ -194,6 +194,7 @@ static SYMBOL symbols[] = { { "DISTINCTROW", SYM(DISTINCT)}, /* Access likes this */ { "DIV", SYM(DIV_SYM)}, { "DO", SYM(DO_SYM)}, + { "DOMAIN", SYM(DOMAIN_SYM)}, { "DOUBLE", SYM(DOUBLE_SYM)}, { "DROP", SYM(DROP)}, { "DUAL", SYM(DUAL_SYM)}, @@ -257,6 +258,7 @@ static SYMBOL symbols[] = { { "GRANT", SYM(GRANT)}, { "GRANTS", SYM(GRANTS)}, { "GROUP", SYM(GROUP_SYM)}, + { "GROUPCOMMIT", SYM(GROUPCOMMIT_SYM)}, { "HANDLER", SYM(HANDLER_SYM)}, { "HARD", SYM(HARD_SYM)}, { "HASH", SYM(HASH_SYM)}, @@ -429,6 +431,7 @@ static SYMBOL symbols[] = { { "PACK_KEYS", SYM(PACK_KEYS_SYM)}, { "PAGE", SYM(PAGE_SYM)}, { "PAGE_CHECKSUM", SYM(PAGE_CHECKSUM_SYM)}, + { "PARALLEL_MODE", SYM(PARALLEL_MODE_SYM)}, { "PARSER", SYM(PARSER_SYM)}, { "PARSE_VCOL_EXPR", SYM(PARSE_VCOL_EXPR_SYM)}, { "PARTIAL", SYM(PARTIAL)}, @@ -650,6 +653,7 @@ static SYMBOL symbols[] = { { "VIEW", SYM(VIEW_SYM)}, { "VIRTUAL", SYM(VIRTUAL_SYM)}, { "WAIT", SYM(WAIT_SYM)}, + { "WAITING", SYM(WAITING_SYM)}, { "WARNINGS", SYM(WARNINGS)}, { "WEEK", SYM(WEEK_SYM)}, { "WEIGHT_STRING", SYM(WEIGHT_STRING_SYM)},
=== modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2014-09-25 13:47:58 +0000 +++ b/sql/log_event.cc 2014-10-07 14:20:37 +0000 @@ -12728,7 +12728,7 @@ bool rpl_get_position_info(const char ** return FALSE; #else const Relay_log_info *rli= &(active_mi->rli); - if (opt_slave_parallel_threads == 0) + if (!rli->mi->using_parallel()) { *log_file_name= rli->group_master_log_name; *log_pos= rli->group_master_log_pos +
=== modified file 'sql/mysqld.cc' --- a/sql/mysqld.cc 2014-09-26 10:37:19 +0000 +++ b/sql/mysqld.cc 2014-10-07 14:20:37 +0000 @@ -562,8 +562,6 @@ ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0; ulong opt_slave_domain_parallel_threads= 0; -ulonglong opt_slave_parallel_mode= - SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_GROUPCOMMIT; ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_usec= 0; ulong opt_slave_parallel_max_queued= 131072;
=== modified file 'sql/mysqld.h' --- a/sql/mysqld.h 2014-09-26 10:37:19 +0000 +++ b/sql/mysqld.h 2014-10-07 14:20:37 +0000 @@ -187,7 +187,6 @@ extern ulong stored_program_cache_size; extern ulong opt_slave_parallel_threads; extern ulong opt_slave_domain_parallel_threads; extern ulong opt_slave_parallel_max_queued; -extern ulonglong opt_slave_parallel_mode; extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_usec; extern my_bool opt_gtid_ignore_duplicates;
=== modified file 'sql/rpl_mi.cc' --- a/sql/rpl_mi.cc 2014-09-02 12:07:01 +0000 +++ b/sql/rpl_mi.cc 2014-10-07 14:20:37 +0000 @@ -40,7 +40,8 @@ Master_info::Master_info(LEX_STRING *con heartbeat_period(0), received_heartbeats(0), master_id(0), prev_master_id(0), using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), - gtid_reconnect_event_skip_count(0), gtid_event_seen(false) + gtid_reconnect_event_skip_count(0), gtid_event_seen(false), + parallel_mode(SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -178,6 +179,7 @@ void init_master_log_pos(Master_info* mi mi->events_queued_since_last_gtid= 0; mi->gtid_reconnect_event_skip_count= 0; mi->gtid_event_seen= false; + mi->parallel_mode= SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT;
/* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; @@ -514,6 +516,22 @@ file '%s')", fname); else mi->using_gtid= Master_info::USE_GTID_NO; } + else if (!strncmp(buf, STRING_WITH_LEN("parallel_mode="))) + { + int val= atoi(buf + (sizeof("parallel_mode=") - 1)); + mi->parallel_mode= val & (SLAVE_PARALLEL_DOMAIN | + SLAVE_PARALLEL_GROUPCOMMIT | + SLAVE_PARALLEL_TRX | + SLAVE_PARALLEL_WAITING); + } + else if (!strncmp(buf, STRING_WITH_LEN("END_MARKER"))) + { + /* + Guard agaist extra left-overs at the end of file, in case a later + update causes the file to shrink compared to earlier contents. + */ + break; + } } } } @@ -657,7 +675,9 @@ int flush_master_info(Master_info* mi, my_b_printf(file,
"%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n" "\n\n\n\n\n\n\n\n\n\n\n" - "using_gtid=%d\n", + "using_gtid=%d\n" + "parallel_mode=%u\n" + "END_MARKER\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, @@ -666,7 +686,7 @@ int flush_master_info(Master_info* mi, mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, heartbeat_buf, "", ignore_server_ids_buf, "", 0, - mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid); + mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid, mi->parallel_mode); my_free(ignore_server_ids_buf); err= flush_io_cache(file); if (sync_masterinfo_period && !err &&
=== modified file 'sql/rpl_mi.h' --- a/sql/rpl_mi.h 2014-09-02 12:07:01 +0000 +++ b/sql/rpl_mi.h 2014-10-07 14:20:37 +0000 @@ -75,6 +75,10 @@ class Master_info : public Slave_reporti return connection_name.str == 0; } static const char *using_gtid_astext(enum enum_using_gtid arg); + bool using_parallel() + { + return opt_slave_parallel_threads > 0 && parallel_mode != 0; + }
/* the variables below are needed because we can change masters on the fly */ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ @@ -178,6 +182,12 @@ class Master_info : public Slave_reporti uint64 gtid_reconnect_event_skip_count; /* gtid_event_seen is false until we receive first GTID event from master. */ bool gtid_event_seen; + /* + The parallel replication modes, if any. A combination (binary OR) of any of + SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, SLAVE_PARALLEL_TRX, and + SLAVE_PARALLEL_WAITING. + */ + uint32 parallel_mode; }; int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname,
=== modified file 'sql/rpl_parallel.cc' --- a/sql/rpl_parallel.cc 2014-10-03 09:43:23 +0000 +++ b/sql/rpl_parallel.cc 2014-10-07 14:20:37 +0000 @@ -1891,6 +1891,7 @@ rpl_parallel::do_event(rpl_group_info *s bool did_enter_cond= false; PSI_stage_info old_stage;
+ DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE();); /* Handle master log name change, seen in Rotate_log_event. */ typ= ev->get_type_code(); if (unlikely(typ == ROTATE_EVENT)) @@ -1971,7 +1972,8 @@ rpl_parallel::do_event(rpl_group_info *s if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || + !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ? 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { @@ -2012,7 +2014,7 @@ rpl_parallel::do_event(rpl_group_info *s { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); bool new_gco; - ulonglong mode= opt_slave_parallel_mode; + ulonglong mode= rli->mi->parallel_mode; uchar gtid_flags= gtid_ev->flags2; group_commit_orderer *gco; uint8 force_switch_flag;
=== modified file 'sql/rpl_parallel.h' --- a/sql/rpl_parallel.h 2014-10-03 09:43:23 +0000 +++ b/sql/rpl_parallel.h 2014-10-07 14:20:37 +0000 @@ -12,7 +12,11 @@ class Relay_log_info; struct inuse_relaylog;
-/* Bit masks for the values in --slave-parallel-mode. */ +/* + Bit masks for the values in --slave-parallel-mode. + Note that these values cannot be changed - they are stored in master.info, + so need to be possible to read back in a different version of the server. +*/ #define SLAVE_PARALLEL_DOMAIN (1ULL << 0) #define SLAVE_PARALLEL_GROUPCOMMIT (1ULL << 1) #define SLAVE_PARALLEL_TRX (1ULL << 2)
=== modified file 'sql/share/errmsg-utf8.txt' --- a/sql/share/errmsg-utf8.txt 2014-07-08 17:38:26 +0000 +++ b/sql/share/errmsg-utf8.txt 2014-10-07 14:20:37 +0000 @@ -7111,3 +7111,5 @@ ER_SLAVE_SKIP_NOT_IN_GTID eng "When using GTID, @@sql_slave_skip_counter can not be used. Instead, setting @@gtid_slave_pos explicitly can be used to skip to after a given GTID position." ER_TABLE_DEFINITION_TOO_BIG eng "The definition for table %`s is too big" +ER_INVALID_SLAVE_PARALLEL_MODE + eng "Invalid use of '%s' option for PARALLEL_MODE in CHANGE MASTER TO"
=== modified file 'sql/slave.cc' --- a/sql/slave.cc 2014-09-02 12:07:01 +0000 +++ b/sql/slave.cc 2014-10-07 14:20:37 +0000 @@ -625,8 +625,7 @@ int terminate_slave_threads(Master_info* if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { DBUG_PRINT("info",("Terminating SQL thread")); - if (opt_slave_parallel_threads > 0 && - mi->rli.abort_slave && mi->rli.stop_for_until) + if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until) { mi->rli.stop_for_until= false; mi->rli.parallel.stop_during_until(); @@ -2576,6 +2575,8 @@ static bool send_show_master_info_header field_list.push_back(new Item_empty_string("Using_Gtid", sizeof("Current_Pos")-1)); field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30)); + field_list.push_back(new Item_empty_string("Parallel_Mode", + sizeof("domain,groupcommit,transactional,waiting")-1)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2708,8 +2709,7 @@ static bool send_show_master_info_data(T else { idle= mi->rli.sql_thread_caught_up; - if (opt_slave_parallel_threads > 0 && idle && - !mi->rli.parallel.workers_idle()) + if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle()) idle= false; } if (idle) @@ -2786,13 +2786,34 @@ static bool send_show_master_info_data(T protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath protocol->store(mi->ssl_capath, &my_charset_bin); + // Using_Gtid protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); + // Gtid_IO_Pos { char buff[30]; String tmp(buff, sizeof(buff), system_charset_info); mi->gtid_current_pos.to_string(&tmp); protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin); } + // Parallel_Mode + { + /* Note how sizeof("domain") has room for "domain," due to traling 0. */ + char buf[sizeof("domain") + sizeof("groupcommit") + + sizeof("transactional") + sizeof("waiting") + 1]; + char *p= buf; + uint32 mode= mi->parallel_mode; + if (mode & SLAVE_PARALLEL_DOMAIN) + p= strmov(p, "domain,"); + if (mode & SLAVE_PARALLEL_GROUPCOMMIT) + p= strmov(p, "groupcommit,"); + if (mode & SLAVE_PARALLEL_TRX) + p= strmov(p, "transactional,"); + if (mode & SLAVE_PARALLEL_WAITING) + p= strmov(p, "waiting,"); + if (p != buf) + --p; // Discard last ',' + protocol->store(buf, p-buf, &my_charset_bin); + } if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3521,7 +3542,7 @@ static int exec_relay_log_event(THD* thd
update_state_of_relay_log(rli, ev);
- if (opt_slave_parallel_threads > 0) + if (rli->mi->using_parallel()) { int res= rli->parallel.do_event(serial_rgi, ev, event_size); if (res >= 0) @@ -4667,7 +4688,7 @@ log '%s' at position %s, relay log '%s' } }
- if (opt_slave_parallel_threads > 0) + if (mi->using_parallel()) rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */ @@ -4693,7 +4714,7 @@ log '%s' at position %s, relay log '%s' (We want the first one to be before the printout of stop position to get the correct position printed.) */ - if (opt_slave_parallel_threads > 0) + if (mi->using_parallel()) rli->parallel.wait_for_done(thd, rli);
/* @@ -6314,7 +6335,7 @@ static Log_event* next_event(rpl_group_i llstr(my_b_tell(cur_log),llbuf1), llstr(rli->event_relay_log_pos,llbuf2))); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(opt_slave_parallel_threads > 0 || + DBUG_ASSERT(rli->mi->using_parallel() || my_b_tell(cur_log) == rli->event_relay_log_pos); } #endif
=== modified file 'sql/sql_lex.h' --- a/sql/sql_lex.h 2014-08-07 16:06:56 +0000 +++ b/sql/sql_lex.h 2014-10-07 14:20:37 +0000 @@ -218,11 +218,18 @@ struct LEX_MASTER_INFO uint port, connect_retry; float heartbeat_period; /* + Modes of parallel replication enabled, if any. A combination (binary OR) of + any of SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, + SLAVE_PARALLEL_TRX, and SLAVE_PARALLEL_WAITING. + */ + uint32 repl_parallel_mode; + /* Enum is used for making it possible to detect if the user changed variable or if it should be left at old value */ enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} - ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt; + ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt, + repl_parallel_mode_opt; enum { LEX_GTID_UNCHANGED, LEX_GTID_NO, LEX_GTID_CURRENT_POS, LEX_GTID_SLAVE_POS } use_gtid_opt; @@ -241,10 +248,11 @@ struct LEX_MASTER_INFO pos= relay_log_pos= server_id= port= connect_retry= 0; heartbeat_period= 0; ssl= ssl_verify_server_cert= heartbeat_opt= - repl_ignore_server_ids_opt= LEX_MI_UNCHANGED; + repl_ignore_server_ids_opt= repl_parallel_mode_opt= LEX_MI_UNCHANGED; gtid_pos_str.length= 0; gtid_pos_str.str= NULL; use_gtid_opt= LEX_GTID_UNCHANGED; + repl_parallel_mode= 0; } };
=== modified file 'sql/sql_repl.cc' --- a/sql/sql_repl.cc 2014-08-07 16:06:56 +0000 +++ b/sql/sql_repl.cc 2014-10-07 14:20:37 +0000 @@ -3434,6 +3434,9 @@ bool change_master(THD* thd, Master_info lex_mi->relay_log_name || lex_mi->relay_log_pos) mi->using_gtid= Master_info::USE_GTID_NO;
+ if (lex_mi->repl_parallel_mode_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->parallel_mode= lex_mi->repl_parallel_mode; + /* If user did specify neither host nor port nor any log name nor any log pos, i.e. he specified only user/password/master_connect_retry, he probably
=== modified file 'sql/sql_yacc.yy' --- a/sql/sql_yacc.yy 2014-09-03 14:24:31 +0000 +++ b/sql/sql_yacc.yy 2014-10-07 14:20:37 +0000 @@ -1130,6 +1130,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token DIV_SYM %token DOUBLE_SYM /* SQL-2003-R */ %token DO_SYM +%token DOMAIN_SYM %token DROP /* SQL-2003-R */ %token DUAL_SYM %token DUMPFILE @@ -1194,6 +1195,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token GRANT /* SQL-2003-R */ %token GRANTS %token GROUP_SYM /* SQL-2003-R */ +%token GROUPCOMMIT_SYM %token GROUP_CONCAT_SYM %token GT_SYM /* OPERATOR */ %token HANDLER_SYM @@ -1376,6 +1378,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token PACK_KEYS_SYM %token PAGE_SYM %token PAGE_CHECKSUM_SYM +%token PARALLEL_MODE_SYM %token PARAM_MARKER %token PARSER_SYM %token PARSE_VCOL_EXPR_SYM @@ -1604,6 +1607,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token VIEW_SYM /* SQL-2003-N */ %token VIRTUAL_SYM %token WAIT_SYM +%token WAITING_SYM %token WARNINGS %token WEEK_SYM %token WEIGHT_STRING_SYM @@ -2244,6 +2248,14 @@ rule: <-- starts at col 1 { Lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_ENABLE; } + | PARALLEL_MODE_SYM EQ '(' ')' + { + Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_DISABLE; + } + | PARALLEL_MODE_SYM EQ '(' parallel_mode_list ')' + { + Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_ENABLE; + } | master_file_def ; @@ -2260,6 +2272,52 @@ rule: <-- starts at col 1 insert_dynamic(&Lex->mi.repl_ignore_server_ids, (uchar*) &($1)); }
+parallel_mode_list: + parallel_mode_option + | parallel_mode_list ',' parallel_mode_option + ; + +parallel_mode_option: + DOMAIN_SYM + { + if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_DOMAIN) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "domain"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_DOMAIN; + } + | GROUPCOMMIT_SYM + { + if (Lex->mi.repl_parallel_mode & + (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX)) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "groupcommit"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_GROUPCOMMIT; + } + | TRANSACTIONAL_SYM + { + if (Lex->mi.repl_parallel_mode & + (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX)) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "transactional"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_TRX; + } + | WAITING_SYM + { + if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_WAITING) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "waiting"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_WAITING; + } + + master_file_def: MASTER_LOG_FILE_SYM EQ TEXT_STRING_sys { @@ -14098,6 +14156,7 @@ user: user_maybe_role | DISABLE_SYM {} | DISCARD {} | DISK_SYM {} + | DOMAIN_SYM {} | DUMPFILE {} | DUPLICATE_SYM {} | DYNAMIC_SYM {} @@ -14131,6 +14190,7 @@ user: user_maybe_role | GET_FORMAT {} | GRANTS {} | GLOBAL_SYM {} + | GROUPCOMMIT_SYM {} | HASH_SYM {} | HARD_SYM {} | HOSTS_SYM {} @@ -14221,6 +14281,7 @@ user: user_maybe_role | ONLY_SYM {} | PACK_KEYS_SYM {} | PAGE_SYM {} + | PARALLEL_MODE_SYM {} | PARTIAL {} | PARTITIONING_SYM {} | PARTITIONS_SYM {} @@ -14336,6 +14397,7 @@ user: user_maybe_role | VALUE_SYM {} | WARNINGS {} | WAIT_SYM {} + | WAITING_SYM {} | WEEK_SYM {} | WEIGHT_STRING_SYM {} | WORK_SYM {}
=== modified file 'sql/sys_vars.cc' --- a/sql/sys_vars.cc 2014-09-19 13:25:37 +0000 +++ b/sql/sys_vars.cc 2014-10-07 14:20:37 +0000 @@ -1834,31 +1834,11 @@ static Sys_var_ulong Sys_slave_parallel_ VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));
-static const char *slave_parallel_mode_names[] = { - "domain", "groupcommit", "transactional", "waiting", NULL -}; - -static Sys_var_set Sys_slave_parallel_mode( - "slave_parallel_mode", - "Controls what transactions are applied in parallel when using " - "--slave-parallel-threads. Syntax: slave_paralle_mode=value[,value...], " - "where \"value\" could be one or more of: \"domain\", to apply " - "different replication domains in parallel; \"groupcommit\", to apply " - "in parallel transactions that group-committed together on the master; " - "\"transactional\", to optimistically try to apply all transactional " - "DML in parallel; and \"waiting\" to extend \"transactional\" to " - "even transactions that had to wait on the master.", - GLOBAL_VAR(opt_slave_parallel_mode), CMD_LINE(REQUIRED_ARG), - slave_parallel_mode_names, - DEFAULT(SLAVE_PARALLEL_DOMAIN | - SLAVE_PARALLEL_GROUPCOMMIT)); - - static Sys_var_bit Sys_replicate_allow_parallel( "replicate_allow_parallel", "If set when a transaction is written to the binlog, that transaction " - "is allowed to replicate in parallel on a slave where " - "slave_parallel_mode is set to \"transactional\". Can be cleared for " + "is allowed to replicate in parallel on a slave where parallel_mode " + "is set to \"transactional\" (in CHANGE MASTER). Can be cleared for " "transactions that are likely to cause a conflict if replicated in " "parallel, to avoid unnecessary rollback and retry.", SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_ALLOW_PARALLEL,
_______________________________________________ commits mailing list commits@mariadb.org https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
Jonas Oreland <jonaso@google.com> writes:
how about also adding a mode = 'auto' which currently can be == transaction, but in future could be extended to based on statistics flip actual modes back and forth ?
Hm. I like the idea that user could set parallel_mode=auto, and then the code would do a best-effort to replicate in parallel in the best way possible. And only if that turns out to not be sufficient, can the user read up on the more detailed options and try to configure them better manually. So as you say, "auto" could work like "transactional" in the first version, since it should be safe in the sense that it is always correct. And then we could refine it with some statistics. For example, we can monitor retries/transaction, and if it becomes > 0.05 or whatever, we go to "groupcommit" mode for a while. Or if we see a lot (say >10%) of DDL or MyISAM. I guess "auto" should be mutually exclusive with "groupcommit", "transactional", and "waiting". While "domain" is kind of orthogonal, and not safe to enable automatically. Thanks for the suggestion, I will keep it in mind and hopefully implement it. Probably even the two heuristics mentioned above could be implemented easily. - Kristian.
participants (2)
-
Jonas Oreland
-
Kristian Nielsen