revision-id: cf0be6420cfe107230aaf6025818638c969960cc (mariadb-10.4.3-37-gcf0be6420cf)
parent(s): 3897734cb0b080585798dfbab031f8ef1eaa6ce9
author: Oleksandr Byelkin
committer: Oleksandr Byelkin
timestamp: 2019-03-06 17:49:57 +0100
message:
there are such tests
---
mysql-test/main/subselect_innodb.result | 35 ------------------------------
mysql-test/main/subselect_innodb.test | 38 ---------------------------------
2 files changed, 73 deletions(-)
diff --git a/mysql-test/main/subselect_innodb.result b/mysql-test/main/subselect_innodb.result
index 64e67c1dfc1..0eb40c9be00 100644
--- a/mysql-test/main/subselect_innodb.result
+++ b/mysql-test/main/subselect_innodb.result
@@ -616,38 +616,3 @@ id select_type table type possible_keys key key_len ref rows filtered Extra
Warnings:
Note 1003 select `test`.`t1`.`f1` AS `f1`,`test`.`t2`.`f2` AS `f2`,`test`.`t3`.`f3` AS `f3` from `test`.`t1` join `test`.`t2` semi join (`test`.`t4`) join `test`.`t3` where `test`.`t4`.`f4` = 1 and `test`.`t1`.`f1` >= `test`.`t2`.`f2`
DROP TABLE t1,t2,t3,t4;
-#
-# MDEV-18339: ASAN heap-buffer-overflow in
-# Item_exists_subselect::is_top_level_item
-#
-CREATE TABLE t1 ( pk int PRIMARY KEY , iiiiiiiiiiiii int , col_int1111 int, col_date_nokey date , col_time_key time, col_time_nokey time , col_datetime_key time, col_datetime_nokey time , ccccccccccccccc varchar(1), vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
-CREATE TABLE t2 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
-CREATE TABLE t3 ( pk int PRIMARY KEY) engine=innodb;
-CREATE TABLE t4 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
-select * from
-(select distinct
-(select count(t111111111.`ccccccccccccccc`) from t1 as t111111111
-where (exists(select distinct t22222222222.`iiiiiiiiiiiii` from t2 as t22222222222 where t22222222222.`vvvvvvvvvvvvvvvvv` < t111111111.`vvvvvvvvvvvvvvvvv`)
-or t111111111.`ccccccccccccccc` != t111111111.`vvvvvvvvvvvvvvvvv`)
-) as field1
-from
-(select t1_______2.*
-from (t1 as t1_______1 join t1 as t1_______2
-on (t1_______2.`vvvvvvvvvvvvvvvvv` = t1_______1.`ccccccccccccccc`
- and t1_______1.`iiiiiiiiiiiii` !=
-(select sum(t44444444444.`iiiiiiiiiiiii`)
-from (t4 as t44444444444 join t3 as t33333333333
-on (t33333333333.`pk` = t44444444444.`iiiiiiiiiiiii`))
-where t44444444444.`vvvvvvvvvvvvvvvvv` > 'x')
-)
-)
-) as alias1
-straight_join
-t2 as alias2
-on (alias2.`iiiiiiiiiiiii` = alias1.`iiiiiiiiiiiii`)
-where ((select 9 from dual) is null)
-and alias1.`pk` in (32, 129, 87, 51, 58, 152, 241, 37, 55, 237, 166)
-group by field1 /* 111
-111111111 */ ) as derived_aaaaa /* comment11111111111111111111111111 */;
-field1
-# End of 10.4 tests
diff --git a/mysql-test/main/subselect_innodb.test b/mysql-test/main/subselect_innodb.test
index 90d3b07c1ad..544bcd994ed 100644
--- a/mysql-test/main/subselect_innodb.test
+++ b/mysql-test/main/subselect_innodb.test
@@ -611,41 +611,3 @@ FROM t1
DROP TABLE t1,t2,t3,t4;
---echo #
---echo # MDEV-18339: ASAN heap-buffer-overflow in
---echo # Item_exists_subselect::is_top_level_item
---echo #
-
-CREATE TABLE t1 ( pk int PRIMARY KEY , iiiiiiiiiiiii int , col_int1111 int, col_date_nokey date , col_time_key time, col_time_nokey time , col_datetime_key time, col_datetime_nokey time , ccccccccccccccc varchar(1), vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
-
-CREATE TABLE t2 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
-CREATE TABLE t3 ( pk int PRIMARY KEY) engine=innodb;
-CREATE TABLE t4 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
-
-select * from
-(select distinct
- (select count(t111111111.`ccccccccccccccc`) from t1 as t111111111
- where (exists(select distinct t22222222222.`iiiiiiiiiiiii` from t2 as t22222222222 where t22222222222.`vvvvvvvvvvvvvvvvv` < t111111111.`vvvvvvvvvvvvvvvvv`)
- or t111111111.`ccccccccccccccc` != t111111111.`vvvvvvvvvvvvvvvvv`)
- ) as field1
-from
- (select t1_______2.*
- from (t1 as t1_______1 join t1 as t1_______2
- on (t1_______2.`vvvvvvvvvvvvvvvvv` = t1_______1.`ccccccccccccccc`
- and t1_______1.`iiiiiiiiiiiii` !=
- (select sum(t44444444444.`iiiiiiiiiiiii`)
- from (t4 as t44444444444 join t3 as t33333333333
- on (t33333333333.`pk` = t44444444444.`iiiiiiiiiiiii`))
- where t44444444444.`vvvvvvvvvvvvvvvvv` > 'x')
- )
- )
- ) as alias1
-straight_join
- t2 as alias2
-on (alias2.`iiiiiiiiiiiii` = alias1.`iiiiiiiiiiiii`)
-where ((select 9 from dual) is null)
-and alias1.`pk` in (32, 129, 87, 51, 58, 152, 241, 37, 55, 237, 166)
-group by field1 /* 111
-111111111 */ ) as derived_aaaaa /* comment11111111111111111111111111 */;
-
---echo # End of 10.4 tests
1
0
revision-id: 79d0e2ac64fecc77a13ff2715ba870ac7453daf1 (mariadb-10.1.38-23-g79d0e2ac64f)
parent(s): 70584750cb4f36a956f00a0131e82922795c3516
author: Jan Lindström
committer: Jan Lindström
timestamp: 2019-03-06 16:41:40 +0200
message:
Galera 3 versions of the result files recorded.
---
.../r/galera_ipv6_mariabackup_section.result | 18 ++++++++++++++++++
.../r/galera_ipv6_rsync_section.result | 5 -----
.../t/galera_ipv6_mariabackup_section.test | 21 +++++++++++----------
3 files changed, 29 insertions(+), 15 deletions(-)
diff --git a/mysql-test/suite/galera_3nodes/r/galera_ipv6_mariabackup_section.result b/mysql-test/suite/galera_3nodes/r/galera_ipv6_mariabackup_section.result
new file mode 100644
index 00000000000..53e35939a79
--- /dev/null
+++ b/mysql-test/suite/galera_3nodes/r/galera_ipv6_mariabackup_section.result
@@ -0,0 +1,18 @@
+SELECT VARIABLE_VALUE LIKE '%[::1]%' FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_incoming_addresses';
+VARIABLE_VALUE LIKE '%[::1]%'
+1
+SELECT VARIABLE_VALUE = 3 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size';
+VARIABLE_VALUE = 3
+1
+SET GLOBAL wsrep_provider_options='gmcast.isolate=1';
+CREATE TABLE t1 (f1 INTEGER) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+SET GLOBAL wsrep_provider_options='gmcast.isolate=0';
+SELECT COUNT(*) = 1 FROM t1;
+COUNT(*) = 1
+1
+DROP TABLE t1;
+include/assert_grep.inc [Streaming the backup to joiner at \[::1\]]
+include/assert_grep.inc [async IST sender starting to serve tcp://\[::1\]:]
+include/assert_grep.inc [IST receiver addr using tcp://\[::1\]]
+include/assert_grep.inc [Prepared IST receiver, listening at: tcp://\[::1\]]
diff --git a/mysql-test/suite/galera_3nodes/r/galera_ipv6_rsync_section.result b/mysql-test/suite/galera_3nodes/r/galera_ipv6_rsync_section.result
index 3f810d3eb97..a2bf5f4d98c 100644
--- a/mysql-test/suite/galera_3nodes/r/galera_ipv6_rsync_section.result
+++ b/mysql-test/suite/galera_3nodes/r/galera_ipv6_rsync_section.result
@@ -1,17 +1,12 @@
-connection node_2;
-connection node_1;
SELECT VARIABLE_VALUE LIKE '%[::1]%' FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_incoming_addresses';
VARIABLE_VALUE LIKE '%[::1]%'
1
SELECT VARIABLE_VALUE = 3 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size';
VARIABLE_VALUE = 3
1
-connection node_2;
SET GLOBAL wsrep_provider_options='gmcast.isolate=1';
-connection node_1;
CREATE TABLE t1 (f1 INTEGER) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
-connection node_2;
SET GLOBAL wsrep_provider_options='gmcast.isolate=0';
SELECT COUNT(*) = 1 FROM t1;
COUNT(*) = 1
diff --git a/mysql-test/suite/galera_3nodes/t/galera_ipv6_mariabackup_section.test b/mysql-test/suite/galera_3nodes/t/galera_ipv6_mariabackup_section.test
index 71d17c133a5..95cd1a5bea5 100644
--- a/mysql-test/suite/galera_3nodes/t/galera_ipv6_mariabackup_section.test
+++ b/mysql-test/suite/galera_3nodes/t/galera_ipv6_mariabackup_section.test
@@ -39,30 +39,31 @@ DROP TABLE t1;
--let $assert_only_after = CURRENT_TEST
# The SSTs happen when nodes are started first time
---let $assert_count = 2
+--let $assert_count= 2
--let $assert_text = Streaming the backup to joiner at \[::1\]
--let $assert_select = Streaming the backup to joiner at \[::1\]
--source include/assert_grep.inc
-# There will be 3 ISTs donated from node_1 in Galera 4.
+# There will be 1 ISTs donated from node_1 in Galera 3.
# Two first happen at the initial startup to populate the certification
# index. The third one is from the IST which happens during the actual test.
---let $assert_count = 3
+--let $assert_count= 1
--let $assert_text = async IST sender starting to serve tcp://\[::1\]:
--let $assert_select = async IST sender starting to serve tcp://\[::1\]:
--source include/assert_grep.inc
+--connection node_2
--let $assert_file = $MYSQLTEST_VARDIR/log/mysqld.2.err
+--let $assert_only_after = CURRENT_TEST
-# There are two ISTs on joiner, the first at the initial startup, the second
-# during the actual test.
---let $assert_count = 2
+# There is one ISTs on joiner at Galera 3.
+--let $assert_count= 1
--let $assert_text = IST receiver addr using tcp://\[::1\]
--let $assert_select = IST receiver addr using tcp://\[::1\]
--source include/assert_grep.inc
-# The receiver expects seqnos 3-6 only once.
---let $assert_count = 1
---let $assert_text = Prepared IST receiver for 3-6, listening at: tcp://\[::1\]
---let $assert_select = Prepared IST receiver for 3-6, listening at: tcp://\[::1\]
+# There will be only one Prepared IST and in Galera 3 segnos are not printed
+--let $assert_count= 1
+--let $assert_text = Prepared IST receiver, listening at: tcp://\[::1\]
+--let $assert_select = Prepared IST receiver, listening at: tcp://\[::1\]
--source include/assert_grep.inc
1
0

[Commits] 3897734cb0b: MDEV-18339: ASAN heap-buffer-overflow in Item_exists_subselect::is_top_level_item
by Oleksandr Byelkin 06 Mar '19
by Oleksandr Byelkin 06 Mar '19
06 Mar '19
revision-id: 3897734cb0b080585798dfbab031f8ef1eaa6ce9 (mariadb-10.4.3-36-g3897734cb0b)
parent(s): 2a791c53ad93c8bc1441dd227000234bd49c4990
author: Oleksandr Byelkin
committer: Oleksandr Byelkin
timestamp: 2019-03-06 15:31:50 +0100
message:
MDEV-18339: ASAN heap-buffer-overflow in Item_exists_subselect::is_top_level_item
Right argument of Item_in_optimizer can not be cast to Item_in_subselect in invisible mode.
---
mysql-test/main/subselect_innodb.result | 35 ++++++++++++++++++++++++++++++
mysql-test/main/subselect_innodb.test | 38 +++++++++++++++++++++++++++++++++
sql/item_cmpfunc.cc | 8 +++----
3 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/mysql-test/main/subselect_innodb.result b/mysql-test/main/subselect_innodb.result
index 0eb40c9be00..64e67c1dfc1 100644
--- a/mysql-test/main/subselect_innodb.result
+++ b/mysql-test/main/subselect_innodb.result
@@ -616,3 +616,38 @@ id select_type table type possible_keys key key_len ref rows filtered Extra
Warnings:
Note 1003 select `test`.`t1`.`f1` AS `f1`,`test`.`t2`.`f2` AS `f2`,`test`.`t3`.`f3` AS `f3` from `test`.`t1` join `test`.`t2` semi join (`test`.`t4`) join `test`.`t3` where `test`.`t4`.`f4` = 1 and `test`.`t1`.`f1` >= `test`.`t2`.`f2`
DROP TABLE t1,t2,t3,t4;
+#
+# MDEV-18339: ASAN heap-buffer-overflow in
+# Item_exists_subselect::is_top_level_item
+#
+CREATE TABLE t1 ( pk int PRIMARY KEY , iiiiiiiiiiiii int , col_int1111 int, col_date_nokey date , col_time_key time, col_time_nokey time , col_datetime_key time, col_datetime_nokey time , ccccccccccccccc varchar(1), vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
+CREATE TABLE t2 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
+CREATE TABLE t3 ( pk int PRIMARY KEY) engine=innodb;
+CREATE TABLE t4 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
+select * from
+(select distinct
+(select count(t111111111.`ccccccccccccccc`) from t1 as t111111111
+where (exists(select distinct t22222222222.`iiiiiiiiiiiii` from t2 as t22222222222 where t22222222222.`vvvvvvvvvvvvvvvvv` < t111111111.`vvvvvvvvvvvvvvvvv`)
+or t111111111.`ccccccccccccccc` != t111111111.`vvvvvvvvvvvvvvvvv`)
+) as field1
+from
+(select t1_______2.*
+from (t1 as t1_______1 join t1 as t1_______2
+on (t1_______2.`vvvvvvvvvvvvvvvvv` = t1_______1.`ccccccccccccccc`
+ and t1_______1.`iiiiiiiiiiiii` !=
+(select sum(t44444444444.`iiiiiiiiiiiii`)
+from (t4 as t44444444444 join t3 as t33333333333
+on (t33333333333.`pk` = t44444444444.`iiiiiiiiiiiii`))
+where t44444444444.`vvvvvvvvvvvvvvvvv` > 'x')
+)
+)
+) as alias1
+straight_join
+t2 as alias2
+on (alias2.`iiiiiiiiiiiii` = alias1.`iiiiiiiiiiiii`)
+where ((select 9 from dual) is null)
+and alias1.`pk` in (32, 129, 87, 51, 58, 152, 241, 37, 55, 237, 166)
+group by field1 /* 111
+111111111 */ ) as derived_aaaaa /* comment11111111111111111111111111 */;
+field1
+# End of 10.4 tests
diff --git a/mysql-test/main/subselect_innodb.test b/mysql-test/main/subselect_innodb.test
index 544bcd994ed..90d3b07c1ad 100644
--- a/mysql-test/main/subselect_innodb.test
+++ b/mysql-test/main/subselect_innodb.test
@@ -611,3 +611,41 @@ FROM t1
DROP TABLE t1,t2,t3,t4;
+--echo #
+--echo # MDEV-18339: ASAN heap-buffer-overflow in
+--echo # Item_exists_subselect::is_top_level_item
+--echo #
+
+CREATE TABLE t1 ( pk int PRIMARY KEY , iiiiiiiiiiiii int , col_int1111 int, col_date_nokey date , col_time_key time, col_time_nokey time , col_datetime_key time, col_datetime_nokey time , ccccccccccccccc varchar(1), vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
+
+CREATE TABLE t2 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
+CREATE TABLE t3 ( pk int PRIMARY KEY) engine=innodb;
+CREATE TABLE t4 ( iiiiiiiiiiiii int , vvvvvvvvvvvvvvvvv varchar(1)) engine=innodb;
+
+select * from
+(select distinct
+ (select count(t111111111.`ccccccccccccccc`) from t1 as t111111111
+ where (exists(select distinct t22222222222.`iiiiiiiiiiiii` from t2 as t22222222222 where t22222222222.`vvvvvvvvvvvvvvvvv` < t111111111.`vvvvvvvvvvvvvvvvv`)
+ or t111111111.`ccccccccccccccc` != t111111111.`vvvvvvvvvvvvvvvvv`)
+ ) as field1
+from
+ (select t1_______2.*
+ from (t1 as t1_______1 join t1 as t1_______2
+ on (t1_______2.`vvvvvvvvvvvvvvvvv` = t1_______1.`ccccccccccccccc`
+ and t1_______1.`iiiiiiiiiiiii` !=
+ (select sum(t44444444444.`iiiiiiiiiiiii`)
+ from (t4 as t44444444444 join t3 as t33333333333
+ on (t33333333333.`pk` = t44444444444.`iiiiiiiiiiiii`))
+ where t44444444444.`vvvvvvvvvvvvvvvvv` > 'x')
+ )
+ )
+ ) as alias1
+straight_join
+ t2 as alias2
+on (alias2.`iiiiiiiiiiiii` = alias1.`iiiiiiiiiiiii`)
+where ((select 9 from dual) is null)
+and alias1.`pk` in (32, 129, 87, 51, 58, 152, 241, 37, 55, 237, 166)
+group by field1 /* 111
+111111111 */ ) as derived_aaaaa /* comment11111111111111111111111111 */;
+
+--echo # End of 10.4 tests
diff --git a/sql/item_cmpfunc.cc b/sql/item_cmpfunc.cc
index 55a06254917..ffb7b60e4de 100644
--- a/sql/item_cmpfunc.cc
+++ b/sql/item_cmpfunc.cc
@@ -1182,6 +1182,8 @@ longlong Item_func_truth::val_int()
bool Item_in_optimizer::is_top_level_item()
{
+ if (invisible_mode())
+ return FALSE;
return ((Item_in_subselect *)args[1])->is_top_level_item();
}
@@ -1237,8 +1239,7 @@ void Item_in_optimizer::print(String *str, enum_query_type query_type)
void Item_in_optimizer::restore_first_argument()
{
- if (args[1]->type() == Item::SUBSELECT_ITEM &&
- ((Item_subselect *)args[1])->is_in_predicate())
+ if (!invisible_mode())
{
args[0]= ((Item_in_subselect *)args[1])->left_expr;
}
@@ -1255,8 +1256,7 @@ bool Item_in_optimizer::fix_left(THD *thd)
it is args[0].
*/
Item **ref0= args;
- if (args[1]->type() == Item::SUBSELECT_ITEM &&
- ((Item_subselect *)args[1])->is_in_predicate())
+ if (!invisible_mode())
{
/*
left_expr->fix_fields() may cause left_expr to be substituted for
1
0
revision-id: 7af7b669eb14d86c19409a058607d1ba9e2ceff4 (mariadb-10.2.22-36-g7af7b669eb1)
parent(s): b68d5c38a41908feb78857d4437626c1adf268e5
author: Oleksandr Byelkin
committer: Oleksandr Byelkin
timestamp: 2019-03-06 13:36:44 +0100
message:
c
---
sql/handler.cc | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/sql/handler.cc b/sql/handler.cc
index dc454fa1c06..d307433cf5d 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -3308,8 +3308,7 @@ void handler::get_auto_increment(ulonglong offset, ulonglong increment,
MY_BITMAP *old_read_set;
bool rnd_inited= (inited == RND);
- if (rnd_inited &&
- ha_rnd_end())
+ if (rnd_inited && ha_rnd_end())
return;
old_read_set= table->prepare_for_keyread(table->s->next_number_index);
@@ -3320,7 +3319,7 @@ void handler::get_auto_increment(ulonglong offset, ulonglong increment,
DBUG_ASSERT(0);
(void) extra(HA_EXTRA_NO_KEYREAD);
*first_value= ULONGLONG_MAX;
- if (rnd_inited &&
+ if (rnd_inited)
ha_rnd_init(0);
return;
}
1
0
revision-id: 3c6a45ddd941e88ec88b0c03654e21dd1e785228 (mariadb-10.1.38-20-g3c6a45ddd94)
parent(s): 91e4f00389483d22fa81517bbcdcd21499fd283a
author: Jan Lindström
committer: Jan Lindström
timestamp: 2019-03-06 13:33:37 +0200
message:
MDEV-18830: Port SST fixes from 10.4 to 10.1
modified: mysql-test/suite/galera/disabled.def
modified: mysql-test/suite/galera/r/galera_many_rows.result
modified: mysql-test/suite/galera/t/galera_kill_nochanges.test
new file: mysql-test/suite/galera/t/galera_many_rows.cnf
modified: mysql-test/suite/galera/t/galera_many_rows.test
modified: mysql-test/suite/galera/t/galera_var_dirty_reads.test
modified: scripts/wsrep_sst_mariabackup.sh
---
mysql-test/suite/galera/disabled.def | 18 ++++-
mysql-test/suite/galera/r/galera_many_rows.result | 26 +++---
.../suite/galera/t/galera_kill_nochanges.test | 9 ++-
mysql-test/suite/galera/t/galera_many_rows.cnf | 5 ++
mysql-test/suite/galera/t/galera_many_rows.test | 19 +++--
.../suite/galera/t/galera_var_dirty_reads.test | 5 --
scripts/wsrep_sst_mariabackup.sh | 93 ++++++++++++++--------
7 files changed, 112 insertions(+), 63 deletions(-)
diff --git a/mysql-test/suite/galera/disabled.def b/mysql-test/suite/galera/disabled.def
index b81c9a737ec..1fc67475b93 100644
--- a/mysql-test/suite/galera/disabled.def
+++ b/mysql-test/suite/galera/disabled.def
@@ -19,12 +19,24 @@ galera_as_master_gtid_change_master : Requires MySQL GTID
galera_as_slave_preordered : wsrep-preordered feature not merged to MariaDB
GAL-419 : MDEV-13549 Galera test failures
galera_var_notify_cmd : MDEV-13549 Galera test failures
-galera_as_slave_replication_bundle : MDEV-13549 Galera test failures
galera_ssl_upgrade : MDEV-13549 Galera test failures
-galera.MW-329 : wsrep_local_replays not stable
+MW-329 : wsrep_local_replays not stable
MW-416 : MDEV-13549 Galera test failures
+MW-44 : MDEV-15809 Test failure on galera.MW-44
+galera_pc_ignore_sb : MDEV-15811 Test failure on galera_pc_ignore_sb
+galera_kill_applier : race condition at the start of the test
+galera_ist_progress: MDEV-15236 galera_ist_progress fails when trying to read transfer status
+pxc-421: Lock timeout exceeded
galera_sst_mysqldump_with_key : MDEV-16890 Galera test failure
+galera_kill_ddl : MDEV-17108 Test failure on galera.galera_kill_ddl
+galera_var_node_address : MDEV-17151 Galera test failure on galera.galera_var_node_address
galera_gc_fc_limit : MDEV-17061 Test failure on galera.galera_gc_fc_limit
-galera_as_slave_replication_budle : MDEV-15785 Test case galera_as_slave_replication_bundle caused debug assertion
+galera_as_slave_replication_bundle : MDEV-15785 Test case galera_as_slave_replication_bundle caused debug assertion
galera_wan : MDEV-17259: Test failure on galera.galera_wan
galera_pc_ignore_sb : MDEV-17357 Test failure on galera.galera_pc_ignore_sb
+MW-328A : MDEV-17847 Galera test failure on MW-328[A|B|C]
+MW-328B : MDEV-17847 Galera test failure on MW-328[A|B|C]
+MW-328C : MDEV-17847 Galera test failure on MW-328[A|B|C]
+query_cache : MDEV-18137: Galera test failure on query_cache
+galera_ist_mariabackup : MDEV-18829 Galera test galera_ist_mariabackup leaves port open causing following SST tests to fail
+galera.galera_gcache_recover_manytrx : MDEV-18834 Galera test failure on galera_gcache_recover_manytrx
diff --git a/mysql-test/suite/galera/r/galera_many_rows.result b/mysql-test/suite/galera/r/galera_many_rows.result
index e650dd6f5de..6f441d9b401 100644
--- a/mysql-test/suite/galera/r/galera_many_rows.result
+++ b/mysql-test/suite/galera/r/galera_many_rows.result
@@ -1,27 +1,27 @@
SET SESSION innodb_lock_wait_timeout=600;
SET SESSION lock_wait_timeout=600;
-CREATE TABLE ten (f1 INTEGER);
+CREATE TABLE ten (f1 INTEGER) engine=InnoDB;
INSERT INTO ten VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
CREATE TABLE t1 (f1 INTEGER AUTO_INCREMENT PRIMARY KEY, f2 INTEGER) Engine=InnoDB;
INSERT INTO t1 (f2) SELECT a1.f1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4, ten AS a5;
SET SESSION wsrep_sync_wait = 0;
SET SESSION wsrep_sync_wait = 15;
SET GLOBAL wsrep_provider_options = 'repl.causal_read_timeout=PT1H';
-SELECT COUNT(*) = 100000 FROM t1;
-COUNT(*) = 100000
-1
+SELECT COUNT(*) FROM t1;
+COUNT(*)
+100000
INSERT INTO t1 (f2) SELECT a1.f1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4, ten AS a5;
-SELECT COUNT(*) = 200000 FROM t1;
-COUNT(*) = 200000
-1
+SELECT COUNT(*) FROM t1;
+COUNT(*)
+200000
UPDATE t1 SET f2 = 1;
-SELECT COUNT(*) = 200000 FROM t1 WHERE f2 = 1;
-COUNT(*) = 200000
-1
+SELECT COUNT(*) FROM t1 WHERE f2 = 1;
+COUNT(*)
+200000
START TRANSACTION;
-SELECT COUNT(*) = 200000 FROM t1;
-COUNT(*) = 200000
-1
+SELECT COUNT(*) FROM t1;
+COUNT(*)
+200000
UPDATE t1 SET f2 = 3;
START TRANSACTION;
UPDATE t1 SET f2 = 4;
diff --git a/mysql-test/suite/galera/t/galera_kill_nochanges.test b/mysql-test/suite/galera/t/galera_kill_nochanges.test
index 4106378885f..9360ad542f6 100644
--- a/mysql-test/suite/galera/t/galera_kill_nochanges.test
+++ b/mysql-test/suite/galera/t/galera_kill_nochanges.test
@@ -3,7 +3,11 @@
#
--source include/galera_cluster.inc
---source include/have_innodb.inc
+
+# Save original auto_increment_offset values.
+--let $node_1=node_1
+--let $node_2=node_2
+--source include/auto_increment_offset_save.inc
--connection node_1
CREATE TABLE t1 (f1 INTEGER) ENGINE=InnoDB;
@@ -29,4 +33,7 @@ SET SESSION wsrep_sync_wait = DEFAULT;
SELECT COUNT(*) = 1 FROM t1;
SELECT VARIABLE_VALUE = 2 FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_cluster_size';
+--let $node_2=node_2a
+--source include/auto_increment_offset_restore.inc
+
DROP TABLE t1;
diff --git a/mysql-test/suite/galera/t/galera_many_rows.cnf b/mysql-test/suite/galera/t/galera_many_rows.cnf
new file mode 100644
index 00000000000..4e1022cf67f
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_many_rows.cnf
@@ -0,0 +1,5 @@
+!include ../galera_2nodes.cnf
+
+[mysqld]
+innodb-status-output=ON
+innodb-status-output-locks=ON
diff --git a/mysql-test/suite/galera/t/galera_many_rows.test b/mysql-test/suite/galera/t/galera_many_rows.test
index 58ba85e1b9e..bc9e99db8da 100644
--- a/mysql-test/suite/galera/t/galera_many_rows.test
+++ b/mysql-test/suite/galera/t/galera_many_rows.test
@@ -1,13 +1,16 @@
-
--source include/big_test.inc
--source include/galera_cluster.inc
---source include/have_innodb.inc
+
+# Save original auto_increment_offset values.
+--let $node_1=node_1
+--let $node_2=node_2
+--source include/auto_increment_offset_save.inc
--connection node_1
SET SESSION innodb_lock_wait_timeout=600;
SET SESSION lock_wait_timeout=600;
-CREATE TABLE ten (f1 INTEGER);
+CREATE TABLE ten (f1 INTEGER) engine=InnoDB;
INSERT INTO ten VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
CREATE TABLE t1 (f1 INTEGER AUTO_INCREMENT PRIMARY KEY, f2 INTEGER) Engine=InnoDB;
@@ -20,19 +23,19 @@ SET SESSION wsrep_sync_wait = 15;
SET GLOBAL wsrep_provider_options = 'repl.causal_read_timeout=PT1H';
-SELECT COUNT(*) = 100000 FROM t1;
+SELECT COUNT(*) FROM t1;
INSERT INTO t1 (f2) SELECT a1.f1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4, ten AS a5;
--connection node_1
-SELECT COUNT(*) = 200000 FROM t1;
+SELECT COUNT(*) FROM t1;
UPDATE t1 SET f2 = 1;
--connection node_2
-SELECT COUNT(*) = 200000 FROM t1 WHERE f2 = 1;
+SELECT COUNT(*) FROM t1 WHERE f2 = 1;
--connection node_1
START TRANSACTION;
-SELECT COUNT(*) = 200000 FROM t1;
+SELECT COUNT(*) FROM t1;
UPDATE t1 SET f2 = 3;
--connection node_2
@@ -50,5 +53,7 @@ COMMIT;
--eval SET GLOBAL wsrep_provider_options = '$wsrep_provider_options_node2';
--enable_query_log
+--source include/auto_increment_offset_restore.inc
+
DROP TABLE t1;
DROP TABLE ten;
diff --git a/mysql-test/suite/galera/t/galera_var_dirty_reads.test b/mysql-test/suite/galera/t/galera_var_dirty_reads.test
index 1f01c4aac07..3e2108868af 100644
--- a/mysql-test/suite/galera/t/galera_var_dirty_reads.test
+++ b/mysql-test/suite/galera/t/galera_var_dirty_reads.test
@@ -11,11 +11,6 @@
--let $node_2=node_2
--source include/auto_increment_offset_save.inc
-# Save original auto_increment_offset values.
---let $node_1=node_1
---let $node_2=node_2
---source include/auto_increment_offset_save.inc
-
--connection node_2
--let $wsrep_cluster_address_saved = `SELECT @@global.wsrep_cluster_address`
diff --git a/scripts/wsrep_sst_mariabackup.sh b/scripts/wsrep_sst_mariabackup.sh
index 3fd93a90c1e..b05e99ada36 100644
--- a/scripts/wsrep_sst_mariabackup.sh
+++ b/scripts/wsrep_sst_mariabackup.sh
@@ -1,6 +1,6 @@
#!/bin/bash -ue
# Copyright (C) 2013 Percona Inc
-# Copyright (C) 2017 MariaDB
+# Copyright (C) 2017-2019 MariaDB
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -22,6 +22,7 @@
. $(dirname $0)/wsrep_sst_common
+OS=$(uname)
ealgo=""
ekey=""
ekeyfile=""
@@ -76,13 +77,13 @@ sdecomp=""
# 5.6.21 PXC and later can't donate to an older joiner
sst_ver=1
-if which pv &>/dev/null && pv --help | grep -q FORMAT;then
+if pv --help 2>/dev/null | grep -q FORMAT;then
pvopts+=$pvformat
fi
pcmd="pv $pvopts"
declare -a RC
-INNOBACKUPEX_BIN=mariabackup
+INNOBACKUPEX_BIN=$(which mariabackup)
XBSTREAM_BIN=mbstream
XBCRYPT_BIN=xbcrypt # Not available in MariaBackup
@@ -174,10 +175,8 @@ get_transfer()
fi
if [[ $tfmt == 'nc' ]];then
- if [[ ! -x `which nc` ]];then
- wsrep_log_error "nc(netcat) not found in path: $PATH"
- exit 2
- fi
+ wsrep_check_programs nc
+
wsrep_log_info "Using netcat as streamer"
if [[ "$WSREP_SST_OPT_ROLE" == "joiner" ]];then
if nc -h 2>&1 | grep -q ncat;then
@@ -204,11 +203,8 @@ get_transfer()
fi
else
tfmt='socat'
+ wsrep_check_programs socat
wsrep_log_info "Using socat as streamer"
- if [[ ! -x `which socat` ]];then
- wsrep_log_error "socat not found in path: $PATH"
- exit 2
- fi
if [[ $encrypt -eq 2 || $encrypt -eq 3 ]] && ! socat -V | grep -q "WITH_OPENSSL 1";then
wsrep_log_error "Encryption requested, but socat is not OpenSSL enabled (encrypt=$encrypt)"
@@ -297,7 +293,7 @@ get_footprint()
adjust_progress()
{
- if [[ ! -x `which pv` ]];then
+ if ! command -v pv >/dev/null;then
wsrep_log_error "pv not found in path: $PATH"
wsrep_log_error "Disabling all progress/rate-limiting"
pcmd=""
@@ -336,6 +332,7 @@ read_cnf()
rebuild=$(parse_cnf sst rebuild 0)
ttime=$(parse_cnf sst time 0)
cpat=$(parse_cnf sst cpat '.*galera\.cache$\|.*sst_in_progress$\|.*\.sst$\|.*gvwstate\.dat$\|.*grastate\.dat$\|.*\.err$\|.*\.log$\|.*RPM_UPGRADE_MARKER$\|.*RPM_UPGRADE_HISTORY$')
+ [[ $OS == "FreeBSD" ]] && cpat=$(parse_cnf sst cpat '.*galera\.cache$|.*sst_in_progress$|.*\.sst$|.*gvwstate\.dat$|.*grastate\.dat$|.*\.err$|.*\.log$|.*RPM_UPGRADE_MARKER$|.*RPM_UPGRADE_HISTORY$')
ealgo=$(parse_cnf xtrabackup encrypt "")
ekey=$(parse_cnf xtrabackup encrypt-key "")
ekeyfile=$(parse_cnf xtrabackup encrypt-key-file "")
@@ -550,8 +547,12 @@ wait_for_listen()
local MODULE=$3
for i in {1..50}
do
- ss -p state listening "( sport = :$PORT )" | grep -qE 'socat|nc' && break
- sleep 0.2
+ if [ "$OS" = "FreeBSD" ];then
+ sockstat -46lp $PORT | grep -qE "^[^ ]* *(socat|nc) *[^ ]* *[^ ]* *[^ ]* *[^ ]*:$PORT" && break
+ else
+ ss -p state listening "( sport = :$PORT )" | grep -qE 'socat|nc' && break
+ fi
+ sleep 0.2
done
echo "ready ${ADDR}/${MODULE}//$sst_ver"
}
@@ -597,7 +598,7 @@ recv_joiner()
pushd ${dir} 1>/dev/null
set +e
- if [[ $tmt -gt 0 && -x `which timeout` ]];then
+ if [[ $tmt -gt 0 ]] && command -v timeout >/dev/null;then
if timeout --help | grep -q -- '-k';then
ltcmd="timeout -k $(( tmt+10 )) $tmt $tcmd"
else
@@ -662,13 +663,13 @@ monitor_process()
while true ; do
- if ! ps --pid "${WSREP_SST_OPT_PARENT}" &>/dev/null; then
+ if ! ps -p "${WSREP_SST_OPT_PARENT}" &>/dev/null; then
wsrep_log_error "Parent mysqld process (PID:${WSREP_SST_OPT_PARENT}) terminated unexpectedly."
kill -- -"${WSREP_SST_OPT_PARENT}"
exit 32
fi
- if ! ps --pid "${sst_stream_pid}" &>/dev/null; then
+ if ! ps -p "${sst_stream_pid}" &>/dev/null; then
break
fi
@@ -677,10 +678,7 @@ monitor_process()
done
}
-if [[ ! -x `which $INNOBACKUPEX_BIN` ]];then
- wsrep_log_error "${INNOBACKUPEX_BIN} not in path: $PATH"
- exit 2
-fi
+wsrep_check_programs "$INNOBACKUPEX_BIN"
rm -f "${MAGIC_FILE}"
@@ -704,9 +702,33 @@ fi
INNOEXTRA=""
+INNODB_DATA_HOME_DIR=${INNODB_DATA_HOME_DIR:-""}
+# Try to set INNODB_DATA_HOME_DIR from the command line:
+if [ ! -z "$INNODB_DATA_HOME_DIR_ARG" ]; then
+ INNODB_DATA_HOME_DIR=$INNODB_DATA_HOME_DIR_ARG
+fi
+# if INNODB_DATA_HOME_DIR env. variable is not set, try to get it from my.cnf
+if [ -z "$INNODB_DATA_HOME_DIR" ]; then
+ INNODB_DATA_HOME_DIR=$(parse_cnf mysqld$WSREP_SST_OPT_SUFFIX_VALUE innodb-data-home-dir '')
+fi
+if [ -z "$INNODB_DATA_HOME_DIR" ]; then
+ INNODB_DATA_HOME_DIR=$(parse_cnf --mysqld innodb-data-home-dir "")
+fi
+if [ ! -z "$INNODB_DATA_HOME_DIR" ]; then
+ INNOEXTRA+=" --innodb-data-home-dir=$INNODB_DATA_HOME_DIR"
+fi
+
+if [ -n "$INNODB_DATA_HOME_DIR" ]; then
+ # handle both relative and absolute paths
+ INNODB_DATA_HOME_DIR=$(cd $DATA; mkdir -p "$INNODB_DATA_HOME_DIR"; cd $INNODB_DATA_HOME_DIR; pwd -P)
+else
+ # default to datadir
+ INNODB_DATA_HOME_DIR=$(cd $DATA; pwd -P)
+fi
+
if [[ $ssyslog -eq 1 ]];then
- if [[ ! -x `which logger` ]];then
+ if ! command -v logger >/dev/null;then
wsrep_log_error "logger not in path: $PATH. Ignoring"
else
@@ -724,7 +746,7 @@ if [[ $ssyslog -eq 1 ]];then
logger -p daemon.info -t ${ssystag}wsrep-sst-$WSREP_SST_OPT_ROLE "$@"
}
- INNOAPPLY="${INNOBACKUPEX_BIN} --innobackupex $disver $iapts --apply-log \$rebuildcmd \${DATA} 2>&1 | logger -p daemon.err -t ${ssystag}innobackupex-apply "
+ INNOAPPLY="${INNOBACKUPEX_BIN} --innobackupex $disver $iapts \$INNOEXTRA --apply-log \$rebuildcmd \${DATA} 2>&1 | logger -p daemon.err -t ${ssystag}innobackupex-apply "
INNOMOVE="${INNOBACKUPEX_BIN} --innobackupex ${WSREP_SST_OPT_CONF} $disver $impts --move-back --force-non-empty-directories \${DATA} 2>&1 | logger -p daemon.err -t ${ssystag}innobackupex-move "
INNOBACKUP="${INNOBACKUPEX_BIN} --innobackupex ${WSREP_SST_OPT_CONF} $disver $iopts \$tmpopts \$INNOEXTRA --galera-info --stream=\$sfmt \$itmpdir 2> >(logger -p daemon.err -t ${ssystag}innobackupex-backup)"
fi
@@ -733,8 +755,8 @@ else
if [[ "$sstlogarchive" -eq 1 ]]
then
- ARCHIVETIMESTAMP=$(date "+%Y.%m.%d-%H.%M.%S")
- newfile=""
+ ARCHIVETIMESTAMP=$(date "+%Y.%m.%d-%H.%M.%S.%N")
+ newfile=""
if [[ ! -z "$sstlogarchivedir" ]]
then
@@ -788,7 +810,7 @@ then
fi
- INNOAPPLY="${INNOBACKUPEX_BIN} --innobackupex $disver $iapts --apply-log \$rebuildcmd \${DATA} &> ${INNOAPPLYLOG}"
+ INNOAPPLY="${INNOBACKUPEX_BIN} --innobackupex $disver $iapts \$INNOEXTRA --apply-log \$rebuildcmd \${DATA} &> ${INNOAPPLYLOG}"
INNOMOVE="${INNOBACKUPEX_BIN} --innobackupex ${WSREP_SST_OPT_CONF} $disver $impts --move-back --force-non-empty-directories \${DATA} &> ${INNOMOVELOG}"
INNOBACKUP="${INNOBACKUPEX_BIN} --innobackupex ${WSREP_SST_OPT_CONF} $disver $iopts \$tmpopts \$INNOEXTRA --galera-info --stream=\$sfmt \$itmpdir 2> ${INNOBACKUPLOG}"
fi
@@ -809,7 +831,7 @@ then
exit 93
fi
- if [[ -z $(parse_cnf mysqld tmpdir "") && -z $(parse_cnf xtrabackup tmpdir "") ]];then
+ if [[ -z $(parse_cnf --mysqld tmpdir "") && -z $(parse_cnf xtrabackup tmpdir "") ]];then
xtmpdir=$(mktemp -d)
tmpopts=" --tmpdir=$xtmpdir "
wsrep_log_info "Using $xtmpdir as xtrabackup temporary directory"
@@ -931,9 +953,9 @@ then
[[ -e $SST_PROGRESS_FILE ]] && wsrep_log_info "Stale sst_in_progress file: $SST_PROGRESS_FILE"
[[ -n $SST_PROGRESS_FILE ]] && touch $SST_PROGRESS_FILE
- ib_home_dir=$(parse_cnf mysqld innodb-data-home-dir "")
- ib_log_dir=$(parse_cnf mysqld innodb-log-group-home-dir "")
- ib_undo_dir=$(parse_cnf mysqld innodb-undo-directory "")
+ ib_home_dir=$INNODB_DATA_HOME_DIR
+ ib_log_dir=$(parse_cnf --mysqld innodb-log-group-home-dir "")
+ ib_undo_dir=$(parse_cnf --mysqld innodb-undo-directory "")
stagemsg="Joiner-Recv"
@@ -1003,11 +1025,14 @@ then
jpid=$!
wsrep_log_info "Proceeding with SST"
-
wsrep_log_info "Cleaning the existing datadir and innodb-data/log directories"
- find $ib_home_dir $ib_log_dir $ib_undo_dir $DATA -mindepth 1 -regex $cpat -prune -o -exec rm -rfv {} 1>&2 \+
+ if [ "${OS}" = "FreeBSD" ]; then
+ find -E $ib_home_dir $ib_log_dir $ib_undo_dir $DATA -mindepth 1 -prune -regex $cpat -o -exec rm -rfv {} 1>&2 \+
+ else
+ find $ib_home_dir $ib_log_dir $ib_undo_dir $DATA -mindepth 1 -prune -regex $cpat -o -exec rm -rfv {} 1>&2 \+
+ fi
- tempdir=$(parse_cnf mysqld log-bin "")
+ tempdir=$(parse_cnf --mysqld log-bin "")
if [[ -n ${tempdir:-} ]];then
binlog_dir=$(dirname $tempdir)
binlog_file=$(basename $tempdir)
@@ -1052,7 +1077,7 @@ then
wsrep_log_info "Compressed qpress files found"
- if [[ ! -x `which qpress` ]];then
+ if ! command -v qpress >/dev/null;then
wsrep_log_error "qpress not found in path: $PATH"
exit 22
fi
1
0

[Commits] 2e3c5d8: MDEV-18816 Assertion `sel->quick' failed in JOIN::make_range_rowid_filters
by IgorBabaev 06 Mar '19
by IgorBabaev 06 Mar '19
06 Mar '19
revision-id: 2e3c5d8ab252cd0d64cab8000ff7f384766ae08f (mariadb-10.4.3-34-g2e3c5d8)
parent(s): 1bcb66c597b79ed61a945661916ccac71ded0cfa
author: Igor Babaev
committer: Igor Babaev
timestamp: 2019-03-05 23:42:04 -0800
message:
MDEV-18816 Assertion `sel->quick' failed in JOIN::make_range_rowid_filters
Do not build range filters with detected impossible WHERE.
Anyway conditions cannot be used anymore to extract ranges for filters.
---
mysql-test/main/rowid_filter.result | 33 ++++++++++++++++++++++++++++++
mysql-test/main/rowid_filter.test | 31 ++++++++++++++++++++++++++++
mysql-test/main/rowid_filter_innodb.result | 33 ++++++++++++++++++++++++++++++
sql/sql_select.cc | 8 ++++++++
4 files changed, 105 insertions(+)
diff --git a/mysql-test/main/rowid_filter.result b/mysql-test/main/rowid_filter.result
index ea7de8c..f025305 100644
--- a/mysql-test/main/rowid_filter.result
+++ b/mysql-test/main/rowid_filter.result
@@ -1941,4 +1941,37 @@ ALTER TABLE orders DROP COLUMN o_totaldiscount;
DROP VIEW v1;
DROP DATABASE dbt3_s001;
use test;
+#
+# MDEV-18816: potential range filter for one join table with
+# impossible WHERE for another
+#
+create table t1 (
+pk int not null primary key, c2 varchar(10) , i1 int,key (c2)
+) engine=myisam;
+insert into t1 values (1,'a',-5),(2,'a',null);
+create table t2 (
+pk int, i1 int, c1 varchar(30) , key c1 (c1(30)), key i1 (i1)
+) engine=myisam;
+insert into t2 values
+(1,-5,'a'),(2,null,'a'),(3,null,'a'),(4,null,'a'),(5,5,'a'),(6,null,'a'),
+(7,4,'a'),(8,55,'a'),(9,null,'a'),(10,null,'a'),(11,null,'a'),(12,-5,'a'),
+(13,-5,'a'),(14,null,'a'),(15,null,'a'),(16,-5,'a'),(17,-5,'a');
+select 1
+from t1
+left join
+t2 join t1 as t1_a on t2.i1 = t1_a.pk
+on t1.c2 = t2.c1
+where t1_a.pk is null and t1_a.i1 != 3;
+1
+explain extended select 1
+from t1
+left join
+t2 join t1 as t1_a on t2.i1 = t1_a.pk
+on t1.c2 = t2.c1
+where t1_a.pk is null and t1_a.i1 != 3;
+id select_type table type possible_keys key key_len ref rows filtered Extra
+1 SIMPLE NULL NULL NULL NULL NULL NULL NULL NULL Impossible WHERE noticed after reading const tables
+Warnings:
+Note 1003 select 1 AS `1` from `test`.`t1` join `test`.`t2` join `test`.`t1` `t1_a` where 0
+drop table t1,t2;
set @@use_stat_tables=@save_use_stat_tables;
diff --git a/mysql-test/main/rowid_filter.test b/mysql-test/main/rowid_filter.test
index a674021..87e8b2c 100644
--- a/mysql-test/main/rowid_filter.test
+++ b/mysql-test/main/rowid_filter.test
@@ -230,4 +230,35 @@ DROP DATABASE dbt3_s001;
use test;
+--echo #
+--echo # MDEV-18816: potential range filter for one join table with
+--echo # impossible WHERE for another
+--echo #
+
+create table t1 (
+ pk int not null primary key, c2 varchar(10) , i1 int,key (c2)
+) engine=myisam;
+insert into t1 values (1,'a',-5),(2,'a',null);
+
+create table t2 (
+ pk int, i1 int, c1 varchar(30) , key c1 (c1(30)), key i1 (i1)
+) engine=myisam;
+insert into t2 values
+ (1,-5,'a'),(2,null,'a'),(3,null,'a'),(4,null,'a'),(5,5,'a'),(6,null,'a'),
+ (7,4,'a'),(8,55,'a'),(9,null,'a'),(10,null,'a'),(11,null,'a'),(12,-5,'a'),
+ (13,-5,'a'),(14,null,'a'),(15,null,'a'),(16,-5,'a'),(17,-5,'a');
+
+let $q=
+select 1
+ from t1
+ left join
+ t2 join t1 as t1_a on t2.i1 = t1_a.pk
+ on t1.c2 = t2.c1
+where t1_a.pk is null and t1_a.i1 != 3;
+
+eval $q;
+eval explain extended $q;
+
+drop table t1,t2;
+
set @@use_stat_tables=@save_use_stat_tables;
diff --git a/mysql-test/main/rowid_filter_innodb.result b/mysql-test/main/rowid_filter_innodb.result
index cd09f1d..ac7ca11 100644
--- a/mysql-test/main/rowid_filter_innodb.result
+++ b/mysql-test/main/rowid_filter_innodb.result
@@ -1870,6 +1870,39 @@ ALTER TABLE orders DROP COLUMN o_totaldiscount;
DROP VIEW v1;
DROP DATABASE dbt3_s001;
use test;
+#
+# MDEV-18816: potential range filter for one join table with
+# impossible WHERE for another
+#
+create table t1 (
+pk int not null primary key, c2 varchar(10) , i1 int,key (c2)
+) engine=myisam;
+insert into t1 values (1,'a',-5),(2,'a',null);
+create table t2 (
+pk int, i1 int, c1 varchar(30) , key c1 (c1(30)), key i1 (i1)
+) engine=myisam;
+insert into t2 values
+(1,-5,'a'),(2,null,'a'),(3,null,'a'),(4,null,'a'),(5,5,'a'),(6,null,'a'),
+(7,4,'a'),(8,55,'a'),(9,null,'a'),(10,null,'a'),(11,null,'a'),(12,-5,'a'),
+(13,-5,'a'),(14,null,'a'),(15,null,'a'),(16,-5,'a'),(17,-5,'a');
+select 1
+from t1
+left join
+t2 join t1 as t1_a on t2.i1 = t1_a.pk
+on t1.c2 = t2.c1
+where t1_a.pk is null and t1_a.i1 != 3;
+1
+explain extended select 1
+from t1
+left join
+t2 join t1 as t1_a on t2.i1 = t1_a.pk
+on t1.c2 = t2.c1
+where t1_a.pk is null and t1_a.i1 != 3;
+id select_type table type possible_keys key key_len ref rows filtered Extra
+1 SIMPLE NULL NULL NULL NULL NULL NULL NULL NULL Impossible WHERE noticed after reading const tables
+Warnings:
+Note 1003 select 1 AS `1` from `test`.`t1` join `test`.`t2` join `test`.`t1` `t1_a` where 0
+drop table t1,t2;
set @@use_stat_tables=@save_use_stat_tables;
#
# MDEV-18755: possible RORI-plan and possible plan with range filter
diff --git a/sql/sql_select.cc b/sql/sql_select.cc
index 211898f..8c75f4d 100644
--- a/sql/sql_select.cc
+++ b/sql/sql_select.cc
@@ -1581,6 +1581,13 @@ bool JOIN::make_range_rowid_filters()
{
DBUG_ENTER("make_range_rowid_filters");
+ /*
+ Do not build range filters with detected impossible WHERE.
+ Anyway conditions cannot be used anymore to extract ranges for filters.
+ */
+ if (const_table_map != found_const_table_map)
+ DBUG_RETURN(0);
+
JOIN_TAB *tab;
for (tab= first_linear_tab(this, WITH_BUSH_ROOTS, WITHOUT_CONST_TABLES);
@@ -7051,6 +7058,7 @@ void set_position(JOIN *join,uint idx,JOIN_TAB *table,KEYUSE *key)
// join->positions[idx].loosescan_key= MAX_KEY; /* Not a LooseScan */
join->positions[idx].sj_strategy= SJ_OPT_NONE;
join->positions[idx].use_join_buffer= FALSE;
+ join->positions[idx].range_rowid_filter_info= 0;
/* Move the const table as down as possible in best_ref */
JOIN_TAB **pos=join->best_ref+idx+1;
1
0

[Commits] 0d6bf4d98fe: Use rocksdb+range_locking that was submitted as PR
by Sergei Petrunia 05 Mar '19
by Sergei Petrunia 05 Mar '19
05 Mar '19
revision-id: 0d6bf4d98febd26b290430ad796e63ce3bce9a3c (fb-prod201801-217-g0d6bf4d98fe)
parent(s): c8a8a09f0a88e110ec0382af13825ffd3e73ce48
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-03-05 19:05:51 +0300
message:
Use rocksdb+range_locking that was submitted as PR
---
rocksdb | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/rocksdb b/rocksdb
index cf25bb5ccb4..c0a1babb157 160000
--- a/rocksdb
+++ b/rocksdb
@@ -1 +1 @@
-Subproject commit cf25bb5ccb48e30756b2d1236636f2154ce5ec3b
+Subproject commit c0a1babb157b83b1eb41820fd7652e23b81f8c48
1
0

[Commits] 33afc32: MDEV-18467 Server crashes in fix_semijoin_strategies_for_picked_join_order
by IgorBabaev 05 Mar '19
by IgorBabaev 05 Mar '19
05 Mar '19
revision-id: 33afc326e05bf90f8f0bb64bbc3e7c00a21711a4 (mariadb-10.3.12-66-g33afc32)
parent(s): 09bd2138522787a4e0b015695c462903f4a9e728
author: Igor Babaev
committer: Igor Babaev
timestamp: 2019-03-04 23:10:30 -0800
message:
MDEV-18467 Server crashes in fix_semijoin_strategies_for_picked_join_order
If a splittable materialized derived table / view T is used in a inner nest
of an outer join with impossible ON condition then T is marked as a
constant table. Yet the execution plan to build T is still searched for
in spite of the fact that is not needed. So it should be set.
---
mysql-test/main/derived_split_innodb.result | 41 +++++++++++++++++++++++++++++
mysql-test/main/derived_split_innodb.test | 29 ++++++++++++++++++++
sql/opt_split.cc | 13 +++++----
sql/sql_select.cc | 1 +
sql/sql_select.h | 3 ++-
5 files changed, 81 insertions(+), 6 deletions(-)
diff --git a/mysql-test/main/derived_split_innodb.result b/mysql-test/main/derived_split_innodb.result
index 21dbd49..b9ed016 100644
--- a/mysql-test/main/derived_split_innodb.result
+++ b/mysql-test/main/derived_split_innodb.result
@@ -99,3 +99,44 @@ id select_type table type possible_keys key key_len ref rows Extra
2 LATERAL DERIVED t1 eq_ref PRIMARY PRIMARY 4 test.t2.id 1
set join_cache_level=default;
DROP TABLE t1,t2;
+#
+# Bug mdev-18467: join of grouping view and a base table as inner operand
+# of left join with on condition containing impossible range
+#
+create table t1 (f1 int, f2 int, key(f2)) engine=InnoDB;
+insert into t1 values (3,33), (7,77), (1,11);
+create table t2 (f1 int, f2 int, primary key (f1)) engine=InnoDB;
+insert into t2 values (3,33), (9,99), (1,11);
+create view v1 as
+select f1, max(f2) as f2 from t2 group by f1;
+select t.f2
+from t1
+left join
+(v1 join t1 as t on v1.f1=t.f1 and t.f2 = null)
+on t1.f1=t.f1;
+f2
+NULL
+NULL
+NULL
+explain select t.f2
+from t1
+left join
+(v1 join t1 as t on v1.f1=t.f1 and t.f2 = null)
+on t1.f1=t.f1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PRIMARY t const f2 NULL NULL NULL 1 Impossible ON condition
+1 PRIMARY <derived2> const key1 NULL NULL NULL 1 Impossible ON condition
+1 PRIMARY t1 ALL NULL NULL NULL NULL 3
+2 DERIVED t2 ALL PRIMARY NULL NULL NULL 3 Using temporary; Using filesort
+set statement optimizer_switch='split_materialized=off' for explain select t.f2
+from t1
+left join
+(v1 join t1 as t on v1.f1=t.f1 and t.f2 = null)
+on t1.f1=t.f1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 PRIMARY t const f2 NULL NULL NULL 1 Impossible ON condition
+1 PRIMARY <derived2> const key1 NULL NULL NULL 1 Impossible ON condition
+1 PRIMARY t1 ALL NULL NULL NULL NULL 3
+2 DERIVED t2 index NULL PRIMARY 4 NULL 3
+drop view v1;
+drop table t1,t2;
diff --git a/mysql-test/main/derived_split_innodb.test b/mysql-test/main/derived_split_innodb.test
index c3b3bca..1bf70cd 100644
--- a/mysql-test/main/derived_split_innodb.test
+++ b/mysql-test/main/derived_split_innodb.test
@@ -94,3 +94,32 @@ eval EXPLAIN $q;
set join_cache_level=default;
DROP TABLE t1,t2;
+
+--echo #
+--echo # Bug mdev-18467: join of grouping view and a base table as inner operand
+--echo # of left join with on condition containing impossible range
+--echo #
+
+create table t1 (f1 int, f2 int, key(f2)) engine=InnoDB;
+insert into t1 values (3,33), (7,77), (1,11);
+
+create table t2 (f1 int, f2 int, primary key (f1)) engine=InnoDB;
+insert into t2 values (3,33), (9,99), (1,11);
+
+create view v1 as
+ select f1, max(f2) as f2 from t2 group by f1;
+
+let $q=
+select t.f2
+ from t1
+ left join
+ (v1 join t1 as t on v1.f1=t.f1 and t.f2 = null)
+ on t1.f1=t.f1;
+
+eval $q;
+eval explain $q;
+eval set statement optimizer_switch='split_materialized=off' for explain $q;
+
+drop view v1;
+
+drop table t1,t2;
diff --git a/sql/opt_split.cc b/sql/opt_split.cc
index fc3f084..cfac0c9 100644
--- a/sql/opt_split.cc
+++ b/sql/opt_split.cc
@@ -1078,6 +1078,7 @@ bool JOIN::inject_best_splitting_cond(table_map remaining_tables)
@param
spl_plan info on the splitting plan chosen for the splittable table T
remaining_tables the table T is joined just before these tables
+ is_const_table the table T is a constant table
@details
If in the final query plan the optimizer has chosen a splitting plan
@@ -1091,12 +1092,13 @@ bool JOIN::inject_best_splitting_cond(table_map remaining_tables)
*/
bool JOIN_TAB::fix_splitting(SplM_plan_info *spl_plan,
- table_map remaining_tables)
+ table_map remaining_tables,
+ bool is_const_table)
{
SplM_opt_info *spl_opt_info= table->spl_opt_info;
DBUG_ASSERT(table->spl_opt_info != 0);
JOIN *md_join= spl_opt_info->join;
- if (spl_plan)
+ if (spl_plan && !is_const_table)
{
memcpy((char *) md_join->best_positions,
(char *) spl_plan->best_positions,
@@ -1113,7 +1115,7 @@ bool JOIN_TAB::fix_splitting(SplM_plan_info *spl_plan,
remaining_tables,
true);
}
- else
+ else if (md_join->save_qep)
{
md_join->restore_query_plan(md_join->save_qep);
}
@@ -1143,10 +1145,11 @@ bool JOIN::fix_all_splittings_in_plan()
{
POSITION *cur_pos= &best_positions[tablenr];
JOIN_TAB *tab= cur_pos->table;
- if (tablenr >= const_tables && tab->table->is_splittable())
+ if (tab->table->is_splittable())
{
SplM_plan_info *spl_plan= cur_pos->spl_plan;
- if (tab->fix_splitting(spl_plan, all_tables & ~prev_tables))
+ if (tab->fix_splitting(spl_plan, all_tables & ~prev_tables,
+ tablenr < const_tables ))
return true;
}
prev_tables|= tab->table->map;
diff --git a/sql/sql_select.cc b/sql/sql_select.cc
index 439853c..a83d4d2 100644
--- a/sql/sql_select.cc
+++ b/sql/sql_select.cc
@@ -6689,6 +6689,7 @@ void set_position(JOIN *join,uint idx,JOIN_TAB *table,KEYUSE *key)
next=tmp;
}
join->best_ref[idx]=table;
+ join->positions[idx].spl_plan= 0;
}
diff --git a/sql/sql_select.h b/sql/sql_select.h
index 0e486c1..57d8dab 100644
--- a/sql/sql_select.h
+++ b/sql/sql_select.h
@@ -659,7 +659,8 @@ typedef struct st_join_table {
void add_keyuses_for_splitting();
SplM_plan_info *choose_best_splitting(double record_count,
table_map remaining_tables);
- bool fix_splitting(SplM_plan_info *spl_plan, table_map remaining_tables);
+ bool fix_splitting(SplM_plan_info *spl_plan, table_map remaining_tables,
+ bool is_const_table);
} JOIN_TAB;
1
0
revision-id: b7ef984c04343bd9a5db167ab3c54cba6170eeef (v5.8-1035-gb7ef984c0)
parent(s): 7348e2db12ef63966daa917402abad5102e77cd2
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-03-05 01:54:26 +0300
message:
Range Locking: add a basic unit test
---
CMakeLists.txt | 1 +
Makefile | 1 +
TARGETS | 5 +
src.mk | 1 +
utilities/transactions/range_locking_test.cc | 157 +++++++++++++++++++++++++++
5 files changed, 165 insertions(+)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9795083ef..57d5d6180 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1009,6 +1009,7 @@ if(WITH_TESTS)
utilities/transactions/transaction_test.cc
utilities/transactions/write_prepared_transaction_test.cc
utilities/transactions/write_unprepared_transaction_test.cc
+ utilities/transactions/range_locking_test.cc
utilities/ttl/ttl_test.cc
utilities/write_batch_with_index/write_batch_with_index_test.cc
)
diff --git a/Makefile b/Makefile
index 074194b32..9ece4310c 100644
--- a/Makefile
+++ b/Makefile
@@ -537,6 +537,7 @@ TESTS = \
db_universal_compaction_test \
trace_analyzer_test \
repeatable_thread_test \
+ range_locking_test \
PARALLEL_TEST = \
backupable_db_test \
diff --git a/TARGETS b/TARGETS
index 2fff0edbd..e06f38771 100644
--- a/TARGETS
+++ b/TARGETS
@@ -930,6 +930,11 @@ ROCKS_TESTS = [
"db/range_del_aggregator_test.cc",
"serial",
],
+ [
+ "range_locking_test",
+ "utilities/transactions/range_locking_test.cc",
+ "serial",
+ ],
[
"rate_limiter_test",
"util/rate_limiter_test.cc",
diff --git a/src.mk b/src.mk
index f3372258f..de0644cac 100644
--- a/src.mk
+++ b/src.mk
@@ -425,6 +425,7 @@ MAIN_SOURCES = \
utilities/spatialdb/spatial_db_test.cc \
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
utilities/transactions/optimistic_transaction_test.cc \
+ utilities/transactions/range_locking_test.cc \
utilities/transactions/transaction_test.cc \
utilities/transactions/write_prepared_transaction_test.cc \
utilities/transactions/write_unprepared_transaction_test.cc \
diff --git a/utilities/transactions/range_locking_test.cc b/utilities/transactions/range_locking_test.cc
new file mode 100644
index 000000000..fff4a0e26
--- /dev/null
+++ b/utilities/transactions/range_locking_test.cc
@@ -0,0 +1,157 @@
+#ifndef ROCKSDB_LITE
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "utilities/transactions/transaction_test.h"
+
+#include <algorithm>
+#include <functional>
+#include <string>
+#include <thread>
+
+#include "db/db_impl.h"
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/perf_context.h"
+#include "rocksdb/utilities/transaction.h"
+#include "rocksdb/utilities/transaction_db.h"
+#include "table/mock_table.h"
+#include "util/fault_injection_test_env.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+#include "util/testharness.h"
+#include "util/testutil.h"
+#include "util/transaction_test_util.h"
+#include "utilities/merge_operators.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+#include "utilities/transactions/pessimistic_transaction_db.h"
+
+#include "port/port.h"
+
+using std::string;
+
+namespace rocksdb {
+
+
+void range_endpoint_convert_same(const rocksdb::Slice &key,
+ std::string *res)
+{
+ res->clear();
+ res->append(key.data(), key.size());
+}
+
+int range_endpoints_compare_default(const char *a, size_t a_len,
+ const char *b, size_t b_len)
+{
+ return Slice(a, a_len).compare(Slice(b, b_len));
+}
+
+class RangeLockingTest : public ::testing::Test {
+ public:
+ TransactionDB* db;
+ std::string dbname;
+ Options options;
+
+ TransactionDBOptions txn_db_options;
+ bool use_stackable_db_;
+
+ RangeLockingTest()
+ : db(nullptr) {
+ options.create_if_missing = true;
+ dbname = test::PerThreadDBPath("transaction_testdb");
+
+ DestroyDB(dbname, options);
+ Status s;
+ s = TransactionDB::Open(options, txn_db_options, dbname, &db);
+ assert(s.ok());
+
+ db->use_range_locking= true;
+ rocksdb::RangeLockMgrControl *mgr= db->get_range_lock_manager();
+ assert(mgr);
+
+ mgr->set_endpoint_cmp_functions(range_endpoint_convert_same,
+ range_endpoints_compare_default);
+ // can also: mgr->set_max_lock_memory(rocksdb_max_lock_memory);
+ }
+
+ ~RangeLockingTest() {
+ delete db;
+ db = nullptr;
+ // This is to skip the assert statement in FaultInjectionTestEnv. There
+ // seems to be a bug in btrfs that the makes readdir return recently
+ // unlink-ed files. By using the default fs we simply ignore errors resulted
+ // from attempting to delete such files in DestroyDB.
+ DestroyDB(dbname, options);
+ }
+};
+
+// TODO: set a smaller lock wait timeout so that the test runs faster.
+TEST_F(RangeLockingTest, BasicRangeLocking) {
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+ std::string value;
+ ReadOptions read_options;
+
+ Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ // Get a range lock
+ {
+ auto s= txn0->GetRangeLock(db->DefaultColumnFamily(),
+ Slice("a"), Slice("c"));
+ ASSERT_EQ(s, Status::OK());
+ }
+
+
+ // Check that range Lock inhibits an overlapping range lock
+ {
+ auto s= txn1->GetRangeLock(db->DefaultColumnFamily(),
+ Slice("b"), Slice("z"));
+ ASSERT_TRUE(s.IsTimedOut());
+ }
+
+ // Check that range Lock inhibits an overlapping point lock
+ {
+ auto s= txn1->GetForUpdate(read_options, db->DefaultColumnFamily(),
+ Slice("b"), &value);
+ ASSERT_TRUE(s.IsTimedOut());
+ }
+
+ // Get a point lock, check that it inhibits range locks
+ {
+ auto s= txn0->Put(db->DefaultColumnFamily(),
+ Slice("d"), Slice("value"));
+ ASSERT_EQ(s, Status::OK());
+
+ auto s2= txn1->GetRangeLock(db->DefaultColumnFamily(),
+ Slice("c"), Slice("e"));
+ ASSERT_TRUE(s2.IsTimedOut());
+ }
+
+ ASSERT_OK(txn0->Commit());
+ txn1->Rollback();
+
+ delete txn0;
+ delete txn1;
+}
+
+} // namespace rocksdb
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr,
+ "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // ROCKSDB_LITE
1
0
revision-id: 8f278697529b349a951d3b49aa1ae6668044dec9 (v5.8-1035-g8f2786975)
parent(s): 7348e2db12ef63966daa917402abad5102e77cd2
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-03-05 01:21:18 +0300
message:
Add a unit test for range locking
---
CMakeLists.txt | 1 +
Makefile | 1 +
TARGETS | 5 +
src.mk | 1 +
utilities/transactions/range_locking_test.cc | 4858 ++++++++++++++++++++++++++
5 files changed, 4866 insertions(+)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9795083ef..57d5d6180 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1009,6 +1009,7 @@ if(WITH_TESTS)
utilities/transactions/transaction_test.cc
utilities/transactions/write_prepared_transaction_test.cc
utilities/transactions/write_unprepared_transaction_test.cc
+ utilities/transactions/range_locking_test.cc
utilities/ttl/ttl_test.cc
utilities/write_batch_with_index/write_batch_with_index_test.cc
)
diff --git a/Makefile b/Makefile
index 074194b32..9ece4310c 100644
--- a/Makefile
+++ b/Makefile
@@ -537,6 +537,7 @@ TESTS = \
db_universal_compaction_test \
trace_analyzer_test \
repeatable_thread_test \
+ range_locking_test \
PARALLEL_TEST = \
backupable_db_test \
diff --git a/TARGETS b/TARGETS
index 2fff0edbd..e06f38771 100644
--- a/TARGETS
+++ b/TARGETS
@@ -930,6 +930,11 @@ ROCKS_TESTS = [
"db/range_del_aggregator_test.cc",
"serial",
],
+ [
+ "range_locking_test",
+ "utilities/transactions/range_locking_test.cc",
+ "serial",
+ ],
[
"rate_limiter_test",
"util/rate_limiter_test.cc",
diff --git a/src.mk b/src.mk
index f3372258f..de0644cac 100644
--- a/src.mk
+++ b/src.mk
@@ -425,6 +425,7 @@ MAIN_SOURCES = \
utilities/spatialdb/spatial_db_test.cc \
utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \
utilities/transactions/optimistic_transaction_test.cc \
+ utilities/transactions/range_locking_test.cc \
utilities/transactions/transaction_test.cc \
utilities/transactions/write_prepared_transaction_test.cc \
utilities/transactions/write_unprepared_transaction_test.cc \
diff --git a/utilities/transactions/range_locking_test.cc b/utilities/transactions/range_locking_test.cc
new file mode 100644
index 000000000..c759e5e10
--- /dev/null
+++ b/utilities/transactions/range_locking_test.cc
@@ -0,0 +1,4858 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "utilities/transactions/transaction_test.h"
+
+#include <algorithm>
+#include <functional>
+#include <string>
+#include <thread>
+
+#include "db/db_impl.h"
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/perf_context.h"
+#include "rocksdb/utilities/transaction.h"
+#include "rocksdb/utilities/transaction_db.h"
+#include "table/mock_table.h"
+#include "util/fault_injection_test_env.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+#include "util/testharness.h"
+#include "util/testutil.h"
+#include "util/transaction_test_util.h"
+#include "utilities/merge_operators.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+#include "utilities/transactions/pessimistic_transaction_db.h"
+
+#include "port/port.h"
+
+using std::string;
+
+namespace rocksdb {
+
+
+void range_endpoint_convert_same(const rocksdb::Slice &key,
+ std::string *res)
+{
+ res->clear();
+ res->append(key.data(), key.size());
+}
+
+int range_endpoints_compare_default(const char *a, size_t a_len,
+ const char *b, size_t b_len)
+{
+ return Slice(a, a_len).compare(Slice(b, b_len));
+}
+
+class RangeLockingTest : public ::testing::Test {
+ public:
+ TransactionDB* db;
+ std::string dbname;
+ Options options;
+
+ TransactionDBOptions txn_db_options;
+ bool use_stackable_db_;
+
+ RangeLockingTest()
+ : db(nullptr) {
+ options.create_if_missing = true;
+ dbname = test::PerThreadDBPath("transaction_testdb");
+
+ DestroyDB(dbname, options);
+ Status s;
+ s = TransactionDB::Open(options, txn_db_options, dbname, &db);
+ assert(s.ok());
+
+ db->use_range_locking= true;
+ rocksdb::RangeLockMgrControl *mgr= db->get_range_lock_manager();
+ assert(mgr);
+
+ mgr->set_endpoint_cmp_functions(range_endpoint_convert_same,
+ range_endpoints_compare_default);
+ // can also: mgr->set_max_lock_memory(rocksdb_max_lock_memory);
+ }
+
+ ~RangeLockingTest() {
+ delete db;
+ db = nullptr;
+ // This is to skip the assert statement in FaultInjectionTestEnv. There
+ // seems to be a bug in btrfs that the makes readdir return recently
+ // unlink-ed files. By using the default fs we simply ignore errors resulted
+ // from attempting to delete such files in DestroyDB.
+ DestroyDB(dbname, options);
+ }
+};
+
+TEST_F(RangeLockingTest, BasicRangeLocking) {
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+
+ Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ // Get a range lock, check that it inhibits point locks
+ auto s= txn0->GetRangeLock(db->DefaultColumnFamily(),
+ Slice("a"), Slice("c"));
+ ASSERT_EQ(s, Status::OK());
+
+ auto s2= txn1->GetRangeLock(db->DefaultColumnFamily(),
+ Slice("b"), Slice("z"));
+ ASSERT_EQ(s2, Status::Busy());
+
+ //GetForUpdate
+ // Get a point lock, check that it inhibits range locks
+
+ // Check if two range locks inhibit each other
+
+ //ASSERT_OK(txn0->SetName("xid"));
+ //ASSERT_OK(txn0->Prepare());
+ ASSERT_OK(txn0->Commit());
+ txn1->Rollback();
+
+ delete txn0;
+}
+
+#if 0
+TEST_P(TransactionTest, SuccessTest) {
+ ASSERT_OK(db->ResetStats());
+
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+
+ ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
+ ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
+
+ Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
+ ASSERT_TRUE(txn);
+
+ ASSERT_EQ(0, txn->GetNumPuts());
+ ASSERT_LE(0, txn->GetID());
+
+ ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
+ ASSERT_EQ(value, "bar");
+
+ ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));
+
+ ASSERT_EQ(1, txn->GetNumPuts());
+
+ ASSERT_OK(txn->GetForUpdate(read_options, "foo", &value));
+ ASSERT_EQ(value, "bar2");
+
+ ASSERT_OK(txn->Commit());
+
+ ASSERT_OK(db->Get(read_options, "foo", &value));
+ ASSERT_EQ(value, "bar2");
+
+ delete txn;
+}
+
+// This test clarifies the contract of ValidateSnapshot
+TEST_P(TransactionTest, ValidateSnapshotTest) {
+ for (bool with_2pc : {true, false}) {
+ ASSERT_OK(ReOpen());
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+
+ assert(db != nullptr);
+ Transaction* txn1 =
+ db->BeginTransaction(write_options, TransactionOptions());
+ ASSERT_TRUE(txn1);
+ ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
+ if (with_2pc) {
+ ASSERT_OK(txn1->SetName("xid1"));
+ ASSERT_OK(txn1->Prepare());
+ }
+
+ Transaction* txn2 =
+ db->BeginTransaction(write_options, TransactionOptions());
+ ASSERT_TRUE(txn2);
+ txn2->SetSnapshot();
+
+ ASSERT_OK(txn1->Commit());
+ delete txn1;
+
+ auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
+ // Test the simple case where the key is not tracked yet
+ auto trakced_seq = kMaxSequenceNumber;
+ auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
+ &trakced_seq);
+ ASSERT_TRUE(s.IsBusy());
+ delete txn2;
+ }
+}
+
+TEST_P(TransactionTest, WaitingTxn) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ string value;
+ Status s;
+
+ txn_options.lock_timeout = 1;
+ s = db->Put(write_options, Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+
+ /* create second cf */
+ ColumnFamilyHandle* cfa;
+ ColumnFamilyOptions cf_options;
+ s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
+ ASSERT_OK(s);
+ s = db->Put(write_options, cfa, Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ TransactionID id1 = txn1->GetID();
+ ASSERT_TRUE(txn1);
+ ASSERT_TRUE(txn2);
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* /*arg*/) {
+ std::string key;
+ uint32_t cf_id;
+ std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
+ ASSERT_EQ(key, "foo");
+ ASSERT_EQ(wait.size(), 1);
+ ASSERT_EQ(wait[0], id1);
+ ASSERT_EQ(cf_id, 0);
+ });
+
+ get_perf_context()->Reset();
+ // lock key in default cf
+ s = txn1->GetForUpdate(read_options, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+ ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
+
+ // lock key in cfa
+ s = txn1->GetForUpdate(read_options, cfa, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+ ASSERT_EQ(get_perf_context()->key_lock_wait_count, 0);
+
+ auto lock_data = db->GetLockStatusData();
+ // Locked keys exist in both column family.
+ ASSERT_EQ(lock_data.size(), 2);
+
+ auto cf_iterator = lock_data.begin();
+
+ // The iterator points to an unordered_multimap
+ // thus the test can not assume any particular order.
+
+ // Column family is 1 or 0 (cfa).
+ if (cf_iterator->first != 1 && cf_iterator->first != 0) {
+ FAIL();
+ }
+ // The locked key is "foo" and is locked by txn1
+ ASSERT_EQ(cf_iterator->second.key, "foo");
+ ASSERT_EQ(cf_iterator->second.ids.size(), 1);
+ ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
+
+ cf_iterator++;
+
+ // Column family is 0 (default) or 1.
+ if (cf_iterator->first != 1 && cf_iterator->first != 0) {
+ FAIL();
+ }
+ // The locked key is "foo" and is locked by txn1
+ ASSERT_EQ(cf_iterator->second.key, "foo");
+ ASSERT_EQ(cf_iterator->second.ids.size(), 1);
+ ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
+
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ s = txn2->GetForUpdate(read_options, "foo", &value);
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+ ASSERT_EQ(get_perf_context()->key_lock_wait_count, 1);
+ ASSERT_GE(get_perf_context()->key_lock_wait_time, 0);
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ delete cfa;
+ delete txn1;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, SharedLocks) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ Status s;
+
+ txn_options.lock_timeout = 1;
+ s = db->Put(write_options, Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+ ASSERT_TRUE(txn2);
+ ASSERT_TRUE(txn3);
+
+ // Test shared access between txns
+ s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ auto lock_data = db->GetLockStatusData();
+ ASSERT_EQ(lock_data.size(), 1);
+
+ auto cf_iterator = lock_data.begin();
+ ASSERT_EQ(cf_iterator->second.key, "foo");
+
+ // We compare whether the set of txns locking this key is the same. To do
+ // this, we need to sort both vectors so that the comparison is done
+ // correctly.
+ std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(),
+ txn3->GetID()};
+ std::vector<TransactionID> lock_txns = cf_iterator->second.ids;
+ ASSERT_EQ(expected_txns, lock_txns);
+ ASSERT_FALSE(cf_iterator->second.exclusive);
+
+ txn1->Rollback();
+ txn2->Rollback();
+ txn3->Rollback();
+
+ // Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
+ s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn3->GetForUpdate(read_options, "foo", nullptr);
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+
+ txn1->UndoGetForUpdate("foo");
+ s = txn3->GetForUpdate(read_options, "foo", nullptr);
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+
+ txn2->UndoGetForUpdate("foo");
+ s = txn3->GetForUpdate(read_options, "foo", nullptr);
+ ASSERT_OK(s);
+
+ txn1->Rollback();
+ txn2->Rollback();
+ txn3->Rollback();
+
+ // Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
+ s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options, "foo", nullptr);
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+
+ txn1->UndoGetForUpdate("foo");
+ s = txn2->GetForUpdate(read_options, "foo", nullptr);
+ ASSERT_OK(s);
+
+ ASSERT_OK(txn1->Rollback());
+ ASSERT_OK(txn2->Rollback());
+
+ // Test txn1 trying to downgrade its lock.
+ s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+
+ // Should still fail after "downgrading".
+ s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+
+ txn1->Rollback();
+ txn2->Rollback();
+
+ // Test txn1 holding an exclusive lock and txn2 trying to obtain shared
+ // access.
+ s = txn1->GetForUpdate(read_options, "foo", nullptr);
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+
+ txn1->UndoGetForUpdate("foo");
+ s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
+ ASSERT_OK(s);
+
+ delete txn1;
+ delete txn2;
+ delete txn3;
+}
+
+TEST_P(TransactionTest, DeadlockCycleShared) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+
+ txn_options.lock_timeout = 1000000;
+ txn_options.deadlock_detect = true;
+
+ // Set up a wait for chain like this:
+ //
+ // Tn -> T(n*2)
+ // Tn -> T(n*2 + 1)
+ //
+ // So we have:
+ // T1 -> T2 -> T4 ...
+ // | |> T5 ...
+ // |> T3 -> T6 ...
+ // |> T7 ...
+ // up to T31, then T[16 - 31] -> T1.
+ // Note that Tn holds lock on floor(n / 2).
+
+ std::vector<Transaction*> txns(31);
+
+ for (uint32_t i = 0; i < 31; i++) {
+ txns[i] = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txns[i]);
+ auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr,
+ false /* exclusive */);
+ ASSERT_OK(s);
+ }
+
+ std::atomic<uint32_t> checkpoints(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
+ [&](void* /*arg*/) { checkpoints.fetch_add(1); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // We want the leaf transactions to block and hold everyone back.
+ std::vector<port::Thread> threads;
+ for (uint32_t i = 0; i < 15; i++) {
+ std::function<void()> blocking_thread = [&, i] {
+ auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
+ true /* exclusive */);
+ ASSERT_OK(s);
+ txns[i]->Rollback();
+ delete txns[i];
+ };
+ threads.emplace_back(blocking_thread);
+ }
+
+ // Wait until all threads are waiting on each other.
+ while (checkpoints.load() != 15) {
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // Complete the cycle T[16 - 31] -> T1
+ for (uint32_t i = 15; i < 31; i++) {
+ auto s =
+ txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
+ ASSERT_TRUE(s.IsDeadlock());
+
+ // Calculate next buffer len, plateau at 5 when 5 records are inserted.
+ const uint32_t curr_dlock_buffer_len_ =
+ (i - 14 > kInitialMaxDeadlocks) ? kInitialMaxDeadlocks : (i - 14);
+
+ auto dlock_buffer = db->GetDeadlockInfoBuffer();
+ ASSERT_EQ(dlock_buffer.size(), curr_dlock_buffer_len_);
+ auto dlock_entry = dlock_buffer[0].path;
+ ASSERT_EQ(dlock_entry.size(), kInitialMaxDeadlocks);
+ int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
+ int64_t cur_deadlock_time = 0;
+ for (auto const& dl_path_rec : dlock_buffer) {
+ cur_deadlock_time = dl_path_rec.deadlock_time;
+ ASSERT_NE(cur_deadlock_time, 0);
+ ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
+ pre_deadlock_time = cur_deadlock_time;
+ }
+
+ int64_t curr_waiting_key = 0;
+
+ // Offset of each txn id from the root of the shared dlock tree's txn id.
+ int64_t offset_root = dlock_entry[0].m_txn_id - 1;
+ // Offset of the final entry in the dlock path from the root's txn id.
+ TransactionID leaf_id =
+ dlock_entry[dlock_entry.size() - 1].m_txn_id - offset_root;
+
+ for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) {
+ auto dl_node = *it;
+ ASSERT_EQ(dl_node.m_txn_id, offset_root + leaf_id);
+ ASSERT_EQ(dl_node.m_cf_id, 0);
+ ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
+ ASSERT_EQ(dl_node.m_exclusive, true);
+
+ if (curr_waiting_key == 0) {
+ curr_waiting_key = leaf_id;
+ }
+ curr_waiting_key /= 2;
+ leaf_id /= 2;
+ }
+ }
+
+ // Rollback the leaf transaction.
+ for (uint32_t i = 15; i < 31; i++) {
+ txns[i]->Rollback();
+ delete txns[i];
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ // Downsize the buffer and verify the 3 latest deadlocks are preserved.
+ auto dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
+ db->SetDeadlockInfoBufferSize(3);
+ auto dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
+ ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
+
+ for (uint32_t i = 0; i < dlock_buffer_after_resize.size(); i++) {
+ for (uint32_t j = 0; j < dlock_buffer_after_resize[i].path.size(); j++) {
+ ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
+ dlock_buffer_before_resize[i].path[j].m_txn_id);
+ }
+ }
+
+ // Upsize the buffer and verify the 3 latest dealocks are preserved.
+ dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
+ db->SetDeadlockInfoBufferSize(5);
+ dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
+ ASSERT_EQ(dlock_buffer_after_resize.size(), 3);
+
+ for (uint32_t i = 0; i < dlock_buffer_before_resize.size(); i++) {
+ for (uint32_t j = 0; j < dlock_buffer_before_resize[i].path.size(); j++) {
+ ASSERT_EQ(dlock_buffer_after_resize[i].path[j].m_txn_id,
+ dlock_buffer_before_resize[i].path[j].m_txn_id);
+ }
+ }
+
+ // Downsize to 0 and verify the size is consistent.
+ dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
+ db->SetDeadlockInfoBufferSize(0);
+ dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
+ ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
+
+ // Upsize from 0 to verify the size is persistent.
+ dlock_buffer_before_resize = db->GetDeadlockInfoBuffer();
+ db->SetDeadlockInfoBufferSize(3);
+ dlock_buffer_after_resize = db->GetDeadlockInfoBuffer();
+ ASSERT_EQ(dlock_buffer_after_resize.size(), 0);
+
+ // Contrived case of shared lock of cycle size 2 to verify that a shared
+ // lock causing a deadlock is correctly reported as "shared" in the buffer.
+ std::vector<Transaction*> txns_shared(2);
+
+ // Create a cycle of size 2.
+ for (uint32_t i = 0; i < 2; i++) {
+ txns_shared[i] = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txns_shared[i]);
+ auto s = txns_shared[i]->GetForUpdate(read_options, ToString(i), nullptr);
+ ASSERT_OK(s);
+ }
+
+ std::atomic<uint32_t> checkpoints_shared(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
+ [&](void* /*arg*/) { checkpoints_shared.fetch_add(1); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<port::Thread> threads_shared;
+ for (uint32_t i = 0; i < 1; i++) {
+ std::function<void()> blocking_thread = [&, i] {
+ auto s =
+ txns_shared[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
+ ASSERT_OK(s);
+ txns_shared[i]->Rollback();
+ delete txns_shared[i];
+ };
+ threads_shared.emplace_back(blocking_thread);
+ }
+
+ // Wait until all threads are waiting on each other.
+ while (checkpoints_shared.load() != 1) {
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // Complete the cycle T2 -> T1 with a shared lock.
+ auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
+ ASSERT_TRUE(s.IsDeadlock());
+
+ auto dlock_buffer = db->GetDeadlockInfoBuffer();
+
+ // Verify the size of the buffer and the single path.
+ ASSERT_EQ(dlock_buffer.size(), 1);
+ ASSERT_EQ(dlock_buffer[0].path.size(), 2);
+
+ // Verify the exclusivity field of the transactions in the deadlock path.
+ ASSERT_TRUE(dlock_buffer[0].path[0].m_exclusive);
+ ASSERT_FALSE(dlock_buffer[0].path[1].m_exclusive);
+ txns_shared[1]->Rollback();
+ delete txns_shared[1];
+
+ for (auto& t : threads_shared) {
+ t.join();
+ }
+}
+
+TEST_P(TransactionStressTest, DeadlockCycle) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+
+ // offset by 2 from the max depth to test edge case
+ const uint32_t kMaxCycleLength = 52;
+
+ txn_options.lock_timeout = 1000000;
+ txn_options.deadlock_detect = true;
+
+ for (uint32_t len = 2; len < kMaxCycleLength; len++) {
+ // Set up a long wait for chain like this:
+ //
+ // T1 -> T2 -> T3 -> ... -> Tlen
+
+ std::vector<Transaction*> txns(len);
+
+ for (uint32_t i = 0; i < len; i++) {
+ txns[i] = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txns[i]);
+ auto s = txns[i]->GetForUpdate(read_options, ToString(i), nullptr);
+ ASSERT_OK(s);
+ }
+
+ std::atomic<uint32_t> checkpoints(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
+ [&](void* /*arg*/) { checkpoints.fetch_add(1); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // We want the last transaction in the chain to block and hold everyone
+ // back.
+ std::vector<port::Thread> threads;
+ for (uint32_t i = 0; i < len - 1; i++) {
+ std::function<void()> blocking_thread = [&, i] {
+ auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr);
+ ASSERT_OK(s);
+ txns[i]->Rollback();
+ delete txns[i];
+ };
+ threads.emplace_back(blocking_thread);
+ }
+
+ // Wait until all threads are waiting on each other.
+ while (checkpoints.load() != len - 1) {
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // Complete the cycle Tlen -> T1
+ auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
+ ASSERT_TRUE(s.IsDeadlock());
+
+ const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
+ uint32_t curr_waiting_key = 0;
+ TransactionID curr_txn_id = txns[0]->GetID();
+
+ auto dlock_buffer = db->GetDeadlockInfoBuffer();
+ ASSERT_EQ(dlock_buffer.size(), dlock_buffer_size_);
+ uint32_t check_len = len;
+ bool check_limit_flag = false;
+
+ // Special case for a deadlock path that exceeds the maximum depth.
+ if (len > 50) {
+ check_len = 0;
+ check_limit_flag = true;
+ }
+ auto dlock_entry = dlock_buffer[0].path;
+ ASSERT_EQ(dlock_entry.size(), check_len);
+ ASSERT_EQ(dlock_buffer[0].limit_exceeded, check_limit_flag);
+
+ int64_t pre_deadlock_time = dlock_buffer[0].deadlock_time;
+ int64_t cur_deadlock_time = 0;
+ for (auto const& dl_path_rec : dlock_buffer) {
+ cur_deadlock_time = dl_path_rec.deadlock_time;
+ ASSERT_NE(cur_deadlock_time, 0);
+ ASSERT_TRUE(cur_deadlock_time <= pre_deadlock_time);
+ pre_deadlock_time = cur_deadlock_time;
+ }
+
+ // Iterates backwards over path verifying decreasing txn_ids.
+ for (auto it = dlock_entry.rbegin(); it != dlock_entry.rend(); it++) {
+ auto dl_node = *it;
+ ASSERT_EQ(dl_node.m_txn_id, len + curr_txn_id - 1);
+ ASSERT_EQ(dl_node.m_cf_id, 0);
+ ASSERT_EQ(dl_node.m_waiting_key, ToString(curr_waiting_key));
+ ASSERT_EQ(dl_node.m_exclusive, true);
+
+ curr_txn_id--;
+ if (curr_waiting_key == 0) {
+ curr_waiting_key = len;
+ }
+ curr_waiting_key--;
+ }
+
+ // Rollback the last transaction.
+ txns[len - 1]->Rollback();
+ delete txns[len - 1];
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ }
+}
+
+TEST_P(TransactionStressTest, DeadlockStress) {
+ const uint32_t NUM_TXN_THREADS = 10;
+ const uint32_t NUM_KEYS = 100;
+ const uint32_t NUM_ITERS = 10000;
+
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+
+ txn_options.lock_timeout = 1000000;
+ txn_options.deadlock_detect = true;
+ std::vector<std::string> keys;
+
+ for (uint32_t i = 0; i < NUM_KEYS; i++) {
+ db->Put(write_options, Slice(ToString(i)), Slice(""));
+ keys.push_back(ToString(i));
+ }
+
+ size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
+ Random rnd(static_cast<uint32_t>(tid));
+ std::function<void(uint32_t)> stress_thread = [&](uint32_t seed) {
+ std::default_random_engine g(seed);
+
+ Transaction* txn;
+ for (uint32_t i = 0; i < NUM_ITERS; i++) {
+ txn = db->BeginTransaction(write_options, txn_options);
+ auto random_keys = keys;
+ std::shuffle(random_keys.begin(), random_keys.end(), g);
+
+ // Lock keys in random order.
+ for (const auto& k : random_keys) {
+ // Lock mostly for shared access, but exclusive 1/4 of the time.
+ auto s =
+ txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
+ if (!s.ok()) {
+ ASSERT_TRUE(s.IsDeadlock());
+ txn->Rollback();
+ break;
+ }
+ }
+
+ delete txn;
+ }
+ };
+
+ std::vector<port::Thread> threads;
+ for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
+ threads.emplace_back(stress_thread, rnd.Next());
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+TEST_P(TransactionTest, CommitTimeBatchFailTest) {
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+
+ std::string value;
+ Status s;
+
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+
+ ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put("cat", "dog"));
+
+ s = txn1->Put("foo", "bar");
+ ASSERT_OK(s);
+
+ // fails due to non-empty commit-time batch
+ s = txn1->Commit();
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ delete txn1;
+}
+
+TEST_P(TransactionTest, LogMarkLeakTest) {
+ TransactionOptions txn_options;
+ WriteOptions write_options;
+ options.write_buffer_size = 1024;
+ ASSERT_OK(ReOpenNoDelete());
+ assert(db != nullptr);
+ Random rnd(47);
+ std::vector<Transaction*> txns;
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ // At the beginning there should be no log containing prepare data
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+ for (size_t i = 0; i < 100; i++) {
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ ASSERT_OK(txn->SetName("xid" + ToString(i)));
+ ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar")));
+ ASSERT_OK(txn->Prepare());
+ ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+ if (rnd.OneIn(5)) {
+ txns.push_back(txn);
+ } else {
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ }
+ db_impl->TEST_FlushMemTable(true);
+ }
+ for (auto txn : txns) {
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ }
+ // At the end there should be no log left containing prepare data
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+ // Make sure that the underlying data structures are properly truncated and
+ // cause not leak
+ ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0);
+ ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0);
+}
+
+TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
+ for (bool cwb4recovery : {true, false}) {
+ ASSERT_OK(ReOpen());
+ WriteOptions write_options;
+ ReadOptions read_options;
+
+ TransactionOptions txn_options;
+ txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery;
+
+ string value;
+ Status s;
+
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("xid");
+ ASSERT_OK(s);
+
+ ASSERT_EQ(db->GetTransactionByName("xid"), txn);
+
+ // transaction put
+ s = txn->Put(Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+ ASSERT_EQ(1, txn->GetNumPuts());
+
+ // regular db put
+ s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
+ ASSERT_OK(s);
+ ASSERT_EQ(1, txn->GetNumPuts());
+
+ // regular db read
+ db->Get(read_options, "foo2", &value);
+ ASSERT_EQ(value, "bar2");
+
+ // commit time put
+ txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"));
+ txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"));
+
+ // nothing has been prepped yet
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+
+ s = txn->Prepare();
+ ASSERT_OK(s);
+
+ // data not im mem yet
+ s = db->Get(read_options, Slice("foo"), &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = db->Get(read_options, Slice("gtid"), &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ // find trans in list of prepared transactions
+ std::vector<Transaction*> prepared_trans;
+ db->GetAllPreparedTransactions(&prepared_trans);
+ ASSERT_EQ(prepared_trans.size(), 1);
+ ASSERT_EQ(prepared_trans.front()->GetName(), "xid");
+
+ auto log_containing_prep =
+ db_impl->TEST_FindMinLogContainingOutstandingPrep();
+ ASSERT_GT(log_containing_prep, 0);
+
+ // make commit
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ // value is now available
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+
+ if (!cwb4recovery) {
+ s = db->Get(read_options, "gtid", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "dogs");
+
+ s = db->Get(read_options, "gtid2", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "cats");
+ }
+
+ // we already committed
+ s = txn->Commit();
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // no longer is prepared results
+ db->GetAllPreparedTransactions(&prepared_trans);
+ ASSERT_EQ(prepared_trans.size(), 0);
+ ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
+
+ // heap should not care about prepared section anymore
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ // but now our memtable should be referencing the prep section
+ ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
+ ASSERT_EQ(log_containing_prep,
+ db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // In these modes memtable do not ref the prep sections
+ ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ default:
+ assert(false);
+ }
+
+ db_impl->TEST_FlushMemTable(true);
+ // After flush the recoverable state must be visible
+ if (cwb4recovery) {
+ s = db->Get(read_options, "gtid", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "dogs");
+
+ s = db->Get(read_options, "gtid2", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "cats");
+ }
+
+ // after memtable flush we can now relese the log
+ ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
+ ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+
+ delete txn;
+
+ if (cwb4recovery) {
+ // kill and reopen to trigger recovery
+ s = ReOpenNoDelete();
+ ASSERT_OK(s);
+ assert(db != nullptr);
+ s = db->Get(read_options, "gtid", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "dogs");
+
+ s = db->Get(read_options, "gtid2", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "cats");
+ }
+ }
+}
+
+TEST_P(TransactionTest, TwoPhaseNameTest) {
+ Status s;
+
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn3);
+ delete txn3;
+
+ // cant prepare txn without name
+ s = txn1->Prepare();
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // name too short
+ s = txn1->SetName("");
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // name too long
+ s = txn1->SetName(std::string(513, 'x'));
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // valid set name
+ s = txn1->SetName("name1");
+ ASSERT_OK(s);
+
+ // cant have duplicate name
+ s = txn2->SetName("name1");
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // shouldn't be able to prepare
+ s = txn2->Prepare();
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // valid name set
+ s = txn2->SetName("name2");
+ ASSERT_OK(s);
+
+ // cant reset name
+ s = txn2->SetName("name3");
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ ASSERT_EQ(txn1->GetName(), "name1");
+ ASSERT_EQ(txn2->GetName(), "name2");
+
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+
+ // can't rename after prepare
+ s = txn1->SetName("name4");
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ txn1->Rollback();
+ txn2->Rollback();
+ delete txn1;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
+ for (bool cwb4recovery : {true, false}) {
+ for (bool test_with_empty_wal : {true, false}) {
+ if (!cwb4recovery && test_with_empty_wal) {
+ continue;
+ }
+ ASSERT_OK(ReOpen());
+ Status s;
+ std::string value;
+
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ txn_options.use_only_the_last_commit_time_batch_for_recovery =
+ cwb4recovery;
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ s = txn1->SetName("joe");
+ ASSERT_OK(s);
+
+ s = txn2->SetName("bob");
+ ASSERT_OK(s);
+
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ delete txn1;
+
+ txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"));
+
+ s = txn2->Prepare();
+ ASSERT_OK(s);
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ delete txn2;
+ if (!cwb4recovery) {
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+ } else {
+ if (test_with_empty_wal) {
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ db_impl->TEST_FlushMemTable(true);
+ // After flush the state must be visible
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+ }
+ db->FlushWAL(true);
+ // kill and reopen to trigger recovery
+ s = ReOpenNoDelete();
+ ASSERT_OK(s);
+ assert(db != nullptr);
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+ }
+ }
+ }
+}
+
+TEST_P(TransactionStressTest, TwoPhaseExpirationTest) {
+ Status s;
+
+ WriteOptions write_options;
+ TransactionOptions txn_options;
+ txn_options.expiration = 500; // 500ms
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+ ASSERT_TRUE(txn1);
+
+ s = txn1->SetName("joe");
+ ASSERT_OK(s);
+ s = txn2->SetName("bob");
+ ASSERT_OK(s);
+
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = txn2->Prepare();
+ ASSERT_EQ(s, Status::Expired());
+
+ delete txn1;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, TwoPhaseRollbackTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+
+ TransactionOptions txn_options;
+
+ std::string value;
+ Status s;
+
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("xid");
+ ASSERT_OK(s);
+
+ // transaction put
+ s = txn->Put(Slice("tfoo"), Slice("tbar"));
+ ASSERT_OK(s);
+
+ // value is readable form txn
+ s = txn->Get(read_options, Slice("tfoo"), &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "tbar");
+
+ // issue rollback
+ s = txn->Rollback();
+ ASSERT_OK(s);
+
+ // value is nolonger readable
+ s = txn->Get(read_options, Slice("tfoo"), &value);
+ ASSERT_TRUE(s.IsNotFound());
+ ASSERT_EQ(txn->GetNumPuts(), 0);
+
+ // put new txn values
+ s = txn->Put(Slice("tfoo2"), Slice("tbar2"));
+ ASSERT_OK(s);
+
+ // new value is readable from txn
+ s = txn->Get(read_options, Slice("tfoo2"), &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "tbar2");
+
+ s = txn->Prepare();
+ ASSERT_OK(s);
+
+ // flush to next wal
+ s = db->Put(write_options, Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+ db_impl->TEST_FlushMemTable(true);
+
+ // issue rollback (marker written to WAL)
+ s = txn->Rollback();
+ ASSERT_OK(s);
+
+ // value is nolonger readable
+ s = txn->Get(read_options, Slice("tfoo2"), &value);
+ ASSERT_TRUE(s.IsNotFound());
+ ASSERT_EQ(txn->GetNumPuts(), 0);
+
+ // make commit
+ s = txn->Commit();
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // try rollback again
+ s = txn->Rollback();
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
+ WriteOptions write_options;
+ write_options.sync = true;
+ write_options.disableWAL = false;
+ ReadOptions read_options;
+
+ TransactionOptions txn_options;
+
+ std::string value;
+ Status s;
+
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("xid");
+ ASSERT_OK(s);
+
+ ASSERT_EQ(db->GetTransactionByName("xid"), txn);
+
+ // transaction put
+ s = txn->Put(Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+ ASSERT_EQ(1, txn->GetNumPuts());
+
+ // txn read
+ s = txn->Get(read_options, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+
+ // regular db put
+ s = db->Put(write_options, Slice("foo2"), Slice("bar2"));
+ ASSERT_OK(s);
+ ASSERT_EQ(1, txn->GetNumPuts());
+
+ db_impl->TEST_FlushMemTable(true);
+
+ // regular db read
+ db->Get(read_options, "foo2", &value);
+ ASSERT_EQ(value, "bar2");
+
+ // nothing has been prepped yet
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+
+ // prepare
+ s = txn->Prepare();
+ ASSERT_OK(s);
+
+ // still not available to db
+ s = db->Get(read_options, Slice("foo"), &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ db->FlushWAL(false);
+ delete txn;
+ // kill and reopen
+ reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
+ s = ReOpenNoDelete();
+ ASSERT_OK(s);
+ assert(db != nullptr);
+ db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+ // find trans in list of prepared transactions
+ std::vector<Transaction*> prepared_trans;
+ db->GetAllPreparedTransactions(&prepared_trans);
+ ASSERT_EQ(prepared_trans.size(), 1);
+
+ txn = prepared_trans.front();
+ ASSERT_TRUE(txn);
+ ASSERT_EQ(txn->GetName(), "xid");
+ ASSERT_EQ(db->GetTransactionByName("xid"), txn);
+
+ // log has been marked
+ auto log_containing_prep =
+ db_impl->TEST_FindMinLogContainingOutstandingPrep();
+ ASSERT_GT(log_containing_prep, 0);
+
+ // value is readable from txn
+ s = txn->Get(read_options, "foo", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar");
+
+ // make commit
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ // value is now available
+ db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "bar");
+
+ // we already committed
+ s = txn->Commit();
+ ASSERT_EQ(s, Status::InvalidArgument());
+
+ // no longer is prepared results
+ prepared_trans.clear();
+ db->GetAllPreparedTransactions(&prepared_trans);
+ ASSERT_EQ(prepared_trans.size(), 0);
+
+ // transaction should no longer be visible
+ ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
+
+ // heap should not care about prepared section anymore
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ // but now our memtable should be referencing the prep section
+ ASSERT_EQ(log_containing_prep,
+ db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
+
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // In these modes memtable do not ref the prep sections
+ ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ default:
+ assert(false);
+ }
+
+ // Add a dummy record to memtable before a flush. Otherwise, the
+ // memtable will be empty and flush will be skipped.
+ s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
+ ASSERT_OK(s);
+
+ db_impl->TEST_FlushMemTable(true);
+
+ // after memtable flush we can now release the log
+ ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
+ ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+
+ delete txn;
+
+ // deleting transaction should unregister transaction
+ ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
+}
+
+// TODO this test needs to be updated with serial commits
+TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) {
+ // mix transaction writes and regular writes
+ const uint32_t NUM_TXN_THREADS = 50;
+ std::atomic<uint32_t> txn_thread_num(0);
+
+ std::function<void()> txn_write_thread = [&]() {
+ uint32_t id = txn_thread_num.fetch_add(1);
+
+ WriteOptions write_options;
+ write_options.sync = true;
+ write_options.disableWAL = false;
+ TransactionOptions txn_options;
+ txn_options.lock_timeout = 1000000;
+ if (id % 2 == 0) {
+ txn_options.expiration = 1000000;
+ }
+ TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(id)));
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ ASSERT_OK(txn->SetName(name));
+ for (int i = 0; i < 10; i++) {
+ std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
+ ASSERT_OK(txn->Put(key, "val"));
+ }
+ ASSERT_OK(txn->Prepare());
+ ASSERT_OK(txn->Commit());
+ delete txn;
+ };
+
+ // assure that all thread are in the same write group
+ std::atomic<uint32_t> t_wait_on_prepare(0);
+ std::atomic<uint32_t> t_wait_on_commit(0);
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
+ auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
+
+ if (writer->ShouldWriteToWAL()) {
+ t_wait_on_prepare.fetch_add(1);
+ // wait for friends
+ while (t_wait_on_prepare.load() < NUM_TXN_THREADS) {
+ env->SleepForMicroseconds(10);
+ }
+ } else if (writer->ShouldWriteToMemtable()) {
+ t_wait_on_commit.fetch_add(1);
+ // wait for friends
+ while (t_wait_on_commit.load() < NUM_TXN_THREADS) {
+ env->SleepForMicroseconds(10);
+ }
+ } else {
+ FAIL();
+ }
+ });
+
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // do all the writes
+ std::vector<port::Thread> threads;
+ for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
+ threads.emplace_back(txn_write_thread);
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+ for (uint32_t t = 0; t < NUM_TXN_THREADS; t++) {
+ TransactionName name("xid_" + std::string(1, 'A' + static_cast<char>(t)));
+ for (int i = 0; i < 10; i++) {
+ std::string key(name + "_" + std::string(1, static_cast<char>('A' + i)));
+ s = db->Get(read_options, key, &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "val");
+ }
+ }
+}
+
+TEST_P(TransactionStressTest, TwoPhaseLongPrepareTest) {
+ WriteOptions write_options;
+ write_options.sync = true;
+ write_options.disableWAL = false;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("bob");
+ ASSERT_OK(s);
+
+ // transaction put
+ s = txn->Put(Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+
+ // prepare
+ s = txn->Prepare();
+ ASSERT_OK(s);
+
+ delete txn;
+
+ for (int i = 0; i < 1000; i++) {
+ std::string key(i, 'k');
+ std::string val(1000, 'v');
+ assert(db != nullptr);
+ s = db->Put(write_options, key, val);
+ ASSERT_OK(s);
+
+ if (i % 29 == 0) {
+ // crash
+ env->SetFilesystemActive(false);
+ reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
+ ReOpenNoDelete();
+ } else if (i % 37 == 0) {
+ // close
+ ReOpenNoDelete();
+ }
+ }
+
+ // commit old txn
+ txn = db->GetTransactionByName("bob");
+ ASSERT_TRUE(txn);
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ // verify data txn data
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_EQ(value, "bar");
+
+ // verify non txn data
+ for (int i = 0; i < 1000; i++) {
+ std::string key(i, 'k');
+ std::string val(1000, 'v');
+ s = db->Get(read_options, key, &value);
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_EQ(value, val);
+ }
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, TwoPhaseSequenceTest) {
+ WriteOptions write_options;
+ write_options.sync = true;
+ write_options.disableWAL = false;
+ ReadOptions read_options;
+
+ TransactionOptions txn_options;
+
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("xid");
+ ASSERT_OK(s);
+
+ // transaction put
+ s = txn->Put(Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+ s = txn->Put(Slice("foo2"), Slice("bar2"));
+ ASSERT_OK(s);
+ s = txn->Put(Slice("foo3"), Slice("bar3"));
+ ASSERT_OK(s);
+ s = txn->Put(Slice("foo4"), Slice("bar4"));
+ ASSERT_OK(s);
+
+ // prepare
+ s = txn->Prepare();
+ ASSERT_OK(s);
+
+ // make commit
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ delete txn;
+
+ // kill and reopen
+ env->SetFilesystemActive(false);
+ ReOpenNoDelete();
+ assert(db != nullptr);
+
+ // value is now available
+ s = db->Get(read_options, "foo4", &value);
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_EQ(value, "bar4");
+}
+
+TEST_P(TransactionTest, TwoPhaseDoubleRecoveryTest) {
+ WriteOptions write_options;
+ write_options.sync = true;
+ write_options.disableWAL = false;
+ ReadOptions read_options;
+
+ TransactionOptions txn_options;
+
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("a");
+ ASSERT_OK(s);
+
+ // transaction put
+ s = txn->Put(Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
+
+ // prepare
+ s = txn->Prepare();
+ ASSERT_OK(s);
+
+ delete txn;
+
+ // kill and reopen
+ env->SetFilesystemActive(false);
+ reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
+ ReOpenNoDelete();
+
+ // commit old txn
+ txn = db->GetTransactionByName("a");
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_EQ(value, "bar");
+
+ delete txn;
+
+ txn = db->BeginTransaction(write_options, txn_options);
+ s = txn->SetName("b");
+ ASSERT_OK(s);
+
+ s = txn->Put(Slice("foo2"), Slice("bar2"));
+ ASSERT_OK(s);
+
+ s = txn->Prepare();
+ ASSERT_OK(s);
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ delete txn;
+
+ // kill and reopen
+ env->SetFilesystemActive(false);
+ ReOpenNoDelete();
+ assert(db != nullptr);
+
+ // value is now available
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_EQ(value, "bar");
+
+ s = db->Get(read_options, "foo2", &value);
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_EQ(value, "bar2");
+}
+
+TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+ Status s;
+ std::string v;
+ ColumnFamilyHandle *cfa, *cfb;
+
+ // Create 2 new column families
+ ColumnFamilyOptions cf_options;
+ s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
+ ASSERT_OK(s);
+ s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
+ ASSERT_OK(s);
+
+ WriteOptions wopts;
+ wopts.disableWAL = false;
+ wopts.sync = true;
+
+ TransactionOptions topts1;
+ Transaction* txn1 = db->BeginTransaction(wopts, topts1);
+ s = txn1->SetName("xid1");
+ ASSERT_OK(s);
+
+ TransactionOptions topts2;
+ Transaction* txn2 = db->BeginTransaction(wopts, topts2);
+ s = txn2->SetName("xid2");
+ ASSERT_OK(s);
+
+ // transaction put in two column families
+ s = txn1->Put(cfa, "ka1", "va1");
+ ASSERT_OK(s);
+
+ // transaction put in two column families
+ s = txn2->Put(cfa, "ka2", "va2");
+ ASSERT_OK(s);
+ s = txn2->Put(cfb, "kb2", "vb2");
+ ASSERT_OK(s);
+
+ // write prep section to wal
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+
+ // our log should be in the heap
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
+ txn1->GetLogNumber());
+ ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
+
+ // flush default cf to crate new log
+ s = db->Put(wopts, "foo", "bar");
+ ASSERT_OK(s);
+ s = db_impl->TEST_FlushMemTable(true);
+ ASSERT_OK(s);
+
+ // make sure we are on a new log
+ ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber());
+
+ // put txn2 prep section in this log
+ s = txn2->Prepare();
+ ASSERT_OK(s);
+ ASSERT_EQ(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
+
+ // heap should still see first log
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
+ txn1->GetLogNumber());
+
+ // commit txn1
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ // heap should now show txn2s log
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
+ txn2->GetLogNumber());
+
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ // we should see txn1s log refernced by the memtables
+ ASSERT_EQ(txn1->GetLogNumber(),
+ db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // In these modes memtable do not ref the prep sections
+ ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ default:
+ assert(false);
+ }
+
+ // flush default cf to crate new log
+ s = db->Put(wopts, "foo", "bar2");
+ ASSERT_OK(s);
+ s = db_impl->TEST_FlushMemTable(true);
+ ASSERT_OK(s);
+
+ // make sure we are on a new log
+ ASSERT_GT(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber());
+
+ // commit txn2
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ // heap should not show any logs
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ // should show the first txn log
+ ASSERT_EQ(txn1->GetLogNumber(),
+ db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // In these modes memtable do not ref the prep sections
+ ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ default:
+ assert(false);
+ }
+
+ // flush only cfa memtable
+ s = db_impl->TEST_FlushMemTable(true, false, cfa);
+ ASSERT_OK(s);
+
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ // should show the first txn log
+ ASSERT_EQ(txn2->GetLogNumber(),
+ db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // In these modes memtable do not ref the prep sections
+ ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
+ break;
+ default:
+ assert(false);
+ }
+
+ // flush only cfb memtable
+ s = db_impl->TEST_FlushMemTable(true, false, cfb);
+ ASSERT_OK(s);
+
+ // should show not dependency on logs
+ ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
+
+ delete txn1;
+ delete txn2;
+ delete cfa;
+ delete cfb;
+}
+
+TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+ Status s;
+ ColumnFamilyHandle *cfa, *cfb;
+
+ ColumnFamilyOptions cf_options;
+ s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
+ ASSERT_OK(s);
+ s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
+ ASSERT_OK(s);
+
+ WriteOptions wopts;
+ wopts.disableWAL = false;
+ wopts.sync = true;
+
+ auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
+ auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
+
+ TransactionOptions topts1;
+ Transaction* txn1 = db->BeginTransaction(wopts, topts1);
+ s = txn1->SetName("xid1");
+ ASSERT_OK(s);
+ s = txn1->Put(cfa, "boys", "girls1");
+ ASSERT_OK(s);
+
+ Transaction* txn2 = db->BeginTransaction(wopts, topts1);
+ s = txn2->SetName("xid2");
+ ASSERT_OK(s);
+ s = txn2->Put(cfb, "up", "down1");
+ ASSERT_OK(s);
+
+ // prepre transaction in LOG A
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+
+ // prepre transaction in LOG A
+ s = txn2->Prepare();
+ ASSERT_OK(s);
+
+ // regular put so that mem table can actually be flushed for log rolling
+ s = db->Put(wopts, "cats", "dogs1");
+ ASSERT_OK(s);
+
+ auto prepare_log_no = txn1->GetLogNumber();
+
+ // roll to LOG B
+ s = db_impl->TEST_FlushMemTable(true);
+ ASSERT_OK(s);
+
+ // now we pause background work so that
+ // imm()s are not flushed before we can check their status
+ s = db_impl->PauseBackgroundWork();
+ ASSERT_OK(s);
+
+ ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ // This cf is empty and should ref the latest log
+ ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
+ ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // This cf is not flushed yet and should ref the log that has its data
+ ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
+ break;
+ default:
+ assert(false);
+ }
+ ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
+ prepare_log_no);
+ ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
+
+ // commit in LOG B
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
+ prepare_log_no);
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // In these modes memtable do not ref the prep sections
+ ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
+ break;
+ default:
+ assert(false);
+ }
+
+ ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
+
+ // request a flush for all column families such that the earliest
+ // alive log file can be killed
+ db_impl->TEST_SwitchWAL();
+ // log cannot be flushed because txn2 has not been commited
+ ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
+ ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
+
+ // assert that cfa has a flush requested
+ ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
+
+ switch (txn_db_options.write_policy) {
+ case WRITE_COMMITTED:
+ // cfb should not be flushed becuse it has no data from LOG A
+ ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
+ break;
+ case WRITE_PREPARED:
+ case WRITE_UNPREPARED:
+ // cfb should be flushed becuse it has prepared data from LOG A
+ ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
+ break;
+ default:
+ assert(false);
+ }
+
+ // cfb now has data from LOG A
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ db_impl->TEST_SwitchWAL();
+ ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
+
+ // we should see that cfb now has a flush requested
+ ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
+
+ // all data in LOG A resides in a memtable that has been
+ // requested for a flush
+ ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
+
+ delete txn1;
+ delete txn2;
+ delete cfa;
+ delete cfb;
+}
+/*
+ * 1) use prepare to keep first log around to determine starting sequence
+ * during recovery.
+ * 2) insert many values, skipping wal, to increase seqid.
+ * 3) insert final value into wal
+ * 4) recover and see that final value was properly recovered - not
+ * hidden behind improperly summed sequence ids
+ */
+TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+ WriteOptions wal_on, wal_off;
+ wal_on.sync = true;
+ wal_on.disableWAL = false;
+ wal_off.disableWAL = true;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+
+ std::string value;
+ Status s;
+
+ Transaction* txn1 = db->BeginTransaction(wal_on, txn_options);
+
+ s = txn1->SetName("1");
+ ASSERT_OK(s);
+
+ s = db->Put(wal_on, "first", "first");
+ ASSERT_OK(s);
+
+ s = txn1->Put(Slice("dummy"), Slice("dummy"));
+ ASSERT_OK(s);
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+
+ s = db->Put(wal_off, "cats", "dogs1");
+ ASSERT_OK(s);
+ s = db->Put(wal_off, "cats", "dogs2");
+ ASSERT_OK(s);
+ s = db->Put(wal_off, "cats", "dogs3");
+ ASSERT_OK(s);
+
+ s = db_impl->TEST_FlushMemTable(true);
+ ASSERT_OK(s);
+
+ s = db->Put(wal_on, "cats", "dogs4");
+ ASSERT_OK(s);
+
+ db->FlushWAL(false);
+
+ // kill and reopen
+ env->SetFilesystemActive(false);
+ reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
+ ReOpenNoDelete();
+ assert(db != nullptr);
+
+ s = db->Get(read_options, "first", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "first");
+
+ s = db->Get(read_options, "cats", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "dogs4");
+}
+
+TEST_P(TransactionTest, FirstWriteTest) {
+ WriteOptions write_options;
+
+ // Test conflict checking against the very first write to a db.
+ // The transaction's snapshot will have seq 1 and the following write
+ // will have sequence 1.
+ Status s = db->Put(write_options, "A", "a");
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ txn->SetSnapshot();
+
+ ASSERT_OK(s);
+
+ s = txn->Put("A", "b");
+ ASSERT_OK(s);
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, FirstWriteTest2) {
+ WriteOptions write_options;
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ txn->SetSnapshot();
+
+ // Test conflict checking against the very first write to a db.
+ // The transaction's snapshot is a seq 0 while the following write
+ // will have sequence 1.
+ Status s = db->Put(write_options, "A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Put("A", "b");
+ ASSERT_TRUE(s.IsBusy());
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, WriteOptionsTest) {
+ WriteOptions write_options;
+ write_options.sync = true;
+ write_options.disableWAL = true;
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ ASSERT_TRUE(txn->GetWriteOptions()->sync);
+
+ write_options.sync = false;
+ txn->SetWriteOptions(write_options);
+ ASSERT_FALSE(txn->GetWriteOptions()->sync);
+ ASSERT_TRUE(txn->GetWriteOptions()->disableWAL);
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, WriteConflictTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ string value;
+ Status s;
+
+ db->Put(write_options, "foo", "A");
+ db->Put(write_options, "foo2", "B");
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ s = txn->Put("foo", "A2");
+ ASSERT_OK(s);
+
+ s = txn->Put("foo2", "B2");
+ ASSERT_OK(s);
+
+ // This Put outside of a transaction will conflict with the previous write
+ s = db->Put(write_options, "foo", "xxx");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "A");
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "A2");
+ db->Get(read_options, "foo2", &value);
+ ASSERT_EQ(value, "B2");
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, WriteConflictTest2) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ std::string value;
+ Status s;
+
+ db->Put(write_options, "foo", "bar");
+
+ txn_options.set_snapshot = true;
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn);
+
+ // This Put outside of a transaction will conflict with a later write
+ s = db->Put(write_options, "foo", "barz");
+ ASSERT_OK(s);
+
+ s = txn->Put("foo2", "X");
+ ASSERT_OK(s);
+
+ s = txn->Put("foo",
+ "bar2"); // Conflicts with write done after snapshot taken
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn->Put("foo3", "Y");
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "barz");
+
+ ASSERT_EQ(2, txn->GetNumKeys());
+
+ s = txn->Commit();
+ ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3
+
+ // Verify that transaction wrote foo2 and foo3 but not foo
+ db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "barz");
+
+ db->Get(read_options, "foo2", &value);
+ ASSERT_EQ(value, "X");
+
+ db->Get(read_options, "foo3", &value);
+ ASSERT_EQ(value, "Y");
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, ReadConflictTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ TransactionOptions txn_options;
+ std::string value;
+ Status s;
+
+ db->Put(write_options, "foo", "bar");
+ db->Put(write_options, "foo2", "bar");
+
+ txn_options.set_snapshot = true;
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn);
+
+ txn->SetSnapshot();
+ snapshot_read_options.snapshot = txn->GetSnapshot();
+
+ txn->GetForUpdate(snapshot_read_options, "foo", &value);
+ ASSERT_EQ(value, "bar");
+
+ // This Put outside of a transaction will conflict with the previous read
+ s = db->Put(write_options, "foo", "barz");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "bar");
+
+ s = txn->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "bar");
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, TxnOnlyTest) {
+ // Test to make sure transactions work when there are no other writes in an
+ // empty db.
+
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ s = txn->Put("x", "y");
+ ASSERT_OK(s);
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, FlushTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ std::string value;
+ Status s;
+
+ db->Put(write_options, Slice("foo"), Slice("bar"));
+ db->Put(write_options, Slice("foo2"), Slice("bar"));
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ snapshot_read_options.snapshot = txn->GetSnapshot();
+
+ txn->GetForUpdate(snapshot_read_options, "foo", &value);
+ ASSERT_EQ(value, "bar");
+
+ s = txn->Put(Slice("foo"), Slice("bar2"));
+ ASSERT_OK(s);
+
+ txn->GetForUpdate(snapshot_read_options, "foo", &value);
+ ASSERT_EQ(value, "bar2");
+
+ // Put a random key so we have a memtable to flush
+ s = db->Put(write_options, "dummy", "dummy");
+ ASSERT_OK(s);
+
+ // force a memtable flush
+ FlushOptions flush_ops;
+ db->Flush(flush_ops);
+
+ s = txn->Commit();
+ // txn should commit since the flushed table is still in MemtableList History
+ ASSERT_OK(s);
+
+ db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "bar2");
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, FlushTest2) {
+ const size_t num_tests = 3;
+
+ for (size_t n = 0; n < num_tests; n++) {
+ // Test different table factories
+ switch (n) {
+ case 0:
+ break;
+ case 1:
+ options.table_factory.reset(new mock::MockTableFactory());
+ break;
+ case 2: {
+ PlainTableOptions pt_opts;
+ pt_opts.hash_table_ratio = 0;
+ options.table_factory.reset(NewPlainTableFactory(pt_opts));
+ break;
+ }
+ }
+
+ Status s = ReOpen();
+ ASSERT_OK(s);
+ assert(db != nullptr);
+
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ TransactionOptions txn_options;
+ string value;
+
+ DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
+
+ db->Put(write_options, Slice("foo"), Slice("bar"));
+ db->Put(write_options, Slice("foo2"), Slice("bar2"));
+ db->Put(write_options, Slice("foo3"), Slice("bar3"));
+
+ txn_options.set_snapshot = true;
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn);
+
+ snapshot_read_options.snapshot = txn->GetSnapshot();
+
+ txn->GetForUpdate(snapshot_read_options, "foo", &value);
+ ASSERT_EQ(value, "bar");
+
+ s = txn->Put(Slice("foo"), Slice("bar2"));
+ ASSERT_OK(s);
+
+ txn->GetForUpdate(snapshot_read_options, "foo", &value);
+ ASSERT_EQ(value, "bar2");
+ // verify foo is locked by txn
+ s = db->Delete(write_options, "foo");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = db->Put(write_options, "Z", "z");
+ ASSERT_OK(s);
+ s = db->Put(write_options, "dummy", "dummy");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "S", "s");
+ ASSERT_OK(s);
+ s = db->SingleDelete(write_options, "S");
+ ASSERT_OK(s);
+
+ s = txn->Delete("S");
+ // Should fail after encountering a write to S in memtable
+ ASSERT_TRUE(s.IsBusy());
+
+ // force a memtable flush
+ s = db_impl->TEST_FlushMemTable(true);
+ ASSERT_OK(s);
+
+ // Put a random key so we have a MemTable to flush
+ s = db->Put(write_options, "dummy", "dummy2");
+ ASSERT_OK(s);
+
+ // force a memtable flush
+ ASSERT_OK(db_impl->TEST_FlushMemTable(true));
+
+ s = db->Put(write_options, "dummy", "dummy3");
+ ASSERT_OK(s);
+
+ // force a memtable flush
+ // Since our test db has max_write_buffer_number=2, this flush will cause
+ // the first memtable to get purged from the MemtableList history.
+ ASSERT_OK(db_impl->TEST_FlushMemTable(true));
+
+ s = txn->Put("X", "Y");
+ // Should succeed after verifying there is no write to X in SST file
+ ASSERT_OK(s);
+
+ s = txn->Put("Z", "zz");
+ // Should fail after encountering a write to Z in SST file
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn->GetForUpdate(read_options, "foo2", &value);
+ // should succeed since key was written before txn started
+ ASSERT_OK(s);
+ // verify foo2 is locked by txn
+ s = db->Delete(write_options, "foo2");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn->Delete("S");
+ // Should fail after encountering a write to S in SST file
+ ASSERT_TRUE(s.IsBusy());
+
+ // Write a bunch of keys to db to force a compaction
+ Random rnd(47);
+ for (int i = 0; i < 1000; i++) {
+ s = db->Put(write_options, std::to_string(i),
+ test::CompressibleString(&rnd, 0.8, 100, &value));
+ ASSERT_OK(s);
+ }
+
+ s = txn->Put("X", "yy");
+ // Should succeed after verifying there is no write to X in SST file
+ ASSERT_OK(s);
+
+ s = txn->Put("Z", "zzz");
+ // Should fail after encountering a write to Z in SST file
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn->Delete("S");
+ // Should fail after encountering a write to S in SST file
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn->GetForUpdate(read_options, "foo3", &value);
+ // should succeed since key was written before txn started
+ ASSERT_OK(s);
+ // verify foo3 is locked by txn
+ s = db->Delete(write_options, "foo3");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ db_impl->TEST_WaitForCompact();
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ // Transaction should only write the keys that succeeded.
+ s = db->Get(read_options, "foo", &value);
+ ASSERT_EQ(value, "bar2");
+
+ s = db->Get(read_options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("yy", value);
+
+ s = db->Get(read_options, "Z", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("z", value);
+
+ delete txn;
+ }
+}
+
+TEST_P(TransactionTest, NoSnapshotTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ db->Put(write_options, "AAA", "bar");
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ // Modify key after transaction start
+ db->Put(write_options, "AAA", "bar1");
+
+ // Read and write without a snap
+ txn->GetForUpdate(read_options, "AAA", &value);
+ ASSERT_EQ(value, "bar1");
+ s = txn->Put("AAA", "bar2");
+ ASSERT_OK(s);
+
+ // Should commit since read/write was done after data changed
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ txn->GetForUpdate(read_options, "AAA", &value);
+ ASSERT_EQ(value, "bar2");
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, MultipleSnapshotTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ std::string value;
+ Status s;
+
+ ASSERT_OK(db->Put(write_options, "AAA", "bar"));
+ ASSERT_OK(db->Put(write_options, "BBB", "bar"));
+ ASSERT_OK(db->Put(write_options, "CCC", "bar"));
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ db->Put(write_options, "AAA", "bar1");
+
+ // Read and write without a snapshot
+ ASSERT_OK(txn->GetForUpdate(read_options, "AAA", &value));
+ ASSERT_EQ(value, "bar1");
+ s = txn->Put("AAA", "bar2");
+ ASSERT_OK(s);
+
+ // Modify BBB before snapshot is taken
+ ASSERT_OK(db->Put(write_options, "BBB", "bar1"));
+
+ txn->SetSnapshot();
+ snapshot_read_options.snapshot = txn->GetSnapshot();
+
+ // Read and write with snapshot
+ ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "BBB", &value));
+ ASSERT_EQ(value, "bar1");
+ s = txn->Put("BBB", "bar2");
+ ASSERT_OK(s);
+
+ ASSERT_OK(db->Put(write_options, "CCC", "bar1"));
+
+ // Set a new snapshot
+ txn->SetSnapshot();
+ snapshot_read_options.snapshot = txn->GetSnapshot();
+
+ // Read and write with snapshot
+ txn->GetForUpdate(snapshot_read_options, "CCC", &value);
+ ASSERT_EQ(value, "bar1");
+ s = txn->Put("CCC", "bar2");
+ ASSERT_OK(s);
+
+ s = txn->GetForUpdate(read_options, "AAA", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar2");
+ s = txn->GetForUpdate(read_options, "BBB", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar2");
+ s = txn->GetForUpdate(read_options, "CCC", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar2");
+
+ s = db->Get(read_options, "AAA", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar1");
+ s = db->Get(read_options, "BBB", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar1");
+ s = db->Get(read_options, "CCC", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar1");
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "AAA", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar2");
+ s = db->Get(read_options, "BBB", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar2");
+ s = db->Get(read_options, "CCC", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "bar2");
+
+ // verify that we track multiple writes to the same key at different snapshots
+ delete txn;
+ txn = db->BeginTransaction(write_options);
+
+ // Potentially conflicting writes
+ db->Put(write_options, "ZZZ", "zzz");
+ db->Put(write_options, "XXX", "xxx");
+
+ txn->SetSnapshot();
+
+ TransactionOptions txn_options;
+ txn_options.set_snapshot = true;
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ txn2->SetSnapshot();
+
+ // This should not conflict in txn since the snapshot is later than the
+ // previous write (spoiler alert: it will later conflict with txn2).
+ s = txn->Put("ZZZ", "zzzz");
+ ASSERT_OK(s);
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ delete txn;
+
+ // This will conflict since the snapshot is earlier than another write to ZZZ
+ s = txn2->Put("ZZZ", "xxxxx");
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "ZZZ", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "zzzz");
+
+ delete txn2;
+}
+
+TEST_P(TransactionTest, ColumnFamiliesTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ TransactionOptions txn_options;
+ string value;
+ Status s;
+
+ ColumnFamilyHandle *cfa, *cfb;
+ ColumnFamilyOptions cf_options;
+
+ // Create 2 new column families
+ s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
+ ASSERT_OK(s);
+ s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
+ ASSERT_OK(s);
+
+ delete cfa;
+ delete cfb;
+ delete db;
+ db = nullptr;
+
+ // open DB with three column families
+ std::vector<ColumnFamilyDescriptor> column_families;
+ // have to open default column family
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
+ // open the new column families
+ column_families.push_back(
+ ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
+ column_families.push_back(
+ ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
+
+ std::vector<ColumnFamilyHandle*> handles;
+
+ s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
+ &handles, &db);
+ assert(db != nullptr);
+ ASSERT_OK(s);
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ txn->SetSnapshot();
+ snapshot_read_options.snapshot = txn->GetSnapshot();
+
+ txn_options.set_snapshot = true;
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ // Write some data to the db
+ WriteBatch batch;
+ batch.Put("foo", "foo");
+ batch.Put(handles[1], "AAA", "bar");
+ batch.Put(handles[1], "AAAZZZ", "bar");
+ s = db->Write(write_options, &batch);
+ ASSERT_OK(s);
+ db->Delete(write_options, handles[1], "AAAZZZ");
+
+ // These keys do not conflict with existing writes since they're in
+ // different column families
+ s = txn->Delete("AAA");
+ ASSERT_OK(s);
+ s = txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ Slice key_slice("AAAZZZ");
+ Slice value_slices[2] = {Slice("bar"), Slice("bar")};
+ s = txn->Put(handles[2], SliceParts(&key_slice, 1),
+ SliceParts(value_slices, 2));
+ ASSERT_OK(s);
+ ASSERT_EQ(3, txn->GetNumKeys());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ s = db->Get(read_options, "AAA", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = db->Get(read_options, handles[2], "AAAZZZ", &value);
+ ASSERT_EQ(value, "barbar");
+
+ Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
+ Slice value_slice("barbarbar");
+
+ s = txn2->Delete(handles[2], "XXX");
+ ASSERT_OK(s);
+ s = txn2->Delete(handles[1], "XXX");
+ ASSERT_OK(s);
+
+ // This write will cause a conflict with the earlier batch write
+ s = txn2->Put(handles[1], SliceParts(key_slices, 3),
+ SliceParts(&value_slice, 1));
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ // In the above the latest change to AAAZZZ in handles[1] is delete.
+ s = db->Get(read_options, handles[1], "AAAZZZ", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ delete txn;
+ delete txn2;
+
+ txn = db->BeginTransaction(write_options, txn_options);
+ snapshot_read_options.snapshot = txn->GetSnapshot();
+
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn);
+
+ std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
+ handles[0], handles[2]};
+ std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
+ std::vector<std::string> values(4);
+
+ std::vector<Status> results = txn->MultiGetForUpdate(
+ snapshot_read_options, multiget_cfh, multiget_keys, &values);
+ ASSERT_OK(results[0]);
+ ASSERT_OK(results[1]);
+ ASSERT_OK(results[2]);
+ ASSERT_TRUE(results[3].IsNotFound());
+ ASSERT_EQ(values[0], "bar");
+ ASSERT_EQ(values[1], "barbar");
+ ASSERT_EQ(values[2], "foo");
+
+ s = txn->SingleDelete(handles[2], "ZZZ");
+ ASSERT_OK(s);
+ s = txn->Put(handles[2], "ZZZ", "YYY");
+ ASSERT_OK(s);
+ s = txn->Put(handles[2], "ZZZ", "YYYY");
+ ASSERT_OK(s);
+ s = txn->Delete(handles[2], "ZZZ");
+ ASSERT_OK(s);
+ s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
+ ASSERT_OK(s);
+
+ ASSERT_EQ(5, txn->GetNumKeys());
+
+ // Txn should commit
+ s = txn->Commit();
+ ASSERT_OK(s);
+ s = db->Get(read_options, handles[2], "ZZZ", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ // Put a key which will conflict with the next txn using the previous snapshot
+ db->Put(write_options, handles[2], "foo", "000");
+
+ results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
+ multiget_keys, &values);
+ // All results should fail since there was a conflict
+ ASSERT_TRUE(results[0].IsBusy());
+ ASSERT_TRUE(results[1].IsBusy());
+ ASSERT_TRUE(results[2].IsBusy());
+ ASSERT_TRUE(results[3].IsBusy());
+
+ s = db->Get(read_options, handles[2], "foo", &value);
+ ASSERT_EQ(value, "000");
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->DropColumnFamily(handles[1]);
+ ASSERT_OK(s);
+ s = db->DropColumnFamily(handles[2]);
+ ASSERT_OK(s);
+
+ delete txn;
+ delete txn2;
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+}
+
+TEST_P(TransactionTest, ColumnFamiliesTest2) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ string value;
+ Status s;
+
+ ColumnFamilyHandle *one, *two;
+ ColumnFamilyOptions cf_options;
+
+ // Create 2 new column families
+ s = db->CreateColumnFamily(cf_options, "ONE", &one);
+ ASSERT_OK(s);
+ s = db->CreateColumnFamily(cf_options, "TWO", &two);
+ ASSERT_OK(s);
+
+ Transaction* txn1 = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn1);
+ Transaction* txn2 = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn2);
+
+ s = txn1->Put(one, "X", "1");
+ ASSERT_OK(s);
+ s = txn1->Put(two, "X", "2");
+ ASSERT_OK(s);
+ s = txn1->Put("X", "0");
+ ASSERT_OK(s);
+
+ s = txn2->Put(one, "X", "11");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ // Drop first column family
+ s = db->DropColumnFamily(one);
+ ASSERT_OK(s);
+
+ // Should fail since column family was dropped.
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ delete txn1;
+ txn1 = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn1);
+
+ // Should fail since column family was dropped
+ s = txn1->Put(one, "X", "111");
+ ASSERT_TRUE(s.IsInvalidArgument());
+
+ s = txn1->Put(two, "X", "222");
+ ASSERT_OK(s);
+
+ s = txn1->Put("X", "000");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, two, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("222", value);
+
+ s = db->Get(read_options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("000", value);
+
+ s = db->DropColumnFamily(two);
+ ASSERT_OK(s);
+
+ delete txn1;
+ delete txn2;
+
+ delete one;
+ delete two;
+}
+
+TEST_P(TransactionTest, EmptyTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ string value;
+ Status s;
+
+ s = db->Put(write_options, "aaa", "aaa");
+ ASSERT_OK(s);
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ txn = db->BeginTransaction(write_options);
+ txn->Rollback();
+ delete txn;
+
+ txn = db->BeginTransaction(write_options);
+ s = txn->GetForUpdate(read_options, "aaa", &value);
+ ASSERT_EQ(value, "aaa");
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ txn = db->BeginTransaction(write_options);
+ txn->SetSnapshot();
+
+ s = txn->GetForUpdate(read_options, "aaa", &value);
+ ASSERT_EQ(value, "aaa");
+
+ // Conflicts with previous GetForUpdate
+ s = db->Put(write_options, "aaa", "xxx");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ // transaction expired!
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+}
+
+TEST_P(TransactionTest, PredicateManyPreceders) {
+ WriteOptions write_options;
+ ReadOptions read_options1, read_options2;
+ TransactionOptions txn_options;
+ string value;
+ Status s;
+
+ txn_options.set_snapshot = true;
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ read_options1.snapshot = txn1->GetSnapshot();
+
+ Transaction* txn2 = db->BeginTransaction(write_options);
+ txn2->SetSnapshot();
+ read_options2.snapshot = txn2->GetSnapshot();
+
+ std::vector<Slice> multiget_keys = {"1", "2", "3"};
+ std::vector<std::string> multiget_values;
+
+ std::vector<Status> results =
+ txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
+ ASSERT_TRUE(results[1].IsNotFound());
+
+ s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate
+ ASSERT_TRUE(s.IsTimedOut());
+
+ txn2->Rollback();
+
+ multiget_values.clear();
+ results =
+ txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
+ ASSERT_TRUE(results[1].IsNotFound());
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ delete txn1;
+ delete txn2;
+
+ txn1 = db->BeginTransaction(write_options, txn_options);
+ read_options1.snapshot = txn1->GetSnapshot();
+
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ read_options2.snapshot = txn2->GetSnapshot();
+
+ s = txn1->Put("4", "x");
+ ASSERT_OK(s);
+
+ s = txn2->Delete("4"); // conflict
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = txn2->GetForUpdate(read_options2, "4", &value);
+ ASSERT_TRUE(s.IsBusy());
+
+ txn2->Rollback();
+
+ delete txn1;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, LostUpdate) {
+ WriteOptions write_options;
+ ReadOptions read_options, read_options1, read_options2;
+ TransactionOptions txn_options;
+ std::string value;
+ Status s;
+
+ // Test 2 transactions writing to the same key in multiple orders and
+ // with/without snapshots
+
+ Transaction* txn1 = db->BeginTransaction(write_options);
+ Transaction* txn2 = db->BeginTransaction(write_options);
+
+ s = txn1->Put("1", "1");
+ ASSERT_OK(s);
+
+ s = txn2->Put("1", "2"); // conflict
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "1", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("1", value);
+
+ delete txn1;
+ delete txn2;
+
+ txn_options.set_snapshot = true;
+ txn1 = db->BeginTransaction(write_options, txn_options);
+ read_options1.snapshot = txn1->GetSnapshot();
+
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ read_options2.snapshot = txn2->GetSnapshot();
+
+ s = txn1->Put("1", "3");
+ ASSERT_OK(s);
+ s = txn2->Put("1", "4"); // conflict
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "1", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("3", value);
+
+ delete txn1;
+ delete txn2;
+
+ txn1 = db->BeginTransaction(write_options, txn_options);
+ read_options1.snapshot = txn1->GetSnapshot();
+
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ read_options2.snapshot = txn2->GetSnapshot();
+
+ s = txn1->Put("1", "5");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = txn2->Put("1", "6");
+ ASSERT_TRUE(s.IsBusy());
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "1", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("5", value);
+
+ delete txn1;
+ delete txn2;
+
+ txn1 = db->BeginTransaction(write_options, txn_options);
+ read_options1.snapshot = txn1->GetSnapshot();
+
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ read_options2.snapshot = txn2->GetSnapshot();
+
+ s = txn1->Put("1", "7");
+ ASSERT_OK(s);
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ txn2->SetSnapshot();
+ s = txn2->Put("1", "8");
+ ASSERT_OK(s);
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "1", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("8", value);
+
+ delete txn1;
+ delete txn2;
+
+ txn1 = db->BeginTransaction(write_options);
+ txn2 = db->BeginTransaction(write_options);
+
+ s = txn1->Put("1", "9");
+ ASSERT_OK(s);
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = txn2->Put("1", "10");
+ ASSERT_OK(s);
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ delete txn1;
+ delete txn2;
+
+ s = db->Get(read_options, "1", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "10");
+}
+
+TEST_P(TransactionTest, UntrackedWrites) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ // Verify transaction rollback works for untracked keys.
+ Transaction* txn = db->BeginTransaction(write_options);
+ txn->SetSnapshot();
+
+ s = txn->PutUntracked("untracked", "0");
+ ASSERT_OK(s);
+ txn->Rollback();
+ s = db->Get(read_options, "untracked", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ delete txn;
+ txn = db->BeginTransaction(write_options);
+ txn->SetSnapshot();
+
+ s = db->Put(write_options, "untracked", "x");
+ ASSERT_OK(s);
+
+ // Untracked writes should succeed even though key was written after snapshot
+ s = txn->PutUntracked("untracked", "1");
+ ASSERT_OK(s);
+ s = txn->MergeUntracked("untracked", "2");
+ ASSERT_OK(s);
+ s = txn->DeleteUntracked("untracked");
+ ASSERT_OK(s);
+
+ // Conflict
+ s = txn->Put("untracked", "3");
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "untracked", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, ExpiredTransaction) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ string value;
+ Status s;
+
+ // Set txn expiration timeout to 0 microseconds (expires instantly)
+ txn_options.expiration = 0;
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ s = txn1->Put("X", "1");
+ ASSERT_OK(s);
+
+ s = txn1->Put("Y", "1");
+ ASSERT_OK(s);
+
+ Transaction* txn2 = db->BeginTransaction(write_options);
+
+ // txn2 should be able to write to X since txn1 has expired
+ s = txn2->Put("X", "2");
+ ASSERT_OK(s);
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ s = db->Get(read_options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("2", value);
+
+ s = txn1->Put("Z", "1");
+ ASSERT_OK(s);
+
+ // txn1 should fail to commit since it is expired
+ s = txn1->Commit();
+ ASSERT_TRUE(s.IsExpired());
+
+ s = db->Get(read_options, "Y", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = db->Get(read_options, "Z", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ delete txn1;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, ReinitializeTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ std::string value;
+ Status s;
+
+ // Set txn expiration timeout to 0 microseconds (expires instantly)
+ txn_options.expiration = 0;
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ // Reinitialize transaction to no long expire
+ txn_options.expiration = -1;
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+
+ s = txn1->Put("Z", "z");
+ ASSERT_OK(s);
+
+ // Should commit since not expired
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+
+ s = txn1->Put("Z", "zz");
+ ASSERT_OK(s);
+
+ // Reinitilize txn1 and verify that Z gets unlocked
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options, nullptr);
+ s = txn2->Put("Z", "zzz");
+ ASSERT_OK(s);
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+
+ s = db->Get(read_options, "Z", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "zzz");
+
+ // Verify snapshots get reinitialized correctly
+ txn1->SetSnapshot();
+ s = txn1->Put("Z", "zzzz");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "Z", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "zzzz");
+
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+ const Snapshot* snapshot = txn1->GetSnapshot();
+ ASSERT_FALSE(snapshot);
+
+ txn_options.set_snapshot = true;
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+ snapshot = txn1->GetSnapshot();
+ ASSERT_TRUE(snapshot);
+
+ s = txn1->Put("Z", "a");
+ ASSERT_OK(s);
+
+ txn1->Rollback();
+
+ s = txn1->Put("Y", "y");
+ ASSERT_OK(s);
+
+ txn_options.set_snapshot = false;
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+ snapshot = txn1->GetSnapshot();
+ ASSERT_FALSE(snapshot);
+
+ s = txn1->Put("X", "x");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "Z", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(value, "zzzz");
+
+ s = db->Get(read_options, "Y", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+
+ s = txn1->SetName("name");
+ ASSERT_OK(s);
+
+ s = txn1->Prepare();
+ ASSERT_OK(s);
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ txn1 = db->BeginTransaction(write_options, txn_options, txn1);
+
+ s = txn1->SetName("name");
+ ASSERT_OK(s);
+
+ delete txn1;
+}
+
+TEST_P(TransactionTest, Rollback) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+
+ ASSERT_OK(s);
+
+ s = txn1->Put("X", "1");
+ ASSERT_OK(s);
+
+ Transaction* txn2 = db->BeginTransaction(write_options);
+
+ // txn2 should not be able to write to X since txn1 has it locked
+ s = txn2->Put("X", "2");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ txn1->Rollback();
+ delete txn1;
+
+ // txn2 should now be able to write to X
+ s = txn2->Put("X", "3");
+ ASSERT_OK(s);
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("3", value);
+
+ delete txn2;
+}
+
+TEST_P(TransactionTest, LockLimitTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ TransactionOptions txn_options;
+ string value;
+ Status s;
+
+ delete db;
+ db = nullptr;
+
+ // Open DB with a lock limit of 3
+ txn_db_options.max_num_locks = 3;
+ s = TransactionDB::Open(options, txn_db_options, dbname, &db);
+ assert(db != nullptr);
+ ASSERT_OK(s);
+
+ // Create a txn and verify we can only lock up to 3 keys
+ Transaction* txn = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn);
+
+ s = txn->Put("X", "x");
+ ASSERT_OK(s);
+
+ s = txn->Put("Y", "y");
+ ASSERT_OK(s);
+
+ s = txn->Put("Z", "z");
+ ASSERT_OK(s);
+
+ // lock limit reached
+ s = txn->Put("W", "w");
+ ASSERT_TRUE(s.IsBusy());
+
+ // re-locking same key shouldn't put us over the limit
+ s = txn->Put("X", "xx");
+ ASSERT_OK(s);
+
+ s = txn->GetForUpdate(read_options, "W", &value);
+ ASSERT_TRUE(s.IsBusy());
+ s = txn->GetForUpdate(read_options, "V", &value);
+ ASSERT_TRUE(s.IsBusy());
+
+ // re-locking same key shouldn't put us over the limit
+ s = txn->GetForUpdate(read_options, "Y", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("y", value);
+
+ s = txn->Get(read_options, "W", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ // "X" currently locked
+ s = txn2->Put("X", "x");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ // lock limit reached
+ s = txn2->Put("M", "m");
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "X", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("xx", value);
+
+ s = db->Get(read_options, "W", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ // Committing txn should release its locks and allow txn2 to proceed
+ s = txn2->Put("X", "x2");
+ ASSERT_OK(s);
+
+ s = txn2->Delete("X");
+ ASSERT_OK(s);
+
+ s = txn2->Put("M", "m");
+ ASSERT_OK(s);
+
+ s = txn2->Put("Z", "z2");
+ ASSERT_OK(s);
+
+ // lock limit reached
+ s = txn2->Delete("Y");
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "Z", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("z2", value);
+
+ s = db->Get(read_options, "Y", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("y", value);
+
+ s = db->Get(read_options, "X", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ delete txn;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, IteratorTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ std::string value;
+ Status s;
+
+ // Write some keys to the db
+ s = db->Put(write_options, "A", "a");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "G", "g");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "F", "f");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "C", "c");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "D", "d");
+ ASSERT_OK(s);
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ // Write some keys in a txn
+ s = txn->Put("B", "b");
+ ASSERT_OK(s);
+
+ s = txn->Put("H", "h");
+ ASSERT_OK(s);
+
+ s = txn->Delete("D");
+ ASSERT_OK(s);
+
+ s = txn->Put("E", "e");
+ ASSERT_OK(s);
+
+ txn->SetSnapshot();
+ const Snapshot* snapshot = txn->GetSnapshot();
+
+ // Write some keys to the db after the snapshot
+ s = db->Put(write_options, "BB", "xx");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "C", "xx");
+ ASSERT_OK(s);
+
+ read_options.snapshot = snapshot;
+ Iterator* iter = txn->GetIterator(read_options);
+ ASSERT_OK(iter->status());
+ iter->SeekToFirst();
+
+ // Read all keys via iter and lock them all
+ std::string results[] = {"a", "b", "c", "e", "f", "g", "h"};
+ for (int i = 0; i < 7; i++) {
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(results[i], iter->value().ToString());
+
+ s = txn->GetForUpdate(read_options, iter->key(), nullptr);
+ if (i == 2) {
+ // "C" was modified after txn's snapshot
+ ASSERT_TRUE(s.IsBusy());
+ } else {
+ ASSERT_OK(s);
+ }
+
+ iter->Next();
+ }
+ ASSERT_FALSE(iter->Valid());
+
+ iter->Seek("G");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("g", iter->value().ToString());
+
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("f", iter->value().ToString());
+
+ iter->Seek("D");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("e", iter->value().ToString());
+
+ iter->Seek("C");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("c", iter->value().ToString());
+
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("e", iter->value().ToString());
+
+ iter->Seek("");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a", iter->value().ToString());
+
+ iter->Seek("X");
+ ASSERT_OK(iter->status());
+ ASSERT_FALSE(iter->Valid());
+
+ iter->SeekToLast();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("h", iter->value().ToString());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ delete iter;
+ delete txn;
+}
+
+TEST_P(TransactionTest, DisableIndexingTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ s = txn->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ txn->DisableIndexing();
+
+ s = txn->Put("B", "b");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "B", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ Iterator* iter = txn->GetIterator(read_options);
+ ASSERT_OK(iter->status());
+
+ iter->Seek("B");
+ ASSERT_OK(iter->status());
+ ASSERT_FALSE(iter->Valid());
+
+ s = txn->Delete("A");
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ txn->EnableIndexing();
+
+ s = txn->Put("B", "bb");
+ ASSERT_OK(s);
+
+ iter->Seek("B");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("bb", iter->value().ToString());
+
+ s = txn->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("bb", value);
+
+ s = txn->Put("A", "aa");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("aa", value);
+
+ delete iter;
+ delete txn;
+}
+
+TEST_P(TransactionTest, SavepointTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ ASSERT_EQ(0, txn->GetNumPuts());
+
+ s = txn->RollbackToSavePoint();
+ ASSERT_TRUE(s.IsNotFound());
+
+ txn->SetSavePoint(); // 1
+
+ ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
+ s = txn->RollbackToSavePoint();
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Put("B", "b");
+ ASSERT_OK(s);
+
+ ASSERT_EQ(1, txn->GetNumPuts());
+ ASSERT_EQ(0, txn->GetNumDeletes());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b", value);
+
+ delete txn;
+ txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ s = txn->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Put("B", "bb");
+ ASSERT_OK(s);
+
+ s = txn->Put("C", "c");
+ ASSERT_OK(s);
+
+ txn->SetSavePoint(); // 2
+
+ s = txn->Delete("B");
+ ASSERT_OK(s);
+
+ s = txn->Put("C", "cc");
+ ASSERT_OK(s);
+
+ s = txn->Put("D", "d");
+ ASSERT_OK(s);
+
+ ASSERT_EQ(5, txn->GetNumPuts());
+ ASSERT_EQ(1, txn->GetNumDeletes());
+
+ ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
+
+ ASSERT_EQ(3, txn->GetNumPuts());
+ ASSERT_EQ(0, txn->GetNumDeletes());
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ s = txn->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("bb", value);
+
+ s = txn->Get(read_options, "C", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("c", value);
+
+ s = txn->Get(read_options, "D", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Put("E", "e");
+ ASSERT_OK(s);
+
+ ASSERT_EQ(5, txn->GetNumPuts());
+ ASSERT_EQ(0, txn->GetNumDeletes());
+
+ // Rollback to beginning of txn
+ s = txn->RollbackToSavePoint();
+ ASSERT_TRUE(s.IsNotFound());
+ txn->Rollback();
+
+ ASSERT_EQ(0, txn->GetNumPuts());
+ ASSERT_EQ(0, txn->GetNumDeletes());
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b", value);
+
+ s = txn->Get(read_options, "D", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Get(read_options, "D", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Get(read_options, "E", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Put("A", "aa");
+ ASSERT_OK(s);
+
+ s = txn->Put("F", "f");
+ ASSERT_OK(s);
+
+ ASSERT_EQ(2, txn->GetNumPuts());
+ ASSERT_EQ(0, txn->GetNumDeletes());
+
+ txn->SetSavePoint(); // 3
+ txn->SetSavePoint(); // 4
+
+ s = txn->Put("G", "g");
+ ASSERT_OK(s);
+
+ s = txn->SingleDelete("F");
+ ASSERT_OK(s);
+
+ s = txn->Delete("B");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("aa", value);
+
+ s = txn->Get(read_options, "F", &value);
+ // According to db.h, doing a SingleDelete on a key that has been
+ // overwritten will have undefinied behavior. So it is unclear what the
+ // result of fetching "F" should be. The current implementation will
+ // return NotFound in this case.
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Get(read_options, "B", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_EQ(3, txn->GetNumPuts());
+ ASSERT_EQ(2, txn->GetNumDeletes());
+
+ ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
+
+ ASSERT_EQ(2, txn->GetNumPuts());
+ ASSERT_EQ(0, txn->GetNumDeletes());
+
+ s = txn->Get(read_options, "F", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("f", value);
+
+ s = txn->Get(read_options, "G", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "F", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("f", value);
+
+ s = db->Get(read_options, "G", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = db->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("aa", value);
+
+ s = db->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b", value);
+
+ s = db->Get(read_options, "C", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = db->Get(read_options, "D", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = db->Get(read_options, "E", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ delete txn;
+}
+
+TEST_P(TransactionTest, SavepointTest2) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ Status s;
+
+ txn_options.lock_timeout = 1; // 1 ms
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+
+ s = txn1->Put("A", "");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 1
+
+ s = txn1->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn1->Put("C", "c");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 2
+
+ s = txn1->Put("A", "a");
+ ASSERT_OK(s);
+ s = txn1->Put("B", "b");
+ ASSERT_OK(s);
+
+ ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 2
+
+ // Verify that "A" and "C" is still locked while "B" is not
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ s = txn2->Put("A", "a2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b2");
+ ASSERT_OK(s);
+
+ s = txn1->Put("A", "aa");
+ ASSERT_OK(s);
+ s = txn1->Put("B", "bb");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+
+ s = txn1->Put("A", "aaa");
+ ASSERT_OK(s);
+ s = txn1->Put("B", "bbb");
+ ASSERT_OK(s);
+ s = txn1->Put("C", "ccc");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 3
+ ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 3
+
+ // Verify that "A", "B", "C" are still locked
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ s = txn2->Put("A", "a2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c2");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ ASSERT_OK(txn1->RollbackToSavePoint()); // Rollback to 1
+
+ // Verify that only "A" is locked
+ s = txn2->Put("A", "a3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b3");
+ ASSERT_OK(s);
+ s = txn2->Put("C", "c3po");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+ delete txn1;
+
+ // Verify "A" "C" "B" are no longer locked
+ s = txn2->Put("A", "a4");
+ ASSERT_OK(s);
+ s = txn2->Put("B", "b4");
+ ASSERT_OK(s);
+ s = txn2->Put("C", "c4");
+ ASSERT_OK(s);
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+}
+
+TEST_P(TransactionTest, SavepointTest3) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ Status s;
+
+ txn_options.lock_timeout = 1; // 1 ms
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+
+ s = txn1->PopSavePoint(); // No SavePoint present
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn1->Put("A", "");
+ ASSERT_OK(s);
+
+ s = txn1->PopSavePoint(); // Still no SavePoint present
+ ASSERT_TRUE(s.IsNotFound());
+
+ txn1->SetSavePoint(); // 1
+
+ s = txn1->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn1->PopSavePoint(); // Remove 1
+ ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
+
+ // Verify that "A" is still locked
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ s = txn2->Put("A", "a2");
+ ASSERT_TRUE(s.IsTimedOut());
+ delete txn2;
+
+ txn1->SetSavePoint(); // 2
+
+ s = txn1->Put("B", "b");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 3
+
+ s = txn1->Put("B", "b2");
+ ASSERT_OK(s);
+
+ ASSERT_OK(txn1->RollbackToSavePoint()); // Roll back to 2
+
+ s = txn1->PopSavePoint();
+ ASSERT_OK(s);
+
+ s = txn1->PopSavePoint();
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+ delete txn1;
+
+ std::string value;
+
+ // tnx1 should have modified "A" to "a"
+ s = db->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ // tnx1 should have set "B" to just "b"
+ s = db->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b", value);
+
+ s = db->Get(read_options, "C", &value);
+ ASSERT_TRUE(s.IsNotFound());
+}
+
+TEST_P(TransactionTest, UndoGetForUpdateTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ std::string value;
+ Status s;
+
+ txn_options.lock_timeout = 1; // 1 ms
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+
+ txn1->UndoGetForUpdate("A");
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+ delete txn1;
+
+ txn1 = db->BeginTransaction(write_options, txn_options);
+
+ txn1->UndoGetForUpdate("A");
+ s = txn1->GetForUpdate(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ // Verify that A is locked
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ s = txn2->Put("A", "a");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ txn1->UndoGetForUpdate("A");
+
+ // Verify that A is now unlocked
+ s = txn2->Put("A", "a2");
+ ASSERT_OK(s);
+ txn2->Commit();
+ delete txn2;
+ s = db->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a2", value);
+
+ s = txn1->Delete("A");
+ ASSERT_OK(s);
+ s = txn1->GetForUpdate(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn1->Put("B", "b3");
+ ASSERT_OK(s);
+ s = txn1->GetForUpdate(read_options, "B", &value);
+ ASSERT_OK(s);
+
+ txn1->UndoGetForUpdate("A");
+ txn1->UndoGetForUpdate("B");
+
+ // Verify that A and B are still locked
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ s = txn2->Put("A", "a4");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b4");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ txn1->Rollback();
+ delete txn1;
+
+ // Verify that A and B are no longer locked
+ s = txn2->Put("A", "a5");
+ ASSERT_OK(s);
+ s = txn2->Put("B", "b5");
+ ASSERT_OK(s);
+ s = txn2->Commit();
+ delete txn2;
+ ASSERT_OK(s);
+
+ txn1 = db->BeginTransaction(write_options, txn_options);
+
+ s = txn1->GetForUpdate(read_options, "A", &value);
+ ASSERT_OK(s);
+ s = txn1->GetForUpdate(read_options, "A", &value);
+ ASSERT_OK(s);
+ s = txn1->GetForUpdate(read_options, "C", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = txn1->GetForUpdate(read_options, "A", &value);
+ ASSERT_OK(s);
+ s = txn1->GetForUpdate(read_options, "C", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = txn1->GetForUpdate(read_options, "B", &value);
+ ASSERT_OK(s);
+ s = txn1->Put("B", "b5");
+ s = txn1->GetForUpdate(read_options, "B", &value);
+ ASSERT_OK(s);
+
+ txn1->UndoGetForUpdate("A");
+ txn1->UndoGetForUpdate("B");
+ txn1->UndoGetForUpdate("C");
+ txn1->UndoGetForUpdate("X");
+
+ // Verify A,B,C are locked
+ txn2 = db->BeginTransaction(write_options, txn_options);
+ s = txn2->Put("A", "a6");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Delete("B");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c6");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("X", "x6");
+ ASSERT_OK(s);
+
+ txn1->UndoGetForUpdate("A");
+ txn1->UndoGetForUpdate("B");
+ txn1->UndoGetForUpdate("C");
+ txn1->UndoGetForUpdate("X");
+
+ // Verify A,B are locked and C is not
+ s = txn2->Put("A", "a6");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Delete("B");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c6");
+ ASSERT_OK(s);
+ s = txn2->Put("X", "x6");
+ ASSERT_OK(s);
+
+ txn1->UndoGetForUpdate("A");
+ txn1->UndoGetForUpdate("B");
+ txn1->UndoGetForUpdate("C");
+ txn1->UndoGetForUpdate("X");
+
+ // Verify B is locked and A and C are not
+ s = txn2->Put("A", "a7");
+ ASSERT_OK(s);
+ s = txn2->Delete("B");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c7");
+ ASSERT_OK(s);
+ s = txn2->Put("X", "x7");
+ ASSERT_OK(s);
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+ delete txn1;
+}
+
+TEST_P(TransactionTest, UndoGetForUpdateTest2) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ TransactionOptions txn_options;
+ std::string value;
+ Status s;
+
+ s = db->Put(write_options, "A", "");
+ ASSERT_OK(s);
+
+ txn_options.lock_timeout = 1; // 1 ms
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn1);
+
+ s = txn1->GetForUpdate(read_options, "A", &value);
+ ASSERT_OK(s);
+ s = txn1->GetForUpdate(read_options, "B", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn1->Put("F", "f");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 1
+
+ txn1->UndoGetForUpdate("A");
+
+ s = txn1->GetForUpdate(read_options, "C", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = txn1->GetForUpdate(read_options, "D", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn1->Put("E", "e");
+ ASSERT_OK(s);
+ s = txn1->GetForUpdate(read_options, "E", &value);
+ ASSERT_OK(s);
+
+ s = txn1->GetForUpdate(read_options, "F", &value);
+ ASSERT_OK(s);
+
+ // Verify A,B,C,D,E,F are still locked
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ s = txn2->Put("A", "a1");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b1");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c1");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("D", "d1");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("E", "e1");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("F", "f1");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ txn1->UndoGetForUpdate("C");
+ txn1->UndoGetForUpdate("E");
+
+ // Verify A,B,D,E,F are still locked and C is not.
+ s = txn2->Put("A", "a2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("D", "d2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("E", "e2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("F", "f2");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c2");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 2
+
+ s = txn1->Put("H", "h");
+ ASSERT_OK(s);
+
+ txn1->UndoGetForUpdate("A");
+ txn1->UndoGetForUpdate("B");
+ txn1->UndoGetForUpdate("C");
+ txn1->UndoGetForUpdate("D");
+ txn1->UndoGetForUpdate("E");
+ txn1->UndoGetForUpdate("F");
+ txn1->UndoGetForUpdate("G");
+ txn1->UndoGetForUpdate("H");
+
+ // Verify A,B,D,E,F,H are still locked and C,G are not.
+ s = txn2->Put("A", "a3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("D", "d3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("E", "e3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("F", "f3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("H", "h3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c3");
+ ASSERT_OK(s);
+ s = txn2->Put("G", "g3");
+ ASSERT_OK(s);
+
+ txn1->RollbackToSavePoint(); // rollback to 2
+
+ // Verify A,B,D,E,F are still locked and C,G,H are not.
+ s = txn2->Put("A", "a3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("D", "d3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("E", "e3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("F", "f3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c3");
+ ASSERT_OK(s);
+ s = txn2->Put("G", "g3");
+ ASSERT_OK(s);
+ s = txn2->Put("H", "h3");
+ ASSERT_OK(s);
+
+ txn1->UndoGetForUpdate("A");
+ txn1->UndoGetForUpdate("B");
+ txn1->UndoGetForUpdate("C");
+ txn1->UndoGetForUpdate("D");
+ txn1->UndoGetForUpdate("E");
+ txn1->UndoGetForUpdate("F");
+ txn1->UndoGetForUpdate("G");
+ txn1->UndoGetForUpdate("H");
+
+ // Verify A,B,E,F are still locked and C,D,G,H are not.
+ s = txn2->Put("A", "a3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("E", "e3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("F", "f3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c3");
+ ASSERT_OK(s);
+ s = txn2->Put("D", "d3");
+ ASSERT_OK(s);
+ s = txn2->Put("G", "g3");
+ ASSERT_OK(s);
+ s = txn2->Put("H", "h3");
+ ASSERT_OK(s);
+
+ txn1->RollbackToSavePoint(); // rollback to 1
+
+ // Verify A,B,F are still locked and C,D,E,G,H are not.
+ s = txn2->Put("A", "a3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("B", "b3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("F", "f3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("C", "c3");
+ ASSERT_OK(s);
+ s = txn2->Put("D", "d3");
+ ASSERT_OK(s);
+ s = txn2->Put("E", "e3");
+ ASSERT_OK(s);
+ s = txn2->Put("G", "g3");
+ ASSERT_OK(s);
+ s = txn2->Put("H", "h3");
+ ASSERT_OK(s);
+
+ txn1->UndoGetForUpdate("A");
+ txn1->UndoGetForUpdate("B");
+ txn1->UndoGetForUpdate("C");
+ txn1->UndoGetForUpdate("D");
+ txn1->UndoGetForUpdate("E");
+ txn1->UndoGetForUpdate("F");
+ txn1->UndoGetForUpdate("G");
+ txn1->UndoGetForUpdate("H");
+
+ // Verify F is still locked and A,B,C,D,E,G,H are not.
+ s = txn2->Put("F", "f3");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Put("A", "a3");
+ ASSERT_OK(s);
+ s = txn2->Put("B", "b3");
+ ASSERT_OK(s);
+ s = txn2->Put("C", "c3");
+ ASSERT_OK(s);
+ s = txn2->Put("D", "d3");
+ ASSERT_OK(s);
+ s = txn2->Put("E", "e3");
+ ASSERT_OK(s);
+ s = txn2->Put("G", "g3");
+ ASSERT_OK(s);
+ s = txn2->Put("H", "h3");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ delete txn1;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, TimeoutTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ delete db;
+ db = nullptr;
+
+ // transaction writes have an infinite timeout,
+ // but we will override this when we start a txn
+ // db writes have infinite timeout
+ txn_db_options.transaction_lock_timeout = -1;
+ txn_db_options.default_lock_timeout = -1;
+
+ s = TransactionDB::Open(options, txn_db_options, dbname, &db);
+ assert(db != nullptr);
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "aaa", "aaa");
+ ASSERT_OK(s);
+
+ TransactionOptions txn_options0;
+ txn_options0.expiration = 100; // 100ms
+ txn_options0.lock_timeout = 50; // txn timeout no longer infinite
+ Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);
+
+ s = txn1->GetForUpdate(read_options, "aaa", nullptr);
+ ASSERT_OK(s);
+
+ // Conflicts with previous GetForUpdate.
+ // Since db writes do not have a timeout, this should eventually succeed when
+ // the transaction expires.
+ s = db->Put(write_options, "aaa", "xxx");
+ ASSERT_OK(s);
+
+ ASSERT_GE(txn1->GetElapsedTime(),
+ static_cast<uint64_t>(txn_options0.expiration));
+
+ s = txn1->Commit();
+ ASSERT_TRUE(s.IsExpired()); // expired!
+
+ s = db->Get(read_options, "aaa", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("xxx", value);
+
+ delete txn1;
+ delete db;
+
+ // transaction writes have 10ms timeout,
+ // db writes have infinite timeout
+ txn_db_options.transaction_lock_timeout = 50;
+ txn_db_options.default_lock_timeout = -1;
+
+ s = TransactionDB::Open(options, txn_db_options, dbname, &db);
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "aaa", "aaa");
+ ASSERT_OK(s);
+
+ TransactionOptions txn_options;
+ txn_options.expiration = 100; // 100ms
+ txn1 = db->BeginTransaction(write_options, txn_options);
+
+ s = txn1->GetForUpdate(read_options, "aaa", nullptr);
+ ASSERT_OK(s);
+
+ // Conflicts with previous GetForUpdate.
+ // Since db writes do not have a timeout, this should eventually succeed when
+ // the transaction expires.
+ s = db->Put(write_options, "aaa", "xxx");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_NOK(s); // expired!
+
+ s = db->Get(read_options, "aaa", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("xxx", value);
+
+ delete txn1;
+ txn_options.expiration = 6000000; // 100 minutes
+ txn_options.lock_timeout = 1; // 1ms
+ txn1 = db->BeginTransaction(write_options, txn_options);
+ txn1->SetLockTimeout(100);
+
+ TransactionOptions txn_options2;
+ txn_options2.expiration = 10; // 10ms
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options2);
+ ASSERT_OK(s);
+
+ s = txn2->Put("a", "2");
+ ASSERT_OK(s);
+
+ // txn1 has a lock timeout longer than txn2's expiration, so it will win
+ s = txn1->Delete("a");
+ ASSERT_OK(s);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ // txn2 should be expired out since txn1 waiting until its timeout expired.
+ s = txn2->Commit();
+ ASSERT_TRUE(s.IsExpired());
+
+ delete txn1;
+ delete txn2;
+ txn_options.expiration = 6000000; // 100 minutes
+ txn1 = db->BeginTransaction(write_options, txn_options);
+ txn_options2.expiration = 100000000;
+ txn2 = db->BeginTransaction(write_options, txn_options2);
+
+ s = txn1->Delete("asdf");
+ ASSERT_OK(s);
+
+ // txn2 has a smaller lock timeout than txn1's expiration, so it will time out
+ s = txn2->Delete("asdf");
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ s = txn2->Put("asdf", "asdf");
+ ASSERT_OK(s);
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+
+ s = db->Get(read_options, "asdf", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("asdf", value);
+
+ delete txn1;
+ delete txn2;
+}
+
+TEST_P(TransactionTest, SingleDeleteTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options);
+ ASSERT_TRUE(txn);
+
+ s = txn->SingleDelete("A");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ txn = db->BeginTransaction(write_options);
+
+ s = txn->SingleDelete("A");
+ ASSERT_OK(s);
+
+ s = txn->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ s = db->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ txn = db->BeginTransaction(write_options);
+
+ s = txn->SingleDelete("A");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ s = db->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ txn = db->BeginTransaction(write_options);
+ Transaction* txn2 = db->BeginTransaction(write_options);
+ txn2->SetSnapshot();
+
+ s = txn->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Put("A", "a2");
+ ASSERT_OK(s);
+
+ s = txn->SingleDelete("A");
+ ASSERT_OK(s);
+
+ s = txn->SingleDelete("B");
+ ASSERT_OK(s);
+
+ // According to db.h, doing a SingleDelete on a key that has been
+ // overwritten will have undefinied behavior. So it is unclear what the
+ // result of fetching "A" should be. The current implementation will
+ // return NotFound in this case.
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn2->Put("B", "b");
+ ASSERT_TRUE(s.IsTimedOut());
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ // According to db.h, doing a SingleDelete on a key that has been
+ // overwritten will have undefinied behavior. So it is unclear what the
+ // result of fetching "A" should be. The current implementation will
+ // return NotFound in this case.
+ s = db->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = db->Get(read_options, "B", &value);
+ ASSERT_TRUE(s.IsNotFound());
+}
+
+TEST_P(TransactionTest, MergeTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
+ ASSERT_TRUE(txn);
+
+ s = db->Put(write_options, "A", "a0");
+ ASSERT_OK(s);
+
+ s = txn->Merge("A", "1");
+ ASSERT_OK(s);
+
+ s = txn->Merge("A", "2");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ s = txn->Put("A", "a");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a", value);
+
+ s = txn->Merge("A", "3");
+ ASSERT_OK(s);
+
+ s = txn->Get(read_options, "A", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+
+ TransactionOptions txn_options;
+ txn_options.lock_timeout = 1; // 1 ms
+ Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
+ ASSERT_TRUE(txn2);
+
+ // verify that txn has "A" locked
+ s = txn2->Merge("A", "4");
+ ASSERT_TRUE(s.IsTimedOut());
+
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+
+ s = txn->Commit();
+ ASSERT_OK(s);
+ delete txn;
+
+ s = db->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a,3", value);
+}
+
+TEST_P(TransactionTest, DeferSnapshotTest) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+ Status s;
+
+ s = db->Put(write_options, "A", "a0");
+ ASSERT_OK(s);
+
+ Transaction* txn1 = db->BeginTransaction(write_options);
+ Transaction* txn2 = db->BeginTransaction(write_options);
+
+ txn1->SetSnapshotOnNextOperation();
+ auto snapshot = txn1->GetSnapshot();
+ ASSERT_FALSE(snapshot);
+
+ s = txn2->Put("A", "a2");
+ ASSERT_OK(s);
+ s = txn2->Commit();
+ ASSERT_OK(s);
+ delete txn2;
+
+ s = txn1->GetForUpdate(read_options, "A", &value);
+ // Should not conflict with txn2 since snapshot wasn't set until
+ // GetForUpdate was called.
+ ASSERT_OK(s);
+ ASSERT_EQ("a2", value);
+
+ s = txn1->Put("A", "a1");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "B", "b0");
+ ASSERT_OK(s);
+
+ // Cannot lock B since it was written after the snapshot was set
+ s = txn1->Put("B", "b1");
+ ASSERT_TRUE(s.IsBusy());
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+ delete txn1;
+
+ s = db->Get(read_options, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a1", value);
+
+ s = db->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b0", value);
+}
+
+TEST_P(TransactionTest, DeferSnapshotTest2) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn1 = db->BeginTransaction(write_options);
+
+ txn1->SetSnapshot();
+
+ s = txn1->Put("A", "a1");
+ ASSERT_OK(s);
+
+ s = db->Put(write_options, "C", "c0");
+ ASSERT_OK(s);
+ s = db->Put(write_options, "D", "d0");
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+
+ txn1->SetSnapshotOnNextOperation();
+
+ s = txn1->Get(snapshot_read_options, "C", &value);
+ // Snapshot was set before C was written
+ ASSERT_TRUE(s.IsNotFound());
+ s = txn1->Get(snapshot_read_options, "D", &value);
+ // Snapshot was set before D was written
+ ASSERT_TRUE(s.IsNotFound());
+
+ // Snapshot should not have changed yet.
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+
+ s = txn1->Get(snapshot_read_options, "C", &value);
+ // Snapshot was set before C was written
+ ASSERT_TRUE(s.IsNotFound());
+ s = txn1->Get(snapshot_read_options, "D", &value);
+ // Snapshot was set before D was written
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = txn1->GetForUpdate(read_options, "C", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("c0", value);
+
+ s = db->Put(write_options, "D", "d00");
+ ASSERT_OK(s);
+
+ // Snapshot is now set
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ s = txn1->Get(snapshot_read_options, "D", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("d0", value);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+ delete txn1;
+}
+
+TEST_P(TransactionTest, DeferSnapshotSavePointTest) {
+ WriteOptions write_options;
+ ReadOptions read_options, snapshot_read_options;
+ std::string value;
+ Status s;
+
+ Transaction* txn1 = db->BeginTransaction(write_options);
+
+ txn1->SetSavePoint(); // 1
+
+ s = db->Put(write_options, "T", "1");
+ ASSERT_OK(s);
+
+ txn1->SetSnapshotOnNextOperation();
+
+ s = db->Put(write_options, "T", "2");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 2
+
+ s = db->Put(write_options, "T", "3");
+ ASSERT_OK(s);
+
+ s = txn1->Put("A", "a");
+ ASSERT_OK(s);
+
+ txn1->SetSavePoint(); // 3
+
+ s = db->Put(write_options, "T", "4");
+ ASSERT_OK(s);
+
+ txn1->SetSnapshot();
+ txn1->SetSnapshotOnNextOperation();
+
+ txn1->SetSavePoint(); // 4
+
+ s = db->Put(write_options, "T", "5");
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ s = txn1->Get(snapshot_read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("4", value);
+
+ s = txn1->Put("A", "a1");
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ s = txn1->Get(snapshot_read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("5", value);
+
+ s = txn1->RollbackToSavePoint(); // Rollback to 4
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ s = txn1->Get(snapshot_read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("4", value);
+
+ s = txn1->RollbackToSavePoint(); // Rollback to 3
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ s = txn1->Get(snapshot_read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("3", value);
+
+ s = txn1->Get(read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("5", value);
+
+ s = txn1->RollbackToSavePoint(); // Rollback to 2
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ ASSERT_FALSE(snapshot_read_options.snapshot);
+ s = txn1->Get(snapshot_read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("5", value);
+
+ s = txn1->Delete("A");
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ ASSERT_TRUE(snapshot_read_options.snapshot);
+ s = txn1->Get(snapshot_read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("5", value);
+
+ s = txn1->RollbackToSavePoint(); // Rollback to 1
+ ASSERT_OK(s);
+
+ s = txn1->Delete("A");
+ ASSERT_OK(s);
+
+ snapshot_read_options.snapshot = txn1->GetSnapshot();
+ ASSERT_FALSE(snapshot_read_options.snapshot);
+ s = txn1->Get(snapshot_read_options, "T", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("5", value);
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ delete txn1;
+}
+
+TEST_P(TransactionTest, SetSnapshotOnNextOperationWithNotification) {
+ WriteOptions write_options;
+ ReadOptions read_options;
+ std::string value;
+
+ class Notifier : public TransactionNotifier {
+ private:
+ const Snapshot** snapshot_ptr_;
+
+ public:
+ explicit Notifier(const Snapshot** snapshot_ptr)
+ : snapshot_ptr_(snapshot_ptr) {}
+
+ void SnapshotCreated(const Snapshot* newSnapshot) {
+ *snapshot_ptr_ = newSnapshot;
+ }
+ };
+
+ std::shared_ptr<Notifier> notifier =
+ std::make_shared<Notifier>(&read_options.snapshot);
+ Status s;
+
+ s = db->Put(write_options, "B", "0");
+ ASSERT_OK(s);
+
+ Transaction* txn1 = db->BeginTransaction(write_options);
+
+ txn1->SetSnapshotOnNextOperation(notifier);
+ ASSERT_FALSE(read_options.snapshot);
+
+ s = db->Put(write_options, "B", "1");
+ ASSERT_OK(s);
+
+ // A Get does not generate the snapshot
+ s = txn1->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_FALSE(read_options.snapshot);
+ ASSERT_EQ(value, "1");
+
+ // Any other operation does
+ s = txn1->Put("A", "0");
+ ASSERT_OK(s);
+
+ // Now change "B".
+ s = db->Put(write_options, "B", "2");
+ ASSERT_OK(s);
+
+ // The original value should still be read
+ s = txn1->Get(read_options, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_TRUE(read_options.snapshot);
+ ASSERT_EQ(value, "1");
+
+ s = txn1->Commit();
+ ASSERT_OK(s);
+
+ delete txn1;
+}
+
+
+TEST_P(TransactionTest, MemoryLimitTest) {
+ TransactionOptions txn_options;
+ // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
+ txn_options.max_write_batch_size = 29;
+ std::string value;
+ Status s;
+
+ Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
+ ASSERT_TRUE(txn);
+
+ ASSERT_EQ(0, txn->GetNumPuts());
+ ASSERT_LE(0, txn->GetID());
+
+ s = txn->Put(Slice("a"), Slice("...."));
+ ASSERT_OK(s);
+ ASSERT_EQ(1, txn->GetNumPuts());
+
+ s = txn->Put(Slice("b"), Slice("...."));
+ ASSERT_OK(s);
+ ASSERT_EQ(2, txn->GetNumPuts());
+
+ s = txn->Put(Slice("b"), Slice("...."));
+ auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
+ // For write unprepared, write batches exceeding max_write_batch_size will
+ // just flush to DB instead of returning a memory limit error.
+ if (pdb->GetTxnDBOptions().write_policy != WRITE_UNPREPARED) {
+ ASSERT_TRUE(s.IsMemoryLimit());
+ ASSERT_EQ(2, txn->GetNumPuts());
+ } else {
+ ASSERT_OK(s);
+ ASSERT_EQ(3, txn->GetNumPuts());
+ }
+
+ txn->Rollback();
+ delete txn;
+}
+
+// Verify that the optimization would not compromize the correctness
+TEST_P(TransactionTest, Optimizations) {
+ size_t comb_cnt = size_t(1) << 2; // 2 is number of optimization vars
+ for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) {
+ TransactionDBWriteOptimizations optimizations;
+ optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
+ optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
+
+ ASSERT_OK(ReOpen());
+ WriteOptions write_options;
+ WriteBatch batch;
+ batch.Put(Slice("k"), Slice("v1"));
+ ASSERT_OK(db->Write(write_options, &batch));
+
+ ReadOptions ropt;
+ PinnableSlice pinnable_val;
+ ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val));
+ ASSERT_TRUE(pinnable_val == ("v1"));
+ }
+}
+#endif
+
+} // namespace rocksdb
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr,
+ "SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // ROCKSDB_LITE
1
0