revision-id: 840f2d390209dd8ae553171c02909230ac15f03c (v5.8-3048-g840f2d390) parent(s): 00751e4292e55c1604b28b7b93fe7a538fa05f29 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2020-11-17 00:06:38 +0300 message: Range Locking - the interface part Definitions and changes in the generic code (RangeTree-based implementation is itself not included). - DeadlockInfoBuffer has been changed into a template (as it is reused by RangeDeadlockInfo) - PointLockManagerTest is split into AnyLockManagerTest (has tests which can be used for any lock manager) and PointLockManagerTest (tests for this specific lock manager) - Added #ifndef ROCKSDB_LITE: as LockTracker now depends on Endpoint which is defined in transaction.h under #ifndef ROCKSDB_LITE --- include/rocksdb/utilities/transaction.h | 82 +++++- include/rocksdb/utilities/transaction_db.h | 100 +++++++- utilities/transactions/lock/lock_manager.cc | 14 +- utilities/transactions/lock/lock_manager.h | 4 +- utilities/transactions/lock/lock_tracker.h | 10 +- .../transactions/lock/point/point_lock_manager.cc | 58 ----- .../transactions/lock/point/point_lock_manager.h | 76 +++++- .../lock/point/point_lock_manager_test.cc | 245 ++---------------- .../lock/point/point_lock_manager_test.h | 276 +++++++++++++++++++++ .../transactions/lock/point/point_lock_tracker.cc | 4 + .../transactions/lock/point/point_lock_tracker.h | 2 + .../transactions/lock/range/range_lock_manager.h | 30 +++ utilities/transactions/pessimistic_transaction.cc | 48 +++- utilities/transactions/pessimistic_transaction.h | 4 + .../transactions/pessimistic_transaction_db.cc | 8 + .../transactions/pessimistic_transaction_db.h | 5 +- 16 files changed, 656 insertions(+), 310 deletions(-) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index b553100f3..2f4804255 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -24,9 +24,81 @@ using TransactionName = std::string; using TransactionID = uint64_t; -// An endpoint for a range of keys. +/* + class Endpoint allows to define prefix ranges. + + Prefix ranges are introduced below. + + == Basic Ranges == + Let's start from basic ranges. Key Comparator defines ordering of rowkeys. + Then, one can specify finite closed ranges by just providing rowkeys of their + endpoints: + + lower_endpoint <= X <= upper_endpoint + + However our goal is to provide a richer set of endpoints. Read on. + + == Lexicographic ordering == + A lexicographic (or dictionary) ordering satisfies these criteria: If there + are two keys in form + key_a = {prefix_a, suffix_a} + key_b = {prefix_b, suffix_b} + and + prefix_a < prefix_b + then + key_a < key_b. + + == Prefix ranges == + With lexicographic ordering, one may want to define ranges in form + + "prefix is $PREFIX" + + which translates to a range in form + + {$PREFIX, -infinity} < X < {$PREFIX, +infinity} + + where -infinity will compare less than any possible suffix, and +infinity + will compare as greater than any possible suffix. + + class Endpoint allows to define these kind of rangtes. + + == Notes == + BytewiseComparator and ReverseBytewiseComparator produce lexicographic + ordering. + + The row comparison function is able to compare key prefixes. If the data + domain includes keys A and B, then the comparison function is able to compare + equal-length prefixes: + + min_len= min(byte_length(A), byte_length(B)); + cmp(Slice(A, min_len), Slice(B, min_len)); // this call is valid + + == Other options == + As far as MyRocks is concerned, the alternative to prefix ranges would be to + support both open (non-inclusive) and closed (inclusive) range endpoints. +*/ + class Endpoint { - // TODO + public: + Slice slice; + + /* + true : the key has a "+infinity" suffix. A suffix that would compare as + greater than any other suffix + false : otherwise + */ + bool inf_suffix; + + Endpoint(const Slice& slice_arg, bool inf_suffix_arg = false) + : slice(slice_arg), inf_suffix(inf_suffix_arg) {} + + Endpoint(const char* s, bool inf_suffix_arg = false) + : slice(s), inf_suffix(inf_suffix_arg) {} + + Endpoint(const char* s, size_t size, bool inf_suffix_arg = false) + : slice(s, size), inf_suffix(inf_suffix_arg) {} + + Endpoint() : inf_suffix(false) {} }; // Provides notification to the caller of SetSnapshotOnNextOperation when @@ -282,6 +354,12 @@ class Transaction { } } + // Get a range lock on [start_endpoint; end_endpoint]. + virtual Status GetRangeLock(ColumnFamilyHandle*, const Endpoint&, + const Endpoint&) { + return Status::NotSupported(); + } + virtual Status GetForUpdate(const ReadOptions& options, const Slice& key, std::string* value, bool exclusive = true, const bool do_validate = true) = 0; diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 2e1a0a171..b9daf84a3 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -31,6 +31,98 @@ enum TxnDBWritePolicy { const uint32_t kInitialMaxDeadlocks = 5; +class LockManager; +struct RangeLockInfo; + +// A lock manager handle +// The workflow is as follows: +// * Use a factory method (like NewRangeLockManager()) to create a lock +// manager and get its handle. +// * A Handle for a particular kind of lock manager will have extra +// methods and parameters to control the lock manager +// * Pass the handle to RocksDB in TransactionDBOptions::lock_mgr_handle. It +// will be used to perform locking. +class LockManagerHandle { + public: + // PessimisticTransactionDB will call this to get the Lock Manager it's going + // to use. + virtual LockManager* getLockManager() = 0; + + virtual ~LockManagerHandle() {} +}; + +// Same as class Endpoint, but use std::string to manage the buffer allocation +struct EndpointWithString { + std::string slice; + bool inf_suffix; +}; + +struct RangeDeadlockInfo { + TransactionID m_txn_id; + uint32_t m_cf_id; + bool m_exclusive; + + EndpointWithString m_start; + EndpointWithString m_end; +}; + +struct RangeDeadlockPath { + std::vector<RangeDeadlockInfo> path; + bool limit_exceeded; + int64_t deadlock_time; + + explicit RangeDeadlockPath(std::vector<RangeDeadlockInfo> path_entry, + const int64_t& dl_time) + : path(path_entry), limit_exceeded(false), deadlock_time(dl_time) {} + + // empty path, limit exceeded constructor and default constructor + explicit RangeDeadlockPath(const int64_t& dl_time = 0, bool limit = false) + : path(0), limit_exceeded(limit), deadlock_time(dl_time) {} + + bool empty() { return path.empty() && !limit_exceeded; } +}; + +// A handle to control RangeLockManager (Range-based lock manager) from outside +// RocksDB +class RangeLockManagerHandle : public LockManagerHandle { + public: + // Total amount of lock memory to use (per column family) + virtual int SetMaxLockMemory(size_t max_lock_memory) = 0; + virtual size_t GetMaxLockMemory() = 0; + + using RangeLockStatus = + std::unordered_multimap<ColumnFamilyId, RangeLockInfo>; + + virtual RangeLockStatus GetRangeLockStatusData() = 0; + + class Counters { + public: + // Number of times lock escalation was triggered (for all column families) + uint64_t escalation_count; + + // How much memory is currently used for locks (total for all column + // families) + uint64_t current_lock_memory; + }; + + // Get the current counter values + virtual Counters GetStatus() = 0; + + // Functions for range-based Deadlock reporting. + virtual std::vector<RangeDeadlockPath> GetRangeDeadlockInfoBuffer() = 0; + virtual void SetRangeDeadlockInfoBufferSize(uint32_t target_size) = 0; + + virtual ~RangeLockManagerHandle(){}; +}; + +// A factory function to create a Range Lock Manager. The created object should +// be: +// 1. Passed in TransactionDBOptions::lock_mgr_handle to open the database in +// range-locking mode +// 2. Used to control the lock manager when the DB is already open. +RangeLockManagerHandle* NewRangeLockManager( + std::shared_ptr<TransactionDBMutexFactory> mutex_factory); + struct TransactionDBOptions { // Specifies the maximum number of keys that can be locked at the same time // per column family. @@ -92,6 +184,10 @@ struct TransactionDBOptions { // for the special way that myrocks uses this operands. bool rollback_merge_operands = false; + // nullptr means use default lock manager. + // Other value means the user provides a custom lock manager. + std::shared_ptr<LockManagerHandle> lock_mgr_handle; + // If true, the TransactionDB implementation might skip concurrency control // unless it is overridden by TransactionOptions or // TransactionDBWriteOptimizations. This can be used in conjuction with @@ -203,8 +299,8 @@ struct KeyLockInfo { }; struct RangeLockInfo { - Endpoint start; - Endpoint end; + EndpointWithString start; + EndpointWithString end; std::vector<TransactionID> ids; bool exclusive; }; diff --git a/utilities/transactions/lock/lock_manager.cc b/utilities/transactions/lock/lock_manager.cc index 200b15390..df16b32ad 100644 --- a/utilities/transactions/lock/lock_manager.cc +++ b/utilities/transactions/lock/lock_manager.cc @@ -11,11 +11,17 @@ namespace ROCKSDB_NAMESPACE { -LockManager* NewLockManager(PessimisticTransactionDB* db, - const TransactionDBOptions& opt) { +std::shared_ptr<LockManager> NewLockManager(PessimisticTransactionDB* db, + const TransactionDBOptions& opt) { assert(db); - // TODO: determine the lock manager implementation based on configuration. - return new PointLockManager(db, opt); + if (opt.lock_mgr_handle) { + // A custom lock manager was provided in options + auto mgr = opt.lock_mgr_handle->getLockManager(); + return std::shared_ptr<LockManager>(opt.lock_mgr_handle, mgr); + } else { + // Use a point lock manager by default + return std::shared_ptr<LockManager>(new PointLockManager(db, opt)); + } } } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/lock/lock_manager.h b/utilities/transactions/lock/lock_manager.h index 32b3f9473..a5ce1948c 100644 --- a/utilities/transactions/lock/lock_manager.h +++ b/utilities/transactions/lock/lock_manager.h @@ -74,8 +74,8 @@ class LockManager { // LockManager should always be constructed through this factory method, // instead of constructing through concrete implementations' constructor. // Caller owns the returned pointer. -LockManager* NewLockManager(PessimisticTransactionDB* db, - const TransactionDBOptions& opt); +std::shared_ptr<LockManager> NewLockManager(PessimisticTransactionDB* db, + const TransactionDBOptions& opt); } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/lock/lock_tracker.h b/utilities/transactions/lock/lock_tracker.h index 0d3abded7..5fa228a82 100644 --- a/utilities/transactions/lock/lock_tracker.h +++ b/utilities/transactions/lock/lock_tracker.h @@ -4,12 +4,14 @@ // (found in the LICENSE.Apache file in the root directory). #pragma once +#ifndef ROCKSDB_LITE #include <memory> #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/status.h" #include "rocksdb/types.h" +#include "rocksdb/utilities/transaction_db.h" namespace ROCKSDB_NAMESPACE { @@ -29,7 +31,12 @@ struct PointLockRequest { // Request for locking a range of keys. struct RangeLockRequest { - // TODO + // The id of the key's column family. + ColumnFamilyId column_family_id; + + // The range to be locked + Endpoint start_endp; + Endpoint end_endp; }; struct PointLockStatus { @@ -199,3 +206,4 @@ class LockTrackerFactory { }; } // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/point/point_lock_manager.cc b/utilities/transactions/lock/point/point_lock_manager.cc index 0ca6e38f0..0bb7e8a40 100644 --- a/utilities/transactions/lock/point/point_lock_manager.cc +++ b/utilities/transactions/lock/point/point_lock_manager.cc @@ -94,64 +94,6 @@ struct LockMap { size_t GetStripe(const std::string& key) const; }; -void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) { - std::lock_guard<std::mutex> lock(paths_buffer_mutex_); - - if (paths_buffer_.empty()) { - return; - } - - paths_buffer_[buffer_idx_] = std::move(path); - buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); -} - -void DeadlockInfoBuffer::Resize(uint32_t target_size) { - std::lock_guard<std::mutex> lock(paths_buffer_mutex_); - - paths_buffer_ = Normalize(); - - // Drop the deadlocks that will no longer be needed ater the normalize - if (target_size < paths_buffer_.size()) { - paths_buffer_.erase( - paths_buffer_.begin(), - paths_buffer_.begin() + (paths_buffer_.size() - target_size)); - buffer_idx_ = 0; - } - // Resize the buffer to the target size and restore the buffer's idx - else { - auto prev_size = paths_buffer_.size(); - paths_buffer_.resize(target_size); - buffer_idx_ = (uint32_t)prev_size; - } -} - -std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() { - auto working = paths_buffer_; - - if (working.empty()) { - return working; - } - - // Next write occurs at a nonexistent path's slot - if (paths_buffer_[buffer_idx_].empty()) { - working.resize(buffer_idx_); - } else { - std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); - } - - return working; -} - -std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() { - std::lock_guard<std::mutex> lock(paths_buffer_mutex_); - - // Reversing the normalized vector returns the latest deadlocks first - auto working = Normalize(); - std::reverse(working.begin(), working.end()); - - return working; -} - namespace { void UnrefLockMapsCache(void* ptr) { // Called when a thread exits or a ThreadLocalPtr gets destroyed. diff --git a/utilities/transactions/lock/point/point_lock_manager.h b/utilities/transactions/lock/point/point_lock_manager.h index b22884424..3c541eb3a 100644 --- a/utilities/transactions/lock/point/point_lock_manager.h +++ b/utilities/transactions/lock/point/point_lock_manager.h @@ -27,21 +27,79 @@ struct LockInfo; struct LockMap; struct LockMapStripe; -struct DeadlockInfoBuffer { +template <class Path> +class DeadlockInfoBufferTempl { private: - std::vector<DeadlockPath> paths_buffer_; + std::vector<Path> paths_buffer_; uint32_t buffer_idx_; std::mutex paths_buffer_mutex_; - std::vector<DeadlockPath> Normalize(); + + std::vector<Path> Normalize() { + auto working = paths_buffer_; + + if (working.empty()) { + return working; + } + + // Next write occurs at a nonexistent path's slot + if (paths_buffer_[buffer_idx_].empty()) { + working.resize(buffer_idx_); + } else { + std::rotate(working.begin(), working.begin() + buffer_idx_, + working.end()); + } + + return working; + } public: - explicit DeadlockInfoBuffer(uint32_t n_latest_dlocks) + explicit DeadlockInfoBufferTempl(uint32_t n_latest_dlocks) : paths_buffer_(n_latest_dlocks), buffer_idx_(0) {} - void AddNewPath(DeadlockPath path); - void Resize(uint32_t target_size); - std::vector<DeadlockPath> PrepareBuffer(); + + void AddNewPath(Path path) { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + if (paths_buffer_.empty()) { + return; + } + + paths_buffer_[buffer_idx_] = std::move(path); + buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); + } + + void Resize(uint32_t target_size) { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + paths_buffer_ = Normalize(); + + // Drop the deadlocks that will no longer be needed ater the normalize + if (target_size < paths_buffer_.size()) { + paths_buffer_.erase( + paths_buffer_.begin(), + paths_buffer_.begin() + (paths_buffer_.size() - target_size)); + buffer_idx_ = 0; + } + // Resize the buffer to the target size and restore the buffer's idx + else { + auto prev_size = paths_buffer_.size(); + paths_buffer_.resize(target_size); + buffer_idx_ = (uint32_t)prev_size; + } + } + + std::vector<Path> PrepareBuffer() { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + // Reversing the normalized vector returns the latest deadlocks first + auto working = Normalize(); + std::reverse(working.begin(), working.end()); + + return working; + } }; +typedef DeadlockInfoBufferTempl<DeadlockPath> DeadlockInfoBuffer; + struct TrackedTrxInfo { autovector<TransactionID> m_neighbors; uint32_t m_cf_id; @@ -67,7 +125,11 @@ class PointLockManager : public LockManager { return PointLockTrackerFactory::Get(); } + // Creates a new LockMap for this column family. Caller should guarantee + // that this column family does not already exist. void AddColumnFamily(const ColumnFamilyHandle* cf) override; + // Deletes the LockMap for this column family. Caller should guarantee that + // this column family is no longer in use. void RemoveColumnFamily(const ColumnFamilyHandle* cf) override; Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, diff --git a/utilities/transactions/lock/point/point_lock_manager_test.cc b/utilities/transactions/lock/point/point_lock_manager_test.cc index 211f17b99..206a0395a 100644 --- a/utilities/transactions/lock/point/point_lock_manager_test.cc +++ b/utilities/transactions/lock/point/point_lock_manager_test.cc @@ -5,80 +5,12 @@ #ifndef ROCKSDB_LITE -#include "utilities/transactions/lock/point/point_lock_manager.h" - -#include "file/file_util.h" -#include "port/port.h" -#include "port/stack_trace.h" -#include "rocksdb/utilities/transaction_db.h" -#include "test_util/testharness.h" -#include "test_util/testutil.h" -#include "utilities/transactions/pessimistic_transaction_db.h" -#include "utilities/transactions/transaction_db_mutex_impl.h" +#include "point_lock_manager_test.h" namespace ROCKSDB_NAMESPACE { -class MockColumnFamilyHandle : public ColumnFamilyHandle { - public: - explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {} - - ~MockColumnFamilyHandle() override {} - - const std::string& GetName() const override { return name_; } - - ColumnFamilyId GetID() const override { return cf_id_; } - - Status GetDescriptor(ColumnFamilyDescriptor*) override { - return Status::OK(); - } - - const Comparator* GetComparator() const override { return nullptr; } - - private: - ColumnFamilyId cf_id_; - std::string name_ = "MockCF"; -}; - -class PointLockManagerTest : public testing::Test { - public: - void SetUp() override { - env_ = Env::Default(); - db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); - ASSERT_OK(env_->CreateDir(db_dir_)); - mutex_factory_ = std::make_shared<TransactionDBMutexFactoryImpl>(); - - Options opt; - opt.create_if_missing = true; - TransactionDBOptions txn_opt; - txn_opt.transaction_lock_timeout = 0; - txn_opt.custom_mutex_factory = mutex_factory_; - ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_)); - - locker_.reset(new PointLockManager( - static_cast<PessimisticTransactionDB*>(db_), txn_opt)); - } - - void TearDown() override { - delete db_; - EXPECT_OK(DestroyDir(env_, db_dir_)); - } - - PessimisticTransaction* NewTxn( - TransactionOptions txn_opt = TransactionOptions()) { - Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt); - return reinterpret_cast<PessimisticTransaction*>(txn); - } - - protected: - Env* env_; - std::unique_ptr<PointLockManager> locker_; - - private: - std::string db_dir_; - std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; - TransactionDB* db_; -}; - +// This test is not applicable for Range Lock manager as Range Lock Manager +// operates on Column Families, not their ids. TEST_F(PointLockManagerTest, LockNonExistingColumnFamily) { MockColumnFamilyHandle cf(1024); locker_->RemoveColumnFamily(&cf); @@ -121,6 +53,12 @@ TEST_F(PointLockManagerTest, LockStatus) { } } + // Cleanup + locker_->UnLock(txn1, 1024, "k1", env_); + locker_->UnLock(txn1, 2048, "k1", env_); + locker_->UnLock(txn2, 1024, "k2", env_); + locker_->UnLock(txn2, 2048, "k2", env_); + delete txn1; delete txn2; } @@ -136,6 +74,9 @@ TEST_F(PointLockManagerTest, UnlockExclusive) { auto txn2 = NewTxn(); ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true)); + // Cleanup + locker_->UnLock(txn2, 1, "k", env_); + delete txn1; delete txn2; } @@ -151,162 +92,15 @@ TEST_F(PointLockManagerTest, UnlockShared) { auto txn2 = NewTxn(); ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, true)); - delete txn1; - delete txn2; -} - -TEST_F(PointLockManagerTest, ReentrantExclusiveLock) { - // Tests that a txn can acquire exclusive lock on the same key repeatedly. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - delete txn; -} - -TEST_F(PointLockManagerTest, ReentrantSharedLock) { - // Tests that a txn can acquire shared lock on the same key repeatedly. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - delete txn; -} - -TEST_F(PointLockManagerTest, LockUpgrade) { - // Tests that a txn can upgrade from a shared lock to an exclusive lock. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - delete txn; -} - -TEST_F(PointLockManagerTest, LockDowngrade) { - // Tests that a txn can acquire a shared lock after acquiring an exclusive - // lock on the same key. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - delete txn; -} - -TEST_F(PointLockManagerTest, LockConflict) { - // Tests that lock conflicts lead to lock timeout. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn1 = NewTxn(); - auto txn2 = NewTxn(); - - { - // exclusive-exclusive conflict. - ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); - auto s = locker_->TryLock(txn2, 1, "k1", env_, true); - ASSERT_TRUE(s.IsTimedOut()); - } - - { - // exclusive-shared conflict. - ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true)); - auto s = locker_->TryLock(txn2, 1, "k2", env_, false); - ASSERT_TRUE(s.IsTimedOut()); - } - - { - // shared-exclusive conflict. - ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false)); - auto s = locker_->TryLock(txn2, 1, "k2", env_, true); - ASSERT_TRUE(s.IsTimedOut()); - } - - delete txn1; - delete txn2; -} - -port::Thread BlockUntilWaitingTxn(std::function<void()> f) { - std::atomic<bool> reached(false); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "PointLockManager::AcquireWithTimeout:WaitingTxn", - [&](void* /*arg*/) { reached.store(true); }); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + // Cleanup + locker_->UnLock(txn2, 1, "k", env_); - port::Thread t(f); - - while (!reached.load()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); - - return t; -} - -TEST_F(PointLockManagerTest, SharedLocks) { - // Tests that shared locks can be concurrently held by multiple transactions. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn1 = NewTxn(); - auto txn2 = NewTxn(); - ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); - ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); delete txn1; delete txn2; } -TEST_F(PointLockManagerTest, Deadlock) { - // Tests that deadlock can be detected. - // Deadlock scenario: - // txn1 exclusively locks k1, and wants to lock k2; - // txn2 exclusively locks k2, and wants to lock k1. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - TransactionOptions txn_opt; - txn_opt.deadlock_detect = true; - txn_opt.lock_timeout = 1000000; - auto txn1 = NewTxn(txn_opt); - auto txn2 = NewTxn(txn_opt); - - ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); - ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); - - // txn1 tries to lock k2, will block forever. - port::Thread t = BlockUntilWaitingTxn([&]() { - // block because txn2 is holding a lock on k2. - locker_->TryLock(txn1, 1, "k2", env_, true); - }); - - auto s = locker_->TryLock(txn2, 1, "k1", env_, true); - ASSERT_TRUE(s.IsBusy()); - ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock); - - std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer(); - ASSERT_EQ(deadlock_paths.size(), 1u); - ASSERT_FALSE(deadlock_paths[0].limit_exceeded); - - std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path; - ASSERT_EQ(deadlocks.size(), 2u); - - ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID()); - ASSERT_EQ(deadlocks[0].m_cf_id, 1u); - ASSERT_TRUE(deadlocks[0].m_exclusive); - ASSERT_EQ(deadlocks[0].m_waiting_key, "k2"); - - ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID()); - ASSERT_EQ(deadlocks[1].m_cf_id, 1u); - ASSERT_TRUE(deadlocks[1].m_exclusive); - ASSERT_EQ(deadlocks[1].m_waiting_key, "k1"); - - locker_->UnLock(txn2, 1, "k2", env_); - t.join(); - - delete txn2; - delete txn1; -} +// This test doesn't work with Range Lock Manager, because Range Lock Manager +// doesn't support deadlock_detect_depth. TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { // Tests that when detecting deadlock, if the detection depth is exceeded, @@ -332,7 +126,7 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { // it must have another txn waiting on it, which is txn4 in this case. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); - port::Thread t1 = BlockUntilWaitingTxn([&]() { + port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); // block because txn1 is holding a lock on k1. locker_->TryLock(txn2, 1, "k1", env_, true); @@ -340,7 +134,7 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { ASSERT_OK(locker_->TryLock(txn3, 1, "k3", env_, true)); - port::Thread t2 = BlockUntilWaitingTxn([&]() { + port::Thread t2 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { // block because txn3 is holding a lock on k1. locker_->TryLock(txn4, 1, "k3", env_, true); }); @@ -364,6 +158,9 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { delete txn1; } +INSTANTIATE_TEST_CASE_P(PointLockManager, AnyLockManagerTest, + ::testing::Values(nullptr)); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/transactions/lock/point/point_lock_manager_test.h b/utilities/transactions/lock/point/point_lock_manager_test.h new file mode 100644 index 000000000..63c580501 --- /dev/null +++ b/utilities/transactions/lock/point/point_lock_manager_test.h @@ -0,0 +1,276 @@ + +#include "file/file_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "utilities/transactions/lock/point/point_lock_manager.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +class MockColumnFamilyHandle : public ColumnFamilyHandle { + public: + explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {} + + ~MockColumnFamilyHandle() override {} + + const std::string& GetName() const override { return name_; } + + ColumnFamilyId GetID() const override { return cf_id_; } + + Status GetDescriptor(ColumnFamilyDescriptor*) override { + return Status::OK(); + } + + const Comparator* GetComparator() const override { + return BytewiseComparator(); + } + + private: + ColumnFamilyId cf_id_; + std::string name_ = "MockCF"; +}; + +class PointLockManagerTest : public testing::Test { + public: + void SetUp() override { + env_ = Env::Default(); + db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); + ASSERT_OK(env_->CreateDir(db_dir_)); + + Options opt; + opt.create_if_missing = true; + TransactionDBOptions txn_opt; + txn_opt.transaction_lock_timeout = 0; + + ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_)); + + // CAUTION: This test creates a separate lock manager object (right, NOT + // the one that the TransactionDB is using!), and runs tests on it. + locker_.reset(new PointLockManager( + static_cast<PessimisticTransactionDB*>(db_), txn_opt)); + + wait_sync_point_name_ = "PointLockManager::AcquireWithTimeout:WaitingTxn"; + } + + void TearDown() override { + delete db_; + EXPECT_OK(DestroyDir(env_, db_dir_)); + } + + PessimisticTransaction* NewTxn( + TransactionOptions txn_opt = TransactionOptions()) { + Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt); + return reinterpret_cast<PessimisticTransaction*>(txn); + } + + protected: + Env* env_; + std::shared_ptr<LockManager> locker_; + const char* wait_sync_point_name_; + friend void PointLockManagerTestExternalSetup(PointLockManagerTest*); + + private: + std::string db_dir_; + TransactionDB* db_; +}; + +typedef void (*init_func_t)(PointLockManagerTest*); + +class AnyLockManagerTest : public PointLockManagerTest, + public testing::WithParamInterface<init_func_t> { + public: + void SetUp() override { + // If a custom setup function was provided, use it. Otherwise, use what we + // have inherited. + auto init_func = GetParam(); + if (init_func) + (*init_func)(this); + else + PointLockManagerTest::SetUp(); + } +}; + +TEST_P(AnyLockManagerTest, ReentrantExclusiveLock) { + // Tests that a txn can acquire exclusive lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, ReentrantSharedLock) { + // Tests that a txn can acquire shared lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, LockUpgrade) { + // Tests that a txn can upgrade from a shared lock to an exclusive lock. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockDowngrade) { + // Tests that a txn can acquire a shared lock after acquiring an exclusive + // lock on the same key. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockConflict) { + // Tests that lock conflicts lead to lock timeout. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + + { + // exclusive-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // exclusive-shared conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, false); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // shared-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + + delete txn1; + delete txn2; +} + +port::Thread BlockUntilWaitingTxn(const char* sync_point_name, + std::function<void()> f) { + std::atomic<bool> reached(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + sync_point_name, [&](void* /*arg*/) { reached.store(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread t(f); + + while (!reached.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + return t; +} + +TEST_P(AnyLockManagerTest, SharedLocks) { + // Tests that shared locks can be concurrently held by multiple transactions. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn1, 1, "k", env_); + locker_->UnLock(txn2, 1, "k", env_); + + delete txn1; + delete txn2; +} + +TEST_P(AnyLockManagerTest, Deadlock) { + // Tests that deadlock can be detected. + // Deadlock scenario: + // txn1 exclusively locks k1, and wants to lock k2; + // txn2 exclusively locks k2, and wants to lock k1. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + TransactionOptions txn_opt; + txn_opt.deadlock_detect = true; + txn_opt.lock_timeout = 1000000; + auto txn1 = NewTxn(txn_opt); + auto txn2 = NewTxn(txn_opt); + + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); + + // txn1 tries to lock k2, will block forever. + port::Thread t = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + // block because txn2 is holding a lock on k2. + locker_->TryLock(txn1, 1, "k2", env_, true); + }); + + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsBusy()); + ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock); + + std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer(); + ASSERT_EQ(deadlock_paths.size(), 1u); + ASSERT_FALSE(deadlock_paths[0].limit_exceeded); + + std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path; + ASSERT_EQ(deadlocks.size(), 2u); + + ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID()); + ASSERT_EQ(deadlocks[0].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[0].m_exclusive); + ASSERT_EQ(deadlocks[0].m_waiting_key, "k2"); + + ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID()); + ASSERT_EQ(deadlocks[1].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[1].m_exclusive); + ASSERT_EQ(deadlocks[1].m_waiting_key, "k1"); + + locker_->UnLock(txn2, 1, "k2", env_); + t.join(); + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + delete txn2; + delete txn1; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/lock/point/point_lock_tracker.cc b/utilities/transactions/lock/point/point_lock_tracker.cc index 22eb6a0b8..837f377de 100644 --- a/utilities/transactions/lock/point/point_lock_tracker.cc +++ b/utilities/transactions/lock/point/point_lock_tracker.cc @@ -3,6 +3,8 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + #include "utilities/transactions/lock/point/point_lock_tracker.h" namespace ROCKSDB_NAMESPACE { @@ -264,3 +266,5 @@ LockTracker::KeyIterator* PointLockTracker::GetKeyIterator( void PointLockTracker::Clear() { tracked_keys_.clear(); } } // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/point/point_lock_tracker.h b/utilities/transactions/lock/point/point_lock_tracker.h index 57e1b8437..daf6f9aa2 100644 --- a/utilities/transactions/lock/point/point_lock_tracker.h +++ b/utilities/transactions/lock/point/point_lock_tracker.h @@ -4,6 +4,7 @@ // (found in the LICENSE.Apache file in the root directory). #pragma once +#ifndef ROCKSDB_LITE #include <memory> #include <string> @@ -95,3 +96,4 @@ class PointLockTrackerFactory : public LockTrackerFactory { }; } // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_lock_manager.h b/utilities/transactions/lock/range/range_lock_manager.h new file mode 100644 index 000000000..91619934b --- /dev/null +++ b/utilities/transactions/lock/range/range_lock_manager.h @@ -0,0 +1,30 @@ +// +// Generic definitions for a Range-based Lock Manager +// +#pragma once +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/lock/lock_manager.h" + +namespace ROCKSDB_NAMESPACE { + +/* + A base class for all Range-based lock managers + + See also class RangeLockManagerHandle in + include/rocksdb/utilities/transaction_db.h +*/ +class RangeLockManagerBase : public LockManager { + public: + // Geting a point lock is reduced to getting a range lock on a single-point + // range + using LockManager::TryLock; + Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const std::string& key, Env* env, bool exclusive) override { + Endpoint endp(key.data(), key.size(), false); + return TryLock(txn, column_family_id, endp, endp, env, exclusive); + } +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 6531773ec..e8a8a6f0d 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -558,9 +558,20 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, } uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); - PointLockStatus status = tracked_locks_->GetPointLockStatus(cfh_id, key_str); - bool previously_locked = status.locked; - bool lock_upgrade = previously_locked && exclusive && !status.exclusive; + + PointLockStatus status; + bool lock_upgrade; + bool previously_locked; + if (tracked_locks_->IsPointLockSupported()) { + status = tracked_locks_->GetPointLockStatus(cfh_id, key_str); + previously_locked = status.locked; + lock_upgrade = previously_locked && exclusive && !status.exclusive; + } else { + // If the record is tracked, we can assume it was locked, too. + previously_locked = assume_tracked; + status.locked = false; + lock_upgrade = false; + } // Lock this key if this transactions hasn't already locked it or we require // an upgrade. @@ -579,7 +590,8 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, SequenceNumber tracked_at_seq = status.locked ? status.seq : kMaxSequenceNumber; if (!do_validate || snapshot_ == nullptr) { - if (assume_tracked && !previously_locked) { + if (assume_tracked && !previously_locked && + tracked_locks_->IsPointLockSupported()) { s = Status::InvalidArgument( "assume_tracked is set but it is not tracked yet"); } @@ -636,11 +648,13 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive); } else { #ifndef NDEBUG - PointLockStatus lock_status = - tracked_locks_->GetPointLockStatus(cfh_id, key_str); - assert(lock_status.locked); - assert(lock_status.seq <= tracked_at_seq); - assert(lock_status.exclusive == exclusive); + if (tracked_locks_->IsPointLockSupported()) { + PointLockStatus lock_status = + tracked_locks_->GetPointLockStatus(cfh_id, key_str); + assert(lock_status.locked); + assert(lock_status.seq <= tracked_at_seq); + assert(lock_status.exclusive == exclusive); + } #endif } } @@ -648,6 +662,22 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, return s; } +Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family, + const Endpoint& start_endp, + const Endpoint& end_endp) { + ColumnFamilyHandle* cfh = + column_family ? column_family : db_impl_->DefaultColumnFamily(); + uint32_t cfh_id = GetColumnFamilyID(cfh); + + Status s = txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp); + + if (s.ok()) { + RangeLockRequest req{cfh_id, start_endp, end_endp}; + tracked_locks_->Track(req); + } + return s; +} + // Return OK() if this key has not been modified more recently than the // transaction snapshot_. // tracked_at_seq is the global seq at which we either locked the key or already diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 98eea7e2d..6c5754ac6 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -116,6 +116,10 @@ class PessimisticTransaction : public TransactionBaseImpl { int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } + virtual Status GetRangeLock(ColumnFamilyHandle* column_family, + const Endpoint& start_key, + const Endpoint& end_key) override; + protected: // Refer to // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 73520f9ab..2c69f7359 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -391,6 +391,14 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive); } +Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn, + uint32_t cfh_id, + const Endpoint& start_endp, + const Endpoint& end_endp) { + return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(), + /*exclusive=*/true); +} + void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, const LockTracker& keys) { lock_manager_->UnLock(txn, keys, GetEnv()); diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 619a83e97..eb0dd2f05 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -21,6 +21,7 @@ #include "rocksdb/utilities/transaction_db.h" #include "util/cast_util.h" #include "utilities/transactions/lock/lock_manager.h" +#include "utilities/transactions/lock/range/range_lock_manager.h" #include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/write_prepared_txn.h" @@ -98,6 +99,8 @@ class PessimisticTransactionDB : public TransactionDB { Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive); + Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id, + const Endpoint& start_endp, const Endpoint& end_endp); void UnLock(PessimisticTransaction* txn, const LockTracker& keys); void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, @@ -172,7 +175,7 @@ class PessimisticTransactionDB : public TransactionDB { friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; - std::unique_ptr<LockManager> lock_manager_; + std::shared_ptr<LockManager> lock_manager_; // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_;