[Commits] 01e20a6d0f1: Range Locking: Release the iterator when taking a lock on a range
revision-id: 01e20a6d0f137ea4e59029699fb426e2b2da2a73 (fb-prod201903-269-g01e20a6d0f1) parent(s): 7da2037ab2d6940ac11d4befcf248fd83ad23213 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-12-09 18:38:27 +0300 message: Range Locking: Release the iterator when taking a lock on a range This will cause the iterator to be refreshed so that it returns the latest committed data, and not the data from a snapshot taken at the point the iterator was created. Iterator::Refresh() is not usable for us because this method is not implemented in rocksdb::BaseDeltaIterator that Transaction::GetIterator() returns. --- .../rocksdb/r/range_locking_refresh_iter.result | 50 ++++++++++++++++ .../rocksdb/t/range_locking_refresh_iter.test | 69 ++++++++++++++++++++++ storage/rocksdb/ha_rocksdb.cc | 21 ++++++- storage/rocksdb/rdb_locking_iter.cc | 1 + storage/rocksdb/rdb_locking_iter.h | 20 ++++--- 5 files changed, 153 insertions(+), 8 deletions(-) diff --git a/mysql-test/suite/rocksdb/r/range_locking_refresh_iter.result b/mysql-test/suite/rocksdb/r/range_locking_refresh_iter.result new file mode 100644 index 00000000000..f96d5f5f45c --- /dev/null +++ b/mysql-test/suite/rocksdb/r/range_locking_refresh_iter.result @@ -0,0 +1,50 @@ +select @@rocksdb_use_range_locking; +@@rocksdb_use_range_locking +1 +set debug_sync='RESET'; +create table ten(a int primary key); +insert into ten values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); +create table one_k(a int primary key); +insert into one_k select A.a + B.a* 10 + C.a * 100 from ten A, ten B, ten C; +create table t1 ( +pk int primary key, +a int +) engine=rocksdb; +insert into t1 select a,a from ten; +insert into t1 select a+40, a+40 from ten; +insert into t1 select a+100, a+100 from one_k; +delete from t1 where pk=44; +set global rocksdb_force_flush_memtable_and_lzero_now=1; +begin; +set debug_sync='rocksdb.check_flags_rmi SIGNAL con1_stopped WAIT_FOR con1_cont'; +update t1 set a=a+100 where pk < 3 or pk between 10 and 50; +set debug_sync='now WAIT_FOR con1_stopped'; +insert into t1 values (44,5000); +delete from t1 where pk= 42; +update t1 set a=5000 where pk between 40 and 45; +set global rocksdb_force_flush_memtable_and_lzero_now=1; +set debug_sync='now SIGNAL con1_cont'; +select * from t1 where pk<100; +pk a +0 100 +1 101 +2 102 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 +40 5100 +41 5100 +43 5100 +44 5100 +45 5100 +46 146 +47 147 +48 148 +49 149 +commit; +set debug_sync='RESET'; +drop table t1, ten, one_k; diff --git a/mysql-test/suite/rocksdb/t/range_locking_refresh_iter.test b/mysql-test/suite/rocksdb/t/range_locking_refresh_iter.test new file mode 100644 index 00000000000..a9e3de29aac --- /dev/null +++ b/mysql-test/suite/rocksdb/t/range_locking_refresh_iter.test @@ -0,0 +1,69 @@ +--source include/have_rocksdb.inc +--source suite/rocksdb/include/have_range_locking.inc + +select @@rocksdb_use_range_locking; + +--disable_warnings +set debug_sync='RESET'; +--enable_warnings +# +# Testcase for iterator snapshot refresh +# +create table ten(a int primary key); +insert into ten values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); + +create table one_k(a int primary key); +insert into one_k select A.a + B.a* 10 + C.a * 100 from ten A, ten B, ten C; + +create table t1 ( + pk int primary key, + a int +) engine=rocksdb; + +insert into t1 select a,a from ten; +insert into t1 select a+40, a+40 from ten; +insert into t1 select a+100, a+100 from one_k; +delete from t1 where pk=44; +set global rocksdb_force_flush_memtable_and_lzero_now=1; + +# Ok, now the table has these PK ranges: +# 0..9 40..49 100...1000 +# and all rows have pk=a +connect (con1,localhost,root,,); +connect (con2,localhost,root,,); + +connection con1; +begin; +set debug_sync='rocksdb.check_flags_rmi SIGNAL con1_stopped WAIT_FOR con1_cont'; +send +update t1 set a=a+100 where pk < 3 or pk between 10 and 50; + +# The query is how stuck at the start of the second range. + + +## con2> +connection con2; +set debug_sync='now WAIT_FOR con1_stopped'; + +# Make some changes to check if the iterator is reading current data or +# snapshot +insert into t1 values (44,5000); +delete from t1 where pk= 42; +update t1 set a=5000 where pk between 40 and 45; +set global rocksdb_force_flush_memtable_and_lzero_now=1; + +set debug_sync='now SIGNAL con1_cont'; + +connection con1; +#--error ER_GET_ERRMSG +reap; +select * from t1 where pk<100; + +commit; +disconnect con1; +disconnect con2; +connection default; +set debug_sync='RESET'; + +drop table t1, ten, one_k; + diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 262af4816f1..2e161dbfb4b 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -8604,6 +8604,25 @@ int ha_rocksdb::set_range_lock(Rdb_transaction *tx, return 0; } + /* + RocksDB's iterator is reading the snapshot of the data that was taken at + the time the iterator was created. + + After we've got a lock on the range, we'll need to refresh the iterator + to read the latest contents. (If we use the iterator created before the + lock_range() call, we may miss the changes that were made/committed after + the iterator was created but before the lock_range() call was made). + + RocksDB has Iterator::Refresh() method, but alas, it is not implemented for + the iterator returned by Transaction object (Transaction object returns + BaseDeltaIterator which allows one to see the transactions's own changes). + + Our solution to this is to release the iterator and create the new one. + We release it here, it will be created as soon as there's a need to read + records. + */ + release_scan_iterator(); + auto s= tx->lock_range(kd.get_cf(), start_endp, end_endp); if (!s.ok()) { return (tx->set_status_error(table->in_use, s, kd, m_tbl_def, @@ -10704,7 +10723,7 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, and re-create Iterator. */ - // psergey-todo: create Locking Iterator here.. + if (m_scan_it_skips_bloom != skip_bloom || use_locking_iterator) { release_scan_iterator(); } diff --git a/storage/rocksdb/rdb_locking_iter.cc b/storage/rocksdb/rdb_locking_iter.cc index bd2ff90fc83..7f5e241d906 100644 --- a/storage/rocksdb/rdb_locking_iter.cc +++ b/storage/rocksdb/rdb_locking_iter.cc @@ -44,6 +44,7 @@ void LockingIterator::SeekForPrev(const rocksdb::Slice& target) { next_key is, we reach it by calling { Seek(current_key); Next(); } */ void LockingIterator::Next() { + DEBUG_SYNC(current_thd, "rocksdb.LockingIterator.Next"); assert(Valid()); // Save the current key value. We need it as the left endpoint // of the range lock we're going to acquire diff --git a/storage/rocksdb/rdb_locking_iter.h b/storage/rocksdb/rdb_locking_iter.h index e2373648d7e..855f3f2cc0a 100644 --- a/storage/rocksdb/rdb_locking_iter.h +++ b/storage/rocksdb/rdb_locking_iter.h @@ -86,7 +86,8 @@ class LockingIterator : public rocksdb::Iterator { } private: - template <bool forward> void Scan(const rocksdb::Slice& target, bool call_next) { + template <bool forward> void Scan(const rocksdb::Slice& target, + bool call_next) { if (!iter_->Valid()) { status_ = iter_->status(); valid_ = false; @@ -101,10 +102,15 @@ class LockingIterator : public rocksdb::Iterator { DEBUG_SYNC(current_thd, "rocksdb.locking_iter_scan"); auto end_key = iter_->key(); bool endp_arg= m_is_rev_cf; - if (forward) - status_ = txn_->GetRangeLock(cfh_, rocksdb::Endpoint(target, endp_arg), rocksdb::Endpoint(end_key, endp_arg)); - else - status_ = txn_->GetRangeLock(cfh_, rocksdb::Endpoint(end_key, endp_arg), rocksdb::Endpoint(target, endp_arg)); + if (forward) { + status_ = txn_->GetRangeLock(cfh_, + rocksdb::Endpoint(target, endp_arg), + rocksdb::Endpoint(end_key, endp_arg)); + } else { + status_ = txn_->GetRangeLock(cfh_, + rocksdb::Endpoint(end_key, endp_arg), + rocksdb::Endpoint(target, endp_arg)); + } if (!status_.ok()) { // Failed to get a lock (most likely lock wait timeout) @@ -138,8 +144,8 @@ class LockingIterator : public rocksdb::Iterator { } if (iter_->Valid()) { - int invert= forward? 1 : -1; - if (cmp->Compare(iter_->key(), rocksdb::Slice(end_key_copy)) * invert <= 0) { + int inv = forward ? 1 : -1; + if (cmp->Compare(iter_->key(), rocksdb::Slice(end_key_copy))*inv <= 0) { // Ok, the found key is within the range. status_ = rocksdb::Status::OK(); valid_= true;
participants (1)
-
Sergei Petrunia