how about also adding a mode = 'auto' which currently can be == transaction, but in future could be extended to based on statistics flip actual modes back and forth ? /Jonas On Tue, Oct 7, 2014 at 4:20 PM, <knielsen@knielsen-hq.org> wrote:
At http://bazaar.launchpad.net/~maria-captains/maria/10.0
------------------------------------------------------------ revno: 4391 revision-id: knielsen@knielsen-hq.org-20141007142037-y85xq099ltdiqno6 parent: knielsen@knielsen-hq.org-20141003094323-nosx0y7vv0kjv20e committer: Kristian Nielsen <knielsen@knielsen-hq.org> branch nick: work-10.0-mdev6676 timestamp: Tue 2014-10-07 16:20:37 +0200 message: MDEV-6676: Speculative parallel replication.
Better syntax for configuring the parallel mode. Now it is a CHANGE MASTER option instead of a system variable, which makes it possible to configure differently for different multi-source replication masters.
Now also the domain-based replication mode can be disabled. === modified file 'mysql-test/include/check-testcase.test' --- a/mysql-test/include/check-testcase.test 2013-12-16 12:02:21 +0000 +++ b/mysql-test/include/check-testcase.test 2014-10-07 14:20:37 +0000 @@ -64,6 +64,7 @@ if ($tmp) --echo Master_SSL_Crlpath # --echo Using_Gtid No --echo Gtid_IO_Pos # + --echo Parallel_Mode domain,groupcommit } if (!$tmp) { # Note: after WL#5177, fields 13-18 shall not be filtered-out.
=== modified file 'mysql-test/r/mysqld--help.result' --- a/mysql-test/r/mysqld--help.result 2014-09-19 13:25:37 +0000 +++ b/mysql-test/r/mysqld--help.result 2014-10-07 14:20:37 +0000 @@ -915,17 +915,6 @@ parallel replication thread when reading ahead in the relay log looking for opportunities for parallel replication. Only used when --slave-parallel-threads > 0. - --slave-parallel-mode=name - Controls what transactions are applied in parallel when - using --slave-parallel-threads. Syntax: - slave_paralle_mode=value[,value...], where "value" could - be one or more of: "domain", to apply different - replication domains in parallel; "groupcommit", to apply - in parallel transactions that group-committed together on - the master; "transactional", to optimistically try to - apply all transactional DML in parallel; and "waiting" to - extend "transactional" to even transactions that had to - wait on the master. --slave-parallel-threads=# If non-zero, number of threads to spawn to apply in parallel events on the slave that were group-committed on @@ -1320,7 +1309,6 @@ slave-exec-mode STRICT slave-max-allowed-packet 1073741824 slave-net-timeout 3600 slave-parallel-max-queued 131072 -slave-parallel-mode domain,groupcommit slave-parallel-threads 0 slave-skip-errors (No default value) slave-sql-verify-checksum TRUE
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result' --- a/mysql-test/suite/rpl/r/rpl_parallel.result 2014-08-15 09:31:13 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel.result 2014-10-07 14:20:37 +0000 @@ -1,5 +1,5 @@ -include/rpl_init.inc [topology=1->2] -SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/master-slave.inc +[connection master] SET GLOBAL slave_parallel_threads=10; ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first include/stop_slave.inc @@ -923,8 +923,79 @@ SELECT * FROM t2 WHERE a >= 30 ORDER BY 32 33 34 +*** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx *** +Parallel_Mode = 'domain,groupcommit' include/stop_slave.inc -SET GLOBAL slave_parallel_threads=@old_parallel_threads; +CHANGE MASTER TO parallel_mode=(,domain,groupcommit); +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'domain,groupcommit)' at line 1 +CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional); +ERROR HY000: Invalid use of 'transactional' option for PARALLEL_MODE in CHANGE MASTER TO +CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting); +ERROR HY000: Invalid use of 'waiting' option for PARALLEL_MODE in CHANGE MASTER TO +CHANGE MASTER TO parallel_mode=(domain,domain); +ERROR HY000: Invalid use of 'domain' option for PARALLEL_MODE in CHANGE MASTER TO +CHANGE MASTER TO parallel_mode=(waiting,transactional,domain); +Parallel_Mode = 'domain,transactional,waiting' +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +Parallel_Mode = 'domain,groupcommit' +*** MDEV-6676 - test that empty parallel_mode does not replicate in parallel *** +INSERT INTO t2 VALUES (40); +include/save_master_gtid.inc +CHANGE MASTER TO parallel_mode=(); +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply"; +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +a +40 +include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +*** MDEV-6676 - test disabling domain-based parallel replication *** +SET gtid_domain_id = 1; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +DELETE FROM t2 WHERE a >= 41; +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +SET gtid_domain_id = 0; +include/save_master_gtid.inc +CHANGE MASTER TO parallel_mode=(groupcommit); +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +a +40 +41 +42 +43 +44 +45 +46 +include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +include/start_slave.inc +*** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart *** +Parallel_Mode = 'domain,groupcommit' +include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(transactional,waiting); +Parallel_Mode = 'transactional,waiting' +include/start_slave.inc +include/rpl_restart_server.inc [server_number=2] +Parallel_Mode = 'transactional,waiting' +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +include/start_slave.inc +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=0; include/start_slave.inc SET DEBUG_SYNC= 'RESET'; DROP function foo;
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic.result' --- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-10-07 14:20:37 +0000 @@ -2,11 +2,9 @@ include/rpl_init.inc [topology=1->2] ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; include/stop_slave.inc SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional); INSERT INTO t1 VALUES(1,1); BEGIN; INSERT INTO t1 VALUES(2,1); @@ -288,8 +286,8 @@ SET GLOBAL binlog_format= @old_format; SET GLOBAL tx_isolation= @old_isolation; include/start_slave.inc include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; include/start_slave.inc DROP TABLE t1, t2, t3; include/rpl_end.inc
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result' --- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result 2014-10-07 14:20:37 +0000 @@ -6,12 +6,10 @@ INSERT INTO t1 VALUES (1,0), (2,0), (3,0 INSERT INTO t2 VALUES (1,0), (2,0); SET @old_isolation= @@GLOBAL.tx_isolation; SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; include/stop_slave.inc SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED; SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional,waiting"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting); *** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave *** INSERT INTO t1 SELECT 4, COUNT(*) FROM t2; INSERT INTO t2 SELECT 4, COUNT(*) FROM t1; @@ -78,8 +76,8 @@ a b 10 10 include/stop_slave.inc SET GLOBAL tx_isolation= @old_isolation; +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; include/start_slave.inc DROP TABLE t1, t2; include/rpl_end.inc
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test' --- a/mysql-test/suite/rpl/t/rpl_parallel.test 2014-08-15 09:31:13 +0000 +++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2014-10-07 14:20:37 +0000 @@ -1,13 +1,12 @@ --source include/have_innodb.inc --source include/have_debug.inc --source include/have_debug_sync.inc ---let $rpl_topology=1->2 ---source include/rpl_init.inc +--source include/master-slave.inc
# Test various aspects of parallel replication.
--connection server_2 -SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--let $old_parallel_threads=`SELECT @@GLOBAL.slave_parallel_threads` --error ER_SLAVE_MUST_STOP SET GLOBAL slave_parallel_threads=10; --source include/stop_slave.inc @@ -1466,9 +1465,100 @@ SET sql_slave_skip_counter= 1; SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+--echo *** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx *** +--connection server_2 + +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +--source include/stop_slave.inc +--error ER_PARSE_ERROR +CHANGE MASTER TO parallel_mode=(,domain,groupcommit); +--error ER_INVALID_SLAVE_PARALLEL_MODE +CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional); +--error ER_INVALID_SLAVE_PARALLEL_MODE +CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting); +--error ER_INVALID_SLAVE_PARALLEL_MODE +CHANGE MASTER TO parallel_mode=(domain,domain); +CHANGE MASTER TO parallel_mode=(waiting,transactional,domain); +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc + + +--echo *** MDEV-6676 - test that empty parallel_mode does not replicate in parallel *** +--connection server_1 +INSERT INTO t2 VALUES (40); +--source include/save_master_gtid.inc + +--connection server_2 +CHANGE MASTER TO parallel_mode=(); +# Test that we do not use parallel apply, by injecting an unconditional +# crash in the parallel apply code. +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply"; +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +--source include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; + + +--echo *** MDEV-6676 - test disabling domain-based parallel replication *** +--connection server_1 +# Let's do a bunch of transactions that will conflict if run out-of-order in +# domain-based parallel replication mode. +SET gtid_domain_id = 1; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +DELETE FROM t2 WHERE a >= 41; +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (41); +INSERT INTO t2 VALUES (42); +INSERT INTO t2 VALUES (43); +INSERT INTO t2 VALUES (44); +INSERT INTO t2 VALUES (45); +INSERT INTO t2 VALUES (46); +SET gtid_domain_id = 0; +--source include/save_master_gtid.inc +--connection server_2 +CHANGE MASTER TO parallel_mode=(groupcommit); +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; +--source include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +--source include/start_slave.inc + + +--echo *** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart *** +--connection server_2 +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +--source include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(transactional,waiting); +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +--source include/start_slave.inc + +--let $rpl_server_number= 2 +--source include/rpl_restart_server.inc + +--connection server_2 +--let $status_items= Parallel_Mode +--source include/show_slave_status.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); +--source include/start_slave.inc + + --connection server_2 --source include/stop_slave.inc -SET GLOBAL slave_parallel_threads=@old_parallel_threads; +eval SET GLOBAL slave_parallel_threads=$old_parallel_threads; --source include/start_slave.inc SET DEBUG_SYNC= 'RESET';
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic.test' --- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-10-07 14:20:37 +0000 @@ -11,11 +11,9 @@ CREATE TABLE t1 (a int PRIMARY KEY, b IN --connection server_2 --sync_with_master SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
--connection server_1 @@ -309,8 +307,8 @@ SET GLOBAL tx_isolation= @old_isolation;
--connection server_2 --source include/stop_slave.inc +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; --source include/start_slave.inc
--connection server_1
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test' --- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test 2014-09-24 19:25:10 +0000 +++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test 2014-10-07 14:20:37 +0000 @@ -16,12 +16,10 @@ INSERT INTO t2 VALUES (1,0), (2,0); --sync_with_master SET @old_isolation= @@GLOBAL.tx_isolation; SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; -SET @old_mode=@@GLOBAL.slave_parallel_mode; --source include/stop_slave.inc SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED; SET GLOBAL slave_parallel_threads=10; -SET GLOBAL slave_parallel_mode="domain,transactional,waiting"; -CHANGE MASTER TO master_use_gtid=slave_pos; +CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
--echo *** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave *** @@ -63,8 +61,8 @@ SELECT * FROM t2 ORDER BY a; --connection server_2 --source include/stop_slave.inc SET GLOBAL tx_isolation= @old_isolation; +CHANGE MASTER TO parallel_mode=(domain,groupcommit); SET GLOBAL slave_parallel_threads=@old_parallel_threads; -SET GLOBAL slave_parallel_mode=@old_mode; --source include/start_slave.inc
--connection server_1
=== modified file 'sql/lex.h' --- a/sql/lex.h 2014-02-02 09:00:36 +0000 +++ b/sql/lex.h 2014-10-07 14:20:37 +0000 @@ -194,6 +194,7 @@ static SYMBOL symbols[] = { { "DISTINCTROW", SYM(DISTINCT)}, /* Access likes this */ { "DIV", SYM(DIV_SYM)}, { "DO", SYM(DO_SYM)}, + { "DOMAIN", SYM(DOMAIN_SYM)}, { "DOUBLE", SYM(DOUBLE_SYM)}, { "DROP", SYM(DROP)}, { "DUAL", SYM(DUAL_SYM)}, @@ -257,6 +258,7 @@ static SYMBOL symbols[] = { { "GRANT", SYM(GRANT)}, { "GRANTS", SYM(GRANTS)}, { "GROUP", SYM(GROUP_SYM)}, + { "GROUPCOMMIT", SYM(GROUPCOMMIT_SYM)}, { "HANDLER", SYM(HANDLER_SYM)}, { "HARD", SYM(HARD_SYM)}, { "HASH", SYM(HASH_SYM)}, @@ -429,6 +431,7 @@ static SYMBOL symbols[] = { { "PACK_KEYS", SYM(PACK_KEYS_SYM)}, { "PAGE", SYM(PAGE_SYM)}, { "PAGE_CHECKSUM", SYM(PAGE_CHECKSUM_SYM)}, + { "PARALLEL_MODE", SYM(PARALLEL_MODE_SYM)}, { "PARSER", SYM(PARSER_SYM)}, { "PARSE_VCOL_EXPR", SYM(PARSE_VCOL_EXPR_SYM)}, { "PARTIAL", SYM(PARTIAL)}, @@ -650,6 +653,7 @@ static SYMBOL symbols[] = { { "VIEW", SYM(VIEW_SYM)}, { "VIRTUAL", SYM(VIRTUAL_SYM)}, { "WAIT", SYM(WAIT_SYM)}, + { "WAITING", SYM(WAITING_SYM)}, { "WARNINGS", SYM(WARNINGS)}, { "WEEK", SYM(WEEK_SYM)}, { "WEIGHT_STRING", SYM(WEIGHT_STRING_SYM)},
=== modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2014-09-25 13:47:58 +0000 +++ b/sql/log_event.cc 2014-10-07 14:20:37 +0000 @@ -12728,7 +12728,7 @@ bool rpl_get_position_info(const char ** return FALSE; #else const Relay_log_info *rli= &(active_mi->rli); - if (opt_slave_parallel_threads == 0) + if (!rli->mi->using_parallel()) { *log_file_name= rli->group_master_log_name; *log_pos= rli->group_master_log_pos +
=== modified file 'sql/mysqld.cc' --- a/sql/mysqld.cc 2014-09-26 10:37:19 +0000 +++ b/sql/mysqld.cc 2014-10-07 14:20:37 +0000 @@ -562,8 +562,6 @@ ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0; ulong opt_slave_domain_parallel_threads= 0; -ulonglong opt_slave_parallel_mode= - SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_GROUPCOMMIT; ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_usec= 0; ulong opt_slave_parallel_max_queued= 131072;
=== modified file 'sql/mysqld.h' --- a/sql/mysqld.h 2014-09-26 10:37:19 +0000 +++ b/sql/mysqld.h 2014-10-07 14:20:37 +0000 @@ -187,7 +187,6 @@ extern ulong stored_program_cache_size; extern ulong opt_slave_parallel_threads; extern ulong opt_slave_domain_parallel_threads; extern ulong opt_slave_parallel_max_queued; -extern ulonglong opt_slave_parallel_mode; extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_usec; extern my_bool opt_gtid_ignore_duplicates;
=== modified file 'sql/rpl_mi.cc' --- a/sql/rpl_mi.cc 2014-09-02 12:07:01 +0000 +++ b/sql/rpl_mi.cc 2014-10-07 14:20:37 +0000 @@ -40,7 +40,8 @@ Master_info::Master_info(LEX_STRING *con heartbeat_period(0), received_heartbeats(0), master_id(0), prev_master_id(0), using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), - gtid_reconnect_event_skip_count(0), gtid_event_seen(false) + gtid_reconnect_event_skip_count(0), gtid_event_seen(false), + parallel_mode(SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -178,6 +179,7 @@ void init_master_log_pos(Master_info* mi mi->events_queued_since_last_gtid= 0; mi->gtid_reconnect_event_skip_count= 0; mi->gtid_event_seen= false; + mi->parallel_mode= SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT;
/* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; @@ -514,6 +516,22 @@ file '%s')", fname); else mi->using_gtid= Master_info::USE_GTID_NO; } + else if (!strncmp(buf, STRING_WITH_LEN("parallel_mode="))) + { + int val= atoi(buf + (sizeof("parallel_mode=") - 1)); + mi->parallel_mode= val & (SLAVE_PARALLEL_DOMAIN | + SLAVE_PARALLEL_GROUPCOMMIT | + SLAVE_PARALLEL_TRX | + SLAVE_PARALLEL_WAITING); + } + else if (!strncmp(buf, STRING_WITH_LEN("END_MARKER"))) + { + /* + Guard agaist extra left-overs at the end of file, in case a later + update causes the file to shrink compared to earlier contents. + */ + break; + } } } } @@ -657,7 +675,9 @@ int flush_master_info(Master_info* mi, my_b_printf(file,
"%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n" "\n\n\n\n\n\n\n\n\n\n\n" - "using_gtid=%d\n", + "using_gtid=%d\n" + "parallel_mode=%u\n" + "END_MARKER\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, @@ -666,7 +686,7 @@ int flush_master_info(Master_info* mi, mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, heartbeat_buf, "", ignore_server_ids_buf, "", 0, - mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid); + mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid, mi->parallel_mode); my_free(ignore_server_ids_buf); err= flush_io_cache(file); if (sync_masterinfo_period && !err &&
=== modified file 'sql/rpl_mi.h' --- a/sql/rpl_mi.h 2014-09-02 12:07:01 +0000 +++ b/sql/rpl_mi.h 2014-10-07 14:20:37 +0000 @@ -75,6 +75,10 @@ class Master_info : public Slave_reporti return connection_name.str == 0; } static const char *using_gtid_astext(enum enum_using_gtid arg); + bool using_parallel() + { + return opt_slave_parallel_threads > 0 && parallel_mode != 0; + }
/* the variables below are needed because we can change masters on the fly */ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ @@ -178,6 +182,12 @@ class Master_info : public Slave_reporti uint64 gtid_reconnect_event_skip_count; /* gtid_event_seen is false until we receive first GTID event from master. */ bool gtid_event_seen; + /* + The parallel replication modes, if any. A combination (binary OR) of any of + SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, SLAVE_PARALLEL_TRX, and + SLAVE_PARALLEL_WAITING. + */ + uint32 parallel_mode; }; int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname,
=== modified file 'sql/rpl_parallel.cc' --- a/sql/rpl_parallel.cc 2014-10-03 09:43:23 +0000 +++ b/sql/rpl_parallel.cc 2014-10-07 14:20:37 +0000 @@ -1891,6 +1891,7 @@ rpl_parallel::do_event(rpl_group_info *s bool did_enter_cond= false; PSI_stage_info old_stage;
+ DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE();); /* Handle master log name change, seen in Rotate_log_event. */ typ= ev->get_type_code(); if (unlikely(typ == ROTATE_EVENT)) @@ -1971,7 +1972,8 @@ rpl_parallel::do_event(rpl_group_info *s if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || + !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ? 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { @@ -2012,7 +2014,7 @@ rpl_parallel::do_event(rpl_group_info *s { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); bool new_gco; - ulonglong mode= opt_slave_parallel_mode; + ulonglong mode= rli->mi->parallel_mode; uchar gtid_flags= gtid_ev->flags2; group_commit_orderer *gco; uint8 force_switch_flag;
=== modified file 'sql/rpl_parallel.h' --- a/sql/rpl_parallel.h 2014-10-03 09:43:23 +0000 +++ b/sql/rpl_parallel.h 2014-10-07 14:20:37 +0000 @@ -12,7 +12,11 @@ class Relay_log_info; struct inuse_relaylog;
-/* Bit masks for the values in --slave-parallel-mode. */ +/* + Bit masks for the values in --slave-parallel-mode. + Note that these values cannot be changed - they are stored in master.info, + so need to be possible to read back in a different version of the server. +*/ #define SLAVE_PARALLEL_DOMAIN (1ULL << 0) #define SLAVE_PARALLEL_GROUPCOMMIT (1ULL << 1) #define SLAVE_PARALLEL_TRX (1ULL << 2)
=== modified file 'sql/share/errmsg-utf8.txt' --- a/sql/share/errmsg-utf8.txt 2014-07-08 17:38:26 +0000 +++ b/sql/share/errmsg-utf8.txt 2014-10-07 14:20:37 +0000 @@ -7111,3 +7111,5 @@ ER_SLAVE_SKIP_NOT_IN_GTID eng "When using GTID, @@sql_slave_skip_counter can not be used. Instead, setting @@gtid_slave_pos explicitly can be used to skip to after a given GTID position." ER_TABLE_DEFINITION_TOO_BIG eng "The definition for table %`s is too big" +ER_INVALID_SLAVE_PARALLEL_MODE + eng "Invalid use of '%s' option for PARALLEL_MODE in CHANGE MASTER TO"
=== modified file 'sql/slave.cc' --- a/sql/slave.cc 2014-09-02 12:07:01 +0000 +++ b/sql/slave.cc 2014-10-07 14:20:37 +0000 @@ -625,8 +625,7 @@ int terminate_slave_threads(Master_info* if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { DBUG_PRINT("info",("Terminating SQL thread")); - if (opt_slave_parallel_threads > 0 && - mi->rli.abort_slave && mi->rli.stop_for_until) + if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until) { mi->rli.stop_for_until= false; mi->rli.parallel.stop_during_until(); @@ -2576,6 +2575,8 @@ static bool send_show_master_info_header field_list.push_back(new Item_empty_string("Using_Gtid", sizeof("Current_Pos")-1)); field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30)); + field_list.push_back(new Item_empty_string("Parallel_Mode", + sizeof("domain,groupcommit,transactional,waiting")-1)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2708,8 +2709,7 @@ static bool send_show_master_info_data(T else { idle= mi->rli.sql_thread_caught_up; - if (opt_slave_parallel_threads > 0 && idle && - !mi->rli.parallel.workers_idle()) + if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle()) idle= false; } if (idle) @@ -2786,13 +2786,34 @@ static bool send_show_master_info_data(T protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath protocol->store(mi->ssl_capath, &my_charset_bin); + // Using_Gtid protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); + // Gtid_IO_Pos { char buff[30]; String tmp(buff, sizeof(buff), system_charset_info); mi->gtid_current_pos.to_string(&tmp); protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin); } + // Parallel_Mode + { + /* Note how sizeof("domain") has room for "domain," due to traling 0. */ + char buf[sizeof("domain") + sizeof("groupcommit") + + sizeof("transactional") + sizeof("waiting") + 1]; + char *p= buf; + uint32 mode= mi->parallel_mode; + if (mode & SLAVE_PARALLEL_DOMAIN) + p= strmov(p, "domain,"); + if (mode & SLAVE_PARALLEL_GROUPCOMMIT) + p= strmov(p, "groupcommit,"); + if (mode & SLAVE_PARALLEL_TRX) + p= strmov(p, "transactional,"); + if (mode & SLAVE_PARALLEL_WAITING) + p= strmov(p, "waiting,"); + if (p != buf) + --p; // Discard last ',' + protocol->store(buf, p-buf, &my_charset_bin); + } if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3521,7 +3542,7 @@ static int exec_relay_log_event(THD* thd
update_state_of_relay_log(rli, ev);
- if (opt_slave_parallel_threads > 0) + if (rli->mi->using_parallel()) { int res= rli->parallel.do_event(serial_rgi, ev, event_size); if (res >= 0) @@ -4667,7 +4688,7 @@ log '%s' at position %s, relay log '%s' } }
- if (opt_slave_parallel_threads > 0) + if (mi->using_parallel()) rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */ @@ -4693,7 +4714,7 @@ log '%s' at position %s, relay log '%s' (We want the first one to be before the printout of stop position to get the correct position printed.) */ - if (opt_slave_parallel_threads > 0) + if (mi->using_parallel()) rli->parallel.wait_for_done(thd, rli);
/* @@ -6314,7 +6335,7 @@ static Log_event* next_event(rpl_group_i llstr(my_b_tell(cur_log),llbuf1), llstr(rli->event_relay_log_pos,llbuf2))); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(opt_slave_parallel_threads > 0 || + DBUG_ASSERT(rli->mi->using_parallel() || my_b_tell(cur_log) == rli->event_relay_log_pos); } #endif
=== modified file 'sql/sql_lex.h' --- a/sql/sql_lex.h 2014-08-07 16:06:56 +0000 +++ b/sql/sql_lex.h 2014-10-07 14:20:37 +0000 @@ -218,11 +218,18 @@ struct LEX_MASTER_INFO uint port, connect_retry; float heartbeat_period; /* + Modes of parallel replication enabled, if any. A combination (binary OR) of + any of SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, + SLAVE_PARALLEL_TRX, and SLAVE_PARALLEL_WAITING. + */ + uint32 repl_parallel_mode; + /* Enum is used for making it possible to detect if the user changed variable or if it should be left at old value */ enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} - ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt; + ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt, + repl_parallel_mode_opt; enum { LEX_GTID_UNCHANGED, LEX_GTID_NO, LEX_GTID_CURRENT_POS, LEX_GTID_SLAVE_POS } use_gtid_opt; @@ -241,10 +248,11 @@ struct LEX_MASTER_INFO pos= relay_log_pos= server_id= port= connect_retry= 0; heartbeat_period= 0; ssl= ssl_verify_server_cert= heartbeat_opt= - repl_ignore_server_ids_opt= LEX_MI_UNCHANGED; + repl_ignore_server_ids_opt= repl_parallel_mode_opt= LEX_MI_UNCHANGED; gtid_pos_str.length= 0; gtid_pos_str.str= NULL; use_gtid_opt= LEX_GTID_UNCHANGED; + repl_parallel_mode= 0; } };
=== modified file 'sql/sql_repl.cc' --- a/sql/sql_repl.cc 2014-08-07 16:06:56 +0000 +++ b/sql/sql_repl.cc 2014-10-07 14:20:37 +0000 @@ -3434,6 +3434,9 @@ bool change_master(THD* thd, Master_info lex_mi->relay_log_name || lex_mi->relay_log_pos) mi->using_gtid= Master_info::USE_GTID_NO;
+ if (lex_mi->repl_parallel_mode_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->parallel_mode= lex_mi->repl_parallel_mode; + /* If user did specify neither host nor port nor any log name nor any log pos, i.e. he specified only user/password/master_connect_retry, he probably
=== modified file 'sql/sql_yacc.yy' --- a/sql/sql_yacc.yy 2014-09-03 14:24:31 +0000 +++ b/sql/sql_yacc.yy 2014-10-07 14:20:37 +0000 @@ -1130,6 +1130,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token DIV_SYM %token DOUBLE_SYM /* SQL-2003-R */ %token DO_SYM +%token DOMAIN_SYM %token DROP /* SQL-2003-R */ %token DUAL_SYM %token DUMPFILE @@ -1194,6 +1195,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token GRANT /* SQL-2003-R */ %token GRANTS %token GROUP_SYM /* SQL-2003-R */ +%token GROUPCOMMIT_SYM %token GROUP_CONCAT_SYM %token GT_SYM /* OPERATOR */ %token HANDLER_SYM @@ -1376,6 +1378,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token PACK_KEYS_SYM %token PAGE_SYM %token PAGE_CHECKSUM_SYM +%token PARALLEL_MODE_SYM %token PARAM_MARKER %token PARSER_SYM %token PARSE_VCOL_EXPR_SYM @@ -1604,6 +1607,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** %token VIEW_SYM /* SQL-2003-N */ %token VIRTUAL_SYM %token WAIT_SYM +%token WAITING_SYM %token WARNINGS %token WEEK_SYM %token WEIGHT_STRING_SYM @@ -2244,6 +2248,14 @@ rule: <-- starts at col 1 { Lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_ENABLE; } + | PARALLEL_MODE_SYM EQ '(' ')' + { + Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_DISABLE; + } + | PARALLEL_MODE_SYM EQ '(' parallel_mode_list ')' + { + Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_ENABLE; + } | master_file_def ; @@ -2260,6 +2272,52 @@ rule: <-- starts at col 1 insert_dynamic(&Lex->mi.repl_ignore_server_ids, (uchar*) &($1)); }
+parallel_mode_list: + parallel_mode_option + | parallel_mode_list ',' parallel_mode_option + ; + +parallel_mode_option: + DOMAIN_SYM + { + if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_DOMAIN) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "domain"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_DOMAIN; + } + | GROUPCOMMIT_SYM + { + if (Lex->mi.repl_parallel_mode & + (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX)) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "groupcommit"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_GROUPCOMMIT; + } + | TRANSACTIONAL_SYM + { + if (Lex->mi.repl_parallel_mode & + (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX)) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "transactional"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_TRX; + } + | WAITING_SYM + { + if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_WAITING) + { + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "waiting"); + MYSQL_YYABORT; + } + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_WAITING; + } + + master_file_def: MASTER_LOG_FILE_SYM EQ TEXT_STRING_sys { @@ -14098,6 +14156,7 @@ user: user_maybe_role | DISABLE_SYM {} | DISCARD {} | DISK_SYM {} + | DOMAIN_SYM {} | DUMPFILE {} | DUPLICATE_SYM {} | DYNAMIC_SYM {} @@ -14131,6 +14190,7 @@ user: user_maybe_role | GET_FORMAT {} | GRANTS {} | GLOBAL_SYM {} + | GROUPCOMMIT_SYM {} | HASH_SYM {} | HARD_SYM {} | HOSTS_SYM {} @@ -14221,6 +14281,7 @@ user: user_maybe_role | ONLY_SYM {} | PACK_KEYS_SYM {} | PAGE_SYM {} + | PARALLEL_MODE_SYM {} | PARTIAL {} | PARTITIONING_SYM {} | PARTITIONS_SYM {} @@ -14336,6 +14397,7 @@ user: user_maybe_role | VALUE_SYM {} | WARNINGS {} | WAIT_SYM {} + | WAITING_SYM {} | WEEK_SYM {} | WEIGHT_STRING_SYM {} | WORK_SYM {}
=== modified file 'sql/sys_vars.cc' --- a/sql/sys_vars.cc 2014-09-19 13:25:37 +0000 +++ b/sql/sys_vars.cc 2014-10-07 14:20:37 +0000 @@ -1834,31 +1834,11 @@ static Sys_var_ulong Sys_slave_parallel_ VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));
-static const char *slave_parallel_mode_names[] = { - "domain", "groupcommit", "transactional", "waiting", NULL -}; - -static Sys_var_set Sys_slave_parallel_mode( - "slave_parallel_mode", - "Controls what transactions are applied in parallel when using " - "--slave-parallel-threads. Syntax: slave_paralle_mode=value[,value...], " - "where \"value\" could be one or more of: \"domain\", to apply " - "different replication domains in parallel; \"groupcommit\", to apply " - "in parallel transactions that group-committed together on the master; " - "\"transactional\", to optimistically try to apply all transactional " - "DML in parallel; and \"waiting\" to extend \"transactional\" to " - "even transactions that had to wait on the master.", - GLOBAL_VAR(opt_slave_parallel_mode), CMD_LINE(REQUIRED_ARG), - slave_parallel_mode_names, - DEFAULT(SLAVE_PARALLEL_DOMAIN | - SLAVE_PARALLEL_GROUPCOMMIT)); - - static Sys_var_bit Sys_replicate_allow_parallel( "replicate_allow_parallel", "If set when a transaction is written to the binlog, that transaction " - "is allowed to replicate in parallel on a slave where " - "slave_parallel_mode is set to \"transactional\". Can be cleared for " + "is allowed to replicate in parallel on a slave where parallel_mode " + "is set to \"transactional\" (in CHANGE MASTER). Can be cleared for " "transactions that are likely to cause a conflict if replicated in " "parallel, to avoid unnecessary rollback and retry.", SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_ALLOW_PARALLEL,
_______________________________________________ commits mailing list commits@mariadb.org https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits