developers
Threads by month
- ----- 2025 -----
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2009 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- 5 participants
- 6819 discussions
[Maria-developers] Step#6: MDEV-7950 Item_func::type() takes 0.26% in OLTP RO
by Alexander Barkov 19 May '15
by Alexander Barkov 19 May '15
19 May '15
Hi Sergey,
Please review the next iteration for MDEV-7950.
This one splits the function add_key_fields() into a method in Item.
This change removes about 3 virtual calls item->type(), as well as some
virtual calls item_func->functype(), and adds one virtual call
item->add_key_fields() instead.
Thanks.
2
2
Re: [Maria-developers] [Commits] 8b30c0f: MDEV-7949: Item_field::used_tables() takes 0.29% in OLTP RO
by Sergey Vojtovich 19 May '15
by Sergey Vojtovich 19 May '15
19 May '15
Hi Sanja,
There're 3 bug fixes and 4 revisions concerning these lines:
https://bugs.mysql.com/bug.php?id=16716
git: 1991a87d831bb48b31989c622e35acda11df7fef
git: ca22a81b1c84ce81e1e9ea2c3ace7be1848027d8 (source for all used_tables())
https://bugs.mysql.com/bug.php?id=36488
git: 66367aeea83a0cd7f583a0194651a235b63975a9
https://bugs.mysql.com/bug.php?id=46815
git: 10406ae65871de074e807e626f9ede686e9321d4
If we don't have these bugs, why not to revert all the code introduced by these
patches?
Or we're not affected by the need to use used_tables() as an indication of
prepared statement? If so, why?
Thanks,
Sergey
On Sun, May 17, 2015 at 03:10:48PM +0200, sanja(a)mariadb.com wrote:
> revision-id: 8b30c0f26c7b35fc73429f85aa35eb3614d60bd8
> parent(s): b2db8e85422a455a10cd28ef7e44977182528f70
> committer: Oleksandr Byelkin
> branch nick: server
> timestamp: 2015-05-17 15:10:45 +0200
> message:
>
> MDEV-7949: Item_field::used_tables() takes 0.29% in OLTP RO
>
> Part 2: removed hack workaround for bug we do not have.
>
> ---
> sql/item_strfunc.cc | 7 +++----
> 1 file changed, 3 insertions(+), 4 deletions(-)
>
> diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc
> index 4bf8dd5..7b9169e 100644
> --- a/sql/item_strfunc.cc
> +++ b/sql/item_strfunc.cc
> @@ -626,8 +626,7 @@ String *Item_func_concat::val_str(String *str)
> if (!(res=args[0]->val_str(str)))
> goto null;
> use_as_buff= &tmp_value;
> - /* Item_subselect in --ps-protocol mode will state it as a non-const */
> - is_const= args[0]->const_item() || !args[0]->used_tables();
> + is_const= args[0]->const_item();
> for (i=1 ; i < arg_count ; i++)
> {
> if (res->length() == 0)
> @@ -639,7 +638,7 @@ String *Item_func_concat::val_str(String *str)
> non-empty argument. Because of this we need is_const to be
> evaluated only for it.
> */
> - is_const= args[i]->const_item() || !args[i]->used_tables();
> + is_const= args[i]->const_item();
> }
> else
> {
> @@ -975,7 +974,7 @@ String *Item_func_concat_ws::val_str(String *str)
> for (i=1; i < arg_count; i++)
> if ((res= args[i]->val_str(str)))
> {
> - is_const= args[i]->const_item() || !args[i]->used_tables();
> + is_const= args[i]->const_item();
> break;
> }
>
> _______________________________________________
> commits mailing list
> commits(a)mariadb.org
> https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
1
0
[Maria-developers] MDEV-7937: Enforce SSL when --ssl client option is used
by Vicențiu Ciorbaru 18 May '15
by Vicențiu Ciorbaru 18 May '15
18 May '15
Hi Sergei!
I've done some work on this issue. I've read MySQL's implementation of this
and have looked at our implementation. They have done a bit of refactoring,
introducing an enforce_ssl flag, as well as changing the C interface a bit,
to allow setting this flag programatically.
I've created a patch here that changes the minimum amount possible, in
order to implement what MDEV-7937 requires. That being said, I agree with
(most of) MySQL's refactoring in this case. They've moved all the SSL
related connection code into its own separate function, before actually
calling send_client_reply_packet.
I can work towards implementing things the way MySQL does, but since I saw
that you've actually done most of the work in that area of the code, I
figured I'd ask for your input on it.
There are two more things that I'm not sure of:
1. Specifying --ssl as a command line parameter to the mysql client is not
enough to enforce ssl and the client's code in this case just ignores the
option. We need to provide at least one of the additional ones like
--ssl-key or --ssl-ca. My patch will not cause the client to report an
error in this case. Is this acceptable behaviour or not?
2. Do we want mysql's enforce_ssl feature?
Regards,
Vicențiu
2
1
18 May '15
Hi Jonas,
Thanks for the patch!
I read through the patch. I am not that familiar with the semi-sync plugin
code, but it looks reasonable.
But the patch causes a test failure in sys_vars.all_vars. This is because
not all newly introduced variables have corresponding test cases in
mysql-test/suite/sys_vars/t/*_basic.test.
Also, do you have some documentation for the feature? (There is a bit in the
Jira entry, but for example the new variable
rpl_semi_sync_master_slave_lag_heartbeat_frequency_us seems to be not
described at all...
If you can fix the above two points (and if Buildbot does not point out more
test suite issues), then I will push it into the MariaDB tree.
Thanks,
- Kristian.
> patch has been ported to 10.1
> you can use it under new (also called "3-clause") BSD license
> i have done internal benchmarks...but you are most welcome to do some too.
>
> DESCRIPTION:
> no slave left behind
>
> this patch implements master throttling based on slave lag,
> aka no slave left behind. the core feature works as follows
> 1) the semi-sync-reply is ammended to also report back SQL-thread
> position (aka exec position)
> 2) transactions are not removed from the "active-transaction-list"
> in the semi-sync-master plugin until atleast one slave has reported
> that it has executed this transaction. the slave lag can then
> be estimated by calculating how long the oldest transaction has been
> lingering in the active-transaction-list.
> 3) client-threads are forced to wait before commit until slave lag
> has decreased to acceptable value.
>
> the following variables are introduced on master:
>
> rpl_semi_sync_master_max_slave_lag (global)
> rpl_semi_sync_master_slave_lag_wait_timeout (session)
>
> the following status variables are introduced on master:
>
> rpl_semi_sync_master_slave_lag_wait_sessions
> rpl_semi_sync_master_estimated_slave_lag
> rpl_semi_sync_master_trx_slave_lag_wait_time
> rpl_semi_sync_master_trx_slave_lag_wait_num
> rpl_semi_sync_master_avg_trx_slave_lag_wait_time
>
> the following variables are introduced on slave:
>
> rpl_semi_sync_slave_lag_enabled (global)
>
> in addition to this, 2 optimizations that decreases overhead of semi-sync
> is introduced.
> 1) the idea of this is that if when a slave should send and transaction,
> it checks if it should be semi-synced, but rather
> than semi-sync:ing each transaction (which is done currently) the code
> will skip semi-syncing transaction if there already is newer transactions
> committed. But, since this can mean that semi-syncing is delayed indefinitely
> a cap is set using 2 new master variables:
>
> rpl_semi_sync_master_max_unacked_event_bytes (global)
> rpl_semi_sync_master_max_unacked_event_count (global)
> 2) rpl_semi_sync_master_group_commit which makes the semi-sync
> plugin only semi-sync the last transaction in a group commit.
> diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> new file mode 100644
> index 0000000..f3545a4
> --- /dev/null
> +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> @@ -0,0 +1,150 @@
> +include/master-slave.inc
> +[connection master]
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +Variable_name Value
> +rpl_semi_sync_master_max_slave_lag 10
> +rpl_semi_sync_master_slave_lag_heartbeat_frequency_us 500000
> +rpl_semi_sync_master_slave_lag_wait_timeout 50
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +include/start_slave.inc
> +# create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +#
> +# Check basic behaviour
> +#
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +# Now wait for slave lag to increase to > 0
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +# Now wait for slave lag to increase to > 10s
> +# Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +should_be_1
> +1
> +# Second transaction should now fail. slave_lag is >10s when it commits
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +ERROR HY000: Slave-lag timeout
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to < 10s
> +# And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i f
> +1 1
> +2 2
> +3 3
> +4 4
> +5 5
> +6 6
> +7 7
> +8 8
> +9 9
> +13 d
> +14 e
> +15 f
> +# [ on slave ]
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i f
> +1 1
> +2 2
> +3 3
> +4 4
> +5 5
> +6 6
> +7 7
> +8 8
> +9 9
> +13 d
> +14 e
> +15 f
> +#
> +# Test interaction with READ_ONLY
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# Now wait for slave lag to increase to > 10s
> +# [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# [ on master ]
> +set global read_only = 1;
> +# [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# read-only is check *before* slave lag
> +COMMIT;
> +ERROR HY000: The MariaDB server is running with the --read-only option so it cannot execute this statement
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +set global read_only = 0;
> +#
> +# check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +# Now wait for slave lag to increase to > 0
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400;
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +should_be_1
> +1
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +#
> +# Clean up
> +#
> +# [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +set global rpl_semi_sync_master_enabled = 0;
> +include/start_slave.inc
> +DROP TABLE t1;
> +include/rpl_end.inc
> diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> new file mode 100644
> index 0000000..f9bdbd9
> --- /dev/null
> +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> @@ -0,0 +1,252 @@
> +source include/have_semisync.inc;
> +source include/not_embedded.inc;
> +source include/have_innodb.inc;
> +source include/master-slave.inc;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +--echo # create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +connect (con1,localhost,test,,test);
> +
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +
> +--echo #
> +--echo # Check basic behaviour
> +--echo #
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +
> +--echo # Second transaction should now fail. slave_lag is >10s when it commits
> +--error ER_ERROR_DURING_COMMIT
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to < 10s
> +let $wait_condition= SELECT VARIABLE_VALUE < 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +sync_slave_with_master;
> +
> +--echo # [ on slave ]
> +connection slave;
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +--echo #
> +--echo # Test interaction with READ_ONLY
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +connection con1;
> +--echo # [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +connection master;
> +--echo # [ on master ]
> +set global read_only = 1;
> +
> +connection con1;
> +--echo # [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # read-only is check *before* slave lag
> +--error ER_OPTION_PREVENTS_STATEMENT
> +COMMIT;
> +
> +disconnect con1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +set global read_only = 0;
> +
> +--echo #
> +--echo # check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +--disable_result_log
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400; # 24h
> +
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +--disable_result_log
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo #
> +--echo # Clean up
> +--echo #
> +connection master;
> +--echo # [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 0;
> +
> +connection slave;
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +DROP TABLE t1;
> +sync_slave_with_master;
> +--source include/rpl_end.inc
> diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc
> index 4a80360..fc02b9d 100644
> --- a/plugin/semisync/semisync.cc
> +++ b/plugin/semisync/semisync.cc
> @@ -20,6 +20,7 @@
>
> const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
> const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
> +const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02;
>
>
> const unsigned long Trace::kTraceGeneral = 0x0001;
> @@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040;
>
> const unsigned char ReplSemiSyncBase::kSyncHeader[2] =
> {ReplSemiSyncBase::kPacketMagicNum, 0};
> +
> +const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec =
> + "rpl_semi_sync_slave_report_exec";
> diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h
> index 2857729..78faba9 100644
> --- a/plugin/semisync/semisync.h
> +++ b/plugin/semisync/semisync.h
> @@ -75,13 +75,23 @@ class ReplSemiSyncBase
>
> /* Constants in network packet header. */
> static const unsigned char kPacketMagicNum;
> + /* this event should be semisync acked */
> static const unsigned char kPacketFlagSync;
> + /* this event should be semisync acked including the current SQL position */
> + static const unsigned char kPacketFlagSyncAndReport;
> +
> + /* user variable for enabling exec-pos reporting */
> + static const char* const kRplSemiSyncSlaveReportExec;
> };
>
> /* The layout of a semisync slave reply packet:
> 1 byte for the magic num
> 8 bytes for the binlog positon
> - n bytes for the binlog filename, terminated with a '\0'
> + n bytes for the binlog filename, NOT terminated with a '\0'
> + [ optionally ]
> + 1 byte == 0
> + 8 bytes for the sql-thread position
> + n bytes for the sql-thread filename, terminated with a '\0'
> */
> #define REPLY_MAGIC_NUM_LEN 1
> #define REPLY_BINLOG_POS_LEN 8
> diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
> index c88c162..3ee6e26 100644
> --- a/plugin/semisync/semisync_master.cc
> +++ b/plugin/semisync/semisync_master.cc
> @@ -22,6 +22,9 @@
> #define TIME_MILLION 1000000
> #define TIME_BILLION 1000000000
>
> +/* thd_key for per slave thread state */
> +static MYSQL_THD_KEY_T thd_key;
> +
> /* This indicates whether semi-synchronous replication is enabled. */
> char rpl_semi_sync_master_enabled;
> unsigned long rpl_semi_sync_master_wait_point =
> @@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0;
> unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
> char rpl_semi_sync_master_wait_no_slave = 1;
>
> +unsigned long rpl_semi_sync_master_max_unacked_event_count = 0;
> +unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096;
> +
> +unsigned long rpl_semi_sync_master_slave_lag_clients = 0;
> +unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000;
> +unsigned long rpl_semi_sync_master_max_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> +
> +unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
>
> static int getWaitTime(const struct timespec& start_ts);
>
> @@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
> ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
> ins_node->log_pos_ = log_file_pos;
>
> + {
> + /**
> + * set trans commit time
> + * this is called when writing into binlog, which is not
> + * exactly right, but close enough for our purposes
> + */
> + ins_node->tranx_commit_time_us = my_hrtime().val;
> + }
> +
> if (!trx_front_)
> {
> /* The list is empty. */
> @@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
> return function_exit(kWho, result);
> }
>
> -bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
> - my_off_t log_file_pos)
> +TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name,
> + my_off_t log_file_pos)
> {
> - const char *kWho = "ActiveTranx::is_tranx_end_pos";
> + const char *kWho = "ActiveTranx::lookup_tranx_end_pos";
> function_enter(kWho);
> -
> unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
> TranxNode *entry = trx_htb_[hash_val];
>
> @@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
> }
>
> if (trace_level_ & kTraceDetail)
> - sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
> - log_file_name, (unsigned long)log_file_pos, hash_val);
> + sql_print_information("%s: probe (%s, %lu)", kWho,
> + log_file_name, (unsigned long)log_file_pos);
>
> function_exit(kWho, (entry != NULL));
> - return (entry != NULL);
> + return entry;
> }
>
> -int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
> - my_off_t log_file_pos)
> +int ActiveTranx::clear_active_tranx_nodes()
> {
> - const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
> - TranxNode *new_front;
> + set_new_front(NULL);
> + return 0;
> +}
>
> +void ActiveTranx::set_new_front(TranxNode *new_front)
> +{
> + const char *kWho = "ActiveTranx::set_new_front";
> function_enter(kWho);
>
> - if (log_file_name != NULL)
> - {
> - new_front = trx_front_;
> -
> - while (new_front)
> - {
> - if (compare(new_front, log_file_name, log_file_pos) > 0)
> - break;
> - new_front = new_front->next_;
> - }
> - }
> - else
> - {
> - /* If log_file_name is NULL, clear everything. */
> - new_front = NULL;
> - }
> -
> if (new_front == NULL)
> {
> /* No active transaction nodes after the call. */
> @@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
> trx_front_ = NULL;
> trx_rear_ = NULL;
> }
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: cleared all nodes", kWho);
> }
> @@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
>
> trx_front_ = new_front;
> allocator_.free_nodes_before(trx_front_);
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
> kWho, n_frees,
> trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
> }
> + function_exit(kWho, 0);
> +}
>
> - return function_exit(kWho, 0);
> +bool ActiveTranx::prune_active_tranx_nodes(
> + LogPosPtr pos,
> + ulonglong *oldest_tranx_commit_time_us)
> +{
> + TranxNode *old_front = trx_front_;
> + TranxNode *new_front;
> +
> + new_front = trx_front_;
> + while (new_front)
> + {
> + if (compare(new_front, pos.file_name, pos.file_pos) > 0)
> + break;
> + new_front = new_front->next_;
> + }
> +
> + set_new_front(new_front);
> +
> + if (oldest_tranx_commit_time_us)
> + {
> + if (trx_front_ == NULL)
> + *oldest_tranx_commit_time_us = 0;
> + else
> + *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us;
> + }
> +
> + return ! (old_front == trx_front_);
> }
>
>
> @@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
> wait_file_pos_(0),
> master_enabled_(false),
> wait_timeout_(0L),
> - state_(0)
> + state_(0),
> + oldest_unapplied_tranx_commit_time_us_(0)
> {
> strcpy(reply_file_name_, "");
> strcpy(wait_file_name_, "");
> @@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject()
> mysql_cond_init(key_ss_cond_COND_binlog_send_,
> &COND_binlog_send_, NULL);
>
> + /* Mutex initialization can only be done after MY_INIT(). */
> + mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_,
> + &LOCK_slave_lag_, MY_MUTEX_INIT_FAST);
> + mysql_cond_init(key_ss_cond_COND_slave_lag_,
> + &COND_slave_lag_, NULL);
> +
> if (rpl_semi_sync_master_enabled)
> result = enableMaster();
> else
> result = disableMaster();
>
> + thd_key_create(&thd_key);
> +
> return result;
> }
>
> @@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup()
> {
> mysql_mutex_destroy(&LOCK_binlog_);
> mysql_cond_destroy(&COND_binlog_send_);
> + mysql_mutex_destroy(&LOCK_slave_lag_);
> + mysql_cond_destroy(&COND_slave_lag_);
> init_done_= 0;
> }
>
> @@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave()
> {
> lock();
> rpl_semi_sync_master_clients++;
> + if (has_semi_sync_slave_lag())
> + rpl_semi_sync_master_slave_lag_clients++;
> unlock();
> +
> + if (has_semi_sync_slave_lag())
> + {
> + int null_val = 0;
> + longlong new_val =
> + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000;
> + longlong old_val = new_val + 1;
> +
> + get_user_var_int("master_heartbeat_period", &old_val, &null_val);
> + if (old_val > new_val || null_val)
> + {
> + /* if there no old value or it's bigger than what we want */
> + int res = set_user_var_int("master_heartbeat_period",new_val, &old_val);
> + if (res == -1)
> + {
> + sql_print_error(
> + "Repl_semi_sync::failed to set master_heartbeat_period");
> + }
> + }
> + }
> +
> + /**
> + * create per slave-state and store it in thread-local-storage */
> + ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState;
> + thd_setspecific(current_thd, thd_key, state);
> }
>
> void ReplSemiSyncMaster::remove_slave()
> @@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave()
> rpl_semi_sync_master_clients == 0)
> switch_off();
> }
> +
> + bool no_slave_lag_clients = false;
> + if (has_semi_sync_slave_lag())
> + {
> + if (--rpl_semi_sync_master_slave_lag_clients == 0)
> + {
> + no_slave_lag_clients = true;
> + }
> + }
> +
> unlock();
> +
> + ReplSemiSyncMasterPerSlaveState *state =
> + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key);
> + thd_setspecific(current_thd, thd_key, NULL);
> +
> + if (state != NULL)
> + {
> + delete state;
> + }
> +
> + if (no_slave_lag_clients)
> + {
> + wake_slave_lag_waiters(0);
> + }
> }
>
> bool ReplSemiSyncMaster::is_semi_sync_slave()
> @@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
> return val;
> }
>
> +bool ReplSemiSyncMaster::has_semi_sync_slave_lag()
> +{
> + int null_value;
> + long long val= 0;
> + get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value);
> + return val;
> +}
> +
> +int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos)
> +{
> + if (log_pos == NULL)
> + {
> + /* heartbeat events does not have logpos (since they are not actually
> + * stored in the binlog).
> + */
> + if (!has_semi_sync_slave_lag())
> + {
> + /* don't semi-sync them if we haven't enabled slave-lag handling */
> + return 0;
> + }
> + else
> + {
> + /* else ask for both IO and exec position */
> + return 2;
> + }
> + }
> +
> + /**
> + * check if this log-pos is a candidate for semi-syncing event
> + */
> + TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name,
> + log_pos->file_pos);
> +
> + if (entry == NULL)
> + return 0;
> +
> + ReplSemiSyncMasterPerSlaveState *state =
> + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd,
> + thd_key);
> + do
> + {
> + state->unacked_event_count_++;
> +
> + if (active_tranxs_->is_rear(entry))
> + {
> + /* always ask for ack on last event in tranx list */
> + break;
> + }
> +
> + if (state->unacked_event_count_ >=
> + rpl_semi_sync_master_max_unacked_event_count)
> + {
> + /* enough events passed that it's time for another ack */
> + break;
> + }
> +
> + if (!state->sync_req_pos_.IsInited())
> + {
> + /* first event => time for ack */
> + break;
> + }
> +
> + if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0)
> + {
> + /* new file => time for ack */
> + break;
> + }
> +
> + if (log_pos->file_pos >= (state->sync_req_pos_.file_pos +
> + rpl_semi_sync_master_max_unacked_event_bytes))
> + {
> + /* enough bytes => time for ack */
> + break;
> + }
> +
> + /* we skip asking for semi-sync ack on this event */
> + return 0;
> +
> + } while (0);
> +
> + /* keep track on when we last asked for semi-sync-ack */
> + state->unacked_event_count_ = 0;
> + state->sync_req_pos_.Assign(log_pos);
> +
> + /**
> + * check if this slave can report back exec position
> + */
> + if (!has_semi_sync_slave_lag())
> + {
> + /* slave can't report back SQL position */
> + return 1;
> + }
> +
> + /* ask for both IO and SQL position */
> + return 2;
> +}
> +
> int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> const char *log_file_name,
> - my_off_t log_file_pos)
> + my_off_t log_file_pos,
> + const LogPos *exec_pos)
> {
> const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
> int cmp;
> bool can_release_threads = false;
> bool need_copy_send_pos = true;
> + bool pruned_trx_list = false;
> + ulonglong oldest_tranx_commit_time_us = 0;
> +
>
> if (!(getMasterEnabled()))
> return 0;
> @@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> reply_file_pos_ = log_file_pos;
> reply_file_name_inited_ = true;
>
> - /* Remove all active transaction nodes before this point. */
> - assert(active_tranxs_ != NULL);
> - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: Got reply at (%s, %lu)", kWho,
> log_file_name, (unsigned long)log_file_pos);
> }
>
> + assert(active_tranxs_ != NULL);
> + if (exec_pos != NULL)
> + {
> + /* prune using exec_pos */
> + LogPosPtr ptr(*exec_pos);
> + pruned_trx_list = active_tranxs_->prune_active_tranx_nodes(
> + ptr, &oldest_tranx_commit_time_us);
> + }
> + else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos)
> + {
> + /**
> + * if we don't have any slaves that can do exec_pos reporting,
> + * prune by IO position as "plain old semi sync"
> + */
> + LogPosPtr ptr(log_file_name, log_file_pos);
> + active_tranxs_->prune_active_tranx_nodes(ptr, NULL);
> + }
> +
> if (rpl_semi_sync_master_wait_sessions > 0)
> {
> /* Let us check if some of the waiting threads doing a trx
> @@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> cond_broadcast();
> }
>
> + if (pruned_trx_list)
> + {
> + /**
> + * if we did prune trx list, it might be that we should wake up
> + * threads waiting for slave-lag to decrease
> + */
> + wake_slave_lag_waiters(oldest_tranx_commit_time_us);
> + }
> +
> return function_exit(kWho, 0);
> }
>
> @@ -743,14 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
> }
> }
>
> - /*
> - At this point, the binlog file and position of this transaction
> - must have been removed from ActiveTranx.
> - */
> - assert(thd_killed(NULL) ||
> - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
> - trx_wait_binlog_pos));
> -
> l_end:
> /* Update the status counter. */
> if (is_on())
> @@ -794,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off()
>
> /* Clear the active transaction list. */
> assert(active_tranxs_ != NULL);
> - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
> + result = active_tranxs_->clear_active_tranx_nodes();
>
> rpl_semi_sync_master_off_times++;
> wait_file_name_inited_ = false;
> @@ -884,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> {
> const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
> int cmp = 0;
> - bool sync = false;
> + int sync = 0;
>
> /* If the semi-sync master is not enabled, or the slave is not a semi-sync
> * target, do not request replies from the slave.
> @@ -905,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> /* semi-sync is ON */
> /* sync= false; No sync unless a transaction is involved. */
>
> + if (log_file_name == NULL)
> + {
> + /* this is heartbeat, request io_pos and exec_pos */
> + sync = checkSyncReq(0);
> + goto l_end;
> + }
> +
> if (reply_file_name_inited_)
> {
> cmp = ActiveTranx::compare(log_file_name, log_file_pos,
> @@ -933,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> */
> if (cmp >= 0)
> {
> - /*
> + /*
> * We only wait if the event is a transaction's ending event.
> */
> assert(active_tranxs_ != NULL);
> - sync = active_tranxs_->is_tranx_end_pos(log_file_name,
> - log_file_pos);
> + LogPosPtr pos(log_file_name, log_file_pos);
> + sync = checkSyncReq(&pos);
> }
> }
> else
> @@ -951,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> }
> else
> {
> - sync = true;
> + sync = 1;
> }
> }
>
> @@ -966,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> /* We do not need to clear sync flag because we set it to 0 when we
> * reserve the packet header.
> */
> - if (sync)
> + if (sync == 1)
> {
> (packet)[2] = kPacketFlagSync;
> }
> + else if (sync == 2)
> + {
> + (packet)[2] = kPacketFlagSyncAndReport;
> + }
>
> return function_exit(kWho, 0);
> }
> @@ -1018,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> if (is_on())
> {
> assert(active_tranxs_ != NULL);
> - if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
> + bool empty = active_tranxs_->is_empty();
> + if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
> {
> /*
> if insert tranx_node failed, print a warning message
> @@ -1028,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> log_file_name, (ulong)log_file_pos);
> switch_off();
> }
> + else if (empty && rpl_semi_sync_master_slave_lag_clients > 0)
> + {
> + /* if the list of transactions was empty,
> + * we need to init the oldest_tranx_commit_time_us
> + */
> + oldest_unapplied_tranx_commit_time_us_ =
> + active_tranxs_->get_oldest_tranx_commit_time_us();
> + }
> }
>
> l_end:
> @@ -1037,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> }
>
> int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> - const char *event_buf)
> + const char *event_buf_)
> {
> const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
> - const unsigned char *packet;
> + const unsigned char *packet, *packet_start;
> char log_file_name[FN_REFLEN];
> my_off_t log_file_pos;
> ulong log_file_len = 0;
> @@ -1048,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> int result = -1;
> struct timespec start_ts;
> ulong trc_level = trace_level_;
> + const unsigned char *event_buf = (const unsigned char*)event_buf_;
> + bool exec_pos_present = false; // is SQL exec pos present in reply
> + LogPos exec_pos; // position of SQL thread
> LINT_INIT_STRUCT(start_ts);
>
> function_enter(kWho);
>
> - assert((unsigned char)event_buf[1] == kPacketMagicNum);
> - if ((unsigned char)event_buf[2] != kPacketFlagSync)
> + assert(event_buf[1] == kPacketMagicNum);
> + if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0)
> {
> /* current event does not require reply */
> result = 0;
> @@ -1111,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> goto l_end;
> }
>
> - packet = net->read_pos;
> + packet_start = packet = net->read_pos;
> if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
> {
> sql_print_error("Read semi-sync reply magic number error");
> goto l_end;
> }
>
> + /* we determine if this semisync ack contains a sql-thread exec-pos
> + * by checking if last byte == 0, since the packet then contains
> + * \0-terminated filenames */
> + exec_pos_present = packet[packet_len - 1] == 0;
> +
> log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
> - log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> + if (exec_pos_present == false)
> + {
> + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> + }
> + else
> + {
> + log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET,
> + MY_MIN((ulong)FN_REFLEN,
> + packet_len - REPLY_BINLOG_NAME_OFFSET));
> + }
> if (log_file_len >= FN_REFLEN)
> {
> sql_print_error("Read semi-sync reply binlog file length too large");
> goto l_end;
> }
> - strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
> + packet+= REPLY_BINLOG_NAME_OFFSET;
> +
> + strncpy(log_file_name, (const char*)packet, log_file_len);
> log_file_name[log_file_len] = 0;
>
> + if (exec_pos_present)
> + {
> + packet += log_file_len + 1;
> + if (packet + 8 + 1 >= (packet_start + packet_len))
> + {
> + sql_print_error("Read semi-sync reply binlog. "
> + "Packet to short to contain exec-position!");
> + goto l_end;
> + }
> + exec_pos.file_pos = uint8korr(packet);
> + packet += 8;
> + strncpy(exec_pos.file_name, (char*)packet,
> + (packet_start + packet_len) - packet);
> + }
> +
> if (trc_level & kTraceDetail)
> sql_print_information("%s: Got reply (%s, %lu)",
> kWho, log_file_name, (ulong)log_file_pos);
>
> - result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
> + result = reportReplyBinlog(server_id, log_file_name, log_file_pos,
> + exec_pos_present ? &exec_pos : NULL);
>
> l_end:
> return function_exit(kWho, result);
> @@ -1154,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster()
> wait_file_name_inited_ = false;
> reply_file_name_inited_ = false;
> commit_file_name_inited_ = false;
> + if (active_tranxs_ != NULL)
> + {
> + /**
> + * make sure to empty transaction hash/list
> + * with slave-lag reporting this container does
> + * not have to be empty even if no transaction is
> + * currently running
> + */
> + active_tranxs_->clear_active_tranx_nodes();
> + }
>
> rpl_semi_sync_master_yes_transactions = 0;
> rpl_semi_sync_master_no_transactions = 0;
> @@ -1168,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster()
>
> unlock();
>
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> + oldest_unapplied_tranx_commit_time_us_ = 0;
> + rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> + rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> +
> return function_exit(kWho, result);
> }
>
> @@ -1186,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats()
> ((double)rpl_semi_sync_master_net_wait_num)) : 0);
>
> unlock();
> +
> + if (oldest_unapplied_tranx_commit_time_us_ != 0)
> + {
> + rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val -
> + oldest_unapplied_tranx_commit_time_us_;
> + }
> + else
> + {
> + rpl_semi_sync_master_estimated_slave_lag = 0;
> + }
> +
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + if (rpl_semi_sync_master_trx_slave_lag_wait_num)
> + {
> + rpl_semi_sync_master_avg_trx_slave_lag_wait_time =
> + (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time /
> + (double)rpl_semi_sync_master_trx_slave_lag_wait_num);
> + }
> + else
> + {
> + rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> + }
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> }
>
> /* Get the waiting time given the wait's staring time.
> @@ -1213,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts)
>
> return (int)(end_usecs - start_usecs);
> }
> +
> +void ReplSemiSyncMaster::wake_slave_lag_waiters(
> + ulonglong oldest_unapplied_tranx_commit_time_us)
> +{
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + oldest_unapplied_tranx_commit_time_us_ =
> + oldest_unapplied_tranx_commit_time_us;
> +
> + if (rpl_semi_sync_master_slave_lag_wait_sessions > 0)
> + {
> + mysql_cond_broadcast(&COND_slave_lag_);
> + }
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> +}
> +
> +int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec)
> +{
> + int error = 0;
> + PSI_stage_info old_stage;
> +
> + /* slave lag waiting not enabled, return directly */
> + if (rpl_semi_sync_master_max_slave_lag == 0)
> + return 0;
> +
> + /* there is no slave that can report slave lag, return directly */
> + if (rpl_semi_sync_master_slave_lag_clients == 0)
> + return 0;
> +
> + /* compute start_time and end_time */
> + struct timespec end_time;
> + set_timespec(end_time, 0);
> + ulonglong start_time_us = timespec_to_usec(&end_time);
> + end_time.tv_sec += timeout_sec;
> +
> + mysql_mutex_lock(&LOCK_slave_lag_);
> +
> + if (oldest_unapplied_tranx_commit_time_us_ == 0)
> + {
> + /* no slave lag, atleast one slave is up to date */
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> + return 0;
> + }
> +
> + if (rpl_semi_sync_master_max_slave_lag == 0)
> + {
> + /* slave lag waiting not enabled */
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> + return 0;
> + }
> +
> + /* This must be called after acquired the lock */
> + THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_,
> + &stage_waiting_for_semi_sync_slave_lag,
> + &old_stage);
> +
> + bool waited = false;
> + ulonglong lag = 0;
> + ulonglong max_lag = 0;
> + while (oldest_unapplied_tranx_commit_time_us_ != 0)
> + {
> + /* check kill_level after THD_ENTER_COND but *before* cond_wait
> + * to avoid missing kills */
> + if (! (getMasterEnabled() && is_on() &&
> + thd_kill_level(NULL) == THD_IS_NOT_KILLED))
> + break;
> +
> + lag = start_time_us - oldest_unapplied_tranx_commit_time_us_;
> + max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag;
> + if (lag <= max_lag)
> + break;
> +
> + waited = true;
> + rpl_semi_sync_master_slave_lag_wait_sessions++;
> + int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_,
> + &end_time);
> + rpl_semi_sync_master_slave_lag_wait_sessions--;
> +
> + bool thd_was_killed = thd_kill_level(NULL) != THD_IS_NOT_KILLED;
> + if (wait_result != 0 || thd_was_killed)
> + {
> + break;
> + }
> + }
> +
> + if (thd_kill_level(NULL) != THD_IS_NOT_KILLED)
> + {
> + /* Return error to client. */
> + error = 1;
> + my_printf_error(ER_ERROR_DURING_COMMIT,
> + "Killed while waiting for replication semi-sync slave-lag.",
> + MYF(0));
> + }
> + else if (lag > max_lag)
> + {
> + /* Return error to client. */
> + error = 1;
> + my_printf_error(ER_ERROR_DURING_COMMIT,
> + "Slave-lag timeout",
> + MYF(0));
> + }
> +
> + if (waited)
> + {
> + rpl_semi_sync_master_trx_slave_lag_wait_num++;
> + rpl_semi_sync_master_trx_slave_lag_wait_time +=
> + (my_hrtime().val - start_time_us);
> + }
> +
> + /* The lock held will be released by thd_exit_cond, so no need to
> + call unlock() here */
> + THD_EXIT_COND(NULL, & old_stage);
> +
> + return error;
> +}
> diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
> index d9dc4ce..e5c0de6 100644
> --- a/plugin/semisync/semisync_master.h
> +++ b/plugin/semisync/semisync_master.h
> @@ -24,17 +24,101 @@
> #ifdef HAVE_PSI_INTERFACE
> extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
> extern PSI_cond_key key_ss_cond_COND_binlog_send_;
> +
> +extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
> +extern PSI_cond_key key_ss_cond_COND_slave_lag_;
> #endif
>
> extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
> +extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
>
> struct TranxNode {
> char log_name_[FN_REFLEN];
> - my_off_t log_pos_;
> + my_off_t log_pos_;
> + ulonglong tranx_commit_time_us;
> struct TranxNode *next_; /* the next node in the sorted list */
> struct TranxNode *hash_next_; /* the next node during hash collision */
> };
>
> +struct LogPos;
> +
> +/* This represent a log position */
> +struct LogPosPtr {
> + LogPosPtr() { Uninit();}
> + LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){}
> + explicit LogPosPtr(const LogPos& pos) { Assign(&pos); }
> +
> + const char *file_name;
> + my_off_t file_pos;
> +
> + LogPosPtr& Assign(const LogPosPtr *src) {
> + file_name = src->file_name;
> + file_pos = src->file_pos;
> + return *this;
> + }
> +
> + LogPosPtr& Assign(const LogPos *src);
> +
> + void Uninit() { file_name = NULL;}
> + bool IsInited() const { return file_name != NULL; }
> +};
> +
> +struct LogPos {
> + char file_name[FN_REFLEN];
> + my_off_t file_pos;
> +
> + LogPos() { Uninit(); }
> +
> + LogPosPtr ToLogPosPtr() const {
> + if (IsInited()){
> + LogPosPtr p(file_name, file_pos);
> + return p;
> + } else {
> + LogPosPtr p;
> + return p;
> + }
> + }
> +
> + LogPos& Assign(const LogPosPtr *src) {
> + if (src->IsInited()) {
> + strcpy(file_name, src->file_name);
> + file_pos = src->file_pos;
> + } else {
> + Uninit();
> + }
> + return *this;
> + }
> +
> + LogPos& Assign(const LogPos* src) {
> + LogPosPtr p = src->ToLogPosPtr();
> + Assign(&p);
> + return *this;
> + }
> +
> + void Uninit() { file_name[0] = 0; }
> + bool IsInited() const { return file_name[0] != 0; }
> +};
> +
> +inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) {
> + LogPosPtr p = src->ToLogPosPtr();
> + Assign(&p);
> + return *this;
> +}
> +
> +inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) {
> + int cmp = strcmp(pos1->file_name, pos2->file_name);
> +
> + if (cmp != 0)
> + return cmp;
> +
> + if (pos1->file_pos > pos2->file_pos)
> + return 1;
> + else if (pos1->file_pos < pos2->file_pos)
> + return -1;
> + else
> + return 0;
> +}
> +
> /**
> @class TranxNodeAllocator
>
> @@ -329,10 +413,14 @@ class ActiveTranx
> node2->log_name_, node2->log_pos_);
> }
>
> + void set_new_front(TranxNode* new_front);
> +
> public:
> ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
> ~ActiveTranx();
>
> + bool is_empty() const { return trx_front_ == NULL; }
> +
> /* Insert an active transaction node with the specified position.
> *
> * Return:
> @@ -340,21 +428,42 @@ class ActiveTranx
> */
> int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
>
> - /* Clear the active transaction nodes until(inclusive) the specified
> - * position.
> - * If log_file_name is NULL, everything will be cleared: the sorted
> + /* Clear the active transaction
> + * Everything will be cleared: the sorted
> * list and the hash table will be reset to empty.
> - *
> + *
> * Return:
> - * 0: success; non-zero: error
> + * 0 success; non-zero: error
> + */
> + int clear_active_tranx_nodes();
> +
> + /* Prune the active transaction nodes until the specified
> + * position (inclusive).
> + *
> + * Return:
> + * true if any transaction was removed
> + * false if list was left unchanged
> */
> - int clear_active_tranx_nodes(const char *log_file_name,
> - my_off_t log_file_pos);
> + bool prune_active_tranx_nodes(LogPosPtr logpos,
> + ulonglong *oldest_tranx_commit_time_us);
>
> - /* Given a position, check to see whether the position is an active
> - * transaction's ending position by probing the hash table.
> + /* Lookup a transaction's ending position by probing the hash table.
> + *
> + * return entry if found or NULL otherwise
> */
> - bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
> + TranxNode* lookup_tranx_end_pos(const char *log_file_name,
> + my_off_t log_file_pos);
> +
> +
> + /* Check if an entry is rear (i.e last) */
> + bool is_rear(TranxNode* entry) const { return entry == trx_rear_; }
> +
> + /**
> + * return timestamp of oldest transaction in list
> + */
> + ulonglong get_oldest_tranx_commit_time_us() const {
> + return trx_front_->tranx_commit_time_us;
> + }
>
> /* Given two binlog positions, compare which one is bigger based on
> * (file_name, file_position).
> @@ -365,6 +474,24 @@ class ActiveTranx
> };
>
> /**
> + * State that semisync master keeps per slave
> + */
> +struct ReplSemiSyncMasterPerSlaveState
> +{
> + ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {}
> +
> + /**
> + * No of events that has not been semi-sync acked
> + */
> + unsigned unacked_event_count_;
> +
> + /**
> + * Position of last event that was semisync'ed
> + */
> + LogPos sync_req_pos_;
> +};
> +
> +/**
> The extension class for the master of semi-synchronous replication
> */
> class ReplSemiSyncMaster
> @@ -432,6 +559,16 @@ class ReplSemiSyncMaster
>
> bool state_; /* whether semi-sync is switched */
>
> + /* This cond variable is signaled when slave lag has decreased */
> + mysql_cond_t COND_slave_lag_;
> +
> + /* Mutex that protects oldest_unapplied_tranx_commit_time_us */
> + mysql_mutex_t LOCK_slave_lag_;
> +
> + /* this is commit time of oldest transaction that has not been applied
> + * on any slave */
> + ulonglong oldest_unapplied_tranx_commit_time_us_;
> +
> void lock();
> void unlock();
> void cond_broadcast();
> @@ -493,6 +630,9 @@ class ReplSemiSyncMaster
> /* Is the slave servered by the thread requested semi-sync */
> bool is_semi_sync_slave();
>
> + /* Does this slave have slave lag reporting capabilities */
> + bool has_semi_sync_slave_lag();
> +
> /* In semi-sync replication, reports up to which binlog position we have
> * received replies from the slave indicating that it already get the events.
> *
> @@ -501,13 +641,15 @@ class ReplSemiSyncMaster
> * log_file_name - (IN) binlog file name
> * end_offset - (IN) the offset in the binlog file up to which we have
> * the replies from the slave
> + * exec_position - (IN) position of SQL thread or NULL if not present
> *
> * Return:
> * 0: success; non-zero: error
> */
> int reportReplyBinlog(uint32 server_id,
> const char* log_file_name,
> - my_off_t end_offset);
> + my_off_t end_offset,
> + const LogPos *exec_position);
>
> /* Commit a transaction in the final step. This function is called from
> * InnoDB before returning from the low commit. If semi-sync is switch on,
> @@ -540,6 +682,16 @@ class ReplSemiSyncMaster
> */
> int reserveSyncHeader(unsigned char *header, unsigned long size);
>
> + /*
> + * check if an event should be semi synced and optionally
> + * if it should report back position of SQL thread on slave
> + *
> + * return 0 - no semi sync
> + * 1 - semi sync
> + * 2 - semi sync and report exec position
> + */
> + int checkSyncReq(const LogPosPtr *log_pos);
> +
> /* Update the sync bit in the packet header to indicate to the slave whether
> * the master will wait for the reply of the event. If semi-sync is switched
> * off and we detect that the slave is catching up, we switch semi-sync on.
> @@ -592,6 +744,21 @@ class ReplSemiSyncMaster
> * go off for that.
> */
> int resetMaster();
> +
> + /**
> + * wake potential slave-lag waiters
> + * called by binlog dump-thread(s)
> + */
> + void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us);
> +
> + /**
> + * wait for slave lag to get below threshold
> + * called by user-thread(s)
> + *
> + * return 0 - success
> + * 1 - timeout
> + */
> + int wait_slave_lag(ulong max_wait_time_sec);
> };
>
> enum rpl_semi_sync_master_wait_point_t {
> @@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num;
> extern unsigned long long rpl_semi_sync_master_net_wait_time;
> extern unsigned long long rpl_semi_sync_master_trx_wait_time;
>
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_count;
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes;
> +extern unsigned long rpl_semi_sync_master_max_slave_lag;
> +extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us;
> +extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions;
> +extern unsigned long long rpl_semi_sync_master_estimated_slave_lag;
> +
> +extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time;
> +
> /*
> This indicates whether we should keep waiting if no semi-sync slave
> is available.
> diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
> index 7bb0eea..3282158 100644
> --- a/plugin/semisync/semisync_master_plugin.cc
> +++ b/plugin/semisync/semisync_master_plugin.cc
> @@ -21,6 +21,11 @@
>
> static ReplSemiSyncMaster repl_semisync;
>
> +// forward declaration
> +static inline ulong get_slave_lag_wait_timeout(THD* thd);
> +
> +static char rpl_semi_sync_master_group_commit = 0;
> +
> C_MODE_START
>
> int repl_semi_report_binlog_update(Binlog_storage_param *param,
> @@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
>
> if (repl_semisync.getMasterEnabled())
> {
> + if (rpl_semi_sync_master_group_commit &&
> + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> + {
> + /** there are transactions more coming... */
> + return 0;
> + }
> +
> /*
> Let us store the binlog file name and the position, so that
> we know how long to wait for the binlog to the replicated to
> @@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
> return error;
> }
>
> -int repl_semi_request_commit(Trans_param *param)
> +int repl_semi_before_commit(Trans_param *param, int *error)
> {
> + *error = repl_semisync.wait_slave_lag(
> + get_slave_lag_wait_timeout(current_thd));
> +
> return 0;
> }
>
> @@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param,
> my_off_t log_pos, uint32 flags)
> {
> int error= 0;
> +
> + if (rpl_semi_sync_master_group_commit &&
> + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> + {
> + /** there are transactions more coming... */
> + return 0;
> + }
> +
> if (rpl_semi_sync_master_wait_point ==
> SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
> {
> @@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
> Let's assume this semi-sync slave has already received all
> binlog events before the filename and position it requests.
> */
> - repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
> + repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL);
> }
> sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
> semi_sync_slave ? "semi-sync" : "asynchronous",
> @@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
> &fix_rpl_semi_sync_master_trace_level, // update
> 32, 0, ~0UL, 1);
>
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_count,
> + rpl_semi_sync_master_max_unacked_event_count,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum unacked replication events",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes,
> + rpl_semi_sync_master_max_unacked_event_bytes,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum unacked replication bytes",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum allowed lag of fastest semi-sync slave (in seconds), "
> + "checked before commit.",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1);
> +
> +static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout,
> + PLUGIN_VAR_RQCMDARG,
> + "Timeout in seconds a rw-transaction may wait for max slave lag before "
> + "being rolled back.",
> + NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0);
> +
> +static MYSQL_SYSVAR_ULONG(
> + slave_lag_heartbeat_frequency_us,
> + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us,
> + PLUGIN_VAR_RQCMDARG,
> + "Heartbeat frequency when slave-lag is enabled (in microseconds).",
> + NULL, // check
> + NULL, // update
> + 500000, /* 500 ms */
> + 1, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit,
> + PLUGIN_VAR_OPCMDARG,
> + "Group commit for semi sync",
> + NULL, // check
> + NULL,
> + 0);
> +
> static SYS_VAR* semi_sync_master_system_vars[]= {
> MYSQL_SYSVAR(enabled),
> MYSQL_SYSVAR(wait_point),
> MYSQL_SYSVAR(timeout),
> MYSQL_SYSVAR(wait_no_slave),
> MYSQL_SYSVAR(trace_level),
> + MYSQL_SYSVAR(max_unacked_event_count),
> + MYSQL_SYSVAR(max_unacked_event_bytes),
> + MYSQL_SYSVAR(max_slave_lag),
> + MYSQL_SYSVAR(slave_lag_wait_timeout),
> + MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us),
> + MYSQL_SYSVAR(group_commit),
> NULL,
> };
>
> +static inline ulong get_slave_lag_wait_timeout(THD* thd)
> +{
> + return THDVAR(thd, slave_lag_wait_timeout);
> +}
>
> static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
> SYS_VAR *var,
> @@ -297,6 +377,7 @@ Trans_observer trans_observer = {
>
> repl_semi_report_commit, // after_commit
> repl_semi_report_rollback, // after_rollback
> + repl_semi_before_commit, // before commit
> };
>
> Binlog_storage_observer storage_observer = {
> @@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
> DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
> DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
> DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
> -
> +DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG)
> +DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
>
> /* plugin status variables */
> static SHOW_VAR semi_sync_master_status_vars[]= {
> @@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= {
> {"Rpl_semi_sync_master_net_avg_wait_time",
> (char*) &SHOW_FNAME(avg_net_wait_time),
> SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_slave_lag_wait_sessions",
> + (char*) &SHOW_FNAME(slave_lag_wait_sessions),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_estimated_slave_lag",
> + (char*) &SHOW_FNAME(estimated_slave_lag),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_slave_lag_wait_time",
> + (char*) &SHOW_FNAME(trx_slave_lag_wait_time),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_slave_lag_waits",
> + (char*) &SHOW_FNAME(trx_slave_lag_wait_num),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time",
> + (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time),
> + SHOW_SIMPLE_FUNC},
> {NULL, NULL, SHOW_LONG},
> };
>
> #ifdef HAVE_PSI_INTERFACE
> PSI_mutex_key key_ss_mutex_LOCK_binlog_;
> +PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
>
> static PSI_mutex_info all_semisync_mutexes[]=
> {
> - { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
> + { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 },
> + { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 }
> };
>
> PSI_cond_key key_ss_cond_COND_binlog_send_;
> +PSI_cond_key key_ss_cond_COND_slave_lag_;
>
> static PSI_cond_info all_semisync_conds[]=
> {
> - { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
> + { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 },
> + { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 }
> };
> #endif /* HAVE_PSI_INTERFACE */
>
> PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
> { 0, "Waiting for semi-sync ACK from slave", 0};
>
> +PSI_stage_info stage_waiting_for_semi_sync_slave_lag=
> +{ 0, "Waiting for semi-sync slave lag", 0};
> +
> #ifdef HAVE_PSI_INTERFACE
> PSI_stage_info *all_semisync_stages[]=
> {
> - & stage_waiting_for_semi_sync_ack_from_slave
> + & stage_waiting_for_semi_sync_ack_from_slave,
> + & stage_waiting_for_semi_sync_slave_lag
> };
>
> static void init_semisync_psi_keys(void)
> @@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master)
> MariaDB_PLUGIN_MATURITY_GAMMA
> }
> maria_declare_plugin_end;
> -
> diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc
> index 5f98472..839e0cc 100644
> --- a/plugin/semisync/semisync_slave.cc
> +++ b/plugin/semisync/semisync_slave.cc
> @@ -20,6 +20,7 @@
> char rpl_semi_sync_slave_enabled;
> char rpl_semi_sync_slave_status= 0;
> unsigned long rpl_semi_sync_slave_trace_level;
> +char rpl_semi_sync_slave_lag_enabled= 0;
>
> int ReplSemiSyncSlave::initObject()
> {
> @@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject()
>
> int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
> unsigned long total_len,
> - bool *need_reply,
> + unsigned char *need_reply,
> const char **payload,
> unsigned long *payload_len)
> {
> @@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
>
> if ((unsigned char)(header[0]) == kPacketMagicNum)
> {
> - *need_reply = (header[1] & kPacketFlagSync);
> + *need_reply = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport));
> *payload_len = total_len - 2;
> *payload = header + 2;
>
> @@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
> return 0;
> }
>
> -int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
> - const char *binlog_filename,
> - my_off_t binlog_filepos)
> +int ReplSemiSyncSlave::slaveReply(unsigned char header_byte,
> + MYSQL *mysql,
> + const char *binlog_filename,
> + my_off_t binlog_filepos,
> + Master_info * mi)
> {
> const char *kWho = "ReplSemiSyncSlave::slaveReply";
> NET *net= &mysql->net;
> - uchar reply_buffer[REPLY_MAGIC_NUM_LEN
> - + REPLY_BINLOG_POS_LEN
> - + REPLY_BINLOG_NAME_LEN];
> + uchar reply_buffer[REPLY_MAGIC_NUM_LEN +
> + 2 * ( REPLY_BINLOG_POS_LEN +
> + REPLY_BINLOG_NAME_LEN +
> + /* '\0' */ 1) ];
> int reply_res, name_len = strlen(binlog_filename);
> + int msg_len = name_len + REPLY_BINLOG_NAME_OFFSET;
>
> function_enter(kWho);
>
> @@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
> sql_print_information("%s: reply (%s, %lu)", kWho,
> binlog_filename, (ulong)binlog_filepos);
>
> + if (header_byte & kPacketFlagSyncAndReport)
> + {
> + /**
> + * master requests that we also report back SQL-thread position
> + */
> +
> + // where to store sql filename/position
> + char *bufptr = (char*)reply_buffer + msg_len;
> + bufptr[0] = 0; // '\0' terminate previous filename
> + bufptr++;
> +
> + my_off_t sql_file_pos;
> + // get file/position and store the filename directly info bufptr+8
> + size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos);
> + int8store(bufptr, sql_file_pos); // store position
> +
> + msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1;
> + }
> +
> net_clear(net, 0);
> /* Send the reply. */
> - reply_res = my_net_write(net, reply_buffer,
> - name_len + REPLY_BINLOG_NAME_OFFSET);
> + reply_res = my_net_write(net, reply_buffer, msg_len);
> +
> if (!reply_res)
> {
> reply_res = net_flush(net);
> diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h
> index 1bf8cf3..c91847d 100644
> --- a/plugin/semisync/semisync_slave.h
> +++ b/plugin/semisync/semisync_slave.h
> @@ -60,23 +60,30 @@ class ReplSemiSyncSlave
> * Return:
> * 0: success; non-zero: error
> */
> - int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
> + int slaveReadSyncHeader(const char *header, unsigned long total_len,
> + unsigned char *need_reply_byte,
> const char **payload, unsigned long *payload_len);
>
> /* A slave replies to the master indicating its replication process. It
> * indicates that the slave has received all events before the specified
> * binlog position.
> - *
> + *
> * Input:
> + * need_reply_byte - (IN) the header byte
> * mysql - (IN) the mysql network connection
> * binlog_filename - (IN) the reply point's binlog file name
> * binlog_filepos - (IN) the reply point's binlog file offset
> + * master_info - (IN) the master info struct so that we can get more
> + * info if needed
> *
> * Return:
> * 0: success; non-zero: error
> */
> - int slaveReply(MYSQL *mysql, const char *binlog_filename,
> - my_off_t binlog_filepos);
> + int slaveReply(unsigned char need_reply_byte,
> + MYSQL *mysql,
> + const char *binlog_filename,
> + my_off_t binlog_filepos,
> + Master_info* master_info);
>
> int slaveStart(Binlog_relay_IO_param *param);
> int slaveStop(Binlog_relay_IO_param *param);
> @@ -93,5 +100,6 @@ class ReplSemiSyncSlave
> extern char rpl_semi_sync_slave_enabled;
> extern unsigned long rpl_semi_sync_slave_trace_level;
> extern char rpl_semi_sync_slave_status;
> +extern char rpl_semi_sync_slave_lag_enabled;
>
> #endif /* SEMISYNC_SLAVE_H */
> diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc
> index 572ead2..0bf03be 100644
> --- a/plugin/semisync/semisync_slave_plugin.cc
> +++ b/plugin/semisync/semisync_slave_plugin.cc
> @@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync;
> event read is the last event of a transaction. And the value is
> checked in repl_semi_slave_queue_event.
> */
> -bool semi_sync_need_reply= false;
> +unsigned char semi_sync_need_reply= 0;
>
> C_MODE_START
>
> @@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
> return 1;
> }
> mysql_free_result(mysql_store_result(mysql));
> +
> + if (rpl_semi_sync_slave_lag_enabled)
> + {
> + char buf[100];
> + /*
> + Tell master that we can do exec-position reporting
> + */
> + snprintf(buf, sizeof(buf), "SET @%s= 1",
> + ReplSemiSyncBase::kRplSemiSyncSlaveReportExec);
> + if (mysql_real_query(mysql, buf, strlen(buf)))
> + {
> + sql_print_error("query: %s on master failed", buf);
> + return 1;
> + }
> + mysql_free_result(mysql_store_result(mysql));
> + }
> +
> rpl_semi_sync_slave_status= 1;
> return 0;
> }
> @@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
> should not cause the slave IO thread to stop, and the error
> messages are already reported.
> */
> - (void) repl_semisync.slaveReply(param->mysql,
> + (void) repl_semisync.slaveReply(semi_sync_need_reply,
> + param->mysql,
> param->master_log_name,
> - param->master_log_pos);
> + param->master_log_pos,
> + param->mi);
> }
> return 0;
> }
> @@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
> &fix_rpl_semi_sync_trace_level, // update
> 32, 0, ~0UL, 1);
>
> +static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled,
> + PLUGIN_VAR_OPCMDARG,
> + "Enable semi-synchronous replication slave lag reporting. ",
> + NULL, // check
> + NULL, // update
> + 0);
> +
> static SYS_VAR* semi_sync_slave_system_vars[]= {
> MYSQL_SYSVAR(enabled),
> MYSQL_SYSVAR(trace_level),
> + MYSQL_SYSVAR(lag_enabled),
> NULL,
> };
>
> @@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave)
> MariaDB_PLUGIN_MATURITY_GAMMA
> }
> maria_declare_plugin_end;
> -
> diff --git a/sql/handler.cc b/sql/handler.cc
> index 3ca9ec3..3e6cd65 100644
> --- a/sql/handler.cc
> +++ b/sql/handler.cc
> @@ -1364,12 +1364,30 @@ int ha_commit_trans(THD *thd, bool all)
> uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
> /* rw_trans is TRUE when we in a transaction changing data */
> bool rw_trans= is_real_trans && (rw_ha_count > 0);
> + bool mdl_request_initialized= false;
> MDL_request mdl_request;
> DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
> is_real_trans, rw_trans, rw_ha_count));
>
> if (rw_trans)
> {
> + /* check READ-ONLY just before before_commit hook to decrease likelihood
> + * of having threads hanging waiting for slave-lag only to be aborted
> + * due to read-only.
> + */
> + if (opt_readonly &&
> + !(thd->security_ctx->master_access & SUPER_ACL) &&
> + !thd->slave_thread)
> + {
> + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
> + goto err;
> + }
> +
> + if (RUN_HOOK(transaction, before_commit, (thd)))
> + {
> + goto err;
> + }
> +
> /*
> Acquire a metadata lock which will ensure that COMMIT is blocked
> by an active FLUSH TABLES WITH READ LOCK (and vice versa:
> @@ -1378,6 +1396,7 @@ int ha_commit_trans(THD *thd, bool all)
> We allow the owner of FTWRL to COMMIT; we assume that it knows
> what it does.
> */
> + mdl_request_initialized= true;
> mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
> MDL_EXPLICIT);
>
> @@ -1486,7 +1505,7 @@ int ha_commit_trans(THD *thd, bool all)
> ha_rollback_trans(thd, all);
>
> end:
> - if (rw_trans && mdl_request.ticket)
> + if (rw_trans && mdl_request_initialized && mdl_request.ticket)
> {
> /*
> We do not always immediately release transactional locks
> diff --git a/sql/replication.h b/sql/replication.h
> index 4731c22..309bdb4 100644
> --- a/sql/replication.h
> +++ b/sql/replication.h
> @@ -110,6 +110,21 @@ typedef struct Trans_observer {
> @retval 1 Failure
> */
> int (*after_rollback)(Trans_param *param);
> +
> + /**
> + This callback is called before transaction commit
> + If function does not return *error == 0 transaction will
> + not be committed but error code will be returned to client
> +
> + @note *error!=0 and return code 0 shall be used by plugin to signal
> + that transaction should be aborted.
> + If returning non-zero transaction will also be aborted and an error
> + will be printed to error log.
> +
> + @retval 0 Sucess
> + @retval non-zero error
> + */
> + int (*before_commit)(Trans_param *param, int *error);
> } Trans_observer;
>
> /**
> @@ -294,6 +309,8 @@ enum Binlog_relay_IO_flags {
> };
>
>
> +class Master_info;
> +
> /**
> Replication binlog relay IO observer parameter
> */
> @@ -309,8 +326,20 @@ typedef struct Binlog_relay_IO_param {
> my_off_t master_log_pos;
>
> MYSQL *mysql; /* the connection to master */
> +
> + Master_info * mi; /* master info handle */
> } Binlog_relay_IO_param;
>
> +
> +/* get the master log given a Master_info
> + * and store it in filename_buf/filepos
> + * return length of filename (excluding \0)
> + *
> + * note: filename_buf should be a minimum FN_REFLEN
> + */
> +size_t get_master_log_pos(const Master_info *mi,
> + char *filename_buf, my_off_t *filepos);
> +
> /**
> Observes and extends the service of slave IO thread.
> */
> @@ -561,7 +590,21 @@ int get_user_var_str(const char *name,
> char *value, unsigned long len,
> unsigned int precision, int *null_value);
>
> -
> +
> +/**
> + Set or replace the value of user variable as to an ulonglong
> +
> + @param name user variable name
> + @param value the value
> + @param old_value pointer to where old value will be stored (or NULL)
> +
> + @retval 0 Success, no prior value found
> + @retval 1 Success, old_value populated
> + @retval -1 Fail
> +*/
> +int set_user_var_int(const char *name,
> + long long int value,
> + long long int *old_value);
>
> #ifdef __cplusplus
> }
> diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
> index 3962600..209a809 100644
> --- a/sql/rpl_handler.cc
> +++ b/sql/rpl_handler.cc
> @@ -23,6 +23,7 @@
> #include "rpl_filter.h"
> #include <my_dir.h>
> #include "rpl_handler.h"
> +#include "sql_prepare.h"
>
> Trans_delegate *transaction_delegate;
> Binlog_storage_delegate *binlog_storage_delegate;
> @@ -88,6 +89,42 @@ int get_user_var_str(const char *name, char *value,
> return 0;
> }
>
> +int set_user_var_int(const char *name,
> + long long int value,
> + long long int *old_value)
> +{
> + THD* thd= current_thd;
> + bool null_val;
> + user_var_entry *entry=
> + (user_var_entry*) my_hash_search(&thd->user_vars,
> + (uchar*) name, strlen(name));
> + if (entry != NULL)
> + {
> + if (old_value != NULL)
> + *old_value= entry->val_int(&null_val);
> + }
> +
> + Ed_connection con(thd);
> +
> + char buf[256];
> + int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value);
> + if (/* error */ res < 0 ||
> + /* truncated */ res >= sizeof(buf))
> + {
> + return -1;
> + }
> +
> + LEX_STRING str;
> + lex_string_set(&str, buf);
> +
> + if (con.execute_direct(str))
> + {
> + return -1;
> + }
> +
> + return entry == NULL ? 0 : 1;
> +}
> +
> int delegates_init()
> {
> static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
> @@ -249,6 +286,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
> return ret;
> }
>
> +int Trans_delegate::before_commit(THD *thd)
> +{
> + int ret= 0, error= 0;
> + Trans_param param;
> + param.flags= 0;
> + param.log_file= 0;
> + param.log_pos= 0;
> + FOREACH_OBSERVER(ret, before_commit, thd, (¶m, &error));
> + return error;
> +}
> +
> int Binlog_storage_delegate::after_flush(THD *thd,
> const char *log_file,
> my_off_t log_pos,
> @@ -374,17 +422,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
>
> int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
> String *packet,
> - const char *log_file,
> + const char *log_file_path,
> my_off_t log_pos)
> {
> Binlog_transmit_param param;
> param.flags= flags;
>
> int ret= 0;
> + const char* log_file_name= log_file_path != NULL ?
> + log_file_path + dirname_length(log_file_path) : NULL;
> FOREACH_OBSERVER(ret, before_send_event, false,
> (¶m, (uchar *)packet->c_ptr(),
> packet->length(),
> - log_file+dirname_length(log_file), log_pos));
> + log_file_name, log_pos));
> return ret;
> }
>
> @@ -414,6 +464,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
> void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
> Master_info *mi)
> {
> + param->mi = mi;
> param->mysql= mi->mysql;
> param->user= mi->user;
> param->host= mi->host;
> @@ -540,6 +591,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> {
> return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
> }
> +
> +/* get master log pos for a Master_info struct */
> +size_t get_master_log_pos(const Master_info* mi,
> + char *filename_buf, my_off_t *filepos)
> +{
> + mysql_mutex_t *mutex= &mi->rli.data_lock;
> +
> + mysql_mutex_lock(mutex);
> + *filepos= mi->rli.group_master_log_pos;
> + strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN);
> + mysql_mutex_unlock(mutex);
> + return strnlen(filename_buf, FN_REFLEN);
> +}
> +
> #else
> int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
> {
> @@ -560,4 +625,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> {
> return 0;
> }
> +
> +size_t get_master_log_pos(const Master_info* mi,
> + char *filename_buf, my_off_t *filepos)
> +{
> + *filepos= 0;
> + filename_buf[0]= 0;
> + return 0;
> +}
> +
> #endif /* HAVE_REPLICATION */
> diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
> index afcfd9d..5119ee4 100644
> --- a/sql/rpl_handler.h
> +++ b/sql/rpl_handler.h
> @@ -142,7 +142,7 @@ class Trans_delegate
> :public Delegate {
> public:
> typedef Trans_observer Observer;
> - int before_commit(THD *thd, bool all);
> + int before_commit(THD *thd);
> int before_rollback(THD *thd, bool all);
> int after_commit(THD *thd, bool all);
> int after_rollback(THD *thd, bool all);
> diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
> index 56300c6..d60a122 100644
> --- a/sql/rpl_rli.h
> +++ b/sql/rpl_rli.h
> @@ -154,7 +154,9 @@ class Relay_log_info : public Slave_reporting_capability
> standard lock acquisition order to avoid deadlocks:
> run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
> */
> - mysql_mutex_t data_lock, run_lock;
> + mutable mysql_mutex_t data_lock;
> + mysql_mutex_t run_lock;
> +
> /*
> start_cond is broadcast when SQL thread is started
> stop_cond - when stopped
> diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
> index d9ae6ca..9662058 100644
> --- a/sql/sql_repl.cc
> +++ b/sql/sql_repl.cc
> @@ -818,6 +818,13 @@ static int send_heartbeat_event(binlog_send_info *info,
> packet->append(b, sizeof(b));
> }
>
> + if (RUN_HOOK(binlog_transmit, before_send_event,
> + (info->thd, info->flags, packet, 0, 0)))
> + {
> + info->error= ER_UNKNOWN_ERROR;
> + DBUG_RETURN(-1);
> + }
> +
> if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
> net_flush(net))
> {
> @@ -825,6 +832,13 @@ static int send_heartbeat_event(binlog_send_info *info,
> DBUG_RETURN(-1);
> }
>
> + if (RUN_HOOK(binlog_transmit, after_send_event,
> + (info->thd, info->flags, packet)))
> + {
> + info->error= ER_UNKNOWN_ERROR;
> + DBUG_RETURN(-1);
> + }
> +
> DBUG_RETURN(0);
> }
>
1
0
16 May '15
Hi, Jan!
On May 16, Jan Lindström wrote:
> revision-id: 51926a1d2ed756902257fe802b1a01abb89a8fa3
> parent(s): 8d2117c0fd0a2211f1ec55f1d61fc5ae822c0ccf
> committer: Jan Lindström
> branch nick: 10.1-encrypt
> timestamp: 2015-05-16 08:58:03 +0300
> message:
>
> Fix assertion failure.
>
> ---
> mysql-test/suite/encryption/r/encrypt_and_grep.result | 6 +-----
> mysql-test/suite/encryption/t/encrypt_and_grep.test | 10 +---------
> storage/innobase/fil/fil0crypt.cc | 9 +++++++--
> storage/xtradb/fil/fil0crypt.cc | 9 +++++++--
> 4 files changed, 16 insertions(+), 18 deletions(-)
>
> diff --git a/mysql-test/suite/encryption/t/encrypt_and_grep.test b/mysql-test/suite/encryption/t/encrypt_and_grep.test
> index 1968b71..0301aa2 100644
> --- a/mysql-test/suite/encryption/t/encrypt_and_grep.test
> +++ b/mysql-test/suite/encryption/t/encrypt_and_grep.test
> @@ -52,7 +52,7 @@ insert t3 values (repeat('dummy', 42));
> SET GLOBAL innodb_encrypt_tables = off;
> SET GLOBAL innodb_encryption_threads = 4;
>
> ---echo # Wait max 10 min for key encryption threads to decrypt all space
> +--echo # Wait max 10 min for key encryption threads to decrypt all spacew
A typo? Did you mean "spaces" ?
> --let $wait_timeout= 600
> --let $wait_condition=SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.INNODB_TABLESPACES_ENCRYPTION WHERE MIN_KEY_VERSION <> 0;
> --source include/wait_condition.inc
> @@ -69,14 +69,6 @@ SET GLOBAL innodb_encryption_threads = 4;
> --echo # t3 no on expecting FOUND
> -- let SEARCH_FILE=$t3_IBD
> -- source include/search_pattern_in_file.inc
> ---let SEARCH_PATTERN=foobar
> ---echo # ibdata1 expecting NOT FOUND
> --- let SEARCH_FILE=$ib1_IBD
> --- source include/search_pattern_in_file.inc
> ---let SEARCH_PATTERN=temp
> ---echo # ibdata1 expecting NOT FOUND
> --- let SEARCH_FILE=$ib1_IBD
> --- source include/search_pattern_in_file.inc
Why?
> --echo # Now turn on encryption and wait for threads to encrypt all spaces
> SET GLOBAL innodb_encryption_threads = 4;
> diff --git a/storage/innobase/fil/fil0crypt.cc b/storage/innobase/fil/fil0crypt.cc
> index 65be860..0c2280f 100644
> --- a/storage/innobase/fil/fil0crypt.cc
> +++ b/storage/innobase/fil/fil0crypt.cc
> @@ -2128,8 +2128,13 @@ fil_crypt_complete_rotate_space(
> crypt_data->rotate_state.end_lsn = state->end_lsn;
> }
>
> - ut_a(crypt_data->rotate_state.active_threads > 0);
> - crypt_data->rotate_state.active_threads--;
> + /* Remember that we can try to rotate tablespaces with
> + FIL_SPACE_ENCRYPTION_OFF and we might want to scrub them
> + also. */
Do you mean that when rotating FIL_SPACE_ENCRYPTION_OFF spaces
crypt_data->rotate_state.active_threads == 0?
> + if (crypt_data->rotate_state.active_threads > 0) {
> + crypt_data->rotate_state.active_threads--;
> + }
> +
> bool last = crypt_data->rotate_state.active_threads == 0;
Regards,
Sergei
1
0
[Maria-developers] Need help with review/suggestions for MDEV-8075 (was Re: [Commits] c325a34: MDEV-8075 ...)
by Kristian Nielsen 15 May '15
by Kristian Nielsen 15 May '15
15 May '15
Hi Serg, Svoj,
I need some help with MDEV-8075, as it involves code around transaction
start/end which I am not familiar with. I hope one of you know enough about
that part of the server to help me?
I have a partial patch (below), but it still has some problems remaining and
I am not sure how to solve it properly. I tried to base the patch on some
corresponding MySQL code, which was only partially merged to MariaDB 10.1.
The basic problem is to remember during a transaction whether DDL was used,
and then at the end of the transaction in binlogging, to set a flag in the
GTID event if DDL is used.
The concrete problem is with this kind of SQL:
BEGIN;
INSERT INTO innodb_table VALUES (1);
ALTER TABLE temporary_table ADD b INT;
In this case the ALTER TABLE does not get the "ddl" mark in my patch. The
reason is that during mysql_alter_table(), THD_TRANS::reset() gets called,
which clears the flag DID_DDL that was set in mysql_execute_command().
Call stack:
#0 THD_TRANS::reset
#1 trans_commit_implicit
#2 ha_enable_transaction
#3 mysql_trans_commit_alter_copy_data
#4 copy_data_between_tables
#5 mysql_alter_table
#6 Sql_cmd_alter_table::execute
#7 mysql_execute_command
(An earlier version of the patch looked directly at thd->lex->sql_command
during binlogging to see if it was a DDL. That caused another problem: the
INSERT INTO got wrongly marked as DDL, because it is implicitly committed by
an ALTER TABLE statement).
But the more general problem is that I am very unsure about the proper
places to put in code to clear the required flags at the start of a
transaction, and to copy relevant flags from "stmt" to "all" transaction.
I tried using what I found in MySQL code, and to look for where the existing
MariaDB code copies the existing THD_TRANS::modified_non_trans_table flag.
But I am very unsure that I choose the right places, the existing code seems
rather ad-hoc (I'm wondering if it is even correct).
I also replaced some add-hoc code that clears
THD_TRANS::modified_non_trans_table directly with calls to
THD_TRANS::reset(); that seems more appropriate when more flags are
affected, but it does more things (notably clearing no_2pc), so again I'm
very unsure if it is correct...
Any suggestions welcomed. The underlying bug makes optimistic parallel
replication quite broken with temporary tables and statement-based
binlogging in 10.1 (can probably even crash the server), so it is of some
importance to get fixed....
- Kristian.
-----------------------------------------------------------------------
revision-id: c325a34d93b931d1c927255c18df94e1560af6eb
parent(s): 53382ac128d9571b5f779bf625567fc3ea0b475e
committer: Kristian Nielsen
branch nick: mariadb
timestamp: 2015-05-15 14:56:54 +0200
message:
MDEV-8075: DROP TEMPORARY TABLE not marked as ddl, causing optimistic parallel replication to fail
CREATE/DROP TEMPORARY TABLE are not safe to optimistically replicate in
parallel with other transactions, so they need to be marked as "ddl" in the
binlog.
This was already done for stand-alone CREATE/DROP TEMPORARY. But temporary
tables can also be created and dropped inside a BEGIN...END transaction, and
such transactions were not marked as ddl. Nor was the DROP TEMPORARY TABLE
statement emitted implicitly when a client connection is closed.
So this patch adds such ddl mark for the missing cases.
---
.../include/binlog_parallel_replication_marks.test | 83 ++++++++++++++++++
.../r/binlog_parallel_replication_marks_row.result | 98 ++++++++++++++++++++++
...inlog_parallel_replication_marks_stm_mix.result | 95 +++++++++++++++++++++
.../t/binlog_parallel_replication_marks_row.test | 3 +
.../binlog_parallel_replication_marks_stm_mix.test | 3 +
.../suite/rpl/r/rpl_parallel_optimistic.result | 27 ++++++
.../suite/rpl/t/rpl_parallel_optimistic.test | 25 ++++++
sql/handler.h | 14 ++++
sql/log_event.cc | 6 +-
sql/sql_base.cc | 1 +
sql/sql_class.h | 10 +++
sql/sql_insert.cc | 8 ++
sql/sql_parse.cc | 4 +-
sql/sql_table.cc | 22 +++--
sql/transaction.cc | 25 +++---
15 files changed, 400 insertions(+), 24 deletions(-)
diff --git a/mysql-test/include/binlog_parallel_replication_marks.test b/mysql-test/include/binlog_parallel_replication_marks.test
new file mode 100644
index 0000000..4775e30
--- /dev/null
+++ b/mysql-test/include/binlog_parallel_replication_marks.test
@@ -0,0 +1,83 @@
+# Test the markings on GTID events (ddl, waited, trans,
+# @@skip_parallel_replication) that are used to control parallel
+# replication on the slave.
+
+--source include/have_innodb.inc
+
+RESET MASTER;
+--source include/wait_for_binlog_checkpoint.inc
+
+--let $stable_stamp= `SELECT UNIX_TIMESTAMP("2020-01-21 15:32:22")`
+eval set timestamp=$stable_stamp;
+
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+--let $binlog_pos1=query_get_value(SHOW MASTER STATUS, Position, 1)
+/* GTID */ INSERT INTO t1 VALUES (1,0);
+/* GTID */ BEGIN;
+/* GTID */ INSERT INTO t1 VALUES (2,0);
+/* GTID */ ALTER TABLE t1 ADD c INT;
+/* GTID */ INSERT INTO t1 VALUES (3,0,0);
+/* GTID */ COMMIT;
+/* GTID */ BEGIN;
+/* GTID */ UPDATE t1 SET b=1, c=1 WHERE a=2;
+/* GTID */ CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t2 VALUES (4,10), (5,20);
+/* GTID */ INSERT INTO t1 SELECT a, 2, b FROM t2;
+/* GTID */ DROP TEMPORARY TABLE t2;
+/* GTID */ INSERT INTO t1 VALUES (6, 3, 0);
+/* GTID */ COMMIT;
+/* GTID */ CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ BEGIN;
+/* GTID */ DELETE FROM t1 WHERE a=5;
+/* GTID */ INSERT INTO t3 VALUES (7);
+/* GTID */ INSERT INTO t1 SELECT a, 4, 0 FROM t3;
+/* GTID */ UPDATE t1 SET c=1 WHERE a=7;
+/* GTID */ DROP TEMPORARY TABLE t3;
+/* GTID */ COMMIT;
+/* GTID */ CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ BEGIN;
+/* GTID */ INSERT INTO t1 VALUES (8, 5, 0);
+/* GTID */ ALTER TABLE t4 ADD b INT;
+/* GTID */ INSERT INTO t1 VALUES (9, 5, 1);
+/* GTID */ COMMIT;
+connect (tmp_con,localhost,root,,);
+eval set timestamp=$stable_stamp;
+/* GTID */ INSERT INTO t1 VALUES (10, 6, 0);
+/* GTID */ BEGIN;
+/* GTID */ CREATE TEMPORARY TABLE t5 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t1 VALUES (11, 7, 0);
+/* GTID */ COMMIT;
+--let $before_drop_pos=query_get_value(SHOW MASTER STATUS, Position, 1)
+disconnect tmp_con;
+connection default;
+
+# We need to wait for the implicit DROP TEMPORARY TABLE to be logged after
+# tmp_con disconnect, otherwise we get sporadic test failures.
+--let $wait_condition= SELECT variable_value > $before_drop_pos FROM information_schema.global_status WHERE variable_name = 'binlog_snapshot_position'
+--source include/wait_condition.inc
+
+--let $binlog_pos2=query_get_value(SHOW MASTER STATUS, Position, 1)
+
+--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
+FLUSH LOGS;
+
+--let $MYSQLD_DATADIR= `select @@datadir`
+--let $file= $MYSQLTEST_VARDIR/tmp/binlog_parallel_replication_marks.out
+--let OUTPUT_FILE=$file
+exec $MYSQL_BINLOG --start_position=$binlog_pos1 --stop_position=$binlog_pos2 $MYSQLD_DATADIR/$binlog_file > $file;
+
+perl;
+my $file= $ENV{'OUTPUT_FILE'};
+open F, "<", $file
+ or die "Unable to open file '$file': $!\n";
+while (<F>) {
+ s/GTID \d+-\d+-\d+/GTID #-#-#/;
+ s/end_log_pos \d+/end_log_pos #/;
+ s/table id \d+/table id #/;
+ s/mapped to number \d+/mapped to number #/;
+ print if /GTID|BEGIN|COMMIT|Table_map|Write_rows|Update_rows|Delete_rows|generated by server|40005 TEMPORARY/;
+}
+close F;
+EOF
+
+DROP TABLE t1;
diff --git a/mysql-test/suite/binlog/r/binlog_parallel_replication_marks_row.result b/mysql-test/suite/binlog/r/binlog_parallel_replication_marks_row.result
new file mode 100644
index 0000000..b9c8666
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_parallel_replication_marks_row.result
@@ -0,0 +1,98 @@
+RESET MASTER;
+set timestamp=1579617142;
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t1 VALUES (1,0);
+/* GTID */ BEGIN;
+/* GTID */ INSERT INTO t1 VALUES (2,0);
+/* GTID */ ALTER TABLE t1 ADD c INT;
+/* GTID */ INSERT INTO t1 VALUES (3,0,0);
+/* GTID */ COMMIT;
+/* GTID */ BEGIN;
+/* GTID */ UPDATE t1 SET b=1, c=1 WHERE a=2;
+/* GTID */ CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t2 VALUES (4,10), (5,20);
+/* GTID */ INSERT INTO t1 SELECT a, 2, b FROM t2;
+/* GTID */ DROP TEMPORARY TABLE t2;
+/* GTID */ INSERT INTO t1 VALUES (6, 3, 0);
+/* GTID */ COMMIT;
+/* GTID */ CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ BEGIN;
+/* GTID */ DELETE FROM t1 WHERE a=5;
+/* GTID */ INSERT INTO t3 VALUES (7);
+/* GTID */ INSERT INTO t1 SELECT a, 4, 0 FROM t3;
+/* GTID */ UPDATE t1 SET c=1 WHERE a=7;
+/* GTID */ DROP TEMPORARY TABLE t3;
+/* GTID */ COMMIT;
+/* GTID */ CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ BEGIN;
+/* GTID */ INSERT INTO t1 VALUES (8, 5, 0);
+/* GTID */ ALTER TABLE t4 ADD b INT;
+/* GTID */ INSERT INTO t1 VALUES (9, 5, 1);
+/* GTID */ COMMIT;
+set timestamp=1579617142;
+/* GTID */ INSERT INTO t1 VALUES (10, 6, 0);
+/* GTID */ BEGIN;
+/* GTID */ CREATE TEMPORARY TABLE t5 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t1 VALUES (11, 7, 0);
+/* GTID */ COMMIT;
+FLUSH LOGS;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+/* GTID */ ALTER TABLE t1 ADD c INT
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Update_rows: table id # flags: STMT_END_F
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+DROP TEMPORARY TABLE IF EXISTS `test`.`t2` /* generated by server */
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Delete_rows: table id # flags: STMT_END_F
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Update_rows: table id # flags: STMT_END_F
+DROP TEMPORARY TABLE IF EXISTS `test`.`t3` /* generated by server */
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+#200121 15:32:22 server id 1 end_log_pos # Table_map: `test`.`t1` mapped to number #
+#200121 15:32:22 server id 1 end_log_pos # Write_rows: table id # flags: STMT_END_F
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+DROP /*!40005 TEMPORARY */ TABLE IF EXISTS `t5`
+DROP TABLE t1;
diff --git a/mysql-test/suite/binlog/r/binlog_parallel_replication_marks_stm_mix.result b/mysql-test/suite/binlog/r/binlog_parallel_replication_marks_stm_mix.result
new file mode 100644
index 0000000..c3dc00d
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_parallel_replication_marks_stm_mix.result
@@ -0,0 +1,95 @@
+RESET MASTER;
+set timestamp=1579617142;
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t1 VALUES (1,0);
+/* GTID */ BEGIN;
+/* GTID */ INSERT INTO t1 VALUES (2,0);
+/* GTID */ ALTER TABLE t1 ADD c INT;
+/* GTID */ INSERT INTO t1 VALUES (3,0,0);
+/* GTID */ COMMIT;
+/* GTID */ BEGIN;
+/* GTID */ UPDATE t1 SET b=1, c=1 WHERE a=2;
+/* GTID */ CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t2 VALUES (4,10), (5,20);
+/* GTID */ INSERT INTO t1 SELECT a, 2, b FROM t2;
+/* GTID */ DROP TEMPORARY TABLE t2;
+/* GTID */ INSERT INTO t1 VALUES (6, 3, 0);
+/* GTID */ COMMIT;
+/* GTID */ CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ BEGIN;
+/* GTID */ DELETE FROM t1 WHERE a=5;
+/* GTID */ INSERT INTO t3 VALUES (7);
+/* GTID */ INSERT INTO t1 SELECT a, 4, 0 FROM t3;
+/* GTID */ UPDATE t1 SET c=1 WHERE a=7;
+/* GTID */ DROP TEMPORARY TABLE t3;
+/* GTID */ COMMIT;
+/* GTID */ CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ BEGIN;
+/* GTID */ INSERT INTO t1 VALUES (8, 5, 0);
+/* GTID */ ALTER TABLE t4 ADD b INT;
+/* GTID */ INSERT INTO t1 VALUES (9, 5, 1);
+/* GTID */ COMMIT;
+set timestamp=1579617142;
+/* GTID */ INSERT INTO t1 VALUES (10, 6, 0);
+/* GTID */ BEGIN;
+/* GTID */ CREATE TEMPORARY TABLE t5 (a INT PRIMARY KEY) ENGINE=InnoDB;
+/* GTID */ INSERT INTO t1 VALUES (11, 7, 0);
+/* GTID */ COMMIT;
+FLUSH LOGS;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+/* GTID */ INSERT INTO t1 VALUES (1,0)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+/* GTID */ INSERT INTO t1 VALUES (2,0)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+/* GTID */ ALTER TABLE t1 ADD c INT
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+/* GTID */ INSERT INTO t1 VALUES (3,0,0)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+BEGIN
+/* GTID */ UPDATE t1 SET b=1, c=1 WHERE a=2
+/* GTID */ CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB
+/* GTID */ INSERT INTO t2 VALUES (4,10), (5,20)
+/* GTID */ INSERT INTO t1 SELECT a, 2, b FROM t2
+DROP TEMPORARY TABLE `t2` /* generated by server */
+/* GTID */ INSERT INTO t1 VALUES (6, 3, 0)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+/* GTID */ CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY) ENGINE=InnoDB
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+BEGIN
+/* GTID */ DELETE FROM t1 WHERE a=5
+/* GTID */ INSERT INTO t3 VALUES (7)
+/* GTID */ INSERT INTO t1 SELECT a, 4, 0 FROM t3
+/* GTID */ UPDATE t1 SET c=1 WHERE a=7
+DROP TEMPORARY TABLE `t3` /* generated by server */
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+/* GTID */ CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+/* GTID */ INSERT INTO t1 VALUES (8, 5, 0)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+/* GTID */ ALTER TABLE t4 ADD b INT
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+/* GTID */ INSERT INTO t1 VALUES (9, 5, 1)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# trans
+BEGIN
+/* GTID */ INSERT INTO t1 VALUES (10, 6, 0)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+BEGIN
+/* GTID */ CREATE TEMPORARY TABLE t5 (a INT PRIMARY KEY) ENGINE=InnoDB
+/* GTID */ INSERT INTO t1 VALUES (11, 7, 0)
+COMMIT/*!*/;
+#200121 15:32:22 server id 1 end_log_pos # GTID #-#-# ddl
+DROP /*!40005 TEMPORARY */ TABLE IF EXISTS `t5`
+DROP TABLE t1;
diff --git a/mysql-test/suite/binlog/t/binlog_parallel_replication_marks_row.test b/mysql-test/suite/binlog/t/binlog_parallel_replication_marks_row.test
new file mode 100644
index 0000000..8289848
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_parallel_replication_marks_row.test
@@ -0,0 +1,3 @@
+--source include/have_log_bin.inc
+--source include/have_binlog_format_row.inc
+--source include/binlog_parallel_replication_marks.test
diff --git a/mysql-test/suite/binlog/t/binlog_parallel_replication_marks_stm_mix.test b/mysql-test/suite/binlog/t/binlog_parallel_replication_marks_stm_mix.test
new file mode 100644
index 0000000..15042b3
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_parallel_replication_marks_stm_mix.test
@@ -0,0 +1,3 @@
+--source include/have_log_bin.inc
+--source include/have_binlog_format_mixed_or_statement.inc
+--source include/binlog_parallel_replication_marks.test
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result
index 7593b49..2da7830 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result
@@ -455,6 +455,33 @@ a b
include/stop_slave.inc
SET GLOBAL debug_dbug= @old_debug;
include/start_slave.inc
+*** MDEV-8075: DROP TEMPORARY TABLE not marked as ddl, causing optimistic parallel replication to fail ***
+include/stop_slave.inc
+INSERT INTO t1 VALUES (40, 10);
+CREATE TEMPORARY TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (41);
+BEGIN;
+INSERT INTO t2 SELECT a, 20 FROM t1;
+DROP TEMPORARY TABLE t1;
+COMMIT;
+INSERT INTO t1 VALUES (42, 10);
+include/save_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 40 ORDER BY a;
+a b
+40 10
+42 10
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+a b
+41 20
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 40 ORDER BY a;
+a b
+40 10
+42 10
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+a b
+41 20
include/stop_slave.inc
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test
index e7d4f18..376c8d6 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test
@@ -429,6 +429,31 @@ SET GLOBAL debug_dbug= @old_debug;
--source include/start_slave.inc
+--echo *** MDEV-8075: DROP TEMPORARY TABLE not marked as ddl, causing optimistic parallel replication to fail ***
+
+--connection server_2
+--source include/stop_slave.inc
+
+--connection server_1
+INSERT INTO t1 VALUES (40, 10);
+CREATE TEMPORARY TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (41);
+BEGIN;
+INSERT INTO t2 SELECT a, 20 FROM t1;
+DROP TEMPORARY TABLE t1;
+COMMIT;
+INSERT INTO t1 VALUES (42, 10);
+--source include/save_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 40 ORDER BY a;
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+
+--connection server_2
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 40 ORDER BY a;
+SELECT * FROM t2 WHERE a >= 40 ORDER BY a;
+
+
# Clean up.
--connection server_2
diff --git a/sql/handler.h b/sql/handler.h
index 279bfaf..8faec7f 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -1444,16 +1444,30 @@ struct THD_TRANS
static unsigned int const CREATED_TEMP_TABLE= 0x02;
static unsigned int const DROPPED_TEMP_TABLE= 0x04;
static unsigned int const DID_WAIT= 0x08;
+ static unsigned int const DID_DDL= 0x10;
void mark_created_temp_table()
{
DBUG_PRINT("debug", ("mark_created_temp_table"));
m_unsafe_rollback_flags|= CREATED_TEMP_TABLE;
}
+ void mark_dropped_temp_table()
+ {
+ DBUG_PRINT("debug", ("mark_dropped_temp_table"));
+ m_unsafe_rollback_flags|= DROPPED_TEMP_TABLE;
+ }
+ bool has_created_dropped_temp_table() const {
+ return
+ (m_unsafe_rollback_flags & (CREATED_TEMP_TABLE|DROPPED_TEMP_TABLE)) != 0;
+ }
void mark_trans_did_wait() { m_unsafe_rollback_flags|= DID_WAIT; }
bool trans_did_wait() const {
return (m_unsafe_rollback_flags & DID_WAIT) != 0;
}
+ void mark_trans_did_ddl() { m_unsafe_rollback_flags|= DID_DDL; }
+ bool trans_did_ddl() const {
+ return (m_unsafe_rollback_flags & DID_DDL) != 0;
+ }
};
diff --git a/sql/log_event.cc b/sql/log_event.cc
index aa34996..98cc4b1 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6405,8 +6405,10 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
if (thd_arg->transaction.stmt.trans_did_wait() ||
thd_arg->transaction.all.trans_did_wait())
flags2|= FL_WAITED;
- if (sql_command_flags[thd->lex->sql_command] &
- (CF_DISALLOW_IN_RO_TRANS | CF_AUTO_COMMIT_TRANS))
+ if (thd_arg->transaction.stmt.trans_did_ddl() ||
+ thd_arg->transaction.stmt.has_created_dropped_temp_table() ||
+ thd_arg->transaction.all.trans_did_ddl() ||
+ thd_arg->transaction.all.has_created_dropped_temp_table())
flags2|= FL_DDL;
else if (is_transactional)
flags2|= FL_TRANSACTIONAL;
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index c400598..6ef46ca 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -1253,6 +1253,7 @@ bool close_temporary_tables(THD *thd)
thd->variables.character_set_client= cs_save;
thd->get_stmt_da()->set_overwrite_status(true);
+ thd->transaction.stmt.mark_dropped_temp_table();
if ((error= (mysql_bin_log.write(&qinfo) || error)))
{
/*
diff --git a/sql/sql_class.h b/sql/sql_class.h
index a39c8cd..11eab4e 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -3953,6 +3953,16 @@ class THD :public Statement,
{
main_lex.restore_set_statement_var();
}
+ /* Copy relevant `stmt` transaction flags to `all` transaction. */
+ void merge_unsafe_rollback_flags()
+ {
+ if (transaction.stmt.modified_non_trans_table)
+ transaction.all.modified_non_trans_table= TRUE;
+ transaction.all.m_unsafe_rollback_flags|=
+ (transaction.stmt.m_unsafe_rollback_flags &
+ (THD_TRANS::DID_WAIT | THD_TRANS::CREATED_TEMP_TABLE |
+ THD_TRANS::DROPPED_TEMP_TABLE | THD_TRANS::DID_DDL));
+ }
};
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 24d75d9..90e68d5 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -4226,6 +4226,14 @@ void select_create::store_values(List<Item> &values)
bool select_create::send_eof()
{
+ /*
+ The routine that writes the statement in the binary log
+ is in select_insert::send_eof(). For that reason, we
+ mark the flag at this point.
+ */
+ if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
+ thd->transaction.stmt.mark_created_temp_table();
+
if (select_insert::send_eof())
{
abort_result_set();
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 5eca972..142596f 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2798,6 +2798,7 @@ mysql_execute_command(THD *thd)
goto error;
}
}
+ thd->transaction.stmt.mark_trans_did_ddl();
}
#ifndef DBUG_OFF
@@ -6781,8 +6782,7 @@ void THD::reset_for_next_command()
if (!thd->in_multi_stmt_transaction_mode())
{
thd->variables.option_bits&= ~OPTION_KEEP_LOG;
- thd->transaction.all.modified_non_trans_table= FALSE;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
}
DBUG_ASSERT(thd->security_ctx== &thd->main_security_ctx);
thd->thread_specific_used= FALSE;
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 1f96793..9abe012 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -2559,6 +2559,9 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
if (non_trans_tmp_table_deleted ||
trans_tmp_table_deleted || non_tmp_table_deleted)
{
+ if (non_trans_tmp_table_deleted || trans_tmp_table_deleted)
+ thd->transaction.stmt.mark_dropped_temp_table();
+
query_cache_invalidate3(thd, tables, 0);
if (!dont_log_query && mysql_bin_log.is_open())
{
@@ -5014,6 +5017,9 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
if (thd->is_current_stmt_binlog_format_row() && create_info->tmp_table())
DBUG_RETURN(result);
+ if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
+ thd->transaction.stmt.mark_created_temp_table();
+
/* Write log if no error or if we already deleted a table */
if (!result || thd->log_current_statement)
{
@@ -5491,13 +5497,17 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
DBUG_PRINT("info",
("res: %d tmp_table: %d create_info->table: %p",
res, create_info->tmp_table(), local_create_info.table));
- if (!res && create_info->tmp_table() && local_create_info.table)
+ if (create_info->tmp_table())
{
- /*
- Remember that tmp table creation was logged so that we know if
- we should log a delete of it.
- */
- local_create_info.table->s->table_creation_was_logged= 1;
+ thd->transaction.stmt.mark_created_temp_table();
+ if (!res && local_create_info.table)
+ {
+ /*
+ Remember that tmp table creation was logged so that we know if
+ we should log a delete of it.
+ */
+ local_create_info.table->s->table_creation_was_logged= 1;
+ }
}
do_logging= TRUE;
}
diff --git a/sql/transaction.cc b/sql/transaction.cc
index f84c4e5..fa6d338 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -154,8 +154,7 @@ bool trans_begin(THD *thd, uint flags)
The following set should not be needed as the flag should always be 0
when we come here. We should at some point change this to an assert.
*/
- thd->transaction.all.modified_non_trans_table= FALSE;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
thd->has_waiter= false;
thd->waiting_on_group_commit= false;
@@ -251,8 +250,7 @@ bool trans_commit(THD *thd)
else
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.modified_non_trans_table= FALSE;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
thd->lex->start_transaction_opt= 0;
DBUG_RETURN(MY_TEST(res));
@@ -299,8 +297,7 @@ bool trans_commit_implicit(THD *thd)
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.modified_non_trans_table= FALSE;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
/*
Upon implicit commit, reset the current transaction
@@ -345,8 +342,7 @@ bool trans_rollback(THD *thd)
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
/* Reset the binlog transaction marker */
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
- thd->transaction.all.modified_non_trans_table= FALSE;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
thd->lex->start_transaction_opt= 0;
DBUG_RETURN(MY_TEST(res));
@@ -390,8 +386,7 @@ bool trans_rollback_implicit(THD *thd)
preserve backward compatibility.
*/
thd->variables.option_bits&= ~(OPTION_KEEP_LOG);
- thd->transaction.all.modified_non_trans_table= false;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
/* Rollback should clear transaction_rollback_request flag. */
DBUG_ASSERT(! thd->transaction_rollback_request);
@@ -427,6 +422,8 @@ bool trans_commit_stmt(THD *thd)
*/
DBUG_ASSERT(! thd->in_sub_stmt);
+ thd->merge_unsafe_rollback_flags();
+
if (thd->transaction.stmt.ha_list)
{
if (WSREP_ON)
@@ -481,6 +478,8 @@ bool trans_rollback_stmt(THD *thd)
*/
DBUG_ASSERT(! thd->in_sub_stmt);
+ thd->merge_unsafe_rollback_flags();
+
if (thd->transaction.stmt.ha_list)
{
if (WSREP_ON)
@@ -913,8 +912,7 @@ bool trans_xa_commit(THD *thd)
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.modified_non_trans_table= FALSE;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -969,8 +967,7 @@ bool trans_xa_rollback(THD *thd)
res= xa_trans_force_rollback(thd);
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.modified_non_trans_table= FALSE;
- thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ thd->transaction.all.reset();
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
_______________________________________________
commits mailing list
commits(a)mariadb.org
https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
1
0
[Maria-developers] Step#5: MDEV-7950, , Item_func::type() takes 0.26% in OLTP RO
by Alexander Barkov 14 May '15
by Alexander Barkov 14 May '15
14 May '15
Hi Sergey,
please review the next step for MDEV-7950.
This step changes the function remove_eq_conds() into a method in Item.
It removes 6 virtual calls for Item_func::type(), and adds only 2
virtual calls for Item***::remove_eq_conds().
Thanks.
1
0
Re: [Maria-developers] 8a0f331: MDEV-7943 - pthread_getspecific() takes 0.76% in OLTP RO
by Sergei Golubchik 13 May '15
by Sergei Golubchik 13 May '15
13 May '15
Hi, Sergey!
On Apr 22, svoj(a)mariadb.org wrote:
> revision-id: 8a0f3310275f7b4fa445f907140f677910e18999
> parent(s): 7feee74dd30c96bd50d1c90e4ce3b06a656b17a5
> committer: Sergey Vojtovich
> branch nick: mariadb
> timestamp: 2015-04-22 14:18:51 +0400
> message:
>
> MDEV-7943 - pthread_getspecific() takes 0.76% in OLTP RO
>
> Avoid calling current_thd from thd_kill_level(). This reduces number of
> pthread_getspecific() calls from 776 to 354.
>
> Also thd_kill_level(NULL) is not permitted anymore: this saves one condition.
>
> ---
> plugin/semisync/semisync_master.cc | 4 ++--
> sql/sql_class.cc | 20 +++++++++-----------
> 2 files changed, 11 insertions(+), 13 deletions(-)
>
> diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
> index c88c162..b1f7fbd 100644
> --- a/plugin/semisync/semisync_master.cc
> +++ b/plugin/semisync/semisync_master.cc
> @@ -635,7 +635,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
> (int)is_on());
> }
>
> - while (is_on() && !thd_killed(NULL))
> + while (is_on() && !thd_killed(current_thd))
thd_killed() is a function of the kill_statement service
(see include/mysql/service_kill_statement.h). It is created for plugins
to use. But current_thd is not. Plugins generally have no access to it.
> {
> if (reply_file_name_inited_)
> {
> diff --git a/sql/sql_class.cc b/sql/sql_class.cc
> index f273974..d32b14d 100644
> --- a/sql/sql_class.cc
> +++ b/sql/sql_class.cc
> @@ -4208,20 +4208,18 @@ void THD::restore_backup_open_tables_state(Open_tables_backup *backup)
> */
> extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd)
> {
> - THD* current= current_thd;
> -
> - if (!thd)
> - thd= current;
> -
> - if (thd == current)
I wonder if there's a cheaper way to check "if THD is current_thd".
E.g. THD stores the "end of stack" value. If it'll also store "beginning
of stack", one can easily check whether a local variable is within these
limits, which would mean the THD is current.
> - {
> - Apc_target *apc_target= (Apc_target*)&thd->apc_target;
> - if (apc_target->have_apc_requests())
> - apc_target->process_apc_requests();
> - }
> + DBUG_ASSERT(thd);
>
> if (likely(thd->killed == NOT_KILLED))
> + {
> + Apc_target *apc_target= (Apc_target*) &thd->apc_target;
> + if (unlikely(apc_target->have_apc_requests()))
> + {
> + if (thd == current_thd)
> + apc_target->process_apc_requests();
> + }
> return THD_IS_NOT_KILLED;
> + }
Why did you put process_apc_requests() under thd->killed == NOT_KILLED ?
> return thd->killed & KILL_HARD_BIT ? THD_ABORT_ASAP : THD_ABORT_SOFTLY;
> }
Regards,
Sergei
2
4
[Maria-developers] Step#4: MDEV-7950 Item_func::type() takes 0.26% in OLTP RO
by Alexander Barkov 13 May '15
by Alexander Barkov 13 May '15
13 May '15
Hi Sergey,
Please review a patch for the next step for MDEV-7950
(one small thing at a time, to avoid huge unclear patches)
Thanks.
2
4
Re: [Maria-developers] 7feee74: MDEV-7943 - pthread_getspecific() takes 0.76% in OLTP RO
by Sergei Golubchik 12 May '15
by Sergei Golubchik 12 May '15
12 May '15
Hi, Sergey!
On Apr 22, svoj(a)mariadb.org wrote:
> revision-id: 7feee74dd30c96bd50d1c90e4ce3b06a656b17a5
> parent(s): 696188fe2f0756a98914df5b95ba302f7e5c55df
> committer: Sergey Vojtovich
> branch nick: mariadb
> timestamp: 2015-04-22 13:29:56 +0400
> message:
>
> MDEV-7943 - pthread_getspecific() takes 0.76% in OLTP RO
>
> Added THD argument to select_result and all derivative classes. This
> reduces number of pthread_getspecific calls from 796 to 776 per OLTP
> RO transaction.
Lots of changes for a little gain :)
Ok to push.
An idea. Create a variant of Sql_alloc (say, Sql_alloc_explicit, a
parent of Sql_alloc) that does not use current_thd but requires MEM_ROOT
to be specified explicitly. And use it for classes where you want it.
Regards,
Sergei
2
1