[Commits] cba804d57: MDEV-19986: MyRocks: Range Locking: SeekForUpdate support

revision-id: cba804d57d693e5cf7763d404f29d2483992f96a (v5.8-1043-gcba804d57) parent(s): c91095e08fbdacb69831a93ab403a80487dee435 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-07-08 19:01:56 +0300 message: MDEV-19986: MyRocks: Range Locking: SeekForUpdate support Initial implementation, RocksDB part --- include/rocksdb/utilities/transaction.h | 4 + utilities/transactions/pessimistic_transaction.cc | 172 +++++++++++++++++++++ utilities/transactions/pessimistic_transaction.h | 3 + .../transactions/pessimistic_transaction_db.h | 1 + 4 files changed, 180 insertions(+) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 6b84d4a44..731f76708 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -366,6 +366,10 @@ class Transaction { virtual Iterator* GetIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) = 0; + virtual Iterator* GetLockingIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + return nullptr; + }; // Put, Merge, Delete, and SingleDelete behave similarly to the corresponding // functions in WriteBatch, but will also do conflict checking on the // keys being written. diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 4c0578444..cbac16437 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -56,6 +56,178 @@ PessimisticTransaction::PessimisticTransaction( Initialize(txn_options); } +////////////////////////////////////////////////////////////////////////////// +// Locking iterator +////////////////////////////////////////////////////////////////////////////// + +// +// LockingIterator is an iterator that locks the rows before returning, as well +// as scanned gaps between the rows. +// +// Example: +// lock_iter= trx->GetLockingIterator(); +// lock_iter->Seek('abc'); +// lock_iter->Valid()==true && lock_iter->key() == 'bcd'; +// +// After the above, the returned record 'bcd' is locked by transaction trx. +// Also, the range between ['abc'..'bcd'] is empty and locked by trx. +// +// lock_iter->Next(); +// lock_iter->Valid()==true && lock_iter->key() == 'efg' +// +// Now, the range ['bcd'.. 'efg'] (bounds incluive) is also locked, and there are no +// records between 'bcd' and 'efg'. +// +class LockingIterator : public Iterator { + + ColumnFamilyHandle* cfh_; + PessimisticTransaction *txn_; + Iterator *iter_; + Status status_; + + public: + LockingIterator(Iterator *iter, ColumnFamilyHandle *cfh, + PessimisticTransaction *txn) : + cfh_(cfh), txn_(txn), iter_(iter), status_(Status::InvalidArgument()) {} + + // An iterator is either positioned at a key/value pair, or + // not valid. This method returns true iff the iterator is valid. + // Always returns false if !status().ok(). + virtual bool Valid() const override { return status_.ok(); } + + // Note: MyRocks doesn't ever call these: + virtual void SeekToFirst() override; + virtual void SeekToLast() override { assert(0); } + + virtual void Seek(const Slice& target) override; + + // Position at the last key in the source that at or before target. + // The iterator is Valid() after this call iff the source contains + // an entry that comes at or before target. + virtual void SeekForPrev(const Slice& target) { assert(0); } + + virtual void Next() override; + virtual void Prev() override { + assert(0); // TODO: implement this + } + + virtual Slice key() const override { + assert(Valid()); + return iter_->key(); + } + + virtual Slice value() const override { + assert(Valid()); + return iter_->value(); + } + + virtual Status status() const override { + return status_; + } + +private: + void ScanForward(const Slice& target, bool call_next); +}; + + +Iterator* PessimisticTransaction::GetLockingIterator( + const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + + if (!txn_db_impl_->UsesRangeLocking()) { + return nullptr; + } + auto iter= GetIterator(read_options, column_family); + + if (iter) + return new LockingIterator(iter, column_family, this); + else + return nullptr; +} + + +void LockingIterator::Seek(const Slice& target) { + iter_->Seek(target); + ScanForward(target, false); +} + +/* + Lock from the current position up to the next key + (This is basically like Seek(right_after(current_position)) +*/ +void LockingIterator::Next() { + assert(Valid()); + // Save the current key value. We need it as the left endpoint + // of the range lock we're going to acquire + std::string current_key = iter_->key().ToString(); + + iter_->Next(); + ScanForward(Slice(current_key), true); +} + +// Note: MyRocks never uses this as call as it has index_nr as prefix for all +// keys. +void LockingIterator::SeekToFirst() { + + iter_->SeekToFirst(); + if (!iter_->Valid()) { + status_ = iter_->status(); + return; + } + + std::string current_key = iter_->key().ToString(); + ScanForward(Slice(current_key), true); +} + +void LockingIterator::ScanForward(const Slice& target, bool call_next) { + + if (!iter_->Valid()) { + status_ = iter_->status(); + return; + } + + while (1) { + /* + TODO: the underlying iterator respects iterator bounds, so we don't need + to check them here + */ + auto end_key = iter_->key(); + status_ = txn_->GetRangeLock(cfh_, Endpoint(target), Endpoint(end_key)); + if (!status_.ok()) { + // Failed to get a lock (most likely lock wait timeout) + return; + } + + //Ok, now we have a lock which is inhibiting modifications in the range + // Somebody might have done external modifications, though: + // - removed the key we've found + // - added a key before that key. + iter_->Seek(target); + if (call_next && iter_->Valid()) + iter_->Next(); + + if (iter_->Valid()) { + if (cfh_->GetComparator()->Compare(iter_->key(), end_key) <= 0) { + // Ok, the key is within the range. + status_ = Status::OK(); + break; + } else { + // We've got a row but it is outside the range we've locked. + // Re-try the lock-and-read step. + continue; + } + } else { + // There's no row (within the iterator bounds perhaps). Exit now. + // (we might already have locked a range in this function but there's + // nothing we can do about it) + status_ = iter_->status(); + break; + } + } +} + +///////////////////////////////////////////////////////////////////////////// + void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { txn_id_ = GenTxnID(); diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 9fb44c74e..4fb5f63e0 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -60,6 +60,9 @@ class PessimisticTransaction : public TransactionBaseImpl { Status SetName(const TransactionName& name) override; + virtual Iterator* GetLockingIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) override; + // Generate a new unique transaction identifier static TransactionID GenTxnID(); diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index e88856342..2646228f5 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -129,6 +129,7 @@ class PessimisticTransactionDB : public TransactionDB { // Key Tracking should be done only with point lock manager. bool ShouldDoKeyTracking() const { return range_lock_mgr_ == nullptr; } + bool UsesRangeLocking() const { return range_lock_mgr_ != nullptr; } protected: DBImpl* db_impl_; std::shared_ptr<Logger> info_log_;
participants (1)
-
Sergei Petrunia