how about also adding a mode = 'auto'
which currently can be == transaction,
but in future could be extended to based on statistics
flip actual modes back and forth ?

/Jonas

On Tue, Oct 7, 2014 at 4:20 PM, <knielsen@knielsen-hq.org> wrote:
At http://bazaar.launchpad.net/~maria-captains/maria/10.0

------------------------------------------------------------
revno: 4391
revision-id: knielsen@knielsen-hq.org-20141007142037-y85xq099ltdiqno6
parent: knielsen@knielsen-hq.org-20141003094323-nosx0y7vv0kjv20e
committer: Kristian Nielsen <knielsen@knielsen-hq.org>
branch nick: work-10.0-mdev6676
timestamp: Tue 2014-10-07 16:20:37 +0200
message:
  MDEV-6676: Speculative parallel replication.

  Better syntax for configuring the parallel mode. Now it is a CHANGE MASTER
  option instead of a system variable, which makes it possible to configure
  differently for different multi-source replication masters.

  Now also the domain-based replication mode can be disabled.
=== modified file 'mysql-test/include/check-testcase.test'
--- a/mysql-test/include/check-testcase.test    2013-12-16 12:02:21 +0000
+++ b/mysql-test/include/check-testcase.test    2014-10-07 14:20:37 +0000
@@ -64,6 +64,7 @@ if ($tmp)
   --echo Master_SSL_Crlpath     #
   --echo Using_Gtid     No
   --echo Gtid_IO_Pos    #
+  --echo Parallel_Mode  domain,groupcommit
 }
 if (!$tmp) {
   # Note: after WL#5177, fields 13-18 shall not be filtered-out.

=== modified file 'mysql-test/r/mysqld--help.result'
--- a/mysql-test/r/mysqld--help.result  2014-09-19 13:25:37 +0000
+++ b/mysql-test/r/mysqld--help.result  2014-10-07 14:20:37 +0000
@@ -915,17 +915,6 @@
  parallel replication thread when reading ahead in the
  relay log looking for opportunities for parallel
  replication. Only used when --slave-parallel-threads > 0.
- --slave-parallel-mode=name
- Controls what transactions are applied in parallel when
- using --slave-parallel-threads. Syntax:
- slave_paralle_mode=value[,value...], where "value" could
- be one or more of: "domain", to apply different
- replication domains in parallel; "groupcommit", to apply
- in parallel transactions that group-committed together on
- the master; "transactional", to optimistically try to
- apply all transactional DML in parallel; and "waiting" to
- extend "transactional" to even transactions that had to
- wait on the master.
  --slave-parallel-threads=#
  If non-zero, number of threads to spawn to apply in
  parallel events on the slave that were group-committed on
@@ -1320,7 +1309,6 @@ slave-exec-mode STRICT
 slave-max-allowed-packet 1073741824
 slave-net-timeout 3600
 slave-parallel-max-queued 131072
-slave-parallel-mode domain,groupcommit
 slave-parallel-threads 0
 slave-skip-errors (No default value)
 slave-sql-verify-checksum TRUE

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result        2014-08-15 09:31:13 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result        2014-10-07 14:20:37 +0000
@@ -1,5 +1,5 @@
-include/rpl_init.inc [topology=1->2]
-SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/master-slave.inc
+[connection master]
 SET GLOBAL slave_parallel_threads=10;
 ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first
 include/stop_slave.inc
@@ -923,8 +923,79 @@ SELECT * FROM t2 WHERE a >= 30 ORDER BY
 32
 33
 34
+*** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx ***
+Parallel_Mode = 'domain,groupcommit'
 include/stop_slave.inc
-SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+CHANGE MASTER TO parallel_mode=(,domain,groupcommit);
+ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'domain,groupcommit)' at line 1
+CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional);
+ERROR HY000: Invalid use of 'transactional' option for PARALLEL_MODE in CHANGE MASTER TO
+CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting);
+ERROR HY000: Invalid use of 'waiting' option for PARALLEL_MODE in CHANGE MASTER TO
+CHANGE MASTER TO parallel_mode=(domain,domain);
+ERROR HY000: Invalid use of 'domain' option for PARALLEL_MODE in CHANGE MASTER TO
+CHANGE MASTER TO parallel_mode=(waiting,transactional,domain);
+Parallel_Mode = 'domain,transactional,waiting'
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+Parallel_Mode = 'domain,groupcommit'
+*** MDEV-6676 - test that empty parallel_mode does not replicate in parallel ***
+INSERT INTO t2 VALUES (40);
+include/save_master_gtid.inc
+CHANGE MASTER TO parallel_mode=();
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply";
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+a
+40
+include/stop_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+*** MDEV-6676 - test disabling domain-based parallel replication ***
+SET gtid_domain_id = 1;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+DELETE FROM t2 WHERE a >= 41;
+SET gtid_domain_id = 2;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+SET gtid_domain_id = 0;
+include/save_master_gtid.inc
+CHANGE MASTER TO parallel_mode=(groupcommit);
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+a
+40
+41
+42
+43
+44
+45
+46
+include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+include/start_slave.inc
+*** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart ***
+Parallel_Mode = 'domain,groupcommit'
+include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(transactional,waiting);
+Parallel_Mode = 'transactional,waiting'
+include/start_slave.inc
+include/rpl_restart_server.inc [server_number=2]
+Parallel_Mode = 'transactional,waiting'
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+include/start_slave.inc
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=0;
 include/start_slave.inc
 SET DEBUG_SYNC= 'RESET';
 DROP function foo;

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result     2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result     2014-10-07 14:20:37 +0000
@@ -2,11 +2,9 @@ include/rpl_init.inc [topology=1->2]
 ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
 CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
 include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);
 INSERT INTO t1 VALUES(1,1);
 BEGIN;
 INSERT INTO t1 VALUES(2,1);
@@ -288,8 +286,8 @@ SET GLOBAL binlog_format= @old_format;
 SET GLOBAL tx_isolation= @old_isolation;
 include/start_slave.inc
 include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
 include/start_slave.inc
 DROP TABLE t1, t2, t3;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result    2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result    2014-10-07 14:20:37 +0000
@@ -6,12 +6,10 @@ INSERT INTO t1 VALUES (1,0), (2,0), (3,0
 INSERT INTO t2 VALUES (1,0), (2,0);
 SET @old_isolation= @@GLOBAL.tx_isolation;
 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
 include/stop_slave.inc
 SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;
 SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional,waiting";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);
 *** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave ***
 INSERT INTO t1 SELECT 4, COUNT(*) FROM t2;
 INSERT INTO t2 SELECT 4, COUNT(*) FROM t1;
@@ -78,8 +76,8 @@ a     b
 10      10
 include/stop_slave.inc
 SET GLOBAL tx_isolation= @old_isolation;
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
 include/start_slave.inc
 DROP TABLE t1, t2;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test  2014-08-15 09:31:13 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test  2014-10-07 14:20:37 +0000
@@ -1,13 +1,12 @@
 --source include/have_innodb.inc
 --source include/have_debug.inc
 --source include/have_debug_sync.inc
---let $rpl_topology=1->2
---source include/rpl_init.inc
+--source include/master-slave.inc

 # Test various aspects of parallel replication.

 --connection server_2
-SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--let $old_parallel_threads=`SELECT @@GLOBAL.slave_parallel_threads`
 --error ER_SLAVE_MUST_STOP
 SET GLOBAL slave_parallel_threads=10;
 --source include/stop_slave.inc
@@ -1466,9 +1465,100 @@ SET sql_slave_skip_counter= 1;
 SELECT * FROM t2 WHERE a >= 30 ORDER BY a;


+--echo *** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx ***
+--connection server_2
+
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+--source include/stop_slave.inc
+--error ER_PARSE_ERROR
+CHANGE MASTER TO parallel_mode=(,domain,groupcommit);
+--error ER_INVALID_SLAVE_PARALLEL_MODE
+CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional);
+--error ER_INVALID_SLAVE_PARALLEL_MODE
+CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting);
+--error ER_INVALID_SLAVE_PARALLEL_MODE
+CHANGE MASTER TO parallel_mode=(domain,domain);
+CHANGE MASTER TO parallel_mode=(waiting,transactional,domain);
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+
+
+--echo *** MDEV-6676 - test that empty parallel_mode does not replicate in parallel ***
+--connection server_1
+INSERT INTO t2 VALUES (40);
+--source include/save_master_gtid.inc
+
+--connection server_2
+CHANGE MASTER TO parallel_mode=();
+# Test that we do not use parallel apply, by injecting an unconditional
+# crash in the parallel apply code.
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply";
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+--source include/stop_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+
+
+--echo *** MDEV-6676 - test disabling domain-based parallel replication ***
+--connection server_1
+# Let's do a bunch of transactions that will conflict if run out-of-order in
+# domain-based parallel replication mode.
+SET gtid_domain_id = 1;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+DELETE FROM t2 WHERE a >= 41;
+SET gtid_domain_id = 2;
+INSERT INTO t2 VALUES (41);
+INSERT INTO t2 VALUES (42);
+INSERT INTO t2 VALUES (43);
+INSERT INTO t2 VALUES (44);
+INSERT INTO t2 VALUES (45);
+INSERT INTO t2 VALUES (46);
+SET gtid_domain_id = 0;
+--source include/save_master_gtid.inc
+--connection server_2
+CHANGE MASTER TO parallel_mode=(groupcommit);
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+--source include/start_slave.inc
+
+
+--echo *** MDEV-6676 - test that parallel mode is saved correctly in master.info across server restart ***
+--connection server_2
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+--source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(transactional,waiting);
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+--source include/start_slave.inc
+
+--let $rpl_server_number= 2
+--source include/rpl_restart_server.inc
+
+--connection server_2
+--let $status_items= Parallel_Mode
+--source include/show_slave_status.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
+--source include/start_slave.inc
+
+
 --connection server_2
 --source include/stop_slave.inc
-SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+eval SET GLOBAL slave_parallel_threads=$old_parallel_threads;
 --source include/start_slave.inc
 SET DEBUG_SYNC= 'RESET';


=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test       2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test       2014-10-07 14:20:37 +0000
@@ -11,11 +11,9 @@ CREATE TABLE t1 (a int PRIMARY KEY, b IN
 --connection server_2
 --sync_with_master
 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
 --source include/stop_slave.inc
 SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional);


 --connection server_1
@@ -309,8 +307,8 @@ SET GLOBAL tx_isolation= @old_isolation;

 --connection server_2
 --source include/stop_slave.inc
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
 --source include/start_slave.inc

 --connection server_1

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test      2014-09-24 19:25:10 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test      2014-10-07 14:20:37 +0000
@@ -16,12 +16,10 @@ INSERT INTO t2 VALUES (1,0), (2,0);
 --sync_with_master
 SET @old_isolation= @@GLOBAL.tx_isolation;
 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
-SET @old_mode=@@GLOBAL.slave_parallel_mode;
 --source include/stop_slave.inc
 SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;
 SET GLOBAL slave_parallel_threads=10;
-SET GLOBAL slave_parallel_mode="domain,transactional,waiting";
-CHANGE MASTER TO master_use_gtid=slave_pos;
+CHANGE MASTER TO master_use_gtid=slave_pos, parallel_mode=(domain,transactional,waiting);


 --echo *** Test that we replicate correctly when using READ COMMITTED and --log-slave-updates=0 on the slave ***
@@ -63,8 +61,8 @@ SELECT * FROM t2 ORDER BY a;
 --connection server_2
 --source include/stop_slave.inc
 SET GLOBAL tx_isolation= @old_isolation;
+CHANGE MASTER TO parallel_mode=(domain,groupcommit);
 SET GLOBAL slave_parallel_threads=@old_parallel_threads;
-SET GLOBAL slave_parallel_mode=@old_mode;
 --source include/start_slave.inc

 --connection server_1

=== modified file 'sql/lex.h'
--- a/sql/lex.h 2014-02-02 09:00:36 +0000
+++ b/sql/lex.h 2014-10-07 14:20:37 +0000
@@ -194,6 +194,7 @@ static SYMBOL symbols[] = {
   { "DISTINCTROW",      SYM(DISTINCT)}, /* Access likes this */
   { "DIV",              SYM(DIV_SYM)},
   { "DO",               SYM(DO_SYM)},
+  { "DOMAIN",           SYM(DOMAIN_SYM)},
   { "DOUBLE",           SYM(DOUBLE_SYM)},
   { "DROP",             SYM(DROP)},
   { "DUAL",             SYM(DUAL_SYM)},
@@ -257,6 +258,7 @@ static SYMBOL symbols[] = {
   { "GRANT",            SYM(GRANT)},
   { "GRANTS",           SYM(GRANTS)},
   { "GROUP",            SYM(GROUP_SYM)},
+  { "GROUPCOMMIT",      SYM(GROUPCOMMIT_SYM)},
   { "HANDLER",          SYM(HANDLER_SYM)},
   { "HARD",             SYM(HARD_SYM)},
   { "HASH",             SYM(HASH_SYM)},
@@ -429,6 +431,7 @@ static SYMBOL symbols[] = {
   { "PACK_KEYS",        SYM(PACK_KEYS_SYM)},
   { "PAGE",             SYM(PAGE_SYM)},
   { "PAGE_CHECKSUM",    SYM(PAGE_CHECKSUM_SYM)},
+  { "PARALLEL_MODE",    SYM(PARALLEL_MODE_SYM)},
   { "PARSER",           SYM(PARSER_SYM)},
   { "PARSE_VCOL_EXPR",  SYM(PARSE_VCOL_EXPR_SYM)},
   { "PARTIAL",          SYM(PARTIAL)},
@@ -650,6 +653,7 @@ static SYMBOL symbols[] = {
   { "VIEW",             SYM(VIEW_SYM)},
   { "VIRTUAL",          SYM(VIRTUAL_SYM)},
   { "WAIT",             SYM(WAIT_SYM)},
+  { "WAITING",          SYM(WAITING_SYM)},
   { "WARNINGS",         SYM(WARNINGS)},
   { "WEEK",             SYM(WEEK_SYM)},
   { "WEIGHT_STRING",    SYM(WEIGHT_STRING_SYM)},

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc  2014-09-25 13:47:58 +0000
+++ b/sql/log_event.cc  2014-10-07 14:20:37 +0000
@@ -12728,7 +12728,7 @@ bool rpl_get_position_info(const char **
   return FALSE;
 #else
   const Relay_log_info *rli= &(active_mi->rli);
-  if (opt_slave_parallel_threads == 0)
+  if (!rli->mi->using_parallel())
   {
     *log_file_name= rli->group_master_log_name;
     *log_pos= rli->group_master_log_pos +

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc     2014-09-26 10:37:19 +0000
+++ b/sql/mysqld.cc     2014-10-07 14:20:37 +0000
@@ -562,8 +562,6 @@ ulong stored_program_cache_size= 0;

 ulong opt_slave_parallel_threads= 0;
 ulong opt_slave_domain_parallel_threads= 0;
-ulonglong opt_slave_parallel_mode=
-  SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_GROUPCOMMIT;
 ulong opt_binlog_commit_wait_count= 0;
 ulong opt_binlog_commit_wait_usec= 0;
 ulong opt_slave_parallel_max_queued= 131072;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h      2014-09-26 10:37:19 +0000
+++ b/sql/mysqld.h      2014-10-07 14:20:37 +0000
@@ -187,7 +187,6 @@ extern ulong stored_program_cache_size;
 extern ulong opt_slave_parallel_threads;
 extern ulong opt_slave_domain_parallel_threads;
 extern ulong opt_slave_parallel_max_queued;
-extern ulonglong opt_slave_parallel_mode;
 extern ulong opt_binlog_commit_wait_count;
 extern ulong opt_binlog_commit_wait_usec;
 extern my_bool opt_gtid_ignore_duplicates;

=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc     2014-09-02 12:07:01 +0000
+++ b/sql/rpl_mi.cc     2014-10-07 14:20:37 +0000
@@ -40,7 +40,8 @@ Master_info::Master_info(LEX_STRING *con
    heartbeat_period(0), received_heartbeats(0), master_id(0),
    prev_master_id(0),
    using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0),
-   gtid_reconnect_event_skip_count(0), gtid_event_seen(false)
+   gtid_reconnect_event_skip_count(0), gtid_event_seen(false),
+   parallel_mode(SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT)
 {
   host[0] = 0; user[0] = 0; password[0] = 0;
   ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
@@ -178,6 +179,7 @@ void init_master_log_pos(Master_info* mi
   mi->events_queued_since_last_gtid= 0;
   mi->gtid_reconnect_event_skip_count= 0;
   mi->gtid_event_seen= false;
+  mi->parallel_mode= SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT;

   /* Intentionally init ssl_verify_server_cert to 0, no option available  */
   mi->ssl_verify_server_cert= 0;
@@ -514,6 +516,22 @@ file '%s')", fname);
             else
               mi->using_gtid= Master_info::USE_GTID_NO;
           }
+          else if (!strncmp(buf, STRING_WITH_LEN("parallel_mode=")))
+          {
+            int val= atoi(buf + (sizeof("parallel_mode=") - 1));
+            mi->parallel_mode= val & (SLAVE_PARALLEL_DOMAIN |
+                                      SLAVE_PARALLEL_GROUPCOMMIT |
+                                      SLAVE_PARALLEL_TRX |
+                                      SLAVE_PARALLEL_WAITING);
+          }
+          else if (!strncmp(buf, STRING_WITH_LEN("END_MARKER")))
+          {
+            /*
+              Guard agaist extra left-overs at the end of file, in case a later
+              update causes the file to shrink compared to earlier contents.
+            */
+            break;
+          }
         }
       }
     }
@@ -657,7 +675,9 @@ int flush_master_info(Master_info* mi,
   my_b_printf(file,
               "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n"
               "\n\n\n\n\n\n\n\n\n\n\n"
-              "using_gtid=%d\n",
+              "using_gtid=%d\n"
+              "parallel_mode=%u\n"
+              "END_MARKER\n",
               LINES_IN_MASTER_INFO,
               mi->master_log_name, llstr(mi->master_log_pos, lbuf),
               mi->host, mi->user,
@@ -666,7 +686,7 @@ int flush_master_info(Master_info* mi,
               mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
               heartbeat_buf, "", ignore_server_ids_buf,
               "", 0,
-              mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid);
+              mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid, mi->parallel_mode);
   my_free(ignore_server_ids_buf);
   err= flush_io_cache(file);
   if (sync_masterinfo_period && !err &&

=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h      2014-09-02 12:07:01 +0000
+++ b/sql/rpl_mi.h      2014-10-07 14:20:37 +0000
@@ -75,6 +75,10 @@ class Master_info : public Slave_reporti
     return connection_name.str == 0;
   }
   static const char *using_gtid_astext(enum enum_using_gtid arg);
+  bool using_parallel()
+  {
+    return opt_slave_parallel_threads > 0 && parallel_mode != 0;
+  }

   /* the variables below are needed because we can change masters on the fly */
   char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
@@ -178,6 +182,12 @@ class Master_info : public Slave_reporti
   uint64 gtid_reconnect_event_skip_count;
   /* gtid_event_seen is false until we receive first GTID event from master. */
   bool gtid_event_seen;
+  /*
+    The parallel replication modes, if any. A combination (binary OR) of any of
+    SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, SLAVE_PARALLEL_TRX, and
+    SLAVE_PARALLEL_WAITING.
+  */
+  uint32 parallel_mode;
 };
 int init_master_info(Master_info* mi, const char* master_info_fname,
                      const char* slave_info_fname,

=== modified file 'sql/rpl_parallel.cc'
--- a/sql/rpl_parallel.cc       2014-10-03 09:43:23 +0000
+++ b/sql/rpl_parallel.cc       2014-10-07 14:20:37 +0000
@@ -1891,6 +1891,7 @@ rpl_parallel::do_event(rpl_group_info *s
   bool did_enter_cond= false;
   PSI_stage_info old_stage;

+  DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
   /* Handle master log name change, seen in Rotate_log_event. */
   typ= ev->get_type_code();
   if (unlikely(typ == ROTATE_EVENT))
@@ -1971,7 +1972,8 @@ rpl_parallel::do_event(rpl_group_info *s
   if (typ == GTID_EVENT)
   {
     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
-    uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+    uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
+                       !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ?
                        0 : gtid_ev->domain_id);
     if (!(e= find(domain_id)))
     {
@@ -2012,7 +2014,7 @@ rpl_parallel::do_event(rpl_group_info *s
   {
     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
     bool new_gco;
-    ulonglong mode= opt_slave_parallel_mode;
+    ulonglong mode= rli->mi->parallel_mode;
     uchar gtid_flags= gtid_ev->flags2;
     group_commit_orderer *gco;
     uint8 force_switch_flag;

=== modified file 'sql/rpl_parallel.h'
--- a/sql/rpl_parallel.h        2014-10-03 09:43:23 +0000
+++ b/sql/rpl_parallel.h        2014-10-07 14:20:37 +0000
@@ -12,7 +12,11 @@ class Relay_log_info;
 struct inuse_relaylog;


-/* Bit masks for the values in --slave-parallel-mode. */
+/*
+  Bit masks for the values in --slave-parallel-mode.
+  Note that these values cannot be changed - they are stored in master.info,
+  so need to be possible to read back in a different version of the server.
+*/
 #define SLAVE_PARALLEL_DOMAIN      (1ULL << 0)
 #define SLAVE_PARALLEL_GROUPCOMMIT (1ULL << 1)
 #define SLAVE_PARALLEL_TRX         (1ULL << 2)

=== modified file 'sql/share/errmsg-utf8.txt'
--- a/sql/share/errmsg-utf8.txt 2014-07-08 17:38:26 +0000
+++ b/sql/share/errmsg-utf8.txt 2014-10-07 14:20:37 +0000
@@ -7111,3 +7111,5 @@ ER_SLAVE_SKIP_NOT_IN_GTID
         eng "When using GTID, @@sql_slave_skip_counter can not be used. Instead, setting @@gtid_slave_pos explicitly can be used to skip to after a given GTID position."
 ER_TABLE_DEFINITION_TOO_BIG
         eng "The definition for table %`s is too big"
+ER_INVALID_SLAVE_PARALLEL_MODE
+        eng "Invalid use of '%s' option for PARALLEL_MODE in CHANGE MASTER TO"

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc      2014-09-02 12:07:01 +0000
+++ b/sql/slave.cc      2014-10-07 14:20:37 +0000
@@ -625,8 +625,7 @@ int terminate_slave_threads(Master_info*
   if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
   {
     DBUG_PRINT("info",("Terminating SQL thread"));
-    if (opt_slave_parallel_threads > 0 &&
-        mi->rli.abort_slave && mi->rli.stop_for_until)
+    if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until)
     {
       mi->rli.stop_for_until= false;
       mi->rli.parallel.stop_during_until();
@@ -2576,6 +2575,8 @@ static bool send_show_master_info_header
   field_list.push_back(new Item_empty_string("Using_Gtid",
                                              sizeof("Current_Pos")-1));
   field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30));
+  field_list.push_back(new Item_empty_string("Parallel_Mode",
+                   sizeof("domain,groupcommit,transactional,waiting")-1));
   if (full)
   {
     field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2708,8 +2709,7 @@ static bool send_show_master_info_data(T
       else
       {
         idle= mi->rli.sql_thread_caught_up;
-        if (opt_slave_parallel_threads > 0 && idle &&
-            !mi->rli.parallel.workers_idle())
+        if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle())
           idle= false;
       }
       if (idle)
@@ -2786,13 +2786,34 @@ static bool send_show_master_info_data(T
     protocol->store(mi->ssl_ca, &my_charset_bin);
     // Master_Ssl_Crlpath
     protocol->store(mi->ssl_capath, &my_charset_bin);
+    // Using_Gtid
     protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin);
+    // Gtid_IO_Pos
     {
       char buff[30];
       String tmp(buff, sizeof(buff), system_charset_info);
       mi->gtid_current_pos.to_string(&tmp);
       protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
     }
+    // Parallel_Mode
+    {
+      /* Note how sizeof("domain") has room for "domain," due to traling 0. */
+      char buf[sizeof("domain") + sizeof("groupcommit") +
+               sizeof("transactional") + sizeof("waiting") + 1];
+      char *p= buf;
+      uint32 mode= mi->parallel_mode;
+      if (mode & SLAVE_PARALLEL_DOMAIN)
+        p= strmov(p, "domain,");
+      if (mode & SLAVE_PARALLEL_GROUPCOMMIT)
+        p= strmov(p, "groupcommit,");
+      if (mode & SLAVE_PARALLEL_TRX)
+        p= strmov(p, "transactional,");
+      if (mode & SLAVE_PARALLEL_WAITING)
+        p= strmov(p, "waiting,");
+      if (p != buf)
+        --p;                                    // Discard last ','
+      protocol->store(buf, p-buf, &my_charset_bin);
+    }
     if (full)
     {
       protocol->store((uint32)    mi->rli.retried_trans);
@@ -3521,7 +3542,7 @@ static int exec_relay_log_event(THD* thd

     update_state_of_relay_log(rli, ev);

-    if (opt_slave_parallel_threads > 0)
+    if (rli->mi->using_parallel())
     {
       int res= rli->parallel.do_event(serial_rgi, ev, event_size);
       if (res >= 0)
@@ -4667,7 +4688,7 @@ log '%s' at position %s, relay log '%s'
     }
   }

-  if (opt_slave_parallel_threads > 0)
+  if (mi->using_parallel())
     rli->parallel.wait_for_done(thd, rli);

   /* Thread stopped. Print the current replication position to the log */
@@ -4693,7 +4714,7 @@ log '%s' at position %s, relay log '%s'
     (We want the first one to be before the printout of stop position to
     get the correct position printed.)
   */
-  if (opt_slave_parallel_threads > 0)
+  if (mi->using_parallel())
     rli->parallel.wait_for_done(thd, rli);

   /*
@@ -6314,7 +6335,7 @@ static Log_event* next_event(rpl_group_i
                           llstr(my_b_tell(cur_log),llbuf1),
                           llstr(rli->event_relay_log_pos,llbuf2)));
       DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
-      DBUG_ASSERT(opt_slave_parallel_threads > 0 ||
+      DBUG_ASSERT(rli->mi->using_parallel() ||
                   my_b_tell(cur_log) == rli->event_relay_log_pos);
     }
 #endif

=== modified file 'sql/sql_lex.h'
--- a/sql/sql_lex.h     2014-08-07 16:06:56 +0000
+++ b/sql/sql_lex.h     2014-10-07 14:20:37 +0000
@@ -218,11 +218,18 @@ struct LEX_MASTER_INFO
   uint port, connect_retry;
   float heartbeat_period;
   /*
+    Modes of parallel replication enabled, if any. A combination (binary OR) of
+    any of SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT,
+    SLAVE_PARALLEL_TRX, and SLAVE_PARALLEL_WAITING.
+  */
+  uint32 repl_parallel_mode;
+  /*
     Enum is used for making it possible to detect if the user
     changed variable or if it should be left at old value
    */
   enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
-    ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt;
+    ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt,
+    repl_parallel_mode_opt;
   enum {
     LEX_GTID_UNCHANGED, LEX_GTID_NO, LEX_GTID_CURRENT_POS, LEX_GTID_SLAVE_POS
   } use_gtid_opt;
@@ -241,10 +248,11 @@ struct LEX_MASTER_INFO
     pos= relay_log_pos= server_id= port= connect_retry= 0;
     heartbeat_period= 0;
     ssl= ssl_verify_server_cert= heartbeat_opt=
-      repl_ignore_server_ids_opt= LEX_MI_UNCHANGED;
+      repl_ignore_server_ids_opt= repl_parallel_mode_opt= LEX_MI_UNCHANGED;
     gtid_pos_str.length= 0;
     gtid_pos_str.str= NULL;
     use_gtid_opt= LEX_GTID_UNCHANGED;
+    repl_parallel_mode= 0;
   }
 };


=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc   2014-08-07 16:06:56 +0000
+++ b/sql/sql_repl.cc   2014-10-07 14:20:37 +0000
@@ -3434,6 +3434,9 @@ bool change_master(THD* thd, Master_info
            lex_mi->relay_log_name || lex_mi->relay_log_pos)
     mi->using_gtid= Master_info::USE_GTID_NO;

+  if (lex_mi->repl_parallel_mode_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+    mi->parallel_mode= lex_mi->repl_parallel_mode;
+
   /*
     If user did specify neither host nor port nor any log name nor any log
     pos, i.e. he specified only user/password/master_connect_retry, he probably

=== modified file 'sql/sql_yacc.yy'
--- a/sql/sql_yacc.yy   2014-09-03 14:24:31 +0000
+++ b/sql/sql_yacc.yy   2014-10-07 14:20:37 +0000
@@ -1130,6 +1130,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
 %token  DIV_SYM
 %token  DOUBLE_SYM                    /* SQL-2003-R */
 %token  DO_SYM
+%token  DOMAIN_SYM
 %token  DROP                          /* SQL-2003-R */
 %token  DUAL_SYM
 %token  DUMPFILE
@@ -1194,6 +1195,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
 %token  GRANT                         /* SQL-2003-R */
 %token  GRANTS
 %token  GROUP_SYM                     /* SQL-2003-R */
+%token  GROUPCOMMIT_SYM
 %token  GROUP_CONCAT_SYM
 %token  GT_SYM                        /* OPERATOR */
 %token  HANDLER_SYM
@@ -1376,6 +1378,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
 %token  PACK_KEYS_SYM
 %token  PAGE_SYM
 %token  PAGE_CHECKSUM_SYM
+%token  PARALLEL_MODE_SYM
 %token  PARAM_MARKER
 %token  PARSER_SYM
 %token  PARSE_VCOL_EXPR_SYM
@@ -1604,6 +1607,7 @@ bool my_yyoverflow(short **a, YYSTYPE **
 %token  VIEW_SYM                      /* SQL-2003-N */
 %token  VIRTUAL_SYM
 %token  WAIT_SYM
+%token  WAITING_SYM
 %token  WARNINGS
 %token  WEEK_SYM
 %token  WEIGHT_STRING_SYM
@@ -2244,6 +2248,14 @@ rule: <-- starts at col 1
           {
             Lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
            }
+        | PARALLEL_MODE_SYM EQ '(' ')'
+          {
+            Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_DISABLE;
+          }
+        | PARALLEL_MODE_SYM EQ '(' parallel_mode_list ')'
+          {
+            Lex->mi.repl_parallel_mode_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
+          }
         |
         master_file_def
         ;
@@ -2260,6 +2272,52 @@ rule: <-- starts at col 1
             insert_dynamic(&Lex->mi.repl_ignore_server_ids, (uchar*) &($1));
           }

+parallel_mode_list:
+            parallel_mode_option
+          | parallel_mode_list ',' parallel_mode_option
+          ;
+
+parallel_mode_option:
+            DOMAIN_SYM
+            {
+              if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_DOMAIN)
+              {
+                my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "domain");
+                MYSQL_YYABORT;
+              }
+              Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_DOMAIN;
+            }
+          | GROUPCOMMIT_SYM
+            {
+              if (Lex->mi.repl_parallel_mode &
+                  (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX))
+              {
+                my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "groupcommit");
+                MYSQL_YYABORT;
+              }
+              Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_GROUPCOMMIT;
+            }
+          | TRANSACTIONAL_SYM
+            {
+              if (Lex->mi.repl_parallel_mode &
+                  (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX))
+              {
+                my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "transactional");
+                MYSQL_YYABORT;
+              }
+              Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_TRX;
+            }
+          | WAITING_SYM
+            {
+              if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_WAITING)
+              {
+                my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "waiting");
+                MYSQL_YYABORT;
+              }
+              Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_WAITING;
+            }
+
+
 master_file_def:
           MASTER_LOG_FILE_SYM EQ TEXT_STRING_sys
           {
@@ -14098,6 +14156,7 @@ user: user_maybe_role
         | DISABLE_SYM              {}
         | DISCARD                  {}
         | DISK_SYM                 {}
+        | DOMAIN_SYM               {}
         | DUMPFILE                 {}
         | DUPLICATE_SYM            {}
         | DYNAMIC_SYM              {}
@@ -14131,6 +14190,7 @@ user: user_maybe_role
         | GET_FORMAT               {}
         | GRANTS                   {}
         | GLOBAL_SYM               {}
+        | GROUPCOMMIT_SYM         {}
         | HASH_SYM                 {}
         | HARD_SYM                 {}
         | HOSTS_SYM                {}
@@ -14221,6 +14281,7 @@ user: user_maybe_role
         | ONLY_SYM                 {}
         | PACK_KEYS_SYM            {}
         | PAGE_SYM                 {}
+        | PARALLEL_MODE_SYM        {}
         | PARTIAL                  {}
         | PARTITIONING_SYM         {}
         | PARTITIONS_SYM           {}
@@ -14336,6 +14397,7 @@ user: user_maybe_role
         | VALUE_SYM                {}
         | WARNINGS                 {}
         | WAIT_SYM                 {}
+        | WAITING_SYM              {}
         | WEEK_SYM                 {}
         | WEIGHT_STRING_SYM        {}
         | WORK_SYM                 {}

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc   2014-09-19 13:25:37 +0000
+++ b/sql/sys_vars.cc   2014-10-07 14:20:37 +0000
@@ -1834,31 +1834,11 @@ static Sys_var_ulong Sys_slave_parallel_
        VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));


-static const char *slave_parallel_mode_names[] = {
-  "domain", "groupcommit", "transactional", "waiting", NULL
-};
-
-static Sys_var_set Sys_slave_parallel_mode(
-       "slave_parallel_mode",
-       "Controls what transactions are applied in parallel when using "
-       "--slave-parallel-threads. Syntax: slave_paralle_mode=value[,value...], "
-       "where \"value\" could be one or more of: \"domain\", to apply "
-       "different replication domains in parallel; \"groupcommit\", to apply "
-       "in parallel transactions that group-committed together on the master; "
-       "\"transactional\", to optimistically try to apply all transactional "
-       "DML in parallel; and \"waiting\" to extend \"transactional\" to "
-       "even transactions that had to wait on the master.",
-       GLOBAL_VAR(opt_slave_parallel_mode), CMD_LINE(REQUIRED_ARG),
-       slave_parallel_mode_names,
-       DEFAULT(SLAVE_PARALLEL_DOMAIN |
-               SLAVE_PARALLEL_GROUPCOMMIT));
-
-
 static Sys_var_bit Sys_replicate_allow_parallel(
        "replicate_allow_parallel",
        "If set when a transaction is written to the binlog, that transaction "
-       "is allowed to replicate in parallel on a slave where "
-       "slave_parallel_mode is set to \"transactional\". Can be cleared for "
+       "is allowed to replicate in parallel on a slave where parallel_mode "
+       "is set to \"transactional\" (in CHANGE MASTER). Can be cleared for "
        "transactions that are likely to cause a conflict if replicated in "
        "parallel, to avoid unnecessary rollback and retry.",
        SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_ALLOW_PARALLEL,

_______________________________________________
commits mailing list
commits@mariadb.org
https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits