[Commits] ce0df6a41: Address input
revision-id: ce0df6a41388f73fac44be21a0629854de2cf836 (v5.8-3068-gce0df6a41) parent(s): d47cf5d0bff01681f18f2e0bb49558574140cf7c author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2020-11-16 23:33:07 +0300 message: Address input - Report range-based deadlocks - point_lock_manager_test should only test point lock manager - however certain tests can be reused to make sure that range lock manager can serve as its substitute. --- include/rocksdb/utilities/transaction_db.h | 42 ++- .../transactions/lock/point/point_lock_manager.cc | 58 ---- .../transactions/lock/point/point_lock_manager.h | 73 ++++- .../lock/point/point_lock_manager_test.cc | 322 +-------------------- .../lock/point/point_lock_manager_test.h | 281 ++++++++++++++++++ .../transactions/lock/range/range_locking_test.cc | 28 ++ .../range/range_tree/lib/locktree/lock_request.cc | 3 +- .../range/range_tree/lib/locktree/lock_request.h | 2 +- .../range/range_tree/range_tree_lock_manager.cc | 68 +++-- .../range/range_tree/range_tree_lock_manager.h | 17 +- .../range/range_tree/range_tree_lock_tracker.h | 5 +- 11 files changed, 483 insertions(+), 416 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 1175bc8d9..7b89ffb77 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -51,6 +51,37 @@ class LockManagerHandle { virtual ~LockManagerHandle() {} }; +// Same as class Endpoint, but use std::string to manage the buffer allocation +struct EndpointWithString { + std::string slice; + bool inf_suffix; +}; + +struct RangeDeadlockInfo { + TransactionID m_txn_id; + uint32_t m_cf_id; + bool m_exclusive; + + EndpointWithString m_start; + EndpointWithString m_end; +}; + +struct RangeDeadlockPath { + std::vector<RangeDeadlockInfo> path; + bool limit_exceeded; + int64_t deadlock_time; + + explicit RangeDeadlockPath(std::vector<RangeDeadlockInfo> path_entry, + const int64_t& dl_time) + : path(path_entry), limit_exceeded(false), deadlock_time(dl_time) {} + + // empty path, limit exceeded constructor and default constructor + explicit RangeDeadlockPath(const int64_t& dl_time = 0, bool limit = false) + : path(0), limit_exceeded(limit), deadlock_time(dl_time) {} + + bool empty() { return path.empty() && !limit_exceeded; } +}; + // A handle to control RangeLockManager (Range-based lock manager) from outside // RocksDB class RangeLockManagerHandle : public LockManagerHandle { @@ -76,6 +107,11 @@ class RangeLockManagerHandle : public LockManagerHandle { // Get the current counter values virtual Counters GetStatus() = 0; + + // Functions for range-based Deadlock reporting. + virtual std::vector<RangeDeadlockPath> GetRangeDeadlockInfoBuffer() = 0; + virtual void SetRangeDeadlockInfoBufferSize(uint32_t target_size) = 0; + virtual ~RangeLockManagerHandle(){}; }; @@ -262,12 +298,6 @@ struct KeyLockInfo { bool exclusive; }; -// Same as class Endpoint, but use std::string to manage the buffer allocation -struct EndpointWithString { - std::string slice; - bool inf_suffix; -}; - struct RangeLockInfo { EndpointWithString start; EndpointWithString end; diff --git a/utilities/transactions/lock/point/point_lock_manager.cc b/utilities/transactions/lock/point/point_lock_manager.cc index 0ca6e38f0..0bb7e8a40 100644 --- a/utilities/transactions/lock/point/point_lock_manager.cc +++ b/utilities/transactions/lock/point/point_lock_manager.cc @@ -94,64 +94,6 @@ struct LockMap { size_t GetStripe(const std::string& key) const; }; -void DeadlockInfoBuffer::AddNewPath(DeadlockPath path) { - std::lock_guard<std::mutex> lock(paths_buffer_mutex_); - - if (paths_buffer_.empty()) { - return; - } - - paths_buffer_[buffer_idx_] = std::move(path); - buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); -} - -void DeadlockInfoBuffer::Resize(uint32_t target_size) { - std::lock_guard<std::mutex> lock(paths_buffer_mutex_); - - paths_buffer_ = Normalize(); - - // Drop the deadlocks that will no longer be needed ater the normalize - if (target_size < paths_buffer_.size()) { - paths_buffer_.erase( - paths_buffer_.begin(), - paths_buffer_.begin() + (paths_buffer_.size() - target_size)); - buffer_idx_ = 0; - } - // Resize the buffer to the target size and restore the buffer's idx - else { - auto prev_size = paths_buffer_.size(); - paths_buffer_.resize(target_size); - buffer_idx_ = (uint32_t)prev_size; - } -} - -std::vector<DeadlockPath> DeadlockInfoBuffer::Normalize() { - auto working = paths_buffer_; - - if (working.empty()) { - return working; - } - - // Next write occurs at a nonexistent path's slot - if (paths_buffer_[buffer_idx_].empty()) { - working.resize(buffer_idx_); - } else { - std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); - } - - return working; -} - -std::vector<DeadlockPath> DeadlockInfoBuffer::PrepareBuffer() { - std::lock_guard<std::mutex> lock(paths_buffer_mutex_); - - // Reversing the normalized vector returns the latest deadlocks first - auto working = Normalize(); - std::reverse(working.begin(), working.end()); - - return working; -} - namespace { void UnrefLockMapsCache(void* ptr) { // Called when a thread exits or a ThreadLocalPtr gets destroyed. diff --git a/utilities/transactions/lock/point/point_lock_manager.h b/utilities/transactions/lock/point/point_lock_manager.h index 3f4328d89..966700c2a 100644 --- a/utilities/transactions/lock/point/point_lock_manager.h +++ b/utilities/transactions/lock/point/point_lock_manager.h @@ -27,21 +27,80 @@ struct LockInfo; struct LockMap; struct LockMapStripe; -struct DeadlockInfoBuffer { + +template<class Path> +class DeadlockInfoBufferTempl { private: - std::vector<DeadlockPath> paths_buffer_; + std::vector<Path> paths_buffer_; uint32_t buffer_idx_; std::mutex paths_buffer_mutex_; - std::vector<DeadlockPath> Normalize(); + + + std::vector<Path> Normalize() { + auto working = paths_buffer_; + + if (working.empty()) { + return working; + } + + // Next write occurs at a nonexistent path's slot + if (paths_buffer_[buffer_idx_].empty()) { + working.resize(buffer_idx_); + } else { + std::rotate(working.begin(), working.begin() + buffer_idx_, working.end()); + } + + return working; + } public: - explicit DeadlockInfoBuffer(uint32_t n_latest_dlocks) + explicit DeadlockInfoBufferTempl(uint32_t n_latest_dlocks) : paths_buffer_(n_latest_dlocks), buffer_idx_(0) {} - void AddNewPath(DeadlockPath path); - void Resize(uint32_t target_size); - std::vector<DeadlockPath> PrepareBuffer(); + + void AddNewPath(Path path) { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + if (paths_buffer_.empty()) { + return; + } + + paths_buffer_[buffer_idx_] = std::move(path); + buffer_idx_ = (buffer_idx_ + 1) % paths_buffer_.size(); + } + + void Resize(uint32_t target_size) { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + paths_buffer_ = Normalize(); + + // Drop the deadlocks that will no longer be needed ater the normalize + if (target_size < paths_buffer_.size()) { + paths_buffer_.erase( + paths_buffer_.begin(), + paths_buffer_.begin() + (paths_buffer_.size() - target_size)); + buffer_idx_ = 0; + } + // Resize the buffer to the target size and restore the buffer's idx + else { + auto prev_size = paths_buffer_.size(); + paths_buffer_.resize(target_size); + buffer_idx_ = (uint32_t)prev_size; + } + } + + std::vector<Path> PrepareBuffer() { + std::lock_guard<std::mutex> lock(paths_buffer_mutex_); + + // Reversing the normalized vector returns the latest deadlocks first + auto working = Normalize(); + std::reverse(working.begin(), working.end()); + + return working; + } }; +typedef DeadlockInfoBufferTempl<DeadlockPath> DeadlockInfoBuffer; + struct TrackedTrxInfo { autovector<TransactionID> m_neighbors; uint32_t m_cf_id; diff --git a/utilities/transactions/lock/point/point_lock_manager_test.cc b/utilities/transactions/lock/point/point_lock_manager_test.cc index 21d6b680f..367f3310c 100644 --- a/utilities/transactions/lock/point/point_lock_manager_test.cc +++ b/utilities/transactions/lock/point/point_lock_manager_test.cc @@ -5,128 +5,10 @@ #ifndef ROCKSDB_LITE -#ifndef OS_WIN -#define ENABLE_RANGE_LOCKING_TESTS -#endif - -#include "utilities/transactions/lock/point/point_lock_manager.h" - -#include "file/file_util.h" -#include "port/port.h" -#include "port/stack_trace.h" -#include "rocksdb/utilities/transaction_db.h" -#include "test_util/testharness.h" -#include "test_util/testutil.h" -#include "utilities/transactions/pessimistic_transaction_db.h" - -#ifdef ENABLE_RANGE_LOCKING_TESTS -#include "utilities/transactions/lock/range/range_lock_manager.h" -#endif - -#include "utilities/transactions/transaction_db_mutex_impl.h" +#include "point_lock_manager_test.h" namespace ROCKSDB_NAMESPACE { -class MockColumnFamilyHandle : public ColumnFamilyHandle { - public: - explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {} - - ~MockColumnFamilyHandle() override {} - - const std::string& GetName() const override { return name_; } - - ColumnFamilyId GetID() const override { return cf_id_; } - - Status GetDescriptor(ColumnFamilyDescriptor*) override { - return Status::OK(); - } - - const Comparator* GetComparator() const override { - return BytewiseComparator(); - } - - private: - ColumnFamilyId cf_id_; - std::string name_ = "MockCF"; -}; - -class PointLockManagerTest : public testing::Test { - public: - void SetUp() override { - env_ = Env::Default(); - db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); - ASSERT_OK(env_->CreateDir(db_dir_)); - mutex_factory_ = std::make_shared<TransactionDBMutexFactoryImpl>(); - - Options opt; - opt.create_if_missing = true; - TransactionDBOptions txn_opt; - txn_opt.transaction_lock_timeout = 0; - txn_opt.custom_mutex_factory = mutex_factory_; - -#ifdef ENABLE_RANGE_LOCKING_TESTS - if (use_range_locking) { - /* - With Range Locking, we must use the same lock manager object that the - TransactionDB is using. - Create it here and pass it to the database through lock_mgr_handle. - */ - locker_.reset(NewRangeLockManager(mutex_factory_)->getLockManager()); - range_lock_mgr = - std::dynamic_pointer_cast<RangeLockManagerHandle>(locker_); - txn_opt.lock_mgr_handle = range_lock_mgr; - } -#endif - - ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_)); - - if (!use_range_locking) { - // If not using range locking, this test creates a separate lock manager - // object (right, NOT the one TransactionDB is using!), and runs tests on - // that. - locker_.reset(new PointLockManager( - static_cast<PessimisticTransactionDB*>(db_), txn_opt)); - } - } - - void TearDown() override { - delete db_; - EXPECT_OK(DestroyDir(env_, db_dir_)); - } - - PessimisticTransaction* NewTxn( - TransactionOptions txn_opt = TransactionOptions()) { - Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt); - return reinterpret_cast<PessimisticTransaction*>(txn); - } - - protected: - Env* env_; - std::shared_ptr<LockManager> locker_; - bool use_range_locking = false; - std::shared_ptr<RangeLockManagerHandle> range_lock_mgr; - - // With Range Locking, functions like GetLockStatusData() return range - // endpoints, not the keys themselves. This function extracts the key. - std::string key_value(const std::string& s) { - if (use_range_locking) - return s.substr(1); - else - return s; - } - - private: - std::string db_dir_; - std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; - TransactionDB* db_; -}; - -class AnyLockManagerTest : public PointLockManagerTest, - public testing::WithParamInterface<bool> { - public: - AnyLockManagerTest() { use_range_locking = GetParam(); } -}; - // This test is not applicable for Range Lock manager as Range Lock Manager // operates on Column Families, not their ids. TEST_F(PointLockManagerTest, LockNonExistingColumnFamily) { @@ -158,13 +40,13 @@ TEST_F(PointLockManagerTest, LockStatus) { ASSERT_EQ(s.count(cf_id), 2u); auto range = s.equal_range(cf_id); for (auto it = range.first; it != range.second; it++) { - ASSERT_TRUE(key_value(it->second.key) == "k1" || - key_value(it->second.key) == "k2"); - if (key_value(it->second.key) == "k1") { + ASSERT_TRUE(it->second.key == "k1" || + it->second.key == "k2"); + if (it->second.key == "k1") { ASSERT_EQ(it->second.exclusive, true); ASSERT_EQ(it->second.ids.size(), 1u); ASSERT_EQ(it->second.ids[0], txn1->GetID()); - } else if (key_value(it->second.key) == "k2") { + } else if (it->second.key == "k2") { ASSERT_EQ(it->second.exclusive, false); ASSERT_EQ(it->second.ids.size(), 1u); ASSERT_EQ(it->second.ids[0], txn2->GetID()); @@ -218,188 +100,6 @@ TEST_F(PointLockManagerTest, UnlockShared) { delete txn2; } -TEST_P(AnyLockManagerTest, ReentrantExclusiveLock) { - // Tests that a txn can acquire exclusive lock on the same key repeatedly. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - - // Cleanup - locker_->UnLock(txn, 1, "k", env_); - - delete txn; -} - -TEST_P(AnyLockManagerTest, ReentrantSharedLock) { - // Tests that a txn can acquire shared lock on the same key repeatedly. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - - // Cleanup - locker_->UnLock(txn, 1, "k", env_); - - delete txn; -} - -TEST_P(AnyLockManagerTest, LockUpgrade) { - // Tests that a txn can upgrade from a shared lock to an exclusive lock. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - - // Cleanup - locker_->UnLock(txn, 1, "k", env_); - delete txn; -} - -TEST_P(AnyLockManagerTest, LockDowngrade) { - // Tests that a txn can acquire a shared lock after acquiring an exclusive - // lock on the same key. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn = NewTxn(); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); - ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); - - // Cleanup - locker_->UnLock(txn, 1, "k", env_); - delete txn; -} - -TEST_P(AnyLockManagerTest, LockConflict) { - // Tests that lock conflicts lead to lock timeout. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn1 = NewTxn(); - auto txn2 = NewTxn(); - - { - // exclusive-exclusive conflict. - ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); - auto s = locker_->TryLock(txn2, 1, "k1", env_, true); - ASSERT_TRUE(s.IsTimedOut()); - } - - { - // exclusive-shared conflict. - ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true)); - auto s = locker_->TryLock(txn2, 1, "k2", env_, false); - ASSERT_TRUE(s.IsTimedOut()); - } - - { - // shared-exclusive conflict. - ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false)); - auto s = locker_->TryLock(txn2, 1, "k2", env_, true); - ASSERT_TRUE(s.IsTimedOut()); - } - - // Cleanup - locker_->UnLock(txn1, 1, "k1", env_); - locker_->UnLock(txn1, 1, "k2", env_); - - delete txn1; - delete txn2; -} - -port::Thread BlockUntilWaitingTxn(bool use_range_locking, - std::function<void()> f) { - std::atomic<bool> reached(false); - const char* sync_point_name = - use_range_locking ? "RangeTreeLockManager::TryRangeLock:WaitingTxn" - : "PointLockManager::AcquireWithTimeout:WaitingTxn"; - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - sync_point_name, [&](void* /*arg*/) { reached.store(true); }); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - - port::Thread t(f); - - while (!reached.load()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); - - return t; -} - -TEST_P(AnyLockManagerTest, SharedLocks) { - // Tests that shared locks can be concurrently held by multiple transactions. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - auto txn1 = NewTxn(); - auto txn2 = NewTxn(); - ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); - ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); - - // Cleanup - locker_->UnLock(txn1, 1, "k", env_); - locker_->UnLock(txn2, 1, "k", env_); - - delete txn1; - delete txn2; -} - -TEST_P(AnyLockManagerTest, Deadlock) { - // Tests that deadlock can be detected. - // Deadlock scenario: - // txn1 exclusively locks k1, and wants to lock k2; - // txn2 exclusively locks k2, and wants to lock k1. - MockColumnFamilyHandle cf(1); - locker_->AddColumnFamily(&cf); - TransactionOptions txn_opt; - txn_opt.deadlock_detect = true; - txn_opt.lock_timeout = 1000000; - auto txn1 = NewTxn(txn_opt); - auto txn2 = NewTxn(txn_opt); - - ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); - ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); - - // txn1 tries to lock k2, will block forever. - port::Thread t = BlockUntilWaitingTxn(use_range_locking, [&]() { - // block because txn2 is holding a lock on k2. - locker_->TryLock(txn1, 1, "k2", env_, true); - }); - - auto s = locker_->TryLock(txn2, 1, "k1", env_, true); - ASSERT_TRUE(s.IsBusy()); - ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock); - - std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer(); - ASSERT_EQ(deadlock_paths.size(), 1u); - ASSERT_FALSE(deadlock_paths[0].limit_exceeded); - - std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path; - ASSERT_EQ(deadlocks.size(), 2u); - - ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID()); - ASSERT_EQ(deadlocks[0].m_cf_id, 1u); - ASSERT_TRUE(deadlocks[0].m_exclusive); - ASSERT_EQ(key_value(deadlocks[0].m_waiting_key), "k2"); - - ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID()); - ASSERT_EQ(deadlocks[1].m_cf_id, 1u); - ASSERT_TRUE(deadlocks[1].m_exclusive); - ASSERT_EQ(key_value(deadlocks[1].m_waiting_key), "k1"); - - locker_->UnLock(txn2, 1, "k2", env_); - t.join(); - - // Cleanup - locker_->UnLock(txn1, 1, "k1", env_); - locker_->UnLock(txn1, 1, "k2", env_); - delete txn2; - delete txn1; -} - // This test doesn't work with Range Lock Manager, because Range Lock Manager // doesn't support deadlock_detect_depth. @@ -427,7 +127,7 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { // it must have another txn waiting on it, which is txn4 in this case. ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); - port::Thread t1 = BlockUntilWaitingTxn(false, [&]() { + port::Thread t1 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); // block because txn1 is holding a lock on k1. locker_->TryLock(txn2, 1, "k1", env_, true); @@ -435,7 +135,7 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { ASSERT_OK(locker_->TryLock(txn3, 1, "k3", env_, true)); - port::Thread t2 = BlockUntilWaitingTxn(false, [&]() { + port::Thread t2 = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { // block because txn3 is holding a lock on k1. locker_->TryLock(txn4, 1, "k3", env_, true); }); @@ -459,12 +159,8 @@ TEST_F(PointLockManagerTest, DeadlockDepthExceeded) { delete txn1; } -#ifdef ENABLE_RANGE_LOCKING_TESTS -INSTANTIATE_TEST_CASE_P(AnyLockManager, AnyLockManagerTest, ::testing::Bool()); -#else -INSTANTIATE_TEST_CASE_P(AnyLockManager, AnyLockManagerTest, - ::testing::Values(false)); -#endif +INSTANTIATE_TEST_CASE_P(PointLockManager, AnyLockManagerTest, + ::testing::Values(nullptr)); } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/lock/point/point_lock_manager_test.h b/utilities/transactions/lock/point/point_lock_manager_test.h new file mode 100644 index 000000000..b6534b5ca --- /dev/null +++ b/utilities/transactions/lock/point/point_lock_manager_test.h @@ -0,0 +1,281 @@ + +#include "utilities/transactions/lock/point/point_lock_manager.h" + +#include "file/file_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/utilities/transaction_db.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "utilities/transactions/pessimistic_transaction_db.h" + +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +class MockColumnFamilyHandle : public ColumnFamilyHandle { + public: + explicit MockColumnFamilyHandle(ColumnFamilyId cf_id) : cf_id_(cf_id) {} + + ~MockColumnFamilyHandle() override {} + + const std::string& GetName() const override { return name_; } + + ColumnFamilyId GetID() const override { return cf_id_; } + + Status GetDescriptor(ColumnFamilyDescriptor*) override { + return Status::OK(); + } + + const Comparator* GetComparator() const override { + return BytewiseComparator(); + } + + private: + ColumnFamilyId cf_id_; + std::string name_ = "MockCF"; +}; + +class PointLockManagerTest : public testing::Test { + public: + void SetUp() override { + env_ = Env::Default(); + db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); + ASSERT_OK(env_->CreateDir(db_dir_)); + + Options opt; + opt.create_if_missing = true; + TransactionDBOptions txn_opt; + txn_opt.transaction_lock_timeout = 0; + + ASSERT_OK(TransactionDB::Open(opt, txn_opt, db_dir_, &db_)); + + // CAUTION: This test creates a separate lock manager object (right, NOT + // the one that the TransactionDB is using!), and runs tests on it. + locker_.reset(new PointLockManager( + static_cast<PessimisticTransactionDB*>(db_), txn_opt)); + + wait_sync_point_name_ = "PointLockManager::AcquireWithTimeout:WaitingTxn"; + } + + void TearDown() override { + delete db_; + EXPECT_OK(DestroyDir(env_, db_dir_)); + } + + PessimisticTransaction* NewTxn( + TransactionOptions txn_opt = TransactionOptions()) { + Transaction* txn = db_->BeginTransaction(WriteOptions(), txn_opt); + return reinterpret_cast<PessimisticTransaction*>(txn); + } + + protected: + Env* env_; + std::shared_ptr<LockManager> locker_; + const char *wait_sync_point_name_; + friend void PointLockManagerTestExternalSetup(PointLockManagerTest*); + + private: + std::string db_dir_; + TransactionDB* db_; +}; + + +typedef void (*init_func_t)(PointLockManagerTest*); + +class AnyLockManagerTest : public PointLockManagerTest, + public testing::WithParamInterface<init_func_t> +{ +public: + void SetUp() override { + // If a custom setup function was provided, use it. Otherwise, use what we + // have inherited. + auto init_func = GetParam(); + if (init_func) + (*init_func)(this); + else + PointLockManagerTest::SetUp(); + } +}; + +TEST_P(AnyLockManagerTest, ReentrantExclusiveLock) { + // Tests that a txn can acquire exclusive lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, ReentrantSharedLock) { + // Tests that a txn can acquire shared lock on the same key repeatedly. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + + delete txn; +} + +TEST_P(AnyLockManagerTest, LockUpgrade) { + // Tests that a txn can upgrade from a shared lock to an exclusive lock. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockDowngrade) { + // Tests that a txn can acquire a shared lock after acquiring an exclusive + // lock on the same key. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn = NewTxn(); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, true)); + ASSERT_OK(locker_->TryLock(txn, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn, 1, "k", env_); + delete txn; +} + +TEST_P(AnyLockManagerTest, LockConflict) { + // Tests that lock conflicts lead to lock timeout. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + + { + // exclusive-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // exclusive-shared conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, true)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, false); + ASSERT_TRUE(s.IsTimedOut()); + } + + { + // shared-exclusive conflict. + ASSERT_OK(locker_->TryLock(txn1, 1, "k2", env_, false)); + auto s = locker_->TryLock(txn2, 1, "k2", env_, true); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + + delete txn1; + delete txn2; +} + +port::Thread BlockUntilWaitingTxn(const char *sync_point_name, + std::function<void()> f) { + std::atomic<bool> reached(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + sync_point_name, [&](void* /*arg*/) { reached.store(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + port::Thread t(f); + + while (!reached.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + return t; +} + +TEST_P(AnyLockManagerTest, SharedLocks) { + // Tests that shared locks can be concurrently held by multiple transactions. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + auto txn1 = NewTxn(); + auto txn2 = NewTxn(); + ASSERT_OK(locker_->TryLock(txn1, 1, "k", env_, false)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k", env_, false)); + + // Cleanup + locker_->UnLock(txn1, 1, "k", env_); + locker_->UnLock(txn2, 1, "k", env_); + + delete txn1; + delete txn2; +} + +TEST_P(AnyLockManagerTest, Deadlock) { + // Tests that deadlock can be detected. + // Deadlock scenario: + // txn1 exclusively locks k1, and wants to lock k2; + // txn2 exclusively locks k2, and wants to lock k1. + MockColumnFamilyHandle cf(1); + locker_->AddColumnFamily(&cf); + TransactionOptions txn_opt; + txn_opt.deadlock_detect = true; + txn_opt.lock_timeout = 1000000; + auto txn1 = NewTxn(txn_opt); + auto txn2 = NewTxn(txn_opt); + + ASSERT_OK(locker_->TryLock(txn1, 1, "k1", env_, true)); + ASSERT_OK(locker_->TryLock(txn2, 1, "k2", env_, true)); + + // txn1 tries to lock k2, will block forever. + port::Thread t = BlockUntilWaitingTxn(wait_sync_point_name_, [&]() { + // block because txn2 is holding a lock on k2. + locker_->TryLock(txn1, 1, "k2", env_, true); + }); + + auto s = locker_->TryLock(txn2, 1, "k1", env_, true); + ASSERT_TRUE(s.IsBusy()); + ASSERT_EQ(s.subcode(), Status::SubCode::kDeadlock); + + std::vector<DeadlockPath> deadlock_paths = locker_->GetDeadlockInfoBuffer(); + ASSERT_EQ(deadlock_paths.size(), 1u); + ASSERT_FALSE(deadlock_paths[0].limit_exceeded); + + std::vector<DeadlockInfo> deadlocks = deadlock_paths[0].path; + ASSERT_EQ(deadlocks.size(), 2u); + + ASSERT_EQ(deadlocks[0].m_txn_id, txn1->GetID()); + ASSERT_EQ(deadlocks[0].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[0].m_exclusive); + ASSERT_EQ(deadlocks[0].m_waiting_key, "k2"); + + ASSERT_EQ(deadlocks[1].m_txn_id, txn2->GetID()); + ASSERT_EQ(deadlocks[1].m_cf_id, 1u); + ASSERT_TRUE(deadlocks[1].m_exclusive); + ASSERT_EQ(deadlocks[1].m_waiting_key, "k1"); + + locker_->UnLock(txn2, 1, "k2", env_); + t.join(); + + // Cleanup + locker_->UnLock(txn1, 1, "k1", env_); + locker_->UnLock(txn1, 1, "k2", env_); + delete txn2; + delete txn1; +} + +} // namespace ROCKSDB_NAMESPACE + diff --git a/utilities/transactions/lock/range/range_locking_test.cc b/utilities/transactions/lock/range/range_locking_test.cc index d0f3312d8..48d96d7d0 100644 --- a/utilities/transactions/lock/range/range_locking_test.cc +++ b/utilities/transactions/lock/range/range_locking_test.cc @@ -20,6 +20,8 @@ #include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_test.h" +#include "utilities/transactions/lock/point/point_lock_manager_test.h" + using std::string; namespace ROCKSDB_NAMESPACE { @@ -219,9 +221,35 @@ TEST_F(RangeLockingTest, MultipleTrxLockStatusData) { delete txn1; } + +void PointLockManagerTestExternalSetup(PointLockManagerTest* self) +{ + self->env_ = Env::Default(); + self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); + ASSERT_OK(self->env_->CreateDir(self->db_dir_)); + + Options opt; + opt.create_if_missing = true; + TransactionDBOptions txn_opt; + txn_opt.transaction_lock_timeout = 0; + + auto mutex_factory = std::make_shared<TransactionDBMutexFactoryImpl>(); + self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager()); + std::shared_ptr<RangeLockManagerHandle> range_lock_mgr = + std::dynamic_pointer_cast<RangeLockManagerHandle>(self->locker_); + txn_opt.lock_mgr_handle = range_lock_mgr; + + ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_)); + self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn"; +} + +INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest, + ::testing::Values(PointLockManagerTestExternalSetup)); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc index b8650fa0e..471eb42b7 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc @@ -179,8 +179,7 @@ bool lock_request::deadlock_exists(const txnid_set &conflicts) { lock_request *req = find_lock_request(a); if (req) { m_deadlock_cb(req->m_txnid, (req->m_type == lock_request::WRITE), - std::string((const char *)req->m_left_key->data, - req->m_left_key->size)); + req->m_left_key,req->m_right_key); } }; } diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h index 8fdd92fef..f2e8bcb54 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.h @@ -226,7 +226,7 @@ class lock_request { void (*m_retry_test_callback)(void); public: - std::function<void(TXNID, bool, std::string)> m_deadlock_cb; + std::function<void(TXNID, bool, const DBT*, const DBT*)> m_deadlock_cb; friend class lock_request_unit_test; }; diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc index f28c85c1e..7e039083d 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -85,16 +85,26 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, exclusive ? toku::lock_request::WRITE : toku::lock_request::READ, false /* not a big txn */, (void*)wait_txn_id); - uint64_t killed_time_msec = 0; // TODO: what should this have? + // This is for "periodically wake up and check if the wait is killed" feature + // which we are not using. + uint64_t killed_time_msec = 0; uint64_t wait_time_msec = txn->GetLockTimeout(); + // convert microseconds to milliseconds if (wait_time_msec != (uint64_t)-1) wait_time_msec = (wait_time_msec + 500) / 1000; - std::vector<DeadlockInfo> di_path; - request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive, std::string key) { + std::vector<RangeDeadlockInfo> di_path; + request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive, + const DBT *start_dbt, const DBT *end_dbt) { + EndpointWithString start; + EndpointWithString end; + deserialize_endpoint(start_dbt, &start); + deserialize_endpoint(end_dbt, &end); + di_path.push_back({((PessimisticTransaction*)txnid)->GetID(), - column_family_id, is_exclusive, key}); + column_family_id, is_exclusive, std::move(start), + std::move(end)}); }; request.start(); @@ -151,7 +161,8 @@ Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, return Status::Busy(Status::SubCode::kLockLimit); case DB_LOCK_DEADLOCK: { std::reverse(di_path.begin(), di_path.end()); - dlock_buffer_.AddNewPath(DeadlockPath(di_path, request.get_start_time())); + dlock_buffer_.AddNewPath(RangeDeadlockPath(di_path, + request.get_start_time())); return Status::Busy(Status::SubCode::kDeadlock); } default: @@ -320,14 +331,35 @@ RangeTreeLockManager::RangeTreeLockManager( ltm_.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); } -void RangeTreeLockManager::Resize(uint32_t target_size) { +void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize(uint32_t target_size) { dlock_buffer_.Resize(target_size); } -std::vector<DeadlockPath> RangeTreeLockManager::GetDeadlockInfoBuffer() { +void RangeTreeLockManager::Resize(uint32_t target_size) { + SetRangeDeadlockInfoBufferSize(target_size); +} + +std::vector<RangeDeadlockPath> RangeTreeLockManager::GetRangeDeadlockInfoBuffer() { return dlock_buffer_.PrepareBuffer(); } +std::vector<DeadlockPath> RangeTreeLockManager::GetDeadlockInfoBuffer() { + std::vector<DeadlockPath> res; + std::vector<RangeDeadlockPath> data = GetRangeDeadlockInfoBuffer(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + std::vector<DeadlockInfo> path; + + for (auto it2 = it->path.begin(); it2 != it->path.end(); ++it2) { + path.push_back({it2->m_txn_id, it2->m_cf_id, it2->m_exclusive, + it2->m_start.slice}); + } + res.push_back(DeadlockPath(path, it->deadlock_time)); + } + return res; +} + + /* @brief Lock Escalation Callback function @@ -496,20 +528,18 @@ struct LOCK_PRINT_CONTEXT { uint32_t cfh_id; // Column Family whose tree we are traversing }; -/* -For reporting point locks: - struct KeyLockInfo info; - - info.key.append((const char*)left->data, (size_t)left->size); - info.exclusive = !is_shared; - if (!(left->size == right->size && - !memcmp(left->data, right->data, left->size))) { - // not a single-point lock - info.has_key2 = true; - info.key2.append((const char*)right->data, right->size); +// Report left endpoints of the acquired locks +LockManager::PointLockStatus RangeTreeLockManager::GetPointLockStatus() { + PointLockStatus res; + LockManager::RangeLockStatus data = GetRangeLockStatus(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + auto& val = it->second; + res.insert({ it->first, { val.start.slice, val.ids, val.exclusive}}); } -*/ + return res; +} static void push_into_lock_status_data(void* param, const DBT* left, const DBT* right, TXNID txnid_arg, diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h index 8511865ec..b53fe90a1 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h @@ -19,6 +19,8 @@ namespace ROCKSDB_NAMESPACE { using namespace toku; +typedef DeadlockInfoBufferTempl<RangeDeadlockPath> RangeDeadlockInfoBuffer; + /* A Range Lock Manager that uses PerconaFT's locktree library */ @@ -30,10 +32,12 @@ class RangeTreeLockManager : public RangeLockManagerBase, void AddColumnFamily(const ColumnFamilyHandle* cfh) override; void RemoveColumnFamily(const ColumnFamilyHandle* cfh) override; - // Resize the deadlock-info buffer, does nothing currently void Resize(uint32_t) override; std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; + std::vector<RangeDeadlockPath> GetRangeDeadlockInfoBuffer() override; + void SetRangeDeadlockInfoBufferSize(uint32_t target_size) override; + // Get a lock on a range // @note only exclusive locks are currently supported (requesting a // non-exclusive lock will get an exclusive one) @@ -66,15 +70,10 @@ class RangeTreeLockManager : public RangeLockManagerBase, bool IsPointLockSupported() const override { // One could have acquired a point lock (it is reduced to range lock) - // but This doesn't mean that one could not have acquired point locks. - // this means we can't implement GetPointLockStatus() - return false; + return true; } - PointLockStatus GetPointLockStatus() override { - // No point locks - return {}; - } + PointLockStatus GetPointLockStatus() override; // This is from LockManager LockManager::RangeLockStatus GetRangeLockStatus() override; @@ -107,7 +106,7 @@ class RangeTreeLockManager : public RangeLockManagerBase, // (uses the same approach as TransactionLockMgr::lock_maps_cache_) std::unique_ptr<ThreadLocalPtr> ltree_lookup_cache_; - DeadlockInfoBuffer dlock_buffer_; + RangeDeadlockInfoBuffer dlock_buffer_; // Get the lock tree which stores locks for Column Family with given cf_id toku::locktree* get_locktree_by_cfid(uint32_t cf_id); diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h index 670417031..8e9e0852d 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h @@ -80,7 +80,10 @@ class RangeTreeLockTracker : public LockTracker { void Track(const PointLockRequest&) override; void Track(const RangeLockRequest&) override; - bool IsPointLockSupported() const override { return false; } + bool IsPointLockSupported() const override { + // This indicates that we don't implement GetPointLockStatus() + return false; + } bool IsRangeLockSupported() const override { return true; } // a Not-supported dummy implementation.
participants (1)
-
psergey