At http://bazaar.launchpad.net/~maria-captains/maria/10.0
------------------------------------------------------------
revno: 4391
revision-id: knielsen@knielsen-hq.org-20141007142037-y85xq099ltdiqno6
parent: knielsen@knielsen-hq.org-20141003094323-nosx0y7vv0kjv20e
committer: Kristian Nielsen <knielsen@knielsen-hq.org>
branch nick: work-10.0-mdev6676
timestamp: Tue 2014-10-07 16:20:37 +0200
message:
MDEV-6676: Speculative parallel replication.
Better syntax for configuring the parallel mode. Now it is a CHANGE MASTER
option instead of a system variable, which makes it possible to configure
differently for different multi-source replication masters.
Now also the domain-based replication mode can be disabled.
=== modified file 'mysql-test/include/check-testcase.test'
--- a/mysql-test/include/check-testcase.test 2013-12-16 12:02:21 +0000
+++ b/mysql-test/include/check-testcase.test 2014-10-07 14:20:37 +0000
@@ -64,6 +64,7 @@ if ($tmp)
--echo Master_SSL_Crlpath #
--echo Using_Gtid No
--echo Gtid_IO_Pos #
+ --echo Parallel_Mode domain,groupcommit
}
if (!$tmp) {
# Note: after WL#5177, fields 13-18 shall not be filtered-out.
=== modified file 'mysql-test/r/mysqld--help.result'
--- a/mysql-test/r/mysqld--help.result 2014-09-19 13:25:37 +0000
+++ b/mysql-test/r/mysqld--help.result 2014-10-07 14:20:37 +0000
@@ -915,17 +915,6 @@
parallel replication thread when reading ahead in the
relay log looking for opportunities for parallel
replication. Only used when --slave-parallel-threads > 0.
- --slave-parallel-mode=name
- Controls what transactions are applied in parallel when
- using --slave-parallel-threads. Syntax:
- slave_paralle_mode=value[,value...], where "value" could
- be one or more of: "domain", to apply different
- replication domains in parallel; "groupcommit", to apply
- in parallel transactions that group-committed together on
- the master; "transactional", to optimistically try to
- apply all transactional DML in parallel; and "waiting" to
- extend "transactional" to even transactions that had to
- wait on the master.
--slave-parallel-threads=#
If non-zero, number of threads to spawn to apply in
parallel events on the slave that were group-committed on
@@ -1320,7 +1309,6 @@ slave-exec-mode STRICT
slave-max-allowed-packet 1073741824
slave-net-timeout 3600
slave-parallel-max-queued 131072
-slave-parallel-mode domain,groupcommit
slave-parallel-threads 0
slave-skip-errors (No default value)
slave-sql-verify-checksum TRUE
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result 2014-08-15 09:31:13 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result 2014-10-07 14:20:37 +0000
@@ -1,5 +1,5 @@
-include/rpl_init.inc [topology=1->2]
-SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/master-slave.inc
+[connection master]
SET GLOBAL slave_parallel_threads=10;
ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first
include/stop_slave.inc
@@ -923,8 +923,79 @@ SELECT * FROM t2 WHERE a >= 30 ORDER BY
32
33
34
+*** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx ***
+Parallel_Mode = 'domain,groupcommit'
include/stop_slave.inc
-SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+CHANGE MASTER TO parallel_mode=(,domain,groupcommit);
+ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'domain,groupcommit)' at line 1
+CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional);
+ERROR HY000: Invalid use of 'transactional' option for PARALLEL_MODE in CHANGE MASTER TO
+CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting);
+ERROR HY000: Invalid use of 'waiting' option for PARALLEL_MODE in CHANGE MASTER TO
+CHANGE MASTER TO parallel_mode=(domain,domain);
+ERROR HY000: Invalid use of 'domain' option for PARALLEL_MODE in CHANGE MASTER TO
+CHANGE MASTER TO parallel_mode=(waiting,transactional,domain);
+Parallel_Mode = 'domain,transactional,waiting'
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+Parallel_Mode = 'domain,groupcommit'
+*** MDEV-6676 - test that empty parallel_mode does not replicate in parallel ***
+INSERT INTO t2 VALUES (40);
+include/save_master_gtid.inc
+CHANGE MASTER TO parallel_mode=();
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply";
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+a
+40
+include/stop_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+*** MDEV-6676 - test disabling domain-based parallel replication ***
+SET gtid_domain_id = 1;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+DELETE FROM t2 WHERE a >= 41;
+SET gtid_domain_id = 2;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+SET gtid_domain_id = 0;
+include/save_master_gtid.inc
+CHANGE MASTER TO parallel_mode=(groupcommit);
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+a
+40
+41
+42
+43
+44
+45
+46
+include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+include/start_slave.inc
+*** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart ***
+Parallel_Mode = 'domain,groupcommit'
+include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(transactional,waiting);
+Parallel_Mode = 'transactional,waiting'
+include/start_slave.inc
+include/rpl_restart_server.inc [server_number=2]
+Parallel_Mode = 'transactional,waiting'
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+include/start_slave.inc
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=0;
include/start_slave.inc
SET DEBUG_SYNC= 'RESET';
DROP function foo;
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-10-07 14:20:37 +0000
@@ -2,11 +2,9 @@ include/rpl_init.inc [topology=1->2]
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
INSERT INTO t1 VALUES(1,1);
BEGIN;
INSERT INTO t1 VALUES(2,1);
@@ -288,8 +286,8 @@ SET GLOBAL binlog_format= @old_format;
SET GLOBAL tx_isolation= @old_isolation;
include/start_slave.inc
include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
include/start_slave.inc
DROP TABLE t1, t2, t3;
include/rpl_end.inc
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result 2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result 2014-10-07 14:20:37 +0000
@@ -6,12 +6,10 @@ INSERT INTO t1 VALUES (1,0), (2,0), (3,0
INSERT INTO t2 VALUES (1,0), (2,0);
SET @old_isolation= @@GLOBAL.tx_isolation;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
include/stop_slave.inc
SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional,waiting";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
*** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave ***
INSERT INTO t1 SELECT 4, COUNT(*) FROM t2;
INSERT INTO t2 SELECT 4, COUNT(*) FROM t1;
@@ -78,8 +76,8 @@ a b
10 10
include/stop_slave.inc
SET GLOBAL tx_isolation= @old_isolation;
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
include/start_slave.inc
DROP TABLE t1, t2;
include/rpl_end.inc
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test 2014-08-15 09:31:13 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2014-10-07 14:20:37 +0000
@@ -1,13 +1,12 @@
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
---let $rpl_topology=1->2
---source include/rpl_init.inc
+--source include/master-slave.inc
# Test various aspects of parallel replication.
--connection server_2
-SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--let $old_parallel_threads=`SELECT @@GLOBAL.slave_parallel_threads`
--error ER_SLAVE_MUST_STOP
SET GLOBAL slave_parallel_threads=10;
--source include/stop_slave.inc
@@ -1466,9 +1465,100 @@ SET sql_slave_skip_counter= 1;
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+--echo *** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx ***
+--connection server_2
+
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+--source include/stop_slave.inc
+--error ER_PARSE_ERROR
+CHANGE MASTER TO parallel_mode=(,domain,groupcommit);
+--error ER_INVALID_SLAVE_PARALLEL_MODE
+CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional);
+--error ER_INVALID_SLAVE_PARALLEL_MODE
+CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting);
+--error ER_INVALID_SLAVE_PARALLEL_MODE
+CHANGE MASTER TO parallel_mode=(domain,domain);
+CHANGE MASTER TO parallel_mode=(waiting,transactional,domain);
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+
+
+--echo *** MDEV-6676 - test that empty parallel_mode does not replicate in parallel ***
+--connection server_1
+INSERT INTO t2 VALUES (40);
+--source include/save_master_gtid.inc
+
+--connection server_2
+CHANGE MASTER TO parallel_mode=();
+# Test that we do not use parallel apply, by injecting an unconditional
+# crash in the parallel apply code.
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply";
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+--source include/stop_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+
+
+--echo *** MDEV-6676 - test disabling domain-based parallel replication ***
+--connection server_1
+# Let's do a bunch of transactions that will conflict if run out-of-order in
+# domain-based parallel replication mode.
+SET gtid_domain_id = 1;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+DELETE FROM t2 WHERE a >= 41;
+SET gtid_domain_id = 2;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+SET gtid_domain_id = 0;
+--source include/save_master_gtid.inc
+--connection server_2
+CHANGE MASTER TO parallel_mode=(groupcommit);
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+--source include/start_slave.inc
+
+
+--echo *** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart ***
+--connection server_2
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(transactional,waiting);
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+--source include/start_slave.inc
+
+--let $rpl_server_number= 2
+--source include/rpl_restart_server.inc
+
+--connection server_2
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+--source include/start_slave.inc
+
+
--connection server_2
--source include/stop_slave.inc
-SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+eval SET GLOBAL slave_parallel_threads=$old_parallel_threads;
--source include/start_slave.inc
SET DEBUG_SYNC= 'RESET';
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-10-07 14:20:37 +0000
@@ -11,11 +11,9 @@ CREATE TABLE t1 (a int PRIMARY KEY, b IN
--connection server_2
--sync_with_master
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
--connection server_1
@@ -309,8 +307,8 @@ SET GLOBAL tx_isolation= @old_isolation;
--connection server_2
--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
--source include/start_slave.inc
--connection server_1
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test 2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test 2014-10-07 14:20:37 +0000
@@ -16,12 +16,10 @@ INSERT INTO t2 VALUES (1,0), (2,0);
--sync_with_master
SET @old_isolation= @@GLOBAL.tx_isolation;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
--source include/stop_slave.inc
SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional,waiting";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
--echo *** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave ***
@@ -63,8 +61,8 @@ SELECT * FROM t2 ORDER BY a;
--connection server_2
--source include/stop_slave.inc
SET GLOBAL tx_isolation= @old_isolation;
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
--source include/start_slave.inc
--connection server_1
=== modified file 'sql/lex.h'
--- a/sql/lex.h 2014-02-02 09:00:36 +0000
+++ b/sql/lex.h 2014-10-07 14:20:37 +0000
@@ -194,6 +194,7 @@ static SYMBOL symbols[] = {
{ "DISTINCTROW", SYM(DISTINCT)}, /* Access likes this */
{ "DIV", SYM(DIV_SYM)},
{ "DO", SYM(DO_SYM)},
+ { "DOMAIN", SYM(DOMAIN_SYM)},
{ "DOUBLE", SYM(DOUBLE_SYM)},
{ "DROP", SYM(DROP)},
{ "DUAL", SYM(DUAL_SYM)},
@@ -257,6 +258,7 @@ static SYMBOL symbols[] = {
{ "GRANT", SYM(GRANT)},
{ "GRANTS", SYM(GRANTS)},
{ "GROUP", SYM(GROUP_SYM)},
+ { "GROUPCOMMIT", SYM(GROUPCOMMIT_SYM)},
{ "HANDLER", SYM(HANDLER_SYM)},
{ "HARD", SYM(HARD_SYM)},
{ "HASH", SYM(HASH_SYM)},
@@ -429,6 +431,7 @@ static SYMBOL symbols[] = {
{ "PACK_KEYS", SYM(PACK_KEYS_SYM)},
{ "PAGE", SYM(PAGE_SYM)},
{ "PAGE_CHECKSUM", SYM(PAGE_CHECKSUM_SYM)},
+ { "PARALLEL_MODE", SYM(PARALLEL_MODE_SYM)},
{ "PARSER", SYM(PARSER_SYM)},
{ "PARSE_VCOL_EXPR", SYM(PARSE_VCOL_EXPR_SYM)},
{ "PARTIAL", SYM(PARTIAL)},
@@ -650,6 +653,7 @@ static SYMBOL symbols[] = {
{ "VIEW", SYM(VIEW_SYM)},
{ "VIRTUAL", SYM(VIRTUAL_SYM)},
{ "WAIT", SYM(WAIT_SYM)},
+ { "WAITING", SYM(WAITING_SYM)},
{ "WARNINGS", SYM(WARNINGS)},
{ "WEEK", SYM(WEEK_SYM)},
{ "WEIGHT_STRING", SYM(WEIGHT_STRING_SYM)},
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2014-09-25 13:47:58 +0000
+++ b/sql/log_event.cc 2014-10-07 14:20:37 +0000
@@ -12728,7 +12728,7 @@ bool rpl_get_position_info(const char **
return FALSE;
#else
const Relay_log_info *rli= &(active_mi->rli);
- if (opt_slave_parallel_threads == 0)
+ if (!rli->mi->using_parallel())
{
*log_file_name= rli->group_master_log_name;
*log_pos= rli->group_master_log_pos +
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2014-09-26 10:37:19 +0000
+++ b/sql/mysqld.cc 2014-10-07 14:20:37 +0000
@@ -562,8 +562,6 @@ ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0;
ulong opt_slave_domain_parallel_threads= 0;
-ulonglong opt_slave_parallel_mode=
- SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_GROUPCOMMIT;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2014-09-26 10:37:19 +0000
+++ b/sql/mysqld.h 2014-10-07 14:20:37 +0000
@@ -187,7 +187,6 @@ extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads;
extern ulong opt_slave_domain_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
-extern ulonglong opt_slave_parallel_mode;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
extern my_bool opt_gtid_ignore_duplicates;
=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc 2014-09-02 12:07:01 +0000
+++ b/sql/rpl_mi.cc 2014-10-07 14:20:37 +0000
@@ -40,7 +40,8 @@ Master_info::Master_info(LEX_STRING *con
heartbeat_period(0), received_heartbeats(0), master_id(0),
prev_master_id(0),
using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0),
- gtid_reconnect_event_skip_count(0), gtid_event_seen(false)
+ gtid_reconnect_event_skip_count(0), gtid_event_seen(false),
+ parallel_mode(SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
@@ -178,6 +179,7 @@ void init_master_log_pos(Master_info* mi
mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0;
mi->gtid_event_seen= false;
+ mi->parallel_mode= SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT;
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
@@ -514,6 +516,22 @@ file '%s')", fname);
else
mi->using_gtid= Master_info::USE_GTID_NO;
}
+ else if (!strncmp(buf, STRING_WITH_LEN("parallel_mode=")))
+ {
+ int val= atoi(buf + (sizeof("parallel_mode=") - 1));
+ mi->parallel_mode= val & (SLAVE_PARALLEL_DOMAIN |
+ SLAVE_PARALLEL_GROUPCOMMIT |
+ SLAVE_PARALLEL_TRX |
+ SLAVE_PARALLEL_WAITING);
+ }
+ else if (!strncmp(buf, STRING_WITH_LEN("END_MARKER")))
+ {
+ /*
+ Guard agaist extra left-overs at the end of file, in case a later
+ update causes the file to shrink compared to earlier contents.
+ */
+ break;
+ }
}
}
}
@@ -657,7 +675,9 @@ int flush_master_info(Master_info* mi,
my_b_printf(file,
"%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n"
"\n\n\n\n\n\n\n\n\n\n\n"
- "using_gtid=%d\n",
+ "using_gtid=%d\n"
+ "parallel_mode=%u\n"
+ "END_MARKER\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
@@ -666,7 +686,7 @@ int flush_master_info(Master_info* mi,
mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
heartbeat_buf, "", ignore_server_ids_buf,
"", 0,
- mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid);
+ mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid, mi->parallel_mode);
my_free(ignore_server_ids_buf);
err= flush_io_cache(file);
if (sync_masterinfo_period && !err &&
=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h 2014-09-02 12:07:01 +0000
+++ b/sql/rpl_mi.h 2014-10-07 14:20:37 +0000
@@ -75,6 +75,10 @@ class Master_info : public Slave_reporti
return connection_name.str == 0;
}
static const char *using_gtid_astext(enum enum_using_gtid arg);
+ bool using_parallel()
+ {
+ return opt_slave_parallel_threads > 0 && parallel_mode != 0;
+ }
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
@@ -178,6 +182,12 @@ class Master_info : public Slave_reporti
uint64 gtid_reconnect_event_skip_count;
/* gtid_event_seen is false until we receive first GTID event from master. */
bool gtid_event_seen;
+ /*
+ The parallel replication modes, if any. A combination (binary OR) of any of
+ SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, SLAVE_PARALLEL_TRX, and
+ SLAVE_PARALLEL_WAITING.
+ */
+ uint32 parallel_mode;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
const char* slave_info_fname,
=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc 2014-10-03 09:43:23 +0000
+++ b/sql/rpl_parallel.cc 2014-10-07 14:20:37 +0000
@@ -1891,6 +1891,7 @@ rpl_parallel::do_event(rpl_group_info *s
bool did_enter_cond= false;
PSI_stage_info old_stage;
+ DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
/* Handle master log name change, seen in Rotate_log_event. */
typ= ev->get_type_code();
if (unlikely(typ == ROTATE_EVENT))
@@ -1971,7 +1972,8 @@ rpl_parallel::do_event(rpl_group_info *s
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
+ !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ?
0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
@@ -2012,7 +2014,7 @@ rpl_parallel::do_event(rpl_group_info *s
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
bool new_gco;
- ulonglong mode= opt_slave_parallel_mode;
+ ulonglong mode= rli->mi->parallel_mode;
uchar gtid_flags= gtid_ev->flags2;
group_commit_orderer *gco;
uint8 force_switch_flag;
=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h 2014-10-03 09:43:23 +0000
+++ b/sql/rpl_parallel.h 2014-10-07 14:20:37 +0000
@@ -12,7 +12,11 @@ class Relay_log_info;
struct inuse_relaylog;
-/* Bit masks for the values in --slave-parallel-mode. */
+/*
+ Bit masks for the values in --slave-parallel-mode.
+ Note that these values cannot be changed - they are stored in master.info,
+ so need to be possible to read back in a different version of the server.
+*/
#define SLAVE_PARALLEL_DOMAIN (1ULL << 0)
#define SLAVE_PARALLEL_GROUPCOMMIT (1ULL << 1)
#define SLAVE_PARALLEL_TRX (1ULL << 2)
=== modified file 'sql/share/errmsg-utf8.txt'
--- a/sql/share/errmsg-utf8.txt 2014-07-08 17:38:26 +0000
+++ b/sql/share/errmsg-utf8.txt 2014-10-07 14:20:37 +0000
@@ -7111,3 +7111,5 @@ ER_SLAVE_SKIP_NOT_IN_GTID
eng "When using GTID, @@sql_slave_skip_counter can not be used. Instead, setting @@gtid_slave_pos explicitly can be used to skip to after a given GTID position."
ER_TABLE_DEFINITION_TOO_BIG
eng "The definition for table %`s is too big"
+ER_INVALID_SLAVE_PARALLEL_MODE
+ eng "Invalid use of '%s' option for PARALLEL_MODE in CHANGE MASTER TO"
=== modified file 'sql/slave.cc'
--- a/sql/slave.cc 2014-09-02 12:07:01 +0000
+++ b/sql/slave.cc 2014-10-07 14:20:37 +0000
@@ -625,8 +625,7 @@ int terminate_slave_threads(Master_info*
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{
DBUG_PRINT("info",("Terminating SQL thread"));
- if (opt_slave_parallel_threads > 0 &&
- mi->rli.abort_slave && mi->rli.stop_for_until)
+ if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until)
{
mi->rli.stop_for_until= false;
mi->rli.parallel.stop_during_until();
@@ -2576,6 +2575,8 @@ static bool send_show_master_info_header
field_list.push_back(new Item_empty_string("Using_Gtid",
sizeof("Current_Pos")-1));
field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30));
+ field_list.push_back(new Item_empty_string("Parallel_Mode",
+ sizeof("domain,groupcommit,transactional,waiting")-1));
if (full)
{
field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2708,8 +2709,7 @@ static bool send_show_master_info_data(T
else
{
idle= mi->rli.sql_thread_caught_up;
- if (opt_slave_parallel_threads > 0 && idle &&
- !mi->rli.parallel.workers_idle())
+ if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle())
idle= false;
}
if (idle)
@@ -2786,13 +2786,34 @@ static bool send_show_master_info_data(T
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
protocol->store(mi->ssl_capath, &my_charset_bin);
+ // Using_Gtid
protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin);
+ // Gtid_IO_Pos
{
char buff[30];
String tmp(buff, sizeof(buff), system_charset_info);
mi->gtid_current_pos.to_string(&tmp);
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
}
+ // Parallel_Mode
+ {
+ /* Note how sizeof("domain") has room for "domain," due to traling 0. */
+ char buf[sizeof("domain") + sizeof("groupcommit") +
+ sizeof("transactional") + sizeof("waiting") + 1];
+ char *p= buf;
+ uint32 mode= mi->parallel_mode;
+ if (mode & SLAVE_PARALLEL_DOMAIN)
+ p= strmov(p, "domain,");
+ if (mode & SLAVE_PARALLEL_GROUPCOMMIT)
+ p= strmov(p, "groupcommit,");
+ if (mode & SLAVE_PARALLEL_TRX)
+ p= strmov(p, "transactional,");
+ if (mode & SLAVE_PARALLEL_WAITING)
+ p= strmov(p, "waiting,");
+ if (p != buf)
+ --p; // Discard last ','
+ protocol->store(buf, p-buf, &my_charset_bin);
+ }
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3521,7 +3542,7 @@ static int exec_relay_log_event(THD* thd
update_state_of_relay_log(rli, ev);
- if (opt_slave_parallel_threads > 0)
+ if (rli->mi->using_parallel())
{
int res= rli->parallel.do_event(serial_rgi, ev, event_size);
if (res >= 0)
@@ -4667,7 +4688,7 @@ log '%s' at position %s, relay log '%s'
}
}
- if (opt_slave_parallel_threads > 0)
+ if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */
@@ -4693,7 +4714,7 @@ log '%s' at position %s, relay log '%s'
(We want the first one to be before the printout of stop position to
get the correct position printed.)
*/
- if (opt_slave_parallel_threads > 0)
+ if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli);
/*
@@ -6314,7 +6335,7 @@ static Log_event* next_event(rpl_group_i
llstr(my_b_tell(cur_log),llbuf1),
llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(opt_slave_parallel_threads > 0 ||
+ DBUG_ASSERT(rli->mi->using_parallel() ||
my_b_tell(cur_log) == rli->event_relay_log_pos);
}
#endif
=== modified file 'sql/sql_lex.h'
--- a/sql/sql_lex.h 2014-08-07 16:06:56 +0000
+++ b/sql/sql_lex.h 2014-10-07 14:20:37 +0000
@@ -218,11 +218,18 @@ struct LEX_MASTER_INFO
uint port, connect_retry;
float heartbeat_period;
/*
+ Modes of parallel replication enabled, if any. A combination (binary OR) of
+ any of SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT,
+ SLAVE_PARALLEL_TRX, and SLAVE_PARALLEL_WAITING.
+ */
+ uint32 repl_parallel_mode;
+ /*
Enum is used for making it possible to detect if the user
changed variable or if it should be left at old value
*/
enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
- ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt;
+ ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt,
+ repl_parallel_mode_opt;
enum {
LEX_GTID_UNCHANGED, LEX_GTID_NO, LEX_GTID_CURRENT_POS, LEX_GTID_SLAVE_POS
} use_gtid_opt;
@@ -241,10 +248,11 @@ struct LEX_MASTER_INFO
pos= relay_log_pos= server_id= port= connect_retry= 0;
heartbeat_period= 0;
ssl= ssl_verify_server_cert= heartbeat_opt=
- repl_ignore_server_ids_opt= LEX_MI_UNCHANGED;
+ repl_ignore_server_ids_opt= repl_parallel_mode_opt= LEX_MI_UNCHANGED;
gtid_pos_str.length= 0;
gtid_pos_str.str= NULL;
use_gtid_opt= LEX_GTID_UNCHANGED;
+ repl_parallel_mode= 0;
}
};
=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc 2014-08-07 16:06:56 +0000
+++ b/sql/sql_repl.cc 2014-10-07 14:20:37 +0000
@@ -3434,6 +3434,9 @@ bool change_master(THD* thd, Master_info
lex_mi->relay_log_name || lex_mi->relay_log_pos)
mi->using_gtid= Master_info::USE_GTID_NO;
+ if (lex_mi->repl_parallel_mode_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->parallel_mode= lex_mi->repl_parallel_mode;
+
/*
If user did specify neither host nor port nor any log name nor any log
pos, i.e. he specified only user/password/master_connect_retry, he probably
=== modified file 'sql/sql_yacc.yy'
--- a/sql/sql_yacc.yy 2014-09-03 14:24:31 +0000
+++ b/sql/sql_yacc.yy 2014-10-07 14:20:37 +0000
@@ -1130,6 +1130,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
%token DIV_SYM
%token DOUBLE_SYM /* SQL-2003-R */
%token DO_SYM
+%token DOMAIN_SYM
%token DROP /* SQL-2003-R */
%token DUAL_SYM
%token DUMPFILE
@@ -1194,6 +1195,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
%token GRANT /* SQL-2003-R */
%token GRANTS
%token GROUP_SYM /* SQL-2003-R */
+%token GROUPCOMMIT_SYM
%token GROUP_CONCAT_SYM
%token GT_SYM /* OPERATOR */
%token HANDLER_SYM
@@ -1376,6 +1378,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
%token PACK_KEYS_SYM
%token PAGE_SYM
%token PAGE_CHECKSUM_SYM
+%token PARALLEL_MODE_SYM
%token PARAM_MARKER
%token PARSER_SYM
%token PARSE_VCOL_EXPR_SYM
@@ -1604,6 +1607,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
%token VIEW_SYM /* SQL-2003-N */
%token VIRTUAL_SYM
%token WAIT_SYM
+%token WAITING_SYM
%token WARNINGS
%token WEEK_SYM
%token WEIGHT_STRING_SYM
@@ -2244,6 +2248,14 @@ rule: <-- starts at col 1
{
Lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
}
+ | PARALLEL_MODE_SYM EQ '(' ')'
+ {
+ Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_DISABLE;
+ }
+ | PARALLEL_MODE_SYM EQ '(' parallel_mode_list ')'
+ {
+ Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
+ }
|
master_file_def
;
@@ -2260,6 +2272,52 @@ rule: <-- starts at col 1
insert_dynamic(&Lex->mi.repl_ignore_server_ids, (uchar*) &($1));
}
+parallel_mode_list:
+ parallel_mode_option
+ | parallel_mode_list ',' parallel_mode_option
+ ;
+
+parallel_mode_option:
+ DOMAIN_SYM
+ {
+ if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_DOMAIN)
+ {
+ my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "domain");
+ MYSQL_YYABORT;
+ }
+ Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_DOMAIN;
+ }
+ | GROUPCOMMIT_SYM
+ {
+ if (Lex->mi.repl_parallel_mode &
+ (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX))
+ {
+ my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "groupcommit");
+ MYSQL_YYABORT;
+ }
+ Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_GROUPCOMMIT;
+ }
+ | TRANSACTIONAL_SYM
+ {
+ if (Lex->mi.repl_parallel_mode &
+ (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX))
+ {
+ my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "transactional");
+ MYSQL_YYABORT;
+ }
+ Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_TRX;
+ }
+ | WAITING_SYM
+ {
+ if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_WAITING)
+ {
+ my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "waiting");
+ MYSQL_YYABORT;
+ }
+ Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_WAITING;
+ }
+
+
master_file_def:
MASTER_LOG_FILE_SYM EQ TEXT_STRING_sys
{
@@ -14098,6 +14156,7 @@ user: user_maybe_role
| DISABLE_SYM {}
| DISCARD {}
| DISK_SYM {}
+ | DOMAIN_SYM {}
| DUMPFILE {}
| DUPLICATE_SYM {}
| DYNAMIC_SYM {}
@@ -14131,6 +14190,7 @@ user: user_maybe_role
| GET_FORMAT {}
| GRANTS {}
| GLOBAL_SYM {}
+ | GROUPCOMMIT_SYM {}
| HASH_SYM {}
| HARD_SYM {}
| HOSTS_SYM {}
@@ -14221,6 +14281,7 @@ user: user_maybe_role
| ONLY_SYM {}
| PACK_KEYS_SYM {}
| PAGE_SYM {}
+ | PARALLEL_MODE_SYM {}
| PARTIAL {}
| PARTITIONING_SYM {}
| PARTITIONS_SYM {}
@@ -14336,6 +14397,7 @@ user: user_maybe_role
| VALUE_SYM {}
| WARNINGS {}
| WAIT_SYM {}
+ | WAITING_SYM {}
| WEEK_SYM {}
| WEIGHT_STRING_SYM {}
| WORK_SYM {}
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2014-09-19 13:25:37 +0000
+++ b/sql/sys_vars.cc 2014-10-07 14:20:37 +0000
@@ -1834,31 +1834,11 @@ static Sys_var_ulong Sys_slave_parallel_
VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));
-static const char *slave_parallel_mode_names[] = {
- "domain", "groupcommit", "transactional", "waiting", NULL
-};
-
-static Sys_var_set Sys_slave_parallel_mode(
- "slave_parallel_mode",
- "Controls what transactions are applied in parallel when using "
- "--slave-parallel-threads. Syntax: slave_paralle_mode=value[,value...], "
- "where \"value\" could be one or more of: \"domain\", to apply "
- "different replication domains in parallel; \"groupcommit\", to apply "
- "in parallel transactions that group-committed together on the master; "
- "\"transactional\", to optimistically try to apply all transactional "
- "DML in parallel; and \"waiting\" to extend \"transactional\" to "
- "even transactions that had to wait on the master.",
- GLOBAL_VAR(opt_slave_parallel_mode), CMD_LINE(REQUIRED_ARG),
- slave_parallel_mode_names,
- DEFAULT(SLAVE_PARALLEL_DOMAIN |
- SLAVE_PARALLEL_GROUPCOMMIT));
-
-
static Sys_var_bit Sys_replicate_allow_parallel(
"replicate_allow_parallel",
"If set when a transaction is written to the binlog, that transaction "
- "is allowed to replicate in parallel on a slave where "
- "slave_parallel_mode is set to \"transactional\". Can be cleared for "
+ "is allowed to replicate in parallel on a slave where parallel_mode "
+ "is set to \"transactional\" (in CHANGE MASTER). Can be cleared for "
"transactions that are likely to cause a conflict if replicated in "
"parallel, to avoid unnecessary rollback and retry.",
SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_ALLOW_PARALLEL,
_______________________________________________
commits mailing list
commits@mariadb.org
https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits