developers
Threads by month
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2009 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
May 2015
- 22 participants
- 54 discussions
8043
I tried using a search engine and googled for "jira mariadb innodb
encryption jonas" :)
/Jonas
On Thu, May 21, 2015 at 4:36 PM, Jan Lindström <jan.lindstrom(a)mariadb.com>
wrote:
> Hi,
>
> Can you give the MDEV, I do not seem to be able to find it.
>
> R: Jan
>
> On Thu, May 21, 2015 at 2:51 PM, Jonas Oreland <jonaso(a)google.com> wrote:
>
>> i made a JIRA entry with latest innodb encryption...
>>
>> /Jonas
>>
>> On Thu, May 21, 2015 at 12:30 PM, Jan Lindström <
>> jan.lindstrom(a)mariadb.com> wrote:
>>
>>> Hi,
>>>
>>> On Thu, May 21, 2015 at 11:33 AM, Jonas Oreland <jonaso(a)google.com>
>>> wrote:
>>>
>>>> Hi Jan,
>>>>
>>>> 1) rotating encrypted => unencrypted is definitely supported,
>>>> in my latest version the TODO is removed...(hope you have a recent
>>>> enough version)
>>>>
>>>
>>> Based on this, I do not have the latest version. Where, I could get
>>> that, it does not seem to be on branch?
>>>
>>>>
>>>>
>>>> 2) Thanks! for testcase, it did indeed reveal a bug with the "encrypted
>>>> => unencrypted => encrypted" sequence
>>>>
>>>> attaching fix.
>>>>
>>>> Let me know it fixes your version of testcase (i've used a modified
>>>> version since e.g we don't have per table settings...)
>>>>
>>>
>>> Yes, this fix is good. Thanks.
>>>
>>> R: Jan
>>>
>>
>>
>
1
0
[Maria-developers] How do I read information_schema.global_status not using query.
by kyung nam Kim 22 May '15
by kyung nam Kim 22 May '15
22 May '15
Hi,
I hava a trouble with getting a data from
informaition_schema.global_status table not using query.
I wrote below code and it's possible to get data from some table in
performance_schema .
** reader.cc
===============================================================================================
my_thread_init();
...
MEM_ROOT mem_field;
READ_RECORD read_record_info;
tables.init_one_table(STRING_WITH_LEN("performance_schema"),
STRING_WITH_LEN("events_statements_current"),
bool result = open_and_lock_tables(ctx->thd, &tables, FALSE,
MYSQL_LOCK_IGNORE_TIMEOUT);
if (result)
{
return false;
}
if(table = tables.table) /* if only table exists */
{
init_sql_alloc(&mem_field, 1024, 0, MYF(0));
...
while (!(error= read_record_info.read_record(&read_record_info)))
{
char* value = get_field(&mem_field, table->field[0]); // It's OK.
....
}
}
...
====================================================================================
It's possible to read data from performance_schema tables using low-level
function .
But I tried to read data from information_schema.global_status table as
same way(just changed the name of database and table) , an error occurred
in open_and_lock_tables().
Is there a way to read data from information_schema.global_status not using
query?
Regards,
Kim kyungnam.
1
0
i made a JIRA entry with latest innodb encryption...
/Jonas
On Thu, May 21, 2015 at 12:30 PM, Jan Lindström <jan.lindstrom(a)mariadb.com>
wrote:
> Hi,
>
> On Thu, May 21, 2015 at 11:33 AM, Jonas Oreland <jonaso(a)google.com> wrote:
>
>> Hi Jan,
>>
>> 1) rotating encrypted => unencrypted is definitely supported,
>> in my latest version the TODO is removed...(hope you have a recent enough
>> version)
>>
>
> Based on this, I do not have the latest version. Where, I could get that,
> it does not seem to be on branch?
>
>>
>>
>> 2) Thanks! for testcase, it did indeed reveal a bug with the "encrypted
>> => unencrypted => encrypted" sequence
>>
>> attaching fix.
>>
>> Let me know it fixes your version of testcase (i've used a modified
>> version since e.g we don't have per table settings...)
>>
>
> Yes, this fix is good. Thanks.
>
> R: Jan
>
1
0
Hi Jan,
1) rotating encrypted => unencrypted is definitely supported,
in my latest version the TODO is removed...(hope you have a recent enough
version)
2) Thanks! for testcase, it did indeed reveal a bug with the "encrypted =>
unencrypted => encrypted" sequence
attaching fix.
Let me know it fixes your version of testcase (i've used a modified version
since e.g we don't have per table settings...)
/Jonas
On Fri, May 15, 2015 at 1:23 PM, Jan Lindström <jan.lindstrom(a)mariadb.com>
wrote:
> Hi,
>
> At fil0crypt.cc there is
>
> fil_crypt_needs_rotation(uint key_version, const key_state_t *key_state)
> {
> // TODO(jonaso): Add support for rotating encrypted => unencrypted
>
> if (key_version == 0 && key_state->key_version != 0) {
> /* this is rotation unencrypted => encrypted
> * ignore rotate_key_age */
> return true;
> }
>
> Thus to me it is not clear is the support for rotating encrypted =>
> unencrypted really missing or not and furthermore, see attached test case
> for this,
>
> encrypted + insert + grep : ok
> encrypted => unencrypted + grep: ok
> unencrypted => encrypted + grep: not ok
>
> R: Jan
>
1
0
Re: [Maria-developers] [Commits] b2db8e8: MDEV-7949: Item_field::used_tables() takes 0.29% in OLTP RO
by Sergey Vojtovich 21 May '15
by Sergey Vojtovich 21 May '15
21 May '15
Hi Sanja,
this patch looks Ok, just one minor suggestion inline. Though effect for given
workload is quite moderate:
Item_field::used_tables 0.21% -> 0.20%
Item_basic_constant::used_tables 0.11% -> 0.11%
Item_func::used_tables 0.05% -> 0.05%
Item_sum::used_tables 0.00% -> 0.00%
On Wed, May 13, 2015 at 04:17:26PM +0200, sanja(a)mariadb.com wrote:
> revision-id: b2db8e85422a455a10cd28ef7e44977182528f70
> parent(s): b22959903b89e798f8804ec9a815c88f75915cd9
> committer: Oleksandr Byelkin
> branch nick: server
> timestamp: 2015-05-13 16:17:22 +0200
> message:
>
> MDEV-7949: Item_field::used_tables() takes 0.29% in OLTP RO
>
> small sixes of used_tables() usage
>
...skip...
> diff --git a/sql/item_func.h b/sql/item_func.h
> index 0d57c2b..c48cd84 100644
> --- a/sql/item_func.h
> +++ b/sql/item_func.h
> @@ -1337,7 +1337,7 @@ class Item_func_sleep :public Item_int_func
> const char *func_name() const { return "sleep"; }
> table_map used_tables() const
> {
> - return Item_int_func::used_tables() | RAND_TABLE_BIT;
> + return used_tables_cache | RAND_TABLE_BIT;
> }
> bool is_expensive() { return 1; }
> longlong val_int();
> @@ -1591,7 +1591,7 @@ class Item_func_get_lock :public Item_int_func
> void fix_length_and_dec() { max_length=1; maybe_null=1;}
> table_map used_tables() const
> {
> - return Item_int_func::used_tables() | RAND_TABLE_BIT;
> + return used_tables_cache | RAND_TABLE_BIT;
> }
> bool const_item() const { return 0; }
> bool is_expensive() { return 1; }
> @@ -1611,7 +1611,7 @@ class Item_func_release_lock :public Item_int_func
> void fix_length_and_dec() { max_length= 1; maybe_null= 1;}
> table_map used_tables() const
> {
> - return Item_int_func::used_tables() | RAND_TABLE_BIT;
> + return used_tables_cache | RAND_TABLE_BIT;
> }
> bool const_item() const { return 0; }
> bool is_expensive() { return 1; }
> @@ -1724,7 +1724,7 @@ class Item_func_set_user_var :public Item_func
> void fix_length_and_dec();
> table_map used_tables() const
> {
> - return Item_func::used_tables() | RAND_TABLE_BIT;
> + return used_tables_cache | RAND_TABLE_BIT;
> }
> bool const_item() const { return 0; }
> bool is_expensive() { return 1; }
Was there any reason for this change? Compiler should be clever enough to inline
used_tables() of base class here.
Regards,
Sergey
3
2
Hello,
Today, I was experimenting with building mariadb images and encountered
this issue:
E: Unable to locate package libmariadb-dev
pcrews@erlking-dev:~/git/dibtest$ aptitude search libmariadb
p libmariadbd-dev
- MariaDB embedded database development
files
p libmariadbd-dev:i386
- MariaDB embedded database development
files
Note how these are listed as libmariadbd vs. libmariadb.
I am writing to see if this can be fixed or if this might be some change
in naming (which would require some updating of other tools' references,
etc).
Thank you,
Patrick Crews
1
0
[Maria-developers] Step#6: MDEV-7950 Item_func::type() takes 0.26% in OLTP RO
by Alexander Barkov 19 May '15
by Alexander Barkov 19 May '15
19 May '15
Hi Sergey,
Please review the next iteration for MDEV-7950.
This one splits the function add_key_fields() into a method in Item.
This change removes about 3 virtual calls item->type(), as well as some
virtual calls item_func->functype(), and adds one virtual call
item->add_key_fields() instead.
Thanks.
2
2
Re: [Maria-developers] [Commits] 8b30c0f: MDEV-7949: Item_field::used_tables() takes 0.29% in OLTP RO
by Sergey Vojtovich 19 May '15
by Sergey Vojtovich 19 May '15
19 May '15
Hi Sanja,
There're 3 bug fixes and 4 revisions concerning these lines:
https://bugs.mysql.com/bug.php?id=16716
git: 1991a87d831bb48b31989c622e35acda11df7fef
git: ca22a81b1c84ce81e1e9ea2c3ace7be1848027d8 (source for all used_tables())
https://bugs.mysql.com/bug.php?id=36488
git: 66367aeea83a0cd7f583a0194651a235b63975a9
https://bugs.mysql.com/bug.php?id=46815
git: 10406ae65871de074e807e626f9ede686e9321d4
If we don't have these bugs, why not to revert all the code introduced by these
patches?
Or we're not affected by the need to use used_tables() as an indication of
prepared statement? If so, why?
Thanks,
Sergey
On Sun, May 17, 2015 at 03:10:48PM +0200, sanja(a)mariadb.com wrote:
> revision-id: 8b30c0f26c7b35fc73429f85aa35eb3614d60bd8
> parent(s): b2db8e85422a455a10cd28ef7e44977182528f70
> committer: Oleksandr Byelkin
> branch nick: server
> timestamp: 2015-05-17 15:10:45 +0200
> message:
>
> MDEV-7949: Item_field::used_tables() takes 0.29% in OLTP RO
>
> Part 2: removed hack workaround for bug we do not have.
>
> ---
> sql/item_strfunc.cc | 7 +++----
> 1 file changed, 3 insertions(+), 4 deletions(-)
>
> diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc
> index 4bf8dd5..7b9169e 100644
> --- a/sql/item_strfunc.cc
> +++ b/sql/item_strfunc.cc
> @@ -626,8 +626,7 @@ String *Item_func_concat::val_str(String *str)
> if (!(res=args[0]->val_str(str)))
> goto null;
> use_as_buff= &tmp_value;
> - /* Item_subselect in --ps-protocol mode will state it as a non-const */
> - is_const= args[0]->const_item() || !args[0]->used_tables();
> + is_const= args[0]->const_item();
> for (i=1 ; i < arg_count ; i++)
> {
> if (res->length() == 0)
> @@ -639,7 +638,7 @@ String *Item_func_concat::val_str(String *str)
> non-empty argument. Because of this we need is_const to be
> evaluated only for it.
> */
> - is_const= args[i]->const_item() || !args[i]->used_tables();
> + is_const= args[i]->const_item();
> }
> else
> {
> @@ -975,7 +974,7 @@ String *Item_func_concat_ws::val_str(String *str)
> for (i=1; i < arg_count; i++)
> if ((res= args[i]->val_str(str)))
> {
> - is_const= args[i]->const_item() || !args[i]->used_tables();
> + is_const= args[i]->const_item();
> break;
> }
>
> _______________________________________________
> commits mailing list
> commits(a)mariadb.org
> https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
1
0
[Maria-developers] MDEV-7937: Enforce SSL when --ssl client option is used
by Vicențiu Ciorbaru 18 May '15
by Vicențiu Ciorbaru 18 May '15
18 May '15
Hi Sergei!
I've done some work on this issue. I've read MySQL's implementation of this
and have looked at our implementation. They have done a bit of refactoring,
introducing an enforce_ssl flag, as well as changing the C interface a bit,
to allow setting this flag programatically.
I've created a patch here that changes the minimum amount possible, in
order to implement what MDEV-7937 requires. That being said, I agree with
(most of) MySQL's refactoring in this case. They've moved all the SSL
related connection code into its own separate function, before actually
calling send_client_reply_packet.
I can work towards implementing things the way MySQL does, but since I saw
that you've actually done most of the work in that area of the code, I
figured I'd ask for your input on it.
There are two more things that I'm not sure of:
1. Specifying --ssl as a command line parameter to the mysql client is not
enough to enforce ssl and the client's code in this case just ignores the
option. We need to provide at least one of the additional ones like
--ssl-key or --ssl-ca. My patch will not cause the client to report an
error in this case. Is this acceptable behaviour or not?
2. Do we want mysql's enforce_ssl feature?
Regards,
Vicențiu
2
1
18 May '15
Hi Jonas,
Thanks for the patch!
I read through the patch. I am not that familiar with the semi-sync plugin
code, but it looks reasonable.
But the patch causes a test failure in sys_vars.all_vars. This is because
not all newly introduced variables have corresponding test cases in
mysql-test/suite/sys_vars/t/*_basic.test.
Also, do you have some documentation for the feature? (There is a bit in the
Jira entry, but for example the new variable
rpl_semi_sync_master_slave_lag_heartbeat_frequency_us seems to be not
described at all...
If you can fix the above two points (and if Buildbot does not point out more
test suite issues), then I will push it into the MariaDB tree.
Thanks,
- Kristian.
> patch has been ported to 10.1
> you can use it under new (also called "3-clause") BSD license
> i have done internal benchmarks...but you are most welcome to do some too.
>
> DESCRIPTION:
> no slave left behind
>
> this patch implements master throttling based on slave lag,
> aka no slave left behind. the core feature works as follows
> 1) the semi-sync-reply is ammended to also report back SQL-thread
> position (aka exec position)
> 2) transactions are not removed from the "active-transaction-list"
> in the semi-sync-master plugin until atleast one slave has reported
> that it has executed this transaction. the slave lag can then
> be estimated by calculating how long the oldest transaction has been
> lingering in the active-transaction-list.
> 3) client-threads are forced to wait before commit until slave lag
> has decreased to acceptable value.
>
> the following variables are introduced on master:
>
> rpl_semi_sync_master_max_slave_lag (global)
> rpl_semi_sync_master_slave_lag_wait_timeout (session)
>
> the following status variables are introduced on master:
>
> rpl_semi_sync_master_slave_lag_wait_sessions
> rpl_semi_sync_master_estimated_slave_lag
> rpl_semi_sync_master_trx_slave_lag_wait_time
> rpl_semi_sync_master_trx_slave_lag_wait_num
> rpl_semi_sync_master_avg_trx_slave_lag_wait_time
>
> the following variables are introduced on slave:
>
> rpl_semi_sync_slave_lag_enabled (global)
>
> in addition to this, 2 optimizations that decreases overhead of semi-sync
> is introduced.
> 1) the idea of this is that if when a slave should send and transaction,
> it checks if it should be semi-synced, but rather
> than semi-sync:ing each transaction (which is done currently) the code
> will skip semi-syncing transaction if there already is newer transactions
> committed. But, since this can mean that semi-syncing is delayed indefinitely
> a cap is set using 2 new master variables:
>
> rpl_semi_sync_master_max_unacked_event_bytes (global)
> rpl_semi_sync_master_max_unacked_event_count (global)
> 2) rpl_semi_sync_master_group_commit which makes the semi-sync
> plugin only semi-sync the last transaction in a group commit.
> diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> new file mode 100644
> index 0000000..f3545a4
> --- /dev/null
> +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> @@ -0,0 +1,150 @@
> +include/master-slave.inc
> +[connection master]
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +Variable_name Value
> +rpl_semi_sync_master_max_slave_lag 10
> +rpl_semi_sync_master_slave_lag_heartbeat_frequency_us 500000
> +rpl_semi_sync_master_slave_lag_wait_timeout 50
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +include/start_slave.inc
> +# create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +#
> +# Check basic behaviour
> +#
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +# Now wait for slave lag to increase to > 0
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +# Now wait for slave lag to increase to > 10s
> +# Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +should_be_1
> +1
> +# Second transaction should now fail. slave_lag is >10s when it commits
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +ERROR HY000: Slave-lag timeout
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to < 10s
> +# And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i f
> +1 1
> +2 2
> +3 3
> +4 4
> +5 5
> +6 6
> +7 7
> +8 8
> +9 9
> +13 d
> +14 e
> +15 f
> +# [ on slave ]
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i f
> +1 1
> +2 2
> +3 3
> +4 4
> +5 5
> +6 6
> +7 7
> +8 8
> +9 9
> +13 d
> +14 e
> +15 f
> +#
> +# Test interaction with READ_ONLY
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# Now wait for slave lag to increase to > 10s
> +# [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# [ on master ]
> +set global read_only = 1;
> +# [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# read-only is check *before* slave lag
> +COMMIT;
> +ERROR HY000: The MariaDB server is running with the --read-only option so it cannot execute this statement
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +set global read_only = 0;
> +#
> +# check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +# Now wait for slave lag to increase to > 0
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400;
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +should_be_1
> +1
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +#
> +# Clean up
> +#
> +# [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +set global rpl_semi_sync_master_enabled = 0;
> +include/start_slave.inc
> +DROP TABLE t1;
> +include/rpl_end.inc
> diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> new file mode 100644
> index 0000000..f9bdbd9
> --- /dev/null
> +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> @@ -0,0 +1,252 @@
> +source include/have_semisync.inc;
> +source include/not_embedded.inc;
> +source include/have_innodb.inc;
> +source include/master-slave.inc;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +--echo # create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +connect (con1,localhost,test,,test);
> +
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +
> +--echo #
> +--echo # Check basic behaviour
> +--echo #
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +
> +--echo # Second transaction should now fail. slave_lag is >10s when it commits
> +--error ER_ERROR_DURING_COMMIT
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to < 10s
> +let $wait_condition= SELECT VARIABLE_VALUE < 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +sync_slave_with_master;
> +
> +--echo # [ on slave ]
> +connection slave;
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +--echo #
> +--echo # Test interaction with READ_ONLY
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +connection con1;
> +--echo # [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +connection master;
> +--echo # [ on master ]
> +set global read_only = 1;
> +
> +connection con1;
> +--echo # [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # read-only is check *before* slave lag
> +--error ER_OPTION_PREVENTS_STATEMENT
> +COMMIT;
> +
> +disconnect con1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +set global read_only = 0;
> +
> +--echo #
> +--echo # check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +--disable_result_log
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400; # 24h
> +
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +--disable_result_log
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo #
> +--echo # Clean up
> +--echo #
> +connection master;
> +--echo # [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 0;
> +
> +connection slave;
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +DROP TABLE t1;
> +sync_slave_with_master;
> +--source include/rpl_end.inc
> diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc
> index 4a80360..fc02b9d 100644
> --- a/plugin/semisync/semisync.cc
> +++ b/plugin/semisync/semisync.cc
> @@ -20,6 +20,7 @@
>
> const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
> const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
> +const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02;
>
>
> const unsigned long Trace::kTraceGeneral = 0x0001;
> @@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040;
>
> const unsigned char ReplSemiSyncBase::kSyncHeader[2] =
> {ReplSemiSyncBase::kPacketMagicNum, 0};
> +
> +const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec =
> + "rpl_semi_sync_slave_report_exec";
> diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h
> index 2857729..78faba9 100644
> --- a/plugin/semisync/semisync.h
> +++ b/plugin/semisync/semisync.h
> @@ -75,13 +75,23 @@ class ReplSemiSyncBase
>
> /* Constants in network packet header. */
> static const unsigned char kPacketMagicNum;
> + /* this event should be semisync acked */
> static const unsigned char kPacketFlagSync;
> + /* this event should be semisync acked including the current SQL position */
> + static const unsigned char kPacketFlagSyncAndReport;
> +
> + /* user variable for enabling exec-pos reporting */
> + static const char* const kRplSemiSyncSlaveReportExec;
> };
>
> /* The layout of a semisync slave reply packet:
> 1 byte for the magic num
> 8 bytes for the binlog positon
> - n bytes for the binlog filename, terminated with a '\0'
> + n bytes for the binlog filename, NOT terminated with a '\0'
> + [ optionally ]
> + 1 byte == 0
> + 8 bytes for the sql-thread position
> + n bytes for the sql-thread filename, terminated with a '\0'
> */
> #define REPLY_MAGIC_NUM_LEN 1
> #define REPLY_BINLOG_POS_LEN 8
> diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
> index c88c162..3ee6e26 100644
> --- a/plugin/semisync/semisync_master.cc
> +++ b/plugin/semisync/semisync_master.cc
> @@ -22,6 +22,9 @@
> #define TIME_MILLION 1000000
> #define TIME_BILLION 1000000000
>
> +/* thd_key for per slave thread state */
> +static MYSQL_THD_KEY_T thd_key;
> +
> /* This indicates whether semi-synchronous replication is enabled. */
> char rpl_semi_sync_master_enabled;
> unsigned long rpl_semi_sync_master_wait_point =
> @@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0;
> unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
> char rpl_semi_sync_master_wait_no_slave = 1;
>
> +unsigned long rpl_semi_sync_master_max_unacked_event_count = 0;
> +unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096;
> +
> +unsigned long rpl_semi_sync_master_slave_lag_clients = 0;
> +unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000;
> +unsigned long rpl_semi_sync_master_max_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> +
> +unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
>
> static int getWaitTime(const struct timespec& start_ts);
>
> @@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
> ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
> ins_node->log_pos_ = log_file_pos;
>
> + {
> + /**
> + * set trans commit time
> + * this is called when writing into binlog, which is not
> + * exactly right, but close enough for our purposes
> + */
> + ins_node->tranx_commit_time_us = my_hrtime().val;
> + }
> +
> if (!trx_front_)
> {
> /* The list is empty. */
> @@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
> return function_exit(kWho, result);
> }
>
> -bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
> - my_off_t log_file_pos)
> +TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name,
> + my_off_t log_file_pos)
> {
> - const char *kWho = "ActiveTranx::is_tranx_end_pos";
> + const char *kWho = "ActiveTranx::lookup_tranx_end_pos";
> function_enter(kWho);
> -
> unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
> TranxNode *entry = trx_htb_[hash_val];
>
> @@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
> }
>
> if (trace_level_ & kTraceDetail)
> - sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
> - log_file_name, (unsigned long)log_file_pos, hash_val);
> + sql_print_information("%s: probe (%s, %lu)", kWho,
> + log_file_name, (unsigned long)log_file_pos);
>
> function_exit(kWho, (entry != NULL));
> - return (entry != NULL);
> + return entry;
> }
>
> -int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
> - my_off_t log_file_pos)
> +int ActiveTranx::clear_active_tranx_nodes()
> {
> - const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
> - TranxNode *new_front;
> + set_new_front(NULL);
> + return 0;
> +}
>
> +void ActiveTranx::set_new_front(TranxNode *new_front)
> +{
> + const char *kWho = "ActiveTranx::set_new_front";
> function_enter(kWho);
>
> - if (log_file_name != NULL)
> - {
> - new_front = trx_front_;
> -
> - while (new_front)
> - {
> - if (compare(new_front, log_file_name, log_file_pos) > 0)
> - break;
> - new_front = new_front->next_;
> - }
> - }
> - else
> - {
> - /* If log_file_name is NULL, clear everything. */
> - new_front = NULL;
> - }
> -
> if (new_front == NULL)
> {
> /* No active transaction nodes after the call. */
> @@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
> trx_front_ = NULL;
> trx_rear_ = NULL;
> }
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: cleared all nodes", kWho);
> }
> @@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
>
> trx_front_ = new_front;
> allocator_.free_nodes_before(trx_front_);
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
> kWho, n_frees,
> trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
> }
> + function_exit(kWho, 0);
> +}
>
> - return function_exit(kWho, 0);
> +bool ActiveTranx::prune_active_tranx_nodes(
> + LogPosPtr pos,
> + ulonglong *oldest_tranx_commit_time_us)
> +{
> + TranxNode *old_front = trx_front_;
> + TranxNode *new_front;
> +
> + new_front = trx_front_;
> + while (new_front)
> + {
> + if (compare(new_front, pos.file_name, pos.file_pos) > 0)
> + break;
> + new_front = new_front->next_;
> + }
> +
> + set_new_front(new_front);
> +
> + if (oldest_tranx_commit_time_us)
> + {
> + if (trx_front_ == NULL)
> + *oldest_tranx_commit_time_us = 0;
> + else
> + *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us;
> + }
> +
> + return ! (old_front == trx_front_);
> }
>
>
> @@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
> wait_file_pos_(0),
> master_enabled_(false),
> wait_timeout_(0L),
> - state_(0)
> + state_(0),
> + oldest_unapplied_tranx_commit_time_us_(0)
> {
> strcpy(reply_file_name_, "");
> strcpy(wait_file_name_, "");
> @@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject()
> mysql_cond_init(key_ss_cond_COND_binlog_send_,
> &COND_binlog_send_, NULL);
>
> + /* Mutex initialization can only be done after MY_INIT(). */
> + mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_,
> + &LOCK_slave_lag_, MY_MUTEX_INIT_FAST);
> + mysql_cond_init(key_ss_cond_COND_slave_lag_,
> + &COND_slave_lag_, NULL);
> +
> if (rpl_semi_sync_master_enabled)
> result = enableMaster();
> else
> result = disableMaster();
>
> + thd_key_create(&thd_key);
> +
> return result;
> }
>
> @@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup()
> {
> mysql_mutex_destroy(&LOCK_binlog_);
> mysql_cond_destroy(&COND_binlog_send_);
> + mysql_mutex_destroy(&LOCK_slave_lag_);
> + mysql_cond_destroy(&COND_slave_lag_);
> init_done_= 0;
> }
>
> @@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave()
> {
> lock();
> rpl_semi_sync_master_clients++;
> + if (has_semi_sync_slave_lag())
> + rpl_semi_sync_master_slave_lag_clients++;
> unlock();
> +
> + if (has_semi_sync_slave_lag())
> + {
> + int null_val = 0;
> + longlong new_val =
> + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000;
> + longlong old_val = new_val + 1;
> +
> + get_user_var_int("master_heartbeat_period", &old_val, &null_val);
> + if (old_val > new_val || null_val)
> + {
> + /* if there no old value or it's bigger than what we want */
> + int res = set_user_var_int("master_heartbeat_period",new_val, &old_val);
> + if (res == -1)
> + {
> + sql_print_error(
> + "Repl_semi_sync::failed to set master_heartbeat_period");
> + }
> + }
> + }
> +
> + /**
> + * create per slave-state and store it in thread-local-storage */
> + ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState;
> + thd_setspecific(current_thd, thd_key, state);
> }
>
> void ReplSemiSyncMaster::remove_slave()
> @@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave()
> rpl_semi_sync_master_clients == 0)
> switch_off();
> }
> +
> + bool no_slave_lag_clients = false;
> + if (has_semi_sync_slave_lag())
> + {
> + if (--rpl_semi_sync_master_slave_lag_clients == 0)
> + {
> + no_slave_lag_clients = true;
> + }
> + }
> +
> unlock();
> +
> + ReplSemiSyncMasterPerSlaveState *state =
> + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key);
> + thd_setspecific(current_thd, thd_key, NULL);
> +
> + if (state != NULL)
> + {
> + delete state;
> + }
> +
> + if (no_slave_lag_clients)
> + {
> + wake_slave_lag_waiters(0);
> + }
> }
>
> bool ReplSemiSyncMaster::is_semi_sync_slave()
> @@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
> return val;
> }
>
> +bool ReplSemiSyncMaster::has_semi_sync_slave_lag()
> +{
> + int null_value;
> + long long val= 0;
> + get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value);
> + return val;
> +}
> +
> +int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos)
> +{
> + if (log_pos == NULL)
> + {
> + /* heartbeat events does not have logpos (since they are not actually
> + * stored in the binlog).
> + */
> + if (!has_semi_sync_slave_lag())
> + {
> + /* don't semi-sync them if we haven't enabled slave-lag handling */
> + return 0;
> + }
> + else
> + {
> + /* else ask for both IO and exec position */
> + return 2;
> + }
> + }
> +
> + /**
> + * check if this log-pos is a candidate for semi-syncing event
> + */
> + TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name,
> + log_pos->file_pos);
> +
> + if (entry == NULL)
> + return 0;
> +
> + ReplSemiSyncMasterPerSlaveState *state =
> + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd,
> + thd_key);
> + do
> + {
> + state->unacked_event_count_++;
> +
> + if (active_tranxs_->is_rear(entry))
> + {
> + /* always ask for ack on last event in tranx list */
> + break;
> + }
> +
> + if (state->unacked_event_count_ >=
> + rpl_semi_sync_master_max_unacked_event_count)
> + {
> + /* enough events passed that it's time for another ack */
> + break;
> + }
> +
> + if (!state->sync_req_pos_.IsInited())
> + {
> + /* first event => time for ack */
> + break;
> + }
> +
> + if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0)
> + {
> + /* new file => time for ack */
> + break;
> + }
> +
> + if (log_pos->file_pos >= (state->sync_req_pos_.file_pos +
> + rpl_semi_sync_master_max_unacked_event_bytes))
> + {
> + /* enough bytes => time for ack */
> + break;
> + }
> +
> + /* we skip asking for semi-sync ack on this event */
> + return 0;
> +
> + } while (0);
> +
> + /* keep track on when we last asked for semi-sync-ack */
> + state->unacked_event_count_ = 0;
> + state->sync_req_pos_.Assign(log_pos);
> +
> + /**
> + * check if this slave can report back exec position
> + */
> + if (!has_semi_sync_slave_lag())
> + {
> + /* slave can't report back SQL position */
> + return 1;
> + }
> +
> + /* ask for both IO and SQL position */
> + return 2;
> +}
> +
> int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> const char *log_file_name,
> - my_off_t log_file_pos)
> + my_off_t log_file_pos,
> + const LogPos *exec_pos)
> {
> const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
> int cmp;
> bool can_release_threads = false;
> bool need_copy_send_pos = true;
> + bool pruned_trx_list = false;
> + ulonglong oldest_tranx_commit_time_us = 0;
> +
>
> if (!(getMasterEnabled()))
> return 0;
> @@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> reply_file_pos_ = log_file_pos;
> reply_file_name_inited_ = true;
>
> - /* Remove all active transaction nodes before this point. */
> - assert(active_tranxs_ != NULL);
> - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: Got reply at (%s, %lu)", kWho,
> log_file_name, (unsigned long)log_file_pos);
> }
>
> + assert(active_tranxs_ != NULL);
> + if (exec_pos != NULL)
> + {
> + /* prune using exec_pos */
> + LogPosPtr ptr(*exec_pos);
> + pruned_trx_list = active_tranxs_->prune_active_tranx_nodes(
> + ptr, &oldest_tranx_commit_time_us);
> + }
> + else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos)
> + {
> + /**
> + * if we don't have any slaves that can do exec_pos reporting,
> + * prune by IO position as "plain old semi sync"
> + */
> + LogPosPtr ptr(log_file_name, log_file_pos);
> + active_tranxs_->prune_active_tranx_nodes(ptr, NULL);
> + }
> +
> if (rpl_semi_sync_master_wait_sessions > 0)
> {
> /* Let us check if some of the waiting threads doing a trx
> @@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> cond_broadcast();
> }
>
> + if (pruned_trx_list)
> + {
> + /**
> + * if we did prune trx list, it might be that we should wake up
> + * threads waiting for slave-lag to decrease
> + */
> + wake_slave_lag_waiters(oldest_tranx_commit_time_us);
> + }
> +
> return function_exit(kWho, 0);
> }
>
> @@ -743,14 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
> }
> }
>
> - /*
> - At this point, the binlog file and position of this transaction
> - must have been removed from ActiveTranx.
> - */
> - assert(thd_killed(NULL) ||
> - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
> - trx_wait_binlog_pos));
> -
> l_end:
> /* Update the status counter. */
> if (is_on())
> @@ -794,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off()
>
> /* Clear the active transaction list. */
> assert(active_tranxs_ != NULL);
> - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
> + result = active_tranxs_->clear_active_tranx_nodes();
>
> rpl_semi_sync_master_off_times++;
> wait_file_name_inited_ = false;
> @@ -884,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> {
> const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
> int cmp = 0;
> - bool sync = false;
> + int sync = 0;
>
> /* If the semi-sync master is not enabled, or the slave is not a semi-sync
> * target, do not request replies from the slave.
> @@ -905,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> /* semi-sync is ON */
> /* sync= false; No sync unless a transaction is involved. */
>
> + if (log_file_name == NULL)
> + {
> + /* this is heartbeat, request io_pos and exec_pos */
> + sync = checkSyncReq(0);
> + goto l_end;
> + }
> +
> if (reply_file_name_inited_)
> {
> cmp = ActiveTranx::compare(log_file_name, log_file_pos,
> @@ -933,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> */
> if (cmp >= 0)
> {
> - /*
> + /*
> * We only wait if the event is a transaction's ending event.
> */
> assert(active_tranxs_ != NULL);
> - sync = active_tranxs_->is_tranx_end_pos(log_file_name,
> - log_file_pos);
> + LogPosPtr pos(log_file_name, log_file_pos);
> + sync = checkSyncReq(&pos);
> }
> }
> else
> @@ -951,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> }
> else
> {
> - sync = true;
> + sync = 1;
> }
> }
>
> @@ -966,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> /* We do not need to clear sync flag because we set it to 0 when we
> * reserve the packet header.
> */
> - if (sync)
> + if (sync == 1)
> {
> (packet)[2] = kPacketFlagSync;
> }
> + else if (sync == 2)
> + {
> + (packet)[2] = kPacketFlagSyncAndReport;
> + }
>
> return function_exit(kWho, 0);
> }
> @@ -1018,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> if (is_on())
> {
> assert(active_tranxs_ != NULL);
> - if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
> + bool empty = active_tranxs_->is_empty();
> + if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
> {
> /*
> if insert tranx_node failed, print a warning message
> @@ -1028,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> log_file_name, (ulong)log_file_pos);
> switch_off();
> }
> + else if (empty && rpl_semi_sync_master_slave_lag_clients > 0)
> + {
> + /* if the list of transactions was empty,
> + * we need to init the oldest_tranx_commit_time_us
> + */
> + oldest_unapplied_tranx_commit_time_us_ =
> + active_tranxs_->get_oldest_tranx_commit_time_us();
> + }
> }
>
> l_end:
> @@ -1037,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> }
>
> int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> - const char *event_buf)
> + const char *event_buf_)
> {
> const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
> - const unsigned char *packet;
> + const unsigned char *packet, *packet_start;
> char log_file_name[FN_REFLEN];
> my_off_t log_file_pos;
> ulong log_file_len = 0;
> @@ -1048,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> int result = -1;
> struct timespec start_ts;
> ulong trc_level = trace_level_;
> + const unsigned char *event_buf = (const unsigned char*)event_buf_;
> + bool exec_pos_present = false; // is SQL exec pos present in reply
> + LogPos exec_pos; // position of SQL thread
> LINT_INIT_STRUCT(start_ts);
>
> function_enter(kWho);
>
> - assert((unsigned char)event_buf[1] == kPacketMagicNum);
> - if ((unsigned char)event_buf[2] != kPacketFlagSync)
> + assert(event_buf[1] == kPacketMagicNum);
> + if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0)
> {
> /* current event does not require reply */
> result = 0;
> @@ -1111,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> goto l_end;
> }
>
> - packet = net->read_pos;
> + packet_start = packet = net->read_pos;
> if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
> {
> sql_print_error("Read semi-sync reply magic number error");
> goto l_end;
> }
>
> + /* we determine if this semisync ack contains a sql-thread exec-pos
> + * by checking if last byte == 0, since the packet then contains
> + * \0-terminated filenames */
> + exec_pos_present = packet[packet_len - 1] == 0;
> +
> log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
> - log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> + if (exec_pos_present == false)
> + {
> + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> + }
> + else
> + {
> + log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET,
> + MY_MIN((ulong)FN_REFLEN,
> + packet_len - REPLY_BINLOG_NAME_OFFSET));
> + }
> if (log_file_len >= FN_REFLEN)
> {
> sql_print_error("Read semi-sync reply binlog file length too large");
> goto l_end;
> }
> - strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
> + packet+= REPLY_BINLOG_NAME_OFFSET;
> +
> + strncpy(log_file_name, (const char*)packet, log_file_len);
> log_file_name[log_file_len] = 0;
>
> + if (exec_pos_present)
> + {
> + packet += log_file_len + 1;
> + if (packet + 8 + 1 >= (packet_start + packet_len))
> + {
> + sql_print_error("Read semi-sync reply binlog. "
> + "Packet to short to contain exec-position!");
> + goto l_end;
> + }
> + exec_pos.file_pos = uint8korr(packet);
> + packet += 8;
> + strncpy(exec_pos.file_name, (char*)packet,
> + (packet_start + packet_len) - packet);
> + }
> +
> if (trc_level & kTraceDetail)
> sql_print_information("%s: Got reply (%s, %lu)",
> kWho, log_file_name, (ulong)log_file_pos);
>
> - result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
> + result = reportReplyBinlog(server_id, log_file_name, log_file_pos,
> + exec_pos_present ? &exec_pos : NULL);
>
> l_end:
> return function_exit(kWho, result);
> @@ -1154,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster()
> wait_file_name_inited_ = false;
> reply_file_name_inited_ = false;
> commit_file_name_inited_ = false;
> + if (active_tranxs_ != NULL)
> + {
> + /**
> + * make sure to empty transaction hash/list
> + * with slave-lag reporting this container does
> + * not have to be empty even if no transaction is
> + * currently running
> + */
> + active_tranxs_->clear_active_tranx_nodes();
> + }
>
> rpl_semi_sync_master_yes_transactions = 0;
> rpl_semi_sync_master_no_transactions = 0;
> @@ -1168,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster()
>
> unlock();
>
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> + oldest_unapplied_tranx_commit_time_us_ = 0;
> + rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> + rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> +
> return function_exit(kWho, result);
> }
>
> @@ -1186,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats()
> ((double)rpl_semi_sync_master_net_wait_num)) : 0);
>
> unlock();
> +
> + if (oldest_unapplied_tranx_commit_time_us_ != 0)
> + {
> + rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val -
> + oldest_unapplied_tranx_commit_time_us_;
> + }
> + else
> + {
> + rpl_semi_sync_master_estimated_slave_lag = 0;
> + }
> +
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + if (rpl_semi_sync_master_trx_slave_lag_wait_num)
> + {
> + rpl_semi_sync_master_avg_trx_slave_lag_wait_time =
> + (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time /
> + (double)rpl_semi_sync_master_trx_slave_lag_wait_num);
> + }
> + else
> + {
> + rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> + }
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> }
>
> /* Get the waiting time given the wait's staring time.
> @@ -1213,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts)
>
> return (int)(end_usecs - start_usecs);
> }
> +
> +void ReplSemiSyncMaster::wake_slave_lag_waiters(
> + ulonglong oldest_unapplied_tranx_commit_time_us)
> +{
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + oldest_unapplied_tranx_commit_time_us_ =
> + oldest_unapplied_tranx_commit_time_us;
> +
> + if (rpl_semi_sync_master_slave_lag_wait_sessions > 0)
> + {
> + mysql_cond_broadcast(&COND_slave_lag_);
> + }
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> +}
> +
> +int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec)
> +{
> + int error = 0;
> + PSI_stage_info old_stage;
> +
> + /* slave lag waiting not enabled, return directly */
> + if (rpl_semi_sync_master_max_slave_lag == 0)
> + return 0;
> +
> + /* there is no slave that can report slave lag, return directly */
> + if (rpl_semi_sync_master_slave_lag_clients == 0)
> + return 0;
> +
> + /* compute start_time and end_time */
> + struct timespec end_time;
> + set_timespec(end_time, 0);
> + ulonglong start_time_us = timespec_to_usec(&end_time);
> + end_time.tv_sec += timeout_sec;
> +
> + mysql_mutex_lock(&LOCK_slave_lag_);
> +
> + if (oldest_unapplied_tranx_commit_time_us_ == 0)
> + {
> + /* no slave lag, atleast one slave is up to date */
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> + return 0;
> + }
> +
> + if (rpl_semi_sync_master_max_slave_lag == 0)
> + {
> + /* slave lag waiting not enabled */
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> + return 0;
> + }
> +
> + /* This must be called after acquired the lock */
> + THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_,
> + &stage_waiting_for_semi_sync_slave_lag,
> + &old_stage);
> +
> + bool waited = false;
> + ulonglong lag = 0;
> + ulonglong max_lag = 0;
> + while (oldest_unapplied_tranx_commit_time_us_ != 0)
> + {
> + /* check kill_level after THD_ENTER_COND but *before* cond_wait
> + * to avoid missing kills */
> + if (! (getMasterEnabled() && is_on() &&
> + thd_kill_level(NULL) == THD_IS_NOT_KILLED))
> + break;
> +
> + lag = start_time_us - oldest_unapplied_tranx_commit_time_us_;
> + max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag;
> + if (lag <= max_lag)
> + break;
> +
> + waited = true;
> + rpl_semi_sync_master_slave_lag_wait_sessions++;
> + int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_,
> + &end_time);
> + rpl_semi_sync_master_slave_lag_wait_sessions--;
> +
> + bool thd_was_killed = thd_kill_level(NULL) != THD_IS_NOT_KILLED;
> + if (wait_result != 0 || thd_was_killed)
> + {
> + break;
> + }
> + }
> +
> + if (thd_kill_level(NULL) != THD_IS_NOT_KILLED)
> + {
> + /* Return error to client. */
> + error = 1;
> + my_printf_error(ER_ERROR_DURING_COMMIT,
> + "Killed while waiting for replication semi-sync slave-lag.",
> + MYF(0));
> + }
> + else if (lag > max_lag)
> + {
> + /* Return error to client. */
> + error = 1;
> + my_printf_error(ER_ERROR_DURING_COMMIT,
> + "Slave-lag timeout",
> + MYF(0));
> + }
> +
> + if (waited)
> + {
> + rpl_semi_sync_master_trx_slave_lag_wait_num++;
> + rpl_semi_sync_master_trx_slave_lag_wait_time +=
> + (my_hrtime().val - start_time_us);
> + }
> +
> + /* The lock held will be released by thd_exit_cond, so no need to
> + call unlock() here */
> + THD_EXIT_COND(NULL, & old_stage);
> +
> + return error;
> +}
> diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
> index d9dc4ce..e5c0de6 100644
> --- a/plugin/semisync/semisync_master.h
> +++ b/plugin/semisync/semisync_master.h
> @@ -24,17 +24,101 @@
> #ifdef HAVE_PSI_INTERFACE
> extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
> extern PSI_cond_key key_ss_cond_COND_binlog_send_;
> +
> +extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
> +extern PSI_cond_key key_ss_cond_COND_slave_lag_;
> #endif
>
> extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
> +extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
>
> struct TranxNode {
> char log_name_[FN_REFLEN];
> - my_off_t log_pos_;
> + my_off_t log_pos_;
> + ulonglong tranx_commit_time_us;
> struct TranxNode *next_; /* the next node in the sorted list */
> struct TranxNode *hash_next_; /* the next node during hash collision */
> };
>
> +struct LogPos;
> +
> +/* This represent a log position */
> +struct LogPosPtr {
> + LogPosPtr() { Uninit();}
> + LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){}
> + explicit LogPosPtr(const LogPos& pos) { Assign(&pos); }
> +
> + const char *file_name;
> + my_off_t file_pos;
> +
> + LogPosPtr& Assign(const LogPosPtr *src) {
> + file_name = src->file_name;
> + file_pos = src->file_pos;
> + return *this;
> + }
> +
> + LogPosPtr& Assign(const LogPos *src);
> +
> + void Uninit() { file_name = NULL;}
> + bool IsInited() const { return file_name != NULL; }
> +};
> +
> +struct LogPos {
> + char file_name[FN_REFLEN];
> + my_off_t file_pos;
> +
> + LogPos() { Uninit(); }
> +
> + LogPosPtr ToLogPosPtr() const {
> + if (IsInited()){
> + LogPosPtr p(file_name, file_pos);
> + return p;
> + } else {
> + LogPosPtr p;
> + return p;
> + }
> + }
> +
> + LogPos& Assign(const LogPosPtr *src) {
> + if (src->IsInited()) {
> + strcpy(file_name, src->file_name);
> + file_pos = src->file_pos;
> + } else {
> + Uninit();
> + }
> + return *this;
> + }
> +
> + LogPos& Assign(const LogPos* src) {
> + LogPosPtr p = src->ToLogPosPtr();
> + Assign(&p);
> + return *this;
> + }
> +
> + void Uninit() { file_name[0] = 0; }
> + bool IsInited() const { return file_name[0] != 0; }
> +};
> +
> +inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) {
> + LogPosPtr p = src->ToLogPosPtr();
> + Assign(&p);
> + return *this;
> +}
> +
> +inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) {
> + int cmp = strcmp(pos1->file_name, pos2->file_name);
> +
> + if (cmp != 0)
> + return cmp;
> +
> + if (pos1->file_pos > pos2->file_pos)
> + return 1;
> + else if (pos1->file_pos < pos2->file_pos)
> + return -1;
> + else
> + return 0;
> +}
> +
> /**
> @class TranxNodeAllocator
>
> @@ -329,10 +413,14 @@ class ActiveTranx
> node2->log_name_, node2->log_pos_);
> }
>
> + void set_new_front(TranxNode* new_front);
> +
> public:
> ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
> ~ActiveTranx();
>
> + bool is_empty() const { return trx_front_ == NULL; }
> +
> /* Insert an active transaction node with the specified position.
> *
> * Return:
> @@ -340,21 +428,42 @@ class ActiveTranx
> */
> int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
>
> - /* Clear the active transaction nodes until(inclusive) the specified
> - * position.
> - * If log_file_name is NULL, everything will be cleared: the sorted
> + /* Clear the active transaction
> + * Everything will be cleared: the sorted
> * list and the hash table will be reset to empty.
> - *
> + *
> * Return:
> - * 0: success; non-zero: error
> + * 0 success; non-zero: error
> + */
> + int clear_active_tranx_nodes();
> +
> + /* Prune the active transaction nodes until the specified
> + * position (inclusive).
> + *
> + * Return:
> + * true if any transaction was removed
> + * false if list was left unchanged
> */
> - int clear_active_tranx_nodes(const char *log_file_name,
> - my_off_t log_file_pos);
> + bool prune_active_tranx_nodes(LogPosPtr logpos,
> + ulonglong *oldest_tranx_commit_time_us);
>
> - /* Given a position, check to see whether the position is an active
> - * transaction's ending position by probing the hash table.
> + /* Lookup a transaction's ending position by probing the hash table.
> + *
> + * return entry if found or NULL otherwise
> */
> - bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
> + TranxNode* lookup_tranx_end_pos(const char *log_file_name,
> + my_off_t log_file_pos);
> +
> +
> + /* Check if an entry is rear (i.e last) */
> + bool is_rear(TranxNode* entry) const { return entry == trx_rear_; }
> +
> + /**
> + * return timestamp of oldest transaction in list
> + */
> + ulonglong get_oldest_tranx_commit_time_us() const {
> + return trx_front_->tranx_commit_time_us;
> + }
>
> /* Given two binlog positions, compare which one is bigger based on
> * (file_name, file_position).
> @@ -365,6 +474,24 @@ class ActiveTranx
> };
>
> /**
> + * State that semisync master keeps per slave
> + */
> +struct ReplSemiSyncMasterPerSlaveState
> +{
> + ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {}
> +
> + /**
> + * No of events that has not been semi-sync acked
> + */
> + unsigned unacked_event_count_;
> +
> + /**
> + * Position of last event that was semisync'ed
> + */
> + LogPos sync_req_pos_;
> +};
> +
> +/**
> The extension class for the master of semi-synchronous replication
> */
> class ReplSemiSyncMaster
> @@ -432,6 +559,16 @@ class ReplSemiSyncMaster
>
> bool state_; /* whether semi-sync is switched */
>
> + /* This cond variable is signaled when slave lag has decreased */
> + mysql_cond_t COND_slave_lag_;
> +
> + /* Mutex that protects oldest_unapplied_tranx_commit_time_us */
> + mysql_mutex_t LOCK_slave_lag_;
> +
> + /* this is commit time of oldest transaction that has not been applied
> + * on any slave */
> + ulonglong oldest_unapplied_tranx_commit_time_us_;
> +
> void lock();
> void unlock();
> void cond_broadcast();
> @@ -493,6 +630,9 @@ class ReplSemiSyncMaster
> /* Is the slave servered by the thread requested semi-sync */
> bool is_semi_sync_slave();
>
> + /* Does this slave have slave lag reporting capabilities */
> + bool has_semi_sync_slave_lag();
> +
> /* In semi-sync replication, reports up to which binlog position we have
> * received replies from the slave indicating that it already get the events.
> *
> @@ -501,13 +641,15 @@ class ReplSemiSyncMaster
> * log_file_name - (IN) binlog file name
> * end_offset - (IN) the offset in the binlog file up to which we have
> * the replies from the slave
> + * exec_position - (IN) position of SQL thread or NULL if not present
> *
> * Return:
> * 0: success; non-zero: error
> */
> int reportReplyBinlog(uint32 server_id,
> const char* log_file_name,
> - my_off_t end_offset);
> + my_off_t end_offset,
> + const LogPos *exec_position);
>
> /* Commit a transaction in the final step. This function is called from
> * InnoDB before returning from the low commit. If semi-sync is switch on,
> @@ -540,6 +682,16 @@ class ReplSemiSyncMaster
> */
> int reserveSyncHeader(unsigned char *header, unsigned long size);
>
> + /*
> + * check if an event should be semi synced and optionally
> + * if it should report back position of SQL thread on slave
> + *
> + * return 0 - no semi sync
> + * 1 - semi sync
> + * 2 - semi sync and report exec position
> + */
> + int checkSyncReq(const LogPosPtr *log_pos);
> +
> /* Update the sync bit in the packet header to indicate to the slave whether
> * the master will wait for the reply of the event. If semi-sync is switched
> * off and we detect that the slave is catching up, we switch semi-sync on.
> @@ -592,6 +744,21 @@ class ReplSemiSyncMaster
> * go off for that.
> */
> int resetMaster();
> +
> + /**
> + * wake potential slave-lag waiters
> + * called by binlog dump-thread(s)
> + */
> + void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us);
> +
> + /**
> + * wait for slave lag to get below threshold
> + * called by user-thread(s)
> + *
> + * return 0 - success
> + * 1 - timeout
> + */
> + int wait_slave_lag(ulong max_wait_time_sec);
> };
>
> enum rpl_semi_sync_master_wait_point_t {
> @@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num;
> extern unsigned long long rpl_semi_sync_master_net_wait_time;
> extern unsigned long long rpl_semi_sync_master_trx_wait_time;
>
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_count;
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes;
> +extern unsigned long rpl_semi_sync_master_max_slave_lag;
> +extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us;
> +extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions;
> +extern unsigned long long rpl_semi_sync_master_estimated_slave_lag;
> +
> +extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time;
> +
> /*
> This indicates whether we should keep waiting if no semi-sync slave
> is available.
> diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
> index 7bb0eea..3282158 100644
> --- a/plugin/semisync/semisync_master_plugin.cc
> +++ b/plugin/semisync/semisync_master_plugin.cc
> @@ -21,6 +21,11 @@
>
> static ReplSemiSyncMaster repl_semisync;
>
> +// forward declaration
> +static inline ulong get_slave_lag_wait_timeout(THD* thd);
> +
> +static char rpl_semi_sync_master_group_commit = 0;
> +
> C_MODE_START
>
> int repl_semi_report_binlog_update(Binlog_storage_param *param,
> @@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
>
> if (repl_semisync.getMasterEnabled())
> {
> + if (rpl_semi_sync_master_group_commit &&
> + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> + {
> + /** there are transactions more coming... */
> + return 0;
> + }
> +
> /*
> Let us store the binlog file name and the position, so that
> we know how long to wait for the binlog to the replicated to
> @@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
> return error;
> }
>
> -int repl_semi_request_commit(Trans_param *param)
> +int repl_semi_before_commit(Trans_param *param, int *error)
> {
> + *error = repl_semisync.wait_slave_lag(
> + get_slave_lag_wait_timeout(current_thd));
> +
> return 0;
> }
>
> @@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param,
> my_off_t log_pos, uint32 flags)
> {
> int error= 0;
> +
> + if (rpl_semi_sync_master_group_commit &&
> + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> + {
> + /** there are transactions more coming... */
> + return 0;
> + }
> +
> if (rpl_semi_sync_master_wait_point ==
> SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
> {
> @@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
> Let's assume this semi-sync slave has already received all
> binlog events before the filename and position it requests.
> */
> - repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
> + repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL);
> }
> sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
> semi_sync_slave ? "semi-sync" : "asynchronous",
> @@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
> &fix_rpl_semi_sync_master_trace_level, // update
> 32, 0, ~0UL, 1);
>
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_count,
> + rpl_semi_sync_master_max_unacked_event_count,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum unacked replication events",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes,
> + rpl_semi_sync_master_max_unacked_event_bytes,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum unacked replication bytes",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum allowed lag of fastest semi-sync slave (in seconds), "
> + "checked before commit.",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1);
> +
> +static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout,
> + PLUGIN_VAR_RQCMDARG,
> + "Timeout in seconds a rw-transaction may wait for max slave lag before "
> + "being rolled back.",
> + NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0);
> +
> +static MYSQL_SYSVAR_ULONG(
> + slave_lag_heartbeat_frequency_us,
> + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us,
> + PLUGIN_VAR_RQCMDARG,
> + "Heartbeat frequency when slave-lag is enabled (in microseconds).",
> + NULL, // check
> + NULL, // update
> + 500000, /* 500 ms */
> + 1, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit,
> + PLUGIN_VAR_OPCMDARG,
> + "Group commit for semi sync",
> + NULL, // check
> + NULL,
> + 0);
> +
> static SYS_VAR* semi_sync_master_system_vars[]= {
> MYSQL_SYSVAR(enabled),
> MYSQL_SYSVAR(wait_point),
> MYSQL_SYSVAR(timeout),
> MYSQL_SYSVAR(wait_no_slave),
> MYSQL_SYSVAR(trace_level),
> + MYSQL_SYSVAR(max_unacked_event_count),
> + MYSQL_SYSVAR(max_unacked_event_bytes),
> + MYSQL_SYSVAR(max_slave_lag),
> + MYSQL_SYSVAR(slave_lag_wait_timeout),
> + MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us),
> + MYSQL_SYSVAR(group_commit),
> NULL,
> };
>
> +static inline ulong get_slave_lag_wait_timeout(THD* thd)
> +{
> + return THDVAR(thd, slave_lag_wait_timeout);
> +}
>
> static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
> SYS_VAR *var,
> @@ -297,6 +377,7 @@ Trans_observer trans_observer = {
>
> repl_semi_report_commit, // after_commit
> repl_semi_report_rollback, // after_rollback
> + repl_semi_before_commit, // before commit
> };
>
> Binlog_storage_observer storage_observer = {
> @@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
> DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
> DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
> DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
> -
> +DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG)
> +DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
>
> /* plugin status variables */
> static SHOW_VAR semi_sync_master_status_vars[]= {
> @@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= {
> {"Rpl_semi_sync_master_net_avg_wait_time",
> (char*) &SHOW_FNAME(avg_net_wait_time),
> SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_slave_lag_wait_sessions",
> + (char*) &SHOW_FNAME(slave_lag_wait_sessions),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_estimated_slave_lag",
> + (char*) &SHOW_FNAME(estimated_slave_lag),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_slave_lag_wait_time",
> + (char*) &SHOW_FNAME(trx_slave_lag_wait_time),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_slave_lag_waits",
> + (char*) &SHOW_FNAME(trx_slave_lag_wait_num),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time",
> + (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time),
> + SHOW_SIMPLE_FUNC},
> {NULL, NULL, SHOW_LONG},
> };
>
> #ifdef HAVE_PSI_INTERFACE
> PSI_mutex_key key_ss_mutex_LOCK_binlog_;
> +PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
>
> static PSI_mutex_info all_semisync_mutexes[]=
> {
> - { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
> + { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 },
> + { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 }
> };
>
> PSI_cond_key key_ss_cond_COND_binlog_send_;
> +PSI_cond_key key_ss_cond_COND_slave_lag_;
>
> static PSI_cond_info all_semisync_conds[]=
> {
> - { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
> + { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 },
> + { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 }
> };
> #endif /* HAVE_PSI_INTERFACE */
>
> PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
> { 0, "Waiting for semi-sync ACK from slave", 0};
>
> +PSI_stage_info stage_waiting_for_semi_sync_slave_lag=
> +{ 0, "Waiting for semi-sync slave lag", 0};
> +
> #ifdef HAVE_PSI_INTERFACE
> PSI_stage_info *all_semisync_stages[]=
> {
> - & stage_waiting_for_semi_sync_ack_from_slave
> + & stage_waiting_for_semi_sync_ack_from_slave,
> + & stage_waiting_for_semi_sync_slave_lag
> };
>
> static void init_semisync_psi_keys(void)
> @@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master)
> MariaDB_PLUGIN_MATURITY_GAMMA
> }
> maria_declare_plugin_end;
> -
> diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc
> index 5f98472..839e0cc 100644
> --- a/plugin/semisync/semisync_slave.cc
> +++ b/plugin/semisync/semisync_slave.cc
> @@ -20,6 +20,7 @@
> char rpl_semi_sync_slave_enabled;
> char rpl_semi_sync_slave_status= 0;
> unsigned long rpl_semi_sync_slave_trace_level;
> +char rpl_semi_sync_slave_lag_enabled= 0;
>
> int ReplSemiSyncSlave::initObject()
> {
> @@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject()
>
> int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
> unsigned long total_len,
> - bool *need_reply,
> + unsigned char *need_reply,
> const char **payload,
> unsigned long *payload_len)
> {
> @@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
>
> if ((unsigned char)(header[0]) == kPacketMagicNum)
> {
> - *need_reply = (header[1] & kPacketFlagSync);
> + *need_reply = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport));
> *payload_len = total_len - 2;
> *payload = header + 2;
>
> @@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
> return 0;
> }
>
> -int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
> - const char *binlog_filename,
> - my_off_t binlog_filepos)
> +int ReplSemiSyncSlave::slaveReply(unsigned char header_byte,
> + MYSQL *mysql,
> + const char *binlog_filename,
> + my_off_t binlog_filepos,
> + Master_info * mi)
> {
> const char *kWho = "ReplSemiSyncSlave::slaveReply";
> NET *net= &mysql->net;
> - uchar reply_buffer[REPLY_MAGIC_NUM_LEN
> - + REPLY_BINLOG_POS_LEN
> - + REPLY_BINLOG_NAME_LEN];
> + uchar reply_buffer[REPLY_MAGIC_NUM_LEN +
> + 2 * ( REPLY_BINLOG_POS_LEN +
> + REPLY_BINLOG_NAME_LEN +
> + /* '\0' */ 1) ];
> int reply_res, name_len = strlen(binlog_filename);
> + int msg_len = name_len + REPLY_BINLOG_NAME_OFFSET;
>
> function_enter(kWho);
>
> @@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
> sql_print_information("%s: reply (%s, %lu)", kWho,
> binlog_filename, (ulong)binlog_filepos);
>
> + if (header_byte & kPacketFlagSyncAndReport)
> + {
> + /**
> + * master requests that we also report back SQL-thread position
> + */
> +
> + // where to store sql filename/position
> + char *bufptr = (char*)reply_buffer + msg_len;
> + bufptr[0] = 0; // '\0' terminate previous filename
> + bufptr++;
> +
> + my_off_t sql_file_pos;
> + // get file/position and store the filename directly info bufptr+8
> + size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos);
> + int8store(bufptr, sql_file_pos); // store position
> +
> + msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1;
> + }
> +
> net_clear(net, 0);
> /* Send the reply. */
> - reply_res = my_net_write(net, reply_buffer,
> - name_len + REPLY_BINLOG_NAME_OFFSET);
> + reply_res = my_net_write(net, reply_buffer, msg_len);
> +
> if (!reply_res)
> {
> reply_res = net_flush(net);
> diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h
> index 1bf8cf3..c91847d 100644
> --- a/plugin/semisync/semisync_slave.h
> +++ b/plugin/semisync/semisync_slave.h
> @@ -60,23 +60,30 @@ class ReplSemiSyncSlave
> * Return:
> * 0: success; non-zero: error
> */
> - int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
> + int slaveReadSyncHeader(const char *header, unsigned long total_len,
> + unsigned char *need_reply_byte,
> const char **payload, unsigned long *payload_len);
>
> /* A slave replies to the master indicating its replication process. It
> * indicates that the slave has received all events before the specified
> * binlog position.
> - *
> + *
> * Input:
> + * need_reply_byte - (IN) the header byte
> * mysql - (IN) the mysql network connection
> * binlog_filename - (IN) the reply point's binlog file name
> * binlog_filepos - (IN) the reply point's binlog file offset
> + * master_info - (IN) the master info struct so that we can get more
> + * info if needed
> *
> * Return:
> * 0: success; non-zero: error
> */
> - int slaveReply(MYSQL *mysql, const char *binlog_filename,
> - my_off_t binlog_filepos);
> + int slaveReply(unsigned char need_reply_byte,
> + MYSQL *mysql,
> + const char *binlog_filename,
> + my_off_t binlog_filepos,
> + Master_info* master_info);
>
> int slaveStart(Binlog_relay_IO_param *param);
> int slaveStop(Binlog_relay_IO_param *param);
> @@ -93,5 +100,6 @@ class ReplSemiSyncSlave
> extern char rpl_semi_sync_slave_enabled;
> extern unsigned long rpl_semi_sync_slave_trace_level;
> extern char rpl_semi_sync_slave_status;
> +extern char rpl_semi_sync_slave_lag_enabled;
>
> #endif /* SEMISYNC_SLAVE_H */
> diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc
> index 572ead2..0bf03be 100644
> --- a/plugin/semisync/semisync_slave_plugin.cc
> +++ b/plugin/semisync/semisync_slave_plugin.cc
> @@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync;
> event read is the last event of a transaction. And the value is
> checked in repl_semi_slave_queue_event.
> */
> -bool semi_sync_need_reply= false;
> +unsigned char semi_sync_need_reply= 0;
>
> C_MODE_START
>
> @@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
> return 1;
> }
> mysql_free_result(mysql_store_result(mysql));
> +
> + if (rpl_semi_sync_slave_lag_enabled)
> + {
> + char buf[100];
> + /*
> + Tell master that we can do exec-position reporting
> + */
> + snprintf(buf, sizeof(buf), "SET @%s= 1",
> + ReplSemiSyncBase::kRplSemiSyncSlaveReportExec);
> + if (mysql_real_query(mysql, buf, strlen(buf)))
> + {
> + sql_print_error("query: %s on master failed", buf);
> + return 1;
> + }
> + mysql_free_result(mysql_store_result(mysql));
> + }
> +
> rpl_semi_sync_slave_status= 1;
> return 0;
> }
> @@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
> should not cause the slave IO thread to stop, and the error
> messages are already reported.
> */
> - (void) repl_semisync.slaveReply(param->mysql,
> + (void) repl_semisync.slaveReply(semi_sync_need_reply,
> + param->mysql,
> param->master_log_name,
> - param->master_log_pos);
> + param->master_log_pos,
> + param->mi);
> }
> return 0;
> }
> @@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
> &fix_rpl_semi_sync_trace_level, // update
> 32, 0, ~0UL, 1);
>
> +static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled,
> + PLUGIN_VAR_OPCMDARG,
> + "Enable semi-synchronous replication slave lag reporting. ",
> + NULL, // check
> + NULL, // update
> + 0);
> +
> static SYS_VAR* semi_sync_slave_system_vars[]= {
> MYSQL_SYSVAR(enabled),
> MYSQL_SYSVAR(trace_level),
> + MYSQL_SYSVAR(lag_enabled),
> NULL,
> };
>
> @@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave)
> MariaDB_PLUGIN_MATURITY_GAMMA
> }
> maria_declare_plugin_end;
> -
> diff --git a/sql/handler.cc b/sql/handler.cc
> index 3ca9ec3..3e6cd65 100644
> --- a/sql/handler.cc
> +++ b/sql/handler.cc
> @@ -1364,12 +1364,30 @@ int ha_commit_trans(THD *thd, bool all)
> uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
> /* rw_trans is TRUE when we in a transaction changing data */
> bool rw_trans= is_real_trans && (rw_ha_count > 0);
> + bool mdl_request_initialized= false;
> MDL_request mdl_request;
> DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
> is_real_trans, rw_trans, rw_ha_count));
>
> if (rw_trans)
> {
> + /* check READ-ONLY just before before_commit hook to decrease likelihood
> + * of having threads hanging waiting for slave-lag only to be aborted
> + * due to read-only.
> + */
> + if (opt_readonly &&
> + !(thd->security_ctx->master_access & SUPER_ACL) &&
> + !thd->slave_thread)
> + {
> + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
> + goto err;
> + }
> +
> + if (RUN_HOOK(transaction, before_commit, (thd)))
> + {
> + goto err;
> + }
> +
> /*
> Acquire a metadata lock which will ensure that COMMIT is blocked
> by an active FLUSH TABLES WITH READ LOCK (and vice versa:
> @@ -1378,6 +1396,7 @@ int ha_commit_trans(THD *thd, bool all)
> We allow the owner of FTWRL to COMMIT; we assume that it knows
> what it does.
> */
> + mdl_request_initialized= true;
> mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
> MDL_EXPLICIT);
>
> @@ -1486,7 +1505,7 @@ int ha_commit_trans(THD *thd, bool all)
> ha_rollback_trans(thd, all);
>
> end:
> - if (rw_trans && mdl_request.ticket)
> + if (rw_trans && mdl_request_initialized && mdl_request.ticket)
> {
> /*
> We do not always immediately release transactional locks
> diff --git a/sql/replication.h b/sql/replication.h
> index 4731c22..309bdb4 100644
> --- a/sql/replication.h
> +++ b/sql/replication.h
> @@ -110,6 +110,21 @@ typedef struct Trans_observer {
> @retval 1 Failure
> */
> int (*after_rollback)(Trans_param *param);
> +
> + /**
> + This callback is called before transaction commit
> + If function does not return *error == 0 transaction will
> + not be committed but error code will be returned to client
> +
> + @note *error!=0 and return code 0 shall be used by plugin to signal
> + that transaction should be aborted.
> + If returning non-zero transaction will also be aborted and an error
> + will be printed to error log.
> +
> + @retval 0 Sucess
> + @retval non-zero error
> + */
> + int (*before_commit)(Trans_param *param, int *error);
> } Trans_observer;
>
> /**
> @@ -294,6 +309,8 @@ enum Binlog_relay_IO_flags {
> };
>
>
> +class Master_info;
> +
> /**
> Replication binlog relay IO observer parameter
> */
> @@ -309,8 +326,20 @@ typedef struct Binlog_relay_IO_param {
> my_off_t master_log_pos;
>
> MYSQL *mysql; /* the connection to master */
> +
> + Master_info * mi; /* master info handle */
> } Binlog_relay_IO_param;
>
> +
> +/* get the master log given a Master_info
> + * and store it in filename_buf/filepos
> + * return length of filename (excluding \0)
> + *
> + * note: filename_buf should be a minimum FN_REFLEN
> + */
> +size_t get_master_log_pos(const Master_info *mi,
> + char *filename_buf, my_off_t *filepos);
> +
> /**
> Observes and extends the service of slave IO thread.
> */
> @@ -561,7 +590,21 @@ int get_user_var_str(const char *name,
> char *value, unsigned long len,
> unsigned int precision, int *null_value);
>
> -
> +
> +/**
> + Set or replace the value of user variable as to an ulonglong
> +
> + @param name user variable name
> + @param value the value
> + @param old_value pointer to where old value will be stored (or NULL)
> +
> + @retval 0 Success, no prior value found
> + @retval 1 Success, old_value populated
> + @retval -1 Fail
> +*/
> +int set_user_var_int(const char *name,
> + long long int value,
> + long long int *old_value);
>
> #ifdef __cplusplus
> }
> diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
> index 3962600..209a809 100644
> --- a/sql/rpl_handler.cc
> +++ b/sql/rpl_handler.cc
> @@ -23,6 +23,7 @@
> #include "rpl_filter.h"
> #include <my_dir.h>
> #include "rpl_handler.h"
> +#include "sql_prepare.h"
>
> Trans_delegate *transaction_delegate;
> Binlog_storage_delegate *binlog_storage_delegate;
> @@ -88,6 +89,42 @@ int get_user_var_str(const char *name, char *value,
> return 0;
> }
>
> +int set_user_var_int(const char *name,
> + long long int value,
> + long long int *old_value)
> +{
> + THD* thd= current_thd;
> + bool null_val;
> + user_var_entry *entry=
> + (user_var_entry*) my_hash_search(&thd->user_vars,
> + (uchar*) name, strlen(name));
> + if (entry != NULL)
> + {
> + if (old_value != NULL)
> + *old_value= entry->val_int(&null_val);
> + }
> +
> + Ed_connection con(thd);
> +
> + char buf[256];
> + int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value);
> + if (/* error */ res < 0 ||
> + /* truncated */ res >= sizeof(buf))
> + {
> + return -1;
> + }
> +
> + LEX_STRING str;
> + lex_string_set(&str, buf);
> +
> + if (con.execute_direct(str))
> + {
> + return -1;
> + }
> +
> + return entry == NULL ? 0 : 1;
> +}
> +
> int delegates_init()
> {
> static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
> @@ -249,6 +286,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
> return ret;
> }
>
> +int Trans_delegate::before_commit(THD *thd)
> +{
> + int ret= 0, error= 0;
> + Trans_param param;
> + param.flags= 0;
> + param.log_file= 0;
> + param.log_pos= 0;
> + FOREACH_OBSERVER(ret, before_commit, thd, (¶m, &error));
> + return error;
> +}
> +
> int Binlog_storage_delegate::after_flush(THD *thd,
> const char *log_file,
> my_off_t log_pos,
> @@ -374,17 +422,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
>
> int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
> String *packet,
> - const char *log_file,
> + const char *log_file_path,
> my_off_t log_pos)
> {
> Binlog_transmit_param param;
> param.flags= flags;
>
> int ret= 0;
> + const char* log_file_name= log_file_path != NULL ?
> + log_file_path + dirname_length(log_file_path) : NULL;
> FOREACH_OBSERVER(ret, before_send_event, false,
> (¶m, (uchar *)packet->c_ptr(),
> packet->length(),
> - log_file+dirname_length(log_file), log_pos));
> + log_file_name, log_pos));
> return ret;
> }
>
> @@ -414,6 +464,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
> void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
> Master_info *mi)
> {
> + param->mi = mi;
> param->mysql= mi->mysql;
> param->user= mi->user;
> param->host= mi->host;
> @@ -540,6 +591,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> {
> return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
> }
> +
> +/* get master log pos for a Master_info struct */
> +size_t get_master_log_pos(const Master_info* mi,
> + char *filename_buf, my_off_t *filepos)
> +{
> + mysql_mutex_t *mutex= &mi->rli.data_lock;
> +
> + mysql_mutex_lock(mutex);
> + *filepos= mi->rli.group_master_log_pos;
> + strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN);
> + mysql_mutex_unlock(mutex);
> + return strnlen(filename_buf, FN_REFLEN);
> +}
> +
> #else
> int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
> {
> @@ -560,4 +625,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> {
> return 0;
> }
> +
> +size_t get_master_log_pos(const Master_info* mi,
> + char *filename_buf, my_off_t *filepos)
> +{
> + *filepos= 0;
> + filename_buf[0]= 0;
> + return 0;
> +}
> +
> #endif /* HAVE_REPLICATION */
> diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
> index afcfd9d..5119ee4 100644
> --- a/sql/rpl_handler.h
> +++ b/sql/rpl_handler.h
> @@ -142,7 +142,7 @@ class Trans_delegate
> :public Delegate {
> public:
> typedef Trans_observer Observer;
> - int before_commit(THD *thd, bool all);
> + int before_commit(THD *thd);
> int before_rollback(THD *thd, bool all);
> int after_commit(THD *thd, bool all);
> int after_rollback(THD *thd, bool all);
> diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
> index 56300c6..d60a122 100644
> --- a/sql/rpl_rli.h
> +++ b/sql/rpl_rli.h
> @@ -154,7 +154,9 @@ class Relay_log_info : public Slave_reporting_capability
> standard lock acquisition order to avoid deadlocks:
> run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
> */
> - mysql_mutex_t data_lock, run_lock;
> + mutable mysql_mutex_t data_lock;
> + mysql_mutex_t run_lock;
> +
> /*
> start_cond is broadcast when SQL thread is started
> stop_cond - when stopped
> diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
> index d9ae6ca..9662058 100644
> --- a/sql/sql_repl.cc
> +++ b/sql/sql_repl.cc
> @@ -818,6 +818,13 @@ static int send_heartbeat_event(binlog_send_info *info,
> packet->append(b, sizeof(b));
> }
>
> + if (RUN_HOOK(binlog_transmit, before_send_event,
> + (info->thd, info->flags, packet, 0, 0)))
> + {
> + info->error= ER_UNKNOWN_ERROR;
> + DBUG_RETURN(-1);
> + }
> +
> if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
> net_flush(net))
> {
> @@ -825,6 +832,13 @@ static int send_heartbeat_event(binlog_send_info *info,
> DBUG_RETURN(-1);
> }
>
> + if (RUN_HOOK(binlog_transmit, after_send_event,
> + (info->thd, info->flags, packet)))
> + {
> + info->error= ER_UNKNOWN_ERROR;
> + DBUG_RETURN(-1);
> + }
> +
> DBUG_RETURN(0);
> }
>
1
0