[Commits] 248acd103: Use a different locktree for each column family
revision-id: 248acd10346428b078aed780183ffe7f9f3c6896 (v5.8-1036-g248acd103) parent(s): 97b782b47ae55675e2b0132d6332824343fe141e author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-04-28 23:39:10 +0300 message: Use a different locktree for each column family This is a preparation for range locking to properly support reverse-ordered column families (and not just assume an identical ordering across the whole DB). --- .../transactions/pessimistic_transaction_db.cc | 6 +- utilities/transactions/transaction_lock_mgr.cc | 184 +++++++++++++++++---- utilities/transactions/transaction_lock_mgr.h | 33 ++-- 3 files changed, 178 insertions(+), 45 deletions(-) diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index e50fb1dad..e3ce435af 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -356,7 +356,7 @@ Status TransactionDB::WrapStackableDB( // allocate a LockMap for it. void PessimisticTransactionDB::AddColumnFamily( const ColumnFamilyHandle* handle) { - lock_mgr_->AddColumnFamily(handle->GetID()); + lock_mgr_->AddColumnFamily(handle); } Status PessimisticTransactionDB::CreateColumnFamily( @@ -370,7 +370,7 @@ Status PessimisticTransactionDB::CreateColumnFamily( s = db_->CreateColumnFamily(options, column_family_name, handle); if (s.ok()) { - lock_mgr_->AddColumnFamily((*handle)->GetID()); + lock_mgr_->AddColumnFamily(*handle); UpdateCFComparatorMap(*handle); } @@ -385,7 +385,7 @@ Status PessimisticTransactionDB::DropColumnFamily( Status s = db_->DropColumnFamily(column_family); if (s.ok()) { - lock_mgr_->RemoveColumnFamily(column_family->GetID()); + lock_mgr_->RemoveColumnFamily(column_family); } return s; diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index edec58261..9b6fd9381 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -189,7 +189,8 @@ size_t LockMap::GetStripe(const std::string& key) const { return stripe; } -void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { +void TransactionLockMgr::AddColumnFamily(const ColumnFamilyHandle *cfh) { + uint32_t column_family_id= cfh->GetID(); InstrumentedMutexLock l(&lock_map_mutex_); if (lock_maps_.find(column_family_id) == lock_maps_.end()) { @@ -202,7 +203,8 @@ void TransactionLockMgr::AddColumnFamily(uint32_t column_family_id) { } } -void TransactionLockMgr::RemoveColumnFamily(uint32_t column_family_id) { +void TransactionLockMgr::RemoveColumnFamily(const ColumnFamilyHandle *cfh) { + uint32_t column_family_id= cfh->GetID(); // Remove lock_map for this column family. Since the lock map is stored // as a shared ptr, concurrent transactions can still keep using it // until they release their references to it. @@ -780,23 +782,31 @@ RangeLockMgrHandle* NewRangeLockManager( class RangeLockList: public PessimisticTransaction::LockStorage { public: virtual ~RangeLockList() { - buffer_.destroy(); + for(auto it : buffers_) { + it.second->destroy(); + } } RangeLockList() : releasing_locks_(false) { - buffer_.create(); } - void append(const DBT *left_key, const DBT *right_key) { + void append(uint32_t cf_id, const DBT *left_key, const DBT *right_key) { MutexLock l(&mutex_); // there's only one thread that calls this function. // the same thread will do lock release. assert(!releasing_locks_); - buffer_.append(left_key, right_key); + auto it= buffers_.find(cf_id); + if (it == buffers_.end()) { + // create a new one + //it->second.create(); + it= buffers_.emplace(cf_id, std::shared_ptr<toku::range_buffer>(new toku::range_buffer())).first; + it->second->create(); + } + else + it->second->append(left_key, right_key); } - // Ranges that the transaction is holding locks on - toku::range_buffer buffer_; + std::unordered_map<uint32_t, std::shared_ptr<toku::range_buffer>> buffers_; // Synchronization. See RangeLockMgr::UnLockAll for details port::Mutex mutex_; @@ -836,6 +846,8 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, // locktree::kill_waiter call. Do we need this anymore? TransactionID wait_txn_id = txn->GetID(); + auto lt= get_locktree_by_cfid(column_family_id); + request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, toku::lock_request::WRITE, false /* not a big txn */, (void*)wait_txn_id); @@ -911,7 +923,7 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, txn->owned_locks= std::unique_ptr<RangeLockList>(new RangeLockList); } RangeLockList* range_list= (RangeLockList*)txn->owned_locks.get(); - range_list->append(&start_key_dbt, &end_key_dbt); + range_list->append(column_family_id, &start_key_dbt, &end_key_dbt); return Status::OK(); } @@ -948,6 +960,7 @@ range_lock_mgr_release_lock_int(toku::locktree *lt, void RangeLockMgr::UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env*) { + auto lt= get_locktree_by_cfid(column_family_id); range_lock_mgr_release_lock_int(lt, txn, column_family_id, key); toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */); } @@ -959,14 +972,14 @@ void RangeLockMgr::UnLock(const PessimisticTransaction* txn, for (auto& key_map_iter : *key_map) { uint32_t column_family_id = key_map_iter.first; auto& keys = key_map_iter.second; + auto lt= get_locktree_by_cfid(column_family_id); for (auto& key_iter : keys) { const std::string& key = key_iter.first; range_lock_mgr_release_lock_int(lt, txn, column_family_id, key); } + toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */); } - - toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */); } void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env*) { @@ -1012,13 +1025,18 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env*) { // not holding any locks, the lock tree might be in the STO-mode with // another transaction, and our attempt to release an empty set of locks // will cause an assertion failure. - if (range_list->buffer_.get_num_ranges()) - lt->release_locks((TXNID)txn, &range_list->buffer_, true); - range_list->buffer_.destroy(); - range_list->buffer_.create(); - range_list->releasing_locks_= false; + for (auto it: range_list->buffers_) { + if (it.second->get_num_ranges()) { + toku::locktree *lt = get_locktree_by_cfid(it.first); + lt->release_locks((TXNID)txn, it.second.get(), true); - toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */); + it.second->destroy(); + it.second->create(); + + toku::lock_request::retry_all_lock_requests(lt, nullptr); + } + } + range_list->releasing_locks_= false; } } @@ -1082,13 +1100,21 @@ int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg, return res; } +int RangeLockMgr::compare_dbt_endpoints_rev(__toku_db* db, void *arg, + const DBT *a_key, + const DBT *b_key) { + return -compare_dbt_endpoints(db, arg, a_key, b_key); +} + RangeLockMgr::RangeLockMgr(std::shared_ptr<TransactionDBMutexFactory> mutex_factory) : - mutex_factory_(mutex_factory) { - ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); - cmp_.create(compare_dbt_endpoints, (void*)this, NULL); - DICTIONARY_ID dict_id = { .dictid = 1 }; - lt= ltm.get_lt(dict_id, cmp_, /* on_create_extra*/nullptr); + mutex_factory_(mutex_factory), + ltree_lookup_cache_(new ThreadLocalPtr(nullptr)) { + + ltm_.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); + + fw_cmp_.create(compare_dbt_endpoints, (void*)this, NULL); + bw_cmp_.create(compare_dbt_endpoints, (void*)this, NULL); } @@ -1102,7 +1128,7 @@ RangeLockMgr::RangeLockMgr(std::shared_ptr<TransactionDBMutexFactory> mutex_fact @param void* Callback context */ -void RangeLockMgr::on_escalate(TXNID txnid, const locktree*, +void RangeLockMgr::on_escalate(TXNID txnid, const locktree* lt, const range_buffer &buffer, void *) { auto txn= (PessimisticTransaction*)txnid; @@ -1119,12 +1145,17 @@ void RangeLockMgr::on_escalate(TXNID txnid, const locktree*, } // TODO: are we tracking this mem: lt->get_manager()->note_mem_released(trx_locks->ranges.buffer->total_memory_size()); - trx_locks->buffer_.destroy(); - trx_locks->buffer_.create(); + + uint32_t cf_id = (uint32_t)lt->get_dict_id().dictid; + + auto it= trx_locks->buffers_.find(cf_id); + it->second->destroy(); + it->second->create(); + toku::range_buffer::iterator iter(&buffer); toku::range_buffer::iterator::record rec; while (iter.current(&rec)) { - trx_locks->buffer_.append(rec.get_left_key(), rec.get_right_key()); + it->second->append(rec.get_left_key(), rec.get_right_key()); iter.next(); } // TODO: same as above: lt->get_manager()->note_mem_used(ranges.buffer->total_memory_size()); @@ -1132,16 +1163,18 @@ void RangeLockMgr::on_escalate(TXNID txnid, const locktree*, RangeLockMgr::~RangeLockMgr() { - if (lt) { - ltm.release_lt(lt); + //TODO: at this point, synchronization is not needed, right? + for (auto it: ltree_map_) { + ltm_.release_lt(it.second); } - ltm.destroy(); - cmp_.destroy(); + ltm_.destroy(); + fw_cmp_.destroy(); + bw_cmp_.destroy(); } uint64_t RangeLockMgr::get_escalation_count() { LTM_STATUS_S ltm_status_test; - ltm.get_status(<m_status_test); + ltm_.get_status(<m_status_test); // Searching status variable by its string name is how Toku's unit tests // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?) @@ -1160,6 +1193,87 @@ uint64_t RangeLockMgr::get_escalation_count() { return key_status->value.num; } +void RangeLockMgr::AddColumnFamily(const ColumnFamilyHandle *cfh) { + uint32_t column_family_id= cfh->GetID(); + + InstrumentedMutexLock l(<ree_map_mutex_); + if (ltree_map_.find(column_family_id) == ltree_map_.end()) { + DICTIONARY_ID dict_id = { .dictid = column_family_id }; + toku::comparator *ltree_cmp; + // "RocksDB_SE_v3.10" // BytewiseComparator() ,ReverseBytewiseComparator() + if (!strcmp(cfh->GetComparator()->Name(), "RocksDB_SE_v3.10")) + ltree_cmp = &fw_cmp_; + else if (!strcmp(cfh->GetComparator()->Name(),"rev:RocksDB_SE_v3.10")) + ltree_cmp = &bw_cmp_; + else { + assert(false); + ltree_cmp= nullptr; + } + toku::locktree *ltree= ltm_.get_lt(dict_id, *ltree_cmp, + /* on_create_extra*/nullptr); + ltree_map_.emplace(column_family_id, ltree); + } else { + // column_family already exists in lock map + assert(false); + } +} + +void RangeLockMgr::RemoveColumnFamily(const ColumnFamilyHandle *cfh) { + uint32_t column_family_id= cfh->GetID(); + // Remove lock_map for this column family. Since the lock map is stored + // as a shared ptr, concurrent transactions can still keep using it + // until they release their references to it. + { + InstrumentedMutexLock l(<ree_map_mutex_); + + auto lock_maps_iter = ltree_map_.find(column_family_id); + assert(lock_maps_iter != ltree_map_.end()); + + ltm_.release_lt(lock_maps_iter->second); + + ltree_map_.erase(lock_maps_iter); + } // lock_map_mutex_ + + //TODO: why do we delete first and clear the caches second? Shouldn't this be + // done in the reverse order? (if we do it in the reverse order, how will we + // prevent somebody from re-populating the cache?) + + // Clear all thread-local caches. We collect a vector of caches but we dont + // really need them. + autovector<void*> local_caches; + ltree_lookup_cache_->Scrape(&local_caches, nullptr); +} + +toku::locktree *RangeLockMgr::get_locktree_by_cfid(uint32_t column_family_id) { + + // First check thread-local cache + if (ltree_lookup_cache_->Get() == nullptr) { + ltree_lookup_cache_->Reset(new LockTreeMap()); + } + + auto ltree_map_cache = static_cast<LockTreeMap*>(ltree_lookup_cache_->Get()); + + auto it = ltree_map_cache->find(column_family_id); + if (it != ltree_map_cache->end()) { + // Found lock map for this column family. + return it->second; + } + + // Not found in local cache, grab mutex and check shared LockMaps + InstrumentedMutexLock l(<ree_map_mutex_); + + it = ltree_map_.find(column_family_id); + if (it == ltree_map_.end()) { + return nullptr; + } else { + // Found lock map. Store in thread-local cache and return. + //std::shared_ptr<LockMap>& lock_map = lock_map_iter->second; + ltree_map_cache->insert({column_family_id, it->second}); + return it->second; + } + + return nullptr; +} struct LOCK_PRINT_CONTEXT { BaseLockMgr::LockStatusData *data; @@ -1193,7 +1307,13 @@ void push_into_lock_status_data(void* param, const DBT *left, BaseLockMgr::LockStatusData RangeLockMgr::GetLockStatusData() { LockStatusData data; LOCK_PRINT_CONTEXT ctx = {&data, GetColumnFamilyID(my_txn_db_->DefaultColumnFamily()) }; - lt->dump_locks((void*)&ctx, push_into_lock_status_data); + + { + InstrumentedMutexLock l(<ree_map_mutex_); + for (auto it : ltree_map_) { + it.second->dump_locks((void*)&ctx, push_into_lock_status_data); + } + } return data; } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 02c68964e..e41a4961a 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -60,8 +60,8 @@ class PessimisticTransactionDB; // class BaseLockMgr { public: - virtual void AddColumnFamily(uint32_t column_family_id) = 0; - virtual void RemoveColumnFamily(uint32_t column_family_id) = 0; + virtual void AddColumnFamily(const ColumnFamilyHandle *cfh) = 0; + virtual void RemoveColumnFamily(const ColumnFamilyHandle *cfh) = 0; virtual Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id, @@ -97,11 +97,11 @@ class TransactionLockMgr : public BaseLockMgr { // Creates a new LockMap for this column family. Caller should guarantee // that this column family does not already exist. - void AddColumnFamily(uint32_t column_family_id); + void AddColumnFamily(const ColumnFamilyHandle *cfh); // Deletes the LockMap for this column family. Caller should guarantee that // this column family is no longer in use. - void RemoveColumnFamily(uint32_t column_family_id); + void RemoveColumnFamily(const ColumnFamilyHandle *cfh); // Attempt to lock key. If OK status is returned, the caller is responsible // for calling UnLock() on this key. @@ -199,8 +199,8 @@ class RangeLockMgr : public BaseLockMgr, public RangeLockMgrHandle { public: - void AddColumnFamily(uint32_t) override { /* do nothing */ } - void RemoveColumnFamily(uint32_t) override { /* do nothing */ } + void AddColumnFamily(const ColumnFamilyHandle *cfh) override; + void RemoveColumnFamily(const ColumnFamilyHandle *cfh) override; Status TryLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env, bool exclusive) override ; @@ -240,7 +240,7 @@ class RangeLockMgr : int set_max_lock_memory(size_t max_lock_memory) override { - return ltm.set_max_lock_memory(max_lock_memory); + return ltm_.set_max_lock_memory(max_lock_memory); } uint64_t get_escalation_count() override; @@ -248,15 +248,28 @@ class RangeLockMgr : LockStatusData GetLockStatusData() override; private: - toku::locktree_manager ltm; - toku::locktree *lt; // only one tree for now + toku::locktree_manager ltm_; - toku::comparator cmp_; + toku::comparator fw_cmp_; + toku::comparator bw_cmp_; TransactionDB* my_txn_db_; std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; + // Map from cf_id to locktree*. Can only be accessed while holding the + // ltree_map_mutex_. + using LockTreeMap = std::unordered_map<uint32_t, locktree*>; + LockTreeMap ltree_map_; + + InstrumentedMutex ltree_map_mutex_; + + // Per-thread cache of ltree_map_. + std::unique_ptr<ThreadLocalPtr> ltree_lookup_cache_; + + toku::locktree *get_locktree_by_cfid(uint32_t cf_id); + static int compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key); + static int compare_dbt_endpoints_rev(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key); // Callbacks static int on_create(locktree*, void*) { return 0; /* no error */ }
participants (1)
-
Sergei Petrunia