revision-id: e836be43ec6c2bd9250764e09b51766b11b04390 (v5.8-1027-ge836be43e) parent(s): 791d4c1f48fe30d7519b93508cc3670246063b7f author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-01-25 19:52:31 +0300 message: Range Locking code cleanup. No functional changes. --- include/rocksdb/utilities/transaction.h | 9 +- utilities/transactions/pessimistic_transaction.cc | 7 +- .../transactions/pessimistic_transaction_db.cc | 9 +- .../transactions/pessimistic_transaction_db.h | 7 +- utilities/transactions/transaction_lock_mgr.cc | 569 ++++++++++----------- utilities/transactions/transaction_lock_mgr.h | 3 +- 6 files changed, 297 insertions(+), 307 deletions(-) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 5a83c39a9..5a2932363 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -251,10 +251,13 @@ class Transaction { return s; } } - //psergey: + + // Get a range lock on [start_endpoint; end_endpoint]. + // Note: range endpoints generally a use a different data format than + // ranges. virtual Status GetRangeLock(ColumnFamilyHandle* column_family, - const Slice& start_key, - const Slice& end_key) { + const Slice& start_endp, + const Slice& end_endp) { return Status::NotSupported(); } diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 7a9a0d11e..e900bc067 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -601,14 +601,13 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family, - const Slice& start_key, - const Slice& end_key) -{ + const Slice& start_endp, + const Slice& end_endp) { ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); uint32_t cfh_id= GetColumnFamilyID(cfh); - return txn_db_impl_->TryRangeLock(this, cfh_id, start_key, end_key); + return txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp); } // Return OK() if this key has not been modified more recently than the diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index df5ec2d6c..d44f329e5 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -392,12 +392,11 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction *txn, uint32_t cfh_id, - const Slice& start_key, - const Slice& end_key) -{ + const Slice& start_endp, + const Slice& end_endp) { if (use_range_locking) { - return range_lock_mgr_.TryRangeLock(txn, cfh_id, start_key, - end_key, /*exclusive=*/false); + return range_lock_mgr_.TryRangeLock(txn, cfh_id, start_endp, + end_endp, /*exclusive=*/false); } else return Status::NotSupported(); diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index cc9b07a6c..1be4143c1 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -80,8 +80,8 @@ class PessimisticTransactionDB : public TransactionDB { const std::string& key, bool exclusive); Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id, - const Slice& start_key, - const Slice& end_key); + const Slice& start_endp, + const Slice& end_endp); void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys, bool all_keys_hint=false); @@ -127,8 +127,7 @@ class PessimisticTransactionDB : public TransactionDB { virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} - void KillLockWait(void *cdata) override - { + void KillLockWait(void *cdata) override { if (use_range_locking) range_lock_mgr_.KillLockWait(cdata); } diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index eef61e724..cd9da2986 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -315,157 +315,6 @@ Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, timeout, lock_info); } -void RangeLockMgr::KillLockWait(void *cdata) -{ - ltm.kill_waiter(cdata); -} - - -/* - Storage for locks that are held by this transaction. - - We store them in toku::range_buffer because toku::locktree::release_locks() - accepts that as an argument. - - Note: the list of locks may differ slighly from the contents of the lock - tree, due to concurrency between lock acquisition, lock release, and lock - escalation. See MDEV-18227 and RangeLockMgr::UnLockAll for details. - This property is currently harmless. -*/ -class RangeLockList: public PessimisticTransaction::LockStorage -{ -public: - virtual ~RangeLockList() { - buffer.destroy(); - } - - RangeLockList() : releasing_locks(false) { - buffer.create(); - } - - void append(const DBT *left_key, const DBT *right_key) { - MutexLock l(&mutex_); - // there's only one thread that calls this function. - // the same thread will do lock release. - assert(!releasing_locks); - buffer.append(left_key, right_key); - } - - /* Ranges that we are holding the locks on. */ - toku::range_buffer buffer; - - /* Synchronization. See RangeLockMgr::UnLockAll for details */ - port::Mutex mutex_; - bool releasing_locks; -}; - -// Get a range lock on [start_key; end_key] range -// (TODO: check if we do what is inteded at the endpoints) - -Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, - uint32_t column_family_id, - const rocksdb::Slice &start_key, - const rocksdb::Slice &end_key, - bool exclusive) -{ - toku::lock_request request; - request.create(); - DBT start_key_dbt, end_key_dbt; - - toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); - toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size()); - request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, toku::lock_request::WRITE, - false /* not a big txn */, (void*)txn->GetID()/*client_extra, for KILL*/); - - uint64_t killed_time_msec = 0; // TODO: what should this have? - uint64_t wait_time_msec = txn->GetLockTimeout(); - // convert microseconds to milliseconds - if (wait_time_msec != (uint64_t)-1) - wait_time_msec = (wait_time_msec + 500) / 1000; - - request.start(); - - /* - If we are going to wait on the lock, we should set appropriate status in - the 'txn' object. This is done by the SetWaitingTxn() call below. - The API we are using are MariaDB's wait notification API, so the way this - is done is a bit convoluted. - In MyRocks, the wait details are visible in I_S.rocksdb_trx. - */ - std::string key_str(start_key.data(), start_key.size()); - struct st_wait_info { - PessimisticTransaction* txn; - uint32_t column_family_id; - std::string *key_ptr; - autovector<TransactionID> wait_ids; - bool done= false; - - static void lock_wait_callback(void *cdata, TXNID waiter, TXNID waitee) - { - auto self= (struct st_wait_info*)cdata; - if (!self->done) - { - self->wait_ids.push_back(waitee); - self->txn->SetWaitingTxn(self->wait_ids, self->column_family_id, self->key_ptr); - self->done= true; - } - } - } wait_info; - - wait_info.txn= txn; - wait_info.column_family_id= column_family_id; - wait_info.key_ptr= &key_str; - wait_info.done= false; - - const int r = request.wait(wait_time_msec, killed_time_msec, - nullptr, // killed_callback - st_wait_info::lock_wait_callback, - (void*)&wait_info); - - // Inform the txn that we are no longer waiting: - txn->ClearWaitingTxn(); - - request.destroy(); - switch (r) { - case 0: - break; /* fall through */ - case DB_LOCK_NOTGRANTED: - return Status::TimedOut(Status::SubCode::kLockTimeout); - case TOKUDB_OUT_OF_LOCKS: - return Status::Busy(Status::SubCode::kLockLimit); - case DB_LOCK_DEADLOCK: - return Status::Busy(Status::SubCode::kDeadlock); - default: - assert(0); - return Status::Busy(Status::SubCode::kLockLimit); - } - - /* Save the acquired lock in txn->owned_locks */ - if (!txn->owned_locks) - { - //create the object - txn->owned_locks= std::unique_ptr<RangeLockList>(new RangeLockList); - } - RangeLockList* range_list= (RangeLockList*)txn->owned_locks.get(); - range_list->append(&start_key_dbt, &end_key_dbt); - - return Status::OK(); -} - - -// Get a singlepoint lock -// (currently it is the same as getting a range lock) -Status RangeLockMgr::TryLock(PessimisticTransaction* txn, - uint32_t column_family_id, - const std::string& key, Env* env, bool exclusive) -{ - std::string endpoint; - convert_key_to_endpoint(rocksdb::Slice(key.data(), key.size()), &endpoint); - rocksdb::Slice endp_slice(endpoint.data(), endpoint.length()); - return TryRangeLock(txn, column_family_id, endp_slice, endp_slice, exclusive); -} - - // Helper function for TryLock(). Status TransactionLockMgr::AcquireWithTimeout( PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe, @@ -804,13 +653,256 @@ void TransactionLockMgr::UnLock(PessimisticTransaction* txn, stripe->stripe_cv->NotifyAll(); } +void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, + const TransactionKeyMap* key_map, Env* env) { + for (auto& key_map_iter : *key_map) { + uint32_t column_family_id = key_map_iter.first; + auto& keys = key_map_iter.second; + + std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); + LockMap* lock_map = lock_map_ptr.get(); + + if (lock_map == nullptr) { + // Column Family must have been dropped. + return; + } + + // Bucket keys by lock_map_ stripe + std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe( + std::max(keys.size(), lock_map->num_stripes_)); + + for (auto& key_iter : keys) { + const std::string& key = key_iter.first; + + size_t stripe_num = lock_map->GetStripe(key); + keys_by_stripe[stripe_num].push_back(&key); + } + + // For each stripe, grab the stripe mutex and unlock all keys in this stripe + for (auto& stripe_iter : keys_by_stripe) { + size_t stripe_num = stripe_iter.first; + auto& stripe_keys = stripe_iter.second; + + assert(lock_map->lock_map_stripes_.size() > stripe_num); + LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); + + stripe->stripe_mutex->Lock(); + + for (const std::string* key : stripe_keys) { + UnLockKey(txn, *key, stripe, lock_map, env); + } + + stripe->stripe_mutex->UnLock(); + + // Signal waiting threads to retry locking + stripe->stripe_cv->NotifyAll(); + } + } +} + +TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { + LockStatusData data; + // Lock order here is important. The correct order is lock_map_mutex_, then + // for every column family ID in ascending order lock every stripe in + // ascending order. + InstrumentedMutexLock l(&lock_map_mutex_); + + std::vector<uint32_t> cf_ids; + for (const auto& map : lock_maps_) { + cf_ids.push_back(map.first); + } + std::sort(cf_ids.begin(), cf_ids.end()); + + for (auto i : cf_ids) { + const auto& stripes = lock_maps_[i]->lock_map_stripes_; + // Iterate and lock all stripes in ascending order. + for (const auto& j : stripes) { + j->stripe_mutex->Lock(); + for (const auto& it : j->keys) { + struct KeyLockInfo info; + info.exclusive = it.second.exclusive; + info.key = it.first; + for (const auto& id : it.second.txn_ids) { + info.ids.push_back(id); + } + data.insert({i, info}); + } + } + } + + // Unlock everything. Unlocking order is not important. + for (auto i : cf_ids) { + const auto& stripes = lock_maps_[i]->lock_map_stripes_; + for (const auto& j : stripes) { + j->stripe_mutex->UnLock(); + } + } + + return data; +} +std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() { + return dlock_buffer_.PrepareBuffer(); +} + +void TransactionLockMgr::Resize(uint32_t target_size) { + dlock_buffer_.Resize(target_size); +} + + +///////////////////////////////////////////////////////////////////////////// +// RangeLockMgr - a lock manager that supports range locking +///////////////////////////////////////////////////////////////////////////// + +/* + Storage for locks that are currently held by a transaction. + + Locks are kept in toku::range_buffer because toku::locktree::release_locks() + accepts that as an argument. + + Note: the list of locks may differ slighly from the contents of the lock + tree, due to concurrency between lock acquisition, lock release, and lock + escalation. See MDEV-18227 and RangeLockMgr::UnLockAll for details. + This property is currently harmless. +*/ +class RangeLockList: public PessimisticTransaction::LockStorage { +public: + virtual ~RangeLockList() { + buffer_.destroy(); + } + + RangeLockList() : releasing_locks_(false) { + buffer_.create(); + } + + void append(const DBT *left_key, const DBT *right_key) { + MutexLock l(&mutex_); + // there's only one thread that calls this function. + // the same thread will do lock release. + assert(!releasing_locks_); + buffer_.append(left_key, right_key); + } + + // Ranges that the transaction is holding locks on + toku::range_buffer buffer_; + + // Synchronization. See RangeLockMgr::UnLockAll for details + port::Mutex mutex_; + bool releasing_locks_; +}; + + +void RangeLockMgr::KillLockWait(void *cdata) { + ltm.kill_waiter(cdata); +} + +// Get a range lock on [start_key; end_key] range +Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, + uint32_t column_family_id, + const rocksdb::Slice &start_key, + const rocksdb::Slice &end_key, + bool exclusive) { + toku::lock_request request; + request.create(); + DBT start_key_dbt, end_key_dbt; + + toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); + toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size()); + request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, toku::lock_request::WRITE, + false /* not a big txn */, (void*)txn->GetID()/*client_extra, for KILL*/); + + uint64_t killed_time_msec = 0; // TODO: what should this have? + uint64_t wait_time_msec = txn->GetLockTimeout(); + // convert microseconds to milliseconds + if (wait_time_msec != (uint64_t)-1) + wait_time_msec = (wait_time_msec + 500) / 1000; + + request.start(); + + /* + If we are going to wait on the lock, we should set appropriate status in + the 'txn' object. This is done by the SetWaitingTxn() call below. + The API we are using are MariaDB's wait notification API, so the way this + is done is a bit convoluted. + In MyRocks, the wait details are visible in I_S.rocksdb_trx. + */ + std::string key_str(start_key.data(), start_key.size()); + struct st_wait_info { + PessimisticTransaction* txn; + uint32_t column_family_id; + std::string *key_ptr; + autovector<TransactionID> wait_ids; + bool done= false; + + static void lock_wait_callback(void *cdata, TXNID waiter, TXNID waitee) { + auto self= (struct st_wait_info*)cdata; + if (!self->done) + { + self->wait_ids.push_back(waitee); + self->txn->SetWaitingTxn(self->wait_ids, self->column_family_id, + self->key_ptr); + self->done= true; + } + } + } wait_info; + + wait_info.txn= txn; + wait_info.column_family_id= column_family_id; + wait_info.key_ptr= &key_str; + wait_info.done= false; + + const int r = request.wait(wait_time_msec, killed_time_msec, + nullptr, // killed_callback + st_wait_info::lock_wait_callback, + (void*)&wait_info); + + // Inform the txn that we are no longer waiting: + txn->ClearWaitingTxn(); + + request.destroy(); + switch (r) { + case 0: + break; /* fall through */ + case DB_LOCK_NOTGRANTED: + return Status::TimedOut(Status::SubCode::kLockTimeout); + case TOKUDB_OUT_OF_LOCKS: + return Status::Busy(Status::SubCode::kLockLimit); + case DB_LOCK_DEADLOCK: + return Status::Busy(Status::SubCode::kDeadlock); + default: + assert(0); + return Status::Busy(Status::SubCode::kLockLimit); + } + + /* Save the acquired lock in txn->owned_locks */ + if (!txn->owned_locks) + { + //create the object + txn->owned_locks= std::unique_ptr<RangeLockList>(new RangeLockList); + } + RangeLockList* range_list= (RangeLockList*)txn->owned_locks.get(); + range_list->append(&start_key_dbt, &end_key_dbt); + + return Status::OK(); +} + + +// Get a singlepoint lock +// (currently it is the same as getting a range lock) +Status RangeLockMgr::TryLock(PessimisticTransaction* txn, + uint32_t column_family_id, + const std::string& key, Env* env, + bool exclusive) { + std::string endpoint; + convert_key_to_endpoint(rocksdb::Slice(key.data(), key.size()), &endpoint); + rocksdb::Slice endp_slice(endpoint.data(), endpoint.length()); + return TryRangeLock(txn, column_family_id, endp_slice, endp_slice, exclusive); +} + static void range_lock_mgr_release_lock_int(toku::locktree *lt, - const PessimisticTransaction* txn, - uint32_t column_family_id, - const std::string& key, - bool releasing_all_locks_hint= false) -{ + const PessimisticTransaction* txn, + uint32_t column_family_id, + const std::string& key) { DBT key_dbt; toku_fill_dbt(&key_dbt, key.data(), key.size()); toku::range_buffer range_buf; @@ -828,7 +920,7 @@ void RangeLockMgr::UnLock(PessimisticTransaction* txn, } void RangeLockMgr::UnLock(const PessimisticTransaction* txn, - const TransactionKeyMap* key_map, Env* env) { + const TransactionKeyMap* key_map, Env* env) { //TODO: if we collect all locks in a range buffer and then // make one call to lock_tree::release_locks(), will that be faster? for (auto& key_map_iter : *key_map) { @@ -855,15 +947,15 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env* env) { { MutexLock l(&range_list->mutex_); /* - The lt->release_locks() call below will walk range_list->buffer. We + The lt->release_locks() call below will walk range_list->buffer_. We need to prevent lock escalation callback from replacing - range_list->buffer while we are doing that. + range_list->buffer_ while we are doing that. Additional complication here is internal mutex(es) in the locktree (let's call them latches): - Lock escalation first obtains latches on the lock tree - Then, it calls RangeLockMgr::on_escalate to replace transaction's - range_list->buffer. + range_list->buffer_. = Access to that buffer must be synchronized, so it will want to acquire the range_list->mutex_. @@ -873,25 +965,25 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env* env) { - and acquire latches on parts of the lock tree to remove locks from it. - How do we avoid the deadlock? Thei ideas is that here we set - releasing_locks=true, and release the mutex. + How do we avoid the deadlock? The idea is that here we set + releasing_locks_=true, and release the mutex. All other users of the range_list must: - - Acquire the mutex, then check that releasing_locks=false. + - Acquire the mutex, then check that releasing_locks_=false. (the code in this function doesnt do that as there's only one thread - that does lock release) + that releases transaction's locks) */ - range_list->releasing_locks= true; + range_list->releasing_locks_= true; } // Don't try to call release_locks() if the buffer is empty! if we are - // not holding any locks, the lock tree might be on STO-mode with another - // transaction, and our attempt to release an empty set of locks will - // cause an assertion failure. - if (range_list->buffer.get_num_ranges()) - lt->release_locks((TXNID)txn, &range_list->buffer, true); - range_list->buffer.destroy(); - range_list->buffer.create(); - range_list->releasing_locks= false; + // not holding any locks, the lock tree might be in the STO-mode with + // another transaction, and our attempt to release an empty set of locks + // will cause an assertion failure. + if (range_list->buffer_.get_num_ranges()) + lt->release_locks((TXNID)txn, &range_list->buffer_, true); + range_list->buffer_.destroy(); + range_list->buffer_.create(); + range_list->releasing_locks_= false; toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */); } @@ -899,16 +991,14 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env* env) { int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, - const DBT *b_key) -{ + const DBT *b_key) { RangeLockMgr* mgr= (RangeLockMgr*) arg; return mgr->compare_endpoints((const char*)a_key->data, a_key->size, (const char*)b_key->data, b_key->size); } -RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db) -{ +RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db) { ltm.create(on_create, on_destroy, on_escalate, NULL); lt= nullptr; } @@ -925,14 +1015,13 @@ RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db) */ void RangeLockMgr::on_escalate(TXNID txnid, const locktree *lt, - const range_buffer &buffer, void *extra) -{ + const range_buffer &buffer, void *extra) { auto txn= (PessimisticTransaction*)txnid; RangeLockList* trx_locks= (RangeLockList*)txn->owned_locks.get(); MutexLock l(&trx_locks->mutex_); - if (trx_locks->releasing_locks) { + if (trx_locks->releasing_locks_) { /* Do nothing. The transaction is releasing its locks, so it will not care about having a correct list of ranges. (In TokuDB, @@ -942,20 +1031,20 @@ void RangeLockMgr::on_escalate(TXNID txnid, const locktree *lt, } // TODO: are we tracking this mem: lt->get_manager()->note_mem_released(trx_locks->ranges.buffer->total_memory_size()); - trx_locks->buffer.destroy(); - trx_locks->buffer.create(); + trx_locks->buffer_.destroy(); + trx_locks->buffer_.create(); toku::range_buffer::iterator iter(&buffer); toku::range_buffer::iterator::record rec; while (iter.current(&rec)) { - trx_locks->buffer.append(rec.get_left_key(), rec.get_right_key()); - iter.next(); + trx_locks->buffer_.append(rec.get_left_key(), rec.get_right_key()); + iter.next(); } // TODO: same as above: lt->get_manager()->note_mem_used(ranges.buffer->total_memory_size()); } -void RangeLockMgr::set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_func, - compare_endpoints_func cmp_func) -{ +void +RangeLockMgr::set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_func, + compare_endpoints_func cmp_func) { convert_key_to_endpoint= cvt_func; compare_endpoints= cmp_func; @@ -963,20 +1052,18 @@ void RangeLockMgr::set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_f assert(!lt); toku::comparator cmp; - //cmp.create(toku_builtin_compare_fun, NULL); cmp.create(compare_dbt_endpoints, (void*)this, NULL); DICTIONARY_ID dict_id = { .dictid = 1 }; lt= ltm.get_lt(dict_id, cmp , /* on_create_extra*/nullptr); } -uint64_t RangeLockMgr::get_escalation_count() -{ +uint64_t RangeLockMgr::get_escalation_count() { LTM_STATUS_S ltm_status_test; ltm.get_status(<m_status_test); - // psergey-todo: The below is how Toku's unit tests do it. - // why didn't Toku just make LTM_ESCALATION_COUNT constant visible? + // Searching status variable by its string name is how Toku's unit tests + // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?) TOKU_ENGINE_STATUS_ROW key_status = NULL; // lookup keyname in status for (int i = 0; ; i++) { @@ -992,52 +1079,6 @@ uint64_t RangeLockMgr::get_escalation_count() return key_status->value.num; } -void TransactionLockMgr::UnLock(const PessimisticTransaction* txn, - const TransactionKeyMap* key_map, Env* env) { - for (auto& key_map_iter : *key_map) { - uint32_t column_family_id = key_map_iter.first; - auto& keys = key_map_iter.second; - - std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); - LockMap* lock_map = lock_map_ptr.get(); - - if (lock_map == nullptr) { - // Column Family must have been dropped. - return; - } - - // Bucket keys by lock_map_ stripe - std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe( - std::max(keys.size(), lock_map->num_stripes_)); - - for (auto& key_iter : keys) { - const std::string& key = key_iter.first; - - size_t stripe_num = lock_map->GetStripe(key); - keys_by_stripe[stripe_num].push_back(&key); - } - - // For each stripe, grab the stripe mutex and unlock all keys in this stripe - for (auto& stripe_iter : keys_by_stripe) { - size_t stripe_num = stripe_iter.first; - auto& stripe_keys = stripe_iter.second; - - assert(lock_map->lock_map_stripes_.size() > stripe_num); - LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); - - stripe->stripe_mutex->Lock(); - - for (const std::string* key : stripe_keys) { - UnLockKey(txn, *key, stripe, lock_map, env); - } - - stripe->stripe_mutex->UnLock(); - - // Signal waiting threads to retry locking - stripe->stripe_cv->NotifyAll(); - } - } -} struct LOCK_PRINT_CONTEXT { TransactionLockMgr::LockStatusData *data; @@ -1047,8 +1088,7 @@ struct LOCK_PRINT_CONTEXT { static void push_into_lock_status_data(void* param, const DBT *left, - const DBT *right, TXNID txnid_arg) -{ + const DBT *right, TXNID txnid_arg) { struct LOCK_PRINT_CONTEXT *ctx= (LOCK_PRINT_CONTEXT*)param; struct KeyLockInfo info; @@ -1076,54 +1116,5 @@ TransactionLockMgr::LockStatusData RangeLockMgr::GetLockStatusData() { return data; } - -TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() { - LockStatusData data; - // Lock order here is important. The correct order is lock_map_mutex_, then - // for every column family ID in ascending order lock every stripe in - // ascending order. - InstrumentedMutexLock l(&lock_map_mutex_); - - std::vector<uint32_t> cf_ids; - for (const auto& map : lock_maps_) { - cf_ids.push_back(map.first); - } - std::sort(cf_ids.begin(), cf_ids.end()); - - for (auto i : cf_ids) { - const auto& stripes = lock_maps_[i]->lock_map_stripes_; - // Iterate and lock all stripes in ascending order. - for (const auto& j : stripes) { - j->stripe_mutex->Lock(); - for (const auto& it : j->keys) { - struct KeyLockInfo info; - info.exclusive = it.second.exclusive; - info.key = it.first; - for (const auto& id : it.second.txn_ids) { - info.ids.push_back(id); - } - data.insert({i, info}); - } - } - } - - // Unlock everything. Unlocking order is not important. - for (auto i : cf_ids) { - const auto& stripes = lock_maps_[i]->lock_map_stripes_; - for (const auto& j : stripes) { - j->stripe_mutex->UnLock(); - } - } - - return data; -} -std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() { - return dlock_buffer_.PrepareBuffer(); -} - -void TransactionLockMgr::Resize(uint32_t target_size) { - dlock_buffer_.Resize(target_size); -} - } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index f0b406ab5..f2380aad7 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -188,8 +188,7 @@ using namespace toku; */ class RangeLockMgr : public BaseLockMgr, - public RangeLockMgrControl -{ + public RangeLockMgrControl { public: void AddColumnFamily(uint32_t column_family_id) override { /* do nothing */ } void RemoveColumnFamily(uint32_t column_family_id) override { /* do nothing */ }