[Commits] 46d49ea0b: Gap Locking: Unified Lock Manager interface
revision-id: 46d49ea0b7e0267b5c360d45d90a64776477b8ed (v5.8-1025-g46d49ea0b) parent(s): 190a29d06f4e79d2df4cb513944ac34bd133caa0 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-04-01 17:11:33 +0300 message: Gap Locking: Unified Lock Manager interface - Make PessimisticTransactionDB use one lock manager (lock_mgr_) which is either a point lock manager or a range lock manager. - A few differences are still "visible" due to lock managers having different properties. --- include/rocksdb/utilities/transaction_db.h | 40 +++++---- utilities/transactions/pessimistic_transaction.cc | 6 +- .../transactions/pessimistic_transaction_db.cc | 95 ++++++++++------------ .../transactions/pessimistic_transaction_db.h | 16 ++-- utilities/transactions/range_locking_test.cc | 9 +- utilities/transactions/transaction_lock_mgr.cc | 39 ++++----- utilities/transactions/transaction_lock_mgr.h | 34 +++++--- 7 files changed, 123 insertions(+), 116 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index fb3c6a88c..06d73bedd 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -33,6 +33,24 @@ enum TxnDBWritePolicy { const uint32_t kInitialMaxDeadlocks = 5; +struct RangeLockingOptions { + typedef void (*convert_key_to_endpoint_func)(const rocksdb::Slice &key, + std::string *endpoint); + + typedef int (*compare_endpoints_func)(const char *a, size_t a_len, + const char *b, size_t b_len); + + // TODO: So, functions to compare ranges are here, while + // functions to compare rowkeys are in per-column family and are in + // rocksdb::ColumnFamilyOptions + // + // TODO: Can we change this to work in a way that does not expose the endpoints + // to the user (like discussed on the meeting?) + // + convert_key_to_endpoint_func cvt_func; + compare_endpoints_func cmp_func; +}; + struct TransactionDBOptions { // Specifies the maximum number of keys that can be locked at the same time // per column family. @@ -93,6 +111,13 @@ struct TransactionDBOptions { // logic in myrocks. This hack of simply not rolling back merge operands works // for the special way that myrocks uses this operands. bool rollback_merge_operands = false; + + // If true, range_locking_opts specifies options on range locking (filling + // the struct is mandatory) + bool use_range_locking = false; + + // Members are valid if use_range_locking= true. + RangeLockingOptions range_locking_opts; }; struct TransactionOptions { @@ -197,15 +222,6 @@ struct DeadlockPath { class RangeLockMgrControl { public: - typedef void (*convert_key_to_endpoint_func)(const rocksdb::Slice &key, - std::string *endpoint); - - typedef int (*compare_endpoints_func)(const char *a, size_t a_len, - const char *b, size_t b_len); - - virtual void set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_func, - compare_endpoints_func cmp_func)=0; - virtual int set_max_lock_memory(size_t max_lock_memory) = 0; virtual uint64_t get_escalation_count() = 0; @@ -286,15 +302,11 @@ class TransactionDB : public StackableDB { virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer() = 0; virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0; - - // psergey-TODO: any better interface for this? - bool use_range_locking; virtual RangeLockMgrControl* get_range_lock_manager() { return nullptr; } - protected: // To Create an TransactionDB, call Open() // The ownership of db is transferred to the base StackableDB - explicit TransactionDB(DB* db) : StackableDB(db), use_range_locking(false) {} + explicit TransactionDB(DB* db) : StackableDB(db) {} private: // No copying allowed diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index e900bc067..339dcf422 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -50,7 +50,7 @@ PessimisticTransaction::PessimisticTransaction( skip_concurrency_control_(false) { txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); - do_key_tracking_ = ! txn_db_impl_->use_range_locking; + do_key_tracking_ = !txn_db_impl_->get_range_lock_manager(); db_impl_ = static_cast_with_check<DBImpl, DB>(db_); Initialize(txn_options); } @@ -90,7 +90,7 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { } PessimisticTransaction::~PessimisticTransaction() { - txn_db_impl_->UnLock(this, &GetTrackedKeys()); + txn_db_impl_->UnLock(this, &GetTrackedKeys()/*, all_keys_hint=true*/); if (expiration_time_ > 0) { txn_db_impl_->RemoveExpirableTransaction(txn_id_); } @@ -569,7 +569,7 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, // since the snapshot. This must be done after we locked the key. // If we already have validated an earilier snapshot it must has been // reflected in tracked_at_seq and ValidateSnapshot will return OK. - if (s.ok()) { + if (s.ok()) { //psergey-todo: this check seems to be meaningless, s.ok()==true always s = ValidateSnapshot(column_family, key, &tracked_at_seq); if (!s.ok()) { diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 308187e13..270859d1a 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -34,22 +34,35 @@ PessimisticTransactionDB::PessimisticTransactionDB( DB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), db_impl_(static_cast_with_check<DBImpl, DB>(db)), - txn_db_options_(txn_db_options), - lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, - txn_db_options_.max_num_deadlocks, - txn_db_options_.custom_mutex_factory - ? txn_db_options_.custom_mutex_factory - : std::shared_ptr<TransactionDBMutexFactory>( - new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this, - txn_db_options_.custom_mutex_factory? - txn_db_options_.custom_mutex_factory : - std::shared_ptr<TransactionDBMutexFactory>( - new TransactionDBMutexFactoryImpl())) { + txn_db_options_(txn_db_options) { + init_lock_manager(); assert(db_impl_ != nullptr); info_log_ = db_impl_->GetDBOptions().info_log; } +void PessimisticTransactionDB::init_lock_manager() { + BaseLockMgr *lock_mgr; + + std::shared_ptr<TransactionDBMutexFactory> mutex_factory = + txn_db_options_.custom_mutex_factory? + txn_db_options_.custom_mutex_factory : + std::shared_ptr<TransactionDBMutexFactory>( + new TransactionDBMutexFactoryImpl()); + + if (txn_db_options_.use_range_locking) { + range_lock_mgr_= new RangeLockMgr(this, txn_db_options_.range_locking_opts, + mutex_factory); + lock_mgr = range_lock_mgr_; + } else { + lock_mgr = new TransactionLockMgr(this, txn_db_options_.num_stripes, + txn_db_options_.max_num_locks, + txn_db_options_.max_num_deadlocks, + mutex_factory); + range_lock_mgr_ = nullptr; + } + lock_mgr_ = std::shared_ptr<BaseLockMgr>(lock_mgr); +} + // Support initiliazing PessimisticTransactionDB from a stackable db // // PessimisticTransactionDB @@ -70,18 +83,8 @@ PessimisticTransactionDB::PessimisticTransactionDB( StackableDB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())), - txn_db_options_(txn_db_options), - lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, - txn_db_options_.max_num_deadlocks, - txn_db_options_.custom_mutex_factory - ? txn_db_options_.custom_mutex_factory - : std::shared_ptr<TransactionDBMutexFactory>( - new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this, - txn_db_options_.custom_mutex_factory - ? txn_db_options_.custom_mutex_factory - : std::shared_ptr<TransactionDBMutexFactory>( - new TransactionDBMutexFactoryImpl())) { + txn_db_options_(txn_db_options) { + init_lock_manager(); assert(db_impl_ != nullptr); } @@ -351,7 +354,7 @@ Status TransactionDB::WrapStackableDB( // allocate a LockMap for it. void PessimisticTransactionDB::AddColumnFamily( const ColumnFamilyHandle* handle) { - lock_mgr_.AddColumnFamily(handle->GetID()); + lock_mgr_->AddColumnFamily(handle->GetID()); } Status PessimisticTransactionDB::CreateColumnFamily( @@ -365,7 +368,7 @@ Status PessimisticTransactionDB::CreateColumnFamily( s = db_->CreateColumnFamily(options, column_family_name, handle); if (s.ok()) { - lock_mgr_.AddColumnFamily((*handle)->GetID()); + lock_mgr_->AddColumnFamily((*handle)->GetID()); UpdateCFComparatorMap(*handle); } @@ -380,8 +383,7 @@ Status PessimisticTransactionDB::DropColumnFamily( Status s = db_->DropColumnFamily(column_family); if (s.ok()) { - //psergey-todo: range_lock_mgr_ ?? - lock_mgr_.RemoveColumnFamily(column_family->GetID()); + lock_mgr_->RemoveColumnFamily(column_family->GetID()); } return s; @@ -391,10 +393,7 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, bool exclusive) { - if (use_range_locking) - return range_lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); - else - return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); + return lock_mgr_->TryLock(txn, cfh_id, key, GetEnv(), exclusive); } Status @@ -402,9 +401,9 @@ PessimisticTransactionDB::TryRangeLock(PessimisticTransaction *txn, uint32_t cfh_id, const Slice& start_endp, const Slice& end_endp) { - if (use_range_locking) { - return range_lock_mgr_.TryRangeLock(txn, cfh_id, start_endp, - end_endp, /*exclusive=*/false); + if (range_lock_mgr_) { + return range_lock_mgr_->TryRangeLock(txn, cfh_id, start_endp, + end_endp, /*exclusive=*/false); } else return Status::NotSupported(); @@ -413,23 +412,16 @@ PessimisticTransactionDB::TryRangeLock(PessimisticTransaction *txn, void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys, bool all_keys_hint) { - if (use_range_locking) - { - if (all_keys_hint) - range_lock_mgr_.UnLockAll(txn, GetEnv()); - else - range_lock_mgr_.UnLock(txn, keys, GetEnv()); + if (all_keys_hint && range_lock_mgr_) { + range_lock_mgr_->UnLockAll(txn, GetEnv()); + return; } - else - lock_mgr_.UnLock(txn, keys, GetEnv()); + lock_mgr_->UnLock(txn, keys, GetEnv()); } void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key) { - if (use_range_locking) - range_lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); - else - lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); + lock_mgr_->UnLock(txn, cfh_id, key, GetEnv()); } // Used when wrapping DB write operations in a transaction @@ -628,20 +620,17 @@ void PessimisticTransactionDB::GetAllPreparedTransactions( TransactionLockMgr::LockStatusData PessimisticTransactionDB::GetLockStatusData() { - if (use_range_locking) - return range_lock_mgr_.GetLockStatusData(); - else - return lock_mgr_.GetLockStatusData(); + return lock_mgr_->GetLockStatusData(); } std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() { // TODO: Here, we should get deadlock info from RangeLockMgr if we are using // it. At the moment, it doesn't provide any deadlock information. - return lock_mgr_.GetDeadlockInfoBuffer(); + return lock_mgr_->GetDeadlockInfoBuffer(); } void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) { - lock_mgr_.Resize(target_size); + lock_mgr_->Resize(target_size); } void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) { diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 388ed4099..ad6eaf4a9 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -150,15 +150,21 @@ class PessimisticTransactionDB : public TransactionDB { friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; - TransactionLockMgr lock_mgr_; - RangeLockMgr range_lock_mgr_; - + // Lock manager being used. This is either a TransactionLockMgr or a RangeLockMgr + std::shared_ptr<BaseLockMgr> lock_mgr_; + + // Non-null if we are using a lock manager that supports range locking. + RangeLockMgr *range_lock_mgr_ = nullptr; + + public: // Return Range Lock Manager if we are actually using it virtual RangeLockMgrControl* get_range_lock_manager() override { - return use_range_locking? &range_lock_mgr_ : nullptr; + return range_lock_mgr_; } - + private: + void init_lock_manager(); + // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_; Transaction* BeginInternalTransaction(const WriteOptions& options); diff --git a/utilities/transactions/range_locking_test.cc b/utilities/transactions/range_locking_test.cc index fff4a0e26..447e4c44e 100644 --- a/utilities/transactions/range_locking_test.cc +++ b/utilities/transactions/range_locking_test.cc @@ -56,7 +56,6 @@ class RangeLockingTest : public ::testing::Test { Options options; TransactionDBOptions txn_db_options; - bool use_stackable_db_; RangeLockingTest() : db(nullptr) { @@ -65,15 +64,17 @@ class RangeLockingTest : public ::testing::Test { DestroyDB(dbname, options); Status s; + txn_db_options.use_range_locking = true; + txn_db_options.range_locking_opts.cvt_func = + range_endpoint_convert_same; + txn_db_options.range_locking_opts.cmp_func = + range_endpoints_compare_default; s = TransactionDB::Open(options, txn_db_options, dbname, &db); assert(s.ok()); db->use_range_locking= true; rocksdb::RangeLockMgrControl *mgr= db->get_range_lock_manager(); assert(mgr); - - mgr->set_endpoint_cmp_functions(range_endpoint_convert_same, - range_endpoints_compare_default); // can also: mgr->set_max_lock_memory(rocksdb_max_lock_memory); } diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 79164a962..f15d9df21 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -700,7 +700,7 @@ void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, } } -TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { +BaseLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { LockStatusData data; // Lock order here is important. The correct order is lock_map_mutex_, then // for every column family ID in ascending order lock every stripe in @@ -1002,11 +1002,17 @@ int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg, RangeLockMgr::RangeLockMgr(TransactionDB* txn_db, + const RangeLockingOptions& opts, std::shared_ptr<TransactionDBMutexFactory> mutex_factory) : - my_txn_db(txn_db), mutex_factory_(mutex_factory) { + my_txn_db_(txn_db), mutex_factory_(mutex_factory) { + convert_key_to_endpoint= opts.cvt_func; + compare_endpoints= opts.cmp_func; + ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); - lt= nullptr; - cmp_initialized_= false; + + cmp_.create(compare_dbt_endpoints, (void*)this, NULL); + DICTIONARY_ID dict_id = { .dictid = 1 }; + lt= ltm.get_lt(dict_id, cmp_, /* on_create_extra*/nullptr); } @@ -1048,28 +1054,13 @@ void RangeLockMgr::on_escalate(TXNID txnid, const locktree *lt, // TODO: same as above: lt->get_manager()->note_mem_used(ranges.buffer->total_memory_size()); } -void -RangeLockMgr::set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_func, - compare_endpoints_func cmp_func) { - convert_key_to_endpoint= cvt_func; - compare_endpoints= cmp_func; - - // The rest is like a constructor: - assert(!lt); - - cmp_.create(compare_dbt_endpoints, (void*)this, NULL); - cmp_initialized_ = true; - DICTIONARY_ID dict_id = { .dictid = 1 }; - lt= ltm.get_lt(dict_id, cmp_, /* on_create_extra*/nullptr); -} RangeLockMgr::~RangeLockMgr() { if (lt) { ltm.release_lt(lt); } ltm.destroy(); - if (cmp_initialized_) - cmp_.destroy(); + cmp_.destroy(); } uint64_t RangeLockMgr::get_escalation_count() { @@ -1095,7 +1086,7 @@ uint64_t RangeLockMgr::get_escalation_count() { struct LOCK_PRINT_CONTEXT { - TransactionLockMgr::LockStatusData *data; + BaseLockMgr::LockStatusData *data; // this will not be needed when locks are per-column-family: uint32_t cfh_id; }; @@ -1123,9 +1114,9 @@ void push_into_lock_status_data(void* param, const DBT *left, } -TransactionLockMgr::LockStatusData RangeLockMgr::GetLockStatusData() { - TransactionLockMgr::LockStatusData data; - LOCK_PRINT_CONTEXT ctx = {&data, GetColumnFamilyID(my_txn_db->DefaultColumnFamily()) }; +BaseLockMgr::LockStatusData RangeLockMgr::GetLockStatusData() { + LockStatusData data; + LOCK_PRINT_CONTEXT ctx = {&data, GetColumnFamilyID(my_txn_db_->DefaultColumnFamily()) }; lt->dump_locks((void*)&ctx, push_into_lock_status_data); return data; } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 9f73e5a07..e3de698f2 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -73,11 +73,17 @@ class BaseLockMgr { void UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env)=0; + // Resize the deadlock info buffer + virtual void Resize(uint32_t)=0; + virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer()= 0; virtual ~BaseLockMgr(){} + + using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>; + virtual LockStatusData GetLockStatusData()=0; }; -class TransactionLockMgr { +class TransactionLockMgr : public BaseLockMgr { public: TransactionLockMgr(TransactionDB* txn_db, size_t default_num_stripes, int64_t max_num_locks, uint32_t max_num_deadlocks, @@ -105,10 +111,9 @@ class TransactionLockMgr { void UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env); - using LockStatusData = std::unordered_multimap<uint32_t, KeyLockInfo>; - LockStatusData GetLockStatusData(); - std::vector<DeadlockPath> GetDeadlockInfoBuffer(); - void Resize(uint32_t); + LockStatusData GetLockStatusData() override; + std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; + void Resize(uint32_t) override; private: PessimisticTransactionDB* txn_db_impl_; @@ -196,6 +201,12 @@ class RangeLockMgr : Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive) override ; + // Resize the deadlock-info buffer, does nothing currently + void Resize(uint32_t) override {} + std::vector<DeadlockPath> GetDeadlockInfoBuffer() override { + return std::vector<DeadlockPath>(); + }; + // Get a lock on a range // (TODO: this allows to acquire exclusive range locks although they are not // used ATM) @@ -216,6 +227,7 @@ class RangeLockMgr : const std::string& key, Env* env) override ; RangeLockMgr(TransactionDB* txn_db, + const RangeLockingOptions& opts, std::shared_ptr<TransactionDBMutexFactory> mutex_factory); ~RangeLockMgr(); @@ -226,20 +238,16 @@ class RangeLockMgr : uint64_t get_escalation_count() override; - TransactionLockMgr::LockStatusData GetLockStatusData(); - - using RangeLockMgrControl::convert_key_to_endpoint_func; - using RangeLockMgrControl::compare_endpoints_func; + LockStatusData GetLockStatusData() override; - void set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_func, - compare_endpoints_func cmp_func) override; + typedef RangeLockingOptions::convert_key_to_endpoint_func convert_key_to_endpoint_func; + typedef RangeLockingOptions::compare_endpoints_func compare_endpoints_func; private: toku::locktree_manager ltm; toku::locktree *lt; // only one tree for now toku::comparator cmp_; - bool cmp_initialized_; // Convert rowkey to endpoint (TODO: shouldn't "rowkey=const" translate into // a pair of [start; end] endpoints in general? They translate into the same @@ -249,7 +257,7 @@ class RangeLockMgr : // User-provided endpoint comparison function compare_endpoints_func compare_endpoints; - TransactionDB* my_txn_db; + TransactionDB* my_txn_db_; std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; static int compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key);
participants (1)
-
Sergei Petrunia