revision-id: e836be43ec6c2bd9250764e09b51766b11b04390 (v5.8-1027-ge836be43e)
parent(s): 791d4c1f48fe30d7519b93508cc3670246063b7f
author: Sergei Petrunia
committer: Sergei Petrunia
timestamp: 2019-01-25 19:52:31 +0300
message:
Range Locking code cleanup. No functional changes.
---
include/rocksdb/utilities/transaction.h | 9 +-
utilities/transactions/pessimistic_transaction.cc | 7 +-
.../transactions/pessimistic_transaction_db.cc | 9 +-
.../transactions/pessimistic_transaction_db.h | 7 +-
utilities/transactions/transaction_lock_mgr.cc | 569 ++++++++++-----------
utilities/transactions/transaction_lock_mgr.h | 3 +-
6 files changed, 297 insertions(+), 307 deletions(-)
diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h
index 5a83c39a9..5a2932363 100644
--- a/include/rocksdb/utilities/transaction.h
+++ b/include/rocksdb/utilities/transaction.h
@@ -251,10 +251,13 @@ class Transaction {
return s;
}
}
- //psergey:
+
+ // Get a range lock on [start_endpoint; end_endpoint].
+ // Note: range endpoints generally a use a different data format than
+ // ranges.
virtual Status GetRangeLock(ColumnFamilyHandle* column_family,
- const Slice& start_key,
- const Slice& end_key) {
+ const Slice& start_endp,
+ const Slice& end_endp) {
return Status::NotSupported();
}
diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc
index 7a9a0d11e..e900bc067 100644
--- a/utilities/transactions/pessimistic_transaction.cc
+++ b/utilities/transactions/pessimistic_transaction.cc
@@ -601,14 +601,13 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
Status
PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family,
- const Slice& start_key,
- const Slice& end_key)
-{
+ const Slice& start_endp,
+ const Slice& end_endp) {
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
uint32_t cfh_id= GetColumnFamilyID(cfh);
- return txn_db_impl_->TryRangeLock(this, cfh_id, start_key, end_key);
+ return txn_db_impl_->TryRangeLock(this, cfh_id, start_endp, end_endp);
}
// Return OK() if this key has not been modified more recently than the
diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc
index df5ec2d6c..d44f329e5 100644
--- a/utilities/transactions/pessimistic_transaction_db.cc
+++ b/utilities/transactions/pessimistic_transaction_db.cc
@@ -392,12 +392,11 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
Status
PessimisticTransactionDB::TryRangeLock(PessimisticTransaction *txn,
uint32_t cfh_id,
- const Slice& start_key,
- const Slice& end_key)
-{
+ const Slice& start_endp,
+ const Slice& end_endp) {
if (use_range_locking) {
- return range_lock_mgr_.TryRangeLock(txn, cfh_id, start_key,
- end_key, /*exclusive=*/false);
+ return range_lock_mgr_.TryRangeLock(txn, cfh_id, start_endp,
+ end_endp, /*exclusive=*/false);
}
else
return Status::NotSupported();
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h
index cc9b07a6c..1be4143c1 100644
--- a/utilities/transactions/pessimistic_transaction_db.h
+++ b/utilities/transactions/pessimistic_transaction_db.h
@@ -80,8 +80,8 @@ class PessimisticTransactionDB : public TransactionDB {
const std::string& key, bool exclusive);
Status TryRangeLock(PessimisticTransaction* txn,
uint32_t cfh_id,
- const Slice& start_key,
- const Slice& end_key);
+ const Slice& start_endp,
+ const Slice& end_endp);
void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys,
bool all_keys_hint=false);
@@ -127,8 +127,7 @@ class PessimisticTransactionDB : public TransactionDB {
virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
- void KillLockWait(void *cdata) override
- {
+ void KillLockWait(void *cdata) override {
if (use_range_locking)
range_lock_mgr_.KillLockWait(cdata);
}
diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc
index eef61e724..cd9da2986 100644
--- a/utilities/transactions/transaction_lock_mgr.cc
+++ b/utilities/transactions/transaction_lock_mgr.cc
@@ -315,157 +315,6 @@ Status TransactionLockMgr::TryLock(PessimisticTransaction* txn,
timeout, lock_info);
}
-void RangeLockMgr::KillLockWait(void *cdata)
-{
- ltm.kill_waiter(cdata);
-}
-
-
-/*
- Storage for locks that are held by this transaction.
-
- We store them in toku::range_buffer because toku::locktree::release_locks()
- accepts that as an argument.
-
- Note: the list of locks may differ slighly from the contents of the lock
- tree, due to concurrency between lock acquisition, lock release, and lock
- escalation. See MDEV-18227 and RangeLockMgr::UnLockAll for details.
- This property is currently harmless.
-*/
-class RangeLockList: public PessimisticTransaction::LockStorage
-{
-public:
- virtual ~RangeLockList() {
- buffer.destroy();
- }
-
- RangeLockList() : releasing_locks(false) {
- buffer.create();
- }
-
- void append(const DBT *left_key, const DBT *right_key) {
- MutexLock l(&mutex_);
- // there's only one thread that calls this function.
- // the same thread will do lock release.
- assert(!releasing_locks);
- buffer.append(left_key, right_key);
- }
-
- /* Ranges that we are holding the locks on. */
- toku::range_buffer buffer;
-
- /* Synchronization. See RangeLockMgr::UnLockAll for details */
- port::Mutex mutex_;
- bool releasing_locks;
-};
-
-// Get a range lock on [start_key; end_key] range
-// (TODO: check if we do what is inteded at the endpoints)
-
-Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
- uint32_t column_family_id,
- const rocksdb::Slice &start_key,
- const rocksdb::Slice &end_key,
- bool exclusive)
-{
- toku::lock_request request;
- request.create();
- DBT start_key_dbt, end_key_dbt;
-
- toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size());
- toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size());
- request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, toku::lock_request::WRITE,
- false /* not a big txn */, (void*)txn->GetID()/*client_extra, for KILL*/);
-
- uint64_t killed_time_msec = 0; // TODO: what should this have?
- uint64_t wait_time_msec = txn->GetLockTimeout();
- // convert microseconds to milliseconds
- if (wait_time_msec != (uint64_t)-1)
- wait_time_msec = (wait_time_msec + 500) / 1000;
-
- request.start();
-
- /*
- If we are going to wait on the lock, we should set appropriate status in
- the 'txn' object. This is done by the SetWaitingTxn() call below.
- The API we are using are MariaDB's wait notification API, so the way this
- is done is a bit convoluted.
- In MyRocks, the wait details are visible in I_S.rocksdb_trx.
- */
- std::string key_str(start_key.data(), start_key.size());
- struct st_wait_info {
- PessimisticTransaction* txn;
- uint32_t column_family_id;
- std::string *key_ptr;
- autovector<TransactionID> wait_ids;
- bool done= false;
-
- static void lock_wait_callback(void *cdata, TXNID waiter, TXNID waitee)
- {
- auto self= (struct st_wait_info*)cdata;
- if (!self->done)
- {
- self->wait_ids.push_back(waitee);
- self->txn->SetWaitingTxn(self->wait_ids, self->column_family_id, self->key_ptr);
- self->done= true;
- }
- }
- } wait_info;
-
- wait_info.txn= txn;
- wait_info.column_family_id= column_family_id;
- wait_info.key_ptr= &key_str;
- wait_info.done= false;
-
- const int r = request.wait(wait_time_msec, killed_time_msec,
- nullptr, // killed_callback
- st_wait_info::lock_wait_callback,
- (void*)&wait_info);
-
- // Inform the txn that we are no longer waiting:
- txn->ClearWaitingTxn();
-
- request.destroy();
- switch (r) {
- case 0:
- break; /* fall through */
- case DB_LOCK_NOTGRANTED:
- return Status::TimedOut(Status::SubCode::kLockTimeout);
- case TOKUDB_OUT_OF_LOCKS:
- return Status::Busy(Status::SubCode::kLockLimit);
- case DB_LOCK_DEADLOCK:
- return Status::Busy(Status::SubCode::kDeadlock);
- default:
- assert(0);
- return Status::Busy(Status::SubCode::kLockLimit);
- }
-
- /* Save the acquired lock in txn->owned_locks */
- if (!txn->owned_locks)
- {
- //create the object
- txn->owned_locks= std::unique_ptr<RangeLockList>(new RangeLockList);
- }
- RangeLockList* range_list= (RangeLockList*)txn->owned_locks.get();
- range_list->append(&start_key_dbt, &end_key_dbt);
-
- return Status::OK();
-}
-
-
-// Get a singlepoint lock
-// (currently it is the same as getting a range lock)
-Status RangeLockMgr::TryLock(PessimisticTransaction* txn,
- uint32_t column_family_id,
- const std::string& key, Env* env, bool exclusive)
-{
- std::string endpoint;
- convert_key_to_endpoint(rocksdb::Slice(key.data(), key.size()), &endpoint);
- rocksdb::Slice endp_slice(endpoint.data(), endpoint.length());
- return TryRangeLock(txn, column_family_id, endp_slice, endp_slice, exclusive);
-}
-
-
// Helper function for TryLock().
Status TransactionLockMgr::AcquireWithTimeout(
PessimisticTransaction* txn, LockMap* lock_map, LockMapStripe* stripe,
@@ -804,13 +653,256 @@ void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
stripe->stripe_cv->NotifyAll();
}
+void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
+ const TransactionKeyMap* key_map, Env* env) {
+ for (auto& key_map_iter : *key_map) {
+ uint32_t column_family_id = key_map_iter.first;
+ auto& keys = key_map_iter.second;
+
+ std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
+ LockMap* lock_map = lock_map_ptr.get();
+
+ if (lock_map == nullptr) {
+ // Column Family must have been dropped.
+ return;
+ }
+
+ // Bucket keys by lock_map_ stripe
+ std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
+ std::max(keys.size(), lock_map->num_stripes_));
+
+ for (auto& key_iter : keys) {
+ const std::string& key = key_iter.first;
+
+ size_t stripe_num = lock_map->GetStripe(key);
+ keys_by_stripe[stripe_num].push_back(&key);
+ }
+
+ // For each stripe, grab the stripe mutex and unlock all keys in this stripe
+ for (auto& stripe_iter : keys_by_stripe) {
+ size_t stripe_num = stripe_iter.first;
+ auto& stripe_keys = stripe_iter.second;
+
+ assert(lock_map->lock_map_stripes_.size() > stripe_num);
+ LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
+
+ stripe->stripe_mutex->Lock();
+
+ for (const std::string* key : stripe_keys) {
+ UnLockKey(txn, *key, stripe, lock_map, env);
+ }
+
+ stripe->stripe_mutex->UnLock();
+
+ // Signal waiting threads to retry locking
+ stripe->stripe_cv->NotifyAll();
+ }
+ }
+}
+
+TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
+ LockStatusData data;
+ // Lock order here is important. The correct order is lock_map_mutex_, then
+ // for every column family ID in ascending order lock every stripe in
+ // ascending order.
+ InstrumentedMutexLock l(&lock_map_mutex_);
+
+ std::vector<uint32_t> cf_ids;
+ for (const auto& map : lock_maps_) {
+ cf_ids.push_back(map.first);
+ }
+ std::sort(cf_ids.begin(), cf_ids.end());
+
+ for (auto i : cf_ids) {
+ const auto& stripes = lock_maps_[i]->lock_map_stripes_;
+ // Iterate and lock all stripes in ascending order.
+ for (const auto& j : stripes) {
+ j->stripe_mutex->Lock();
+ for (const auto& it : j->keys) {
+ struct KeyLockInfo info;
+ info.exclusive = it.second.exclusive;
+ info.key = it.first;
+ for (const auto& id : it.second.txn_ids) {
+ info.ids.push_back(id);
+ }
+ data.insert({i, info});
+ }
+ }
+ }
+
+ // Unlock everything. Unlocking order is not important.
+ for (auto i : cf_ids) {
+ const auto& stripes = lock_maps_[i]->lock_map_stripes_;
+ for (const auto& j : stripes) {
+ j->stripe_mutex->UnLock();
+ }
+ }
+
+ return data;
+}
+std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
+ return dlock_buffer_.PrepareBuffer();
+}
+
+void TransactionLockMgr::Resize(uint32_t target_size) {
+ dlock_buffer_.Resize(target_size);
+}
+
+
+/////////////////////////////////////////////////////////////////////////////
+// RangeLockMgr - a lock manager that supports range locking
+/////////////////////////////////////////////////////////////////////////////
+
+/*
+ Storage for locks that are currently held by a transaction.
+
+ Locks are kept in toku::range_buffer because toku::locktree::release_locks()
+ accepts that as an argument.
+
+ Note: the list of locks may differ slighly from the contents of the lock
+ tree, due to concurrency between lock acquisition, lock release, and lock
+ escalation. See MDEV-18227 and RangeLockMgr::UnLockAll for details.
+ This property is currently harmless.
+*/
+class RangeLockList: public PessimisticTransaction::LockStorage {
+public:
+ virtual ~RangeLockList() {
+ buffer_.destroy();
+ }
+
+ RangeLockList() : releasing_locks_(false) {
+ buffer_.create();
+ }
+
+ void append(const DBT *left_key, const DBT *right_key) {
+ MutexLock l(&mutex_);
+ // there's only one thread that calls this function.
+ // the same thread will do lock release.
+ assert(!releasing_locks_);
+ buffer_.append(left_key, right_key);
+ }
+
+ // Ranges that the transaction is holding locks on
+ toku::range_buffer buffer_;
+
+ // Synchronization. See RangeLockMgr::UnLockAll for details
+ port::Mutex mutex_;
+ bool releasing_locks_;
+};
+
+
+void RangeLockMgr::KillLockWait(void *cdata) {
+ ltm.kill_waiter(cdata);
+}
+
+// Get a range lock on [start_key; end_key] range
+Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
+ uint32_t column_family_id,
+ const rocksdb::Slice &start_key,
+ const rocksdb::Slice &end_key,
+ bool exclusive) {
+ toku::lock_request request;
+ request.create();
+ DBT start_key_dbt, end_key_dbt;
+
+ toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size());
+ toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size());
+ request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, toku::lock_request::WRITE,
+ false /* not a big txn */, (void*)txn->GetID()/*client_extra, for KILL*/);
+
+ uint64_t killed_time_msec = 0; // TODO: what should this have?
+ uint64_t wait_time_msec = txn->GetLockTimeout();
+ // convert microseconds to milliseconds
+ if (wait_time_msec != (uint64_t)-1)
+ wait_time_msec = (wait_time_msec + 500) / 1000;
+
+ request.start();
+
+ /*
+ If we are going to wait on the lock, we should set appropriate status in
+ the 'txn' object. This is done by the SetWaitingTxn() call below.
+ The API we are using are MariaDB's wait notification API, so the way this
+ is done is a bit convoluted.
+ In MyRocks, the wait details are visible in I_S.rocksdb_trx.
+ */
+ std::string key_str(start_key.data(), start_key.size());
+ struct st_wait_info {
+ PessimisticTransaction* txn;
+ uint32_t column_family_id;
+ std::string *key_ptr;
+ autovector<TransactionID> wait_ids;
+ bool done= false;
+
+ static void lock_wait_callback(void *cdata, TXNID waiter, TXNID waitee) {
+ auto self= (struct st_wait_info*)cdata;
+ if (!self->done)
+ {
+ self->wait_ids.push_back(waitee);
+ self->txn->SetWaitingTxn(self->wait_ids, self->column_family_id,
+ self->key_ptr);
+ self->done= true;
+ }
+ }
+ } wait_info;
+
+ wait_info.txn= txn;
+ wait_info.column_family_id= column_family_id;
+ wait_info.key_ptr= &key_str;
+ wait_info.done= false;
+
+ const int r = request.wait(wait_time_msec, killed_time_msec,
+ nullptr, // killed_callback
+ st_wait_info::lock_wait_callback,
+ (void*)&wait_info);
+
+ // Inform the txn that we are no longer waiting:
+ txn->ClearWaitingTxn();
+
+ request.destroy();
+ switch (r) {
+ case 0:
+ break; /* fall through */
+ case DB_LOCK_NOTGRANTED:
+ return Status::TimedOut(Status::SubCode::kLockTimeout);
+ case TOKUDB_OUT_OF_LOCKS:
+ return Status::Busy(Status::SubCode::kLockLimit);
+ case DB_LOCK_DEADLOCK:
+ return Status::Busy(Status::SubCode::kDeadlock);
+ default:
+ assert(0);
+ return Status::Busy(Status::SubCode::kLockLimit);
+ }
+
+ /* Save the acquired lock in txn->owned_locks */
+ if (!txn->owned_locks)
+ {
+ //create the object
+ txn->owned_locks= std::unique_ptr<RangeLockList>(new RangeLockList);
+ }
+ RangeLockList* range_list= (RangeLockList*)txn->owned_locks.get();
+ range_list->append(&start_key_dbt, &end_key_dbt);
+
+ return Status::OK();
+}
+
+
+// Get a singlepoint lock
+// (currently it is the same as getting a range lock)
+Status RangeLockMgr::TryLock(PessimisticTransaction* txn,
+ uint32_t column_family_id,
+ const std::string& key, Env* env,
+ bool exclusive) {
+ std::string endpoint;
+ convert_key_to_endpoint(rocksdb::Slice(key.data(), key.size()), &endpoint);
+ rocksdb::Slice endp_slice(endpoint.data(), endpoint.length());
+ return TryRangeLock(txn, column_family_id, endp_slice, endp_slice, exclusive);
+}
+
static void
range_lock_mgr_release_lock_int(toku::locktree *lt,
- const PessimisticTransaction* txn,
- uint32_t column_family_id,
- const std::string& key,
- bool releasing_all_locks_hint= false)
-{
+ const PessimisticTransaction* txn,
+ uint32_t column_family_id,
+ const std::string& key) {
DBT key_dbt;
toku_fill_dbt(&key_dbt, key.data(), key.size());
toku::range_buffer range_buf;
@@ -828,7 +920,7 @@ void RangeLockMgr::UnLock(PessimisticTransaction* txn,
}
void RangeLockMgr::UnLock(const PessimisticTransaction* txn,
- const TransactionKeyMap* key_map, Env* env) {
+ const TransactionKeyMap* key_map, Env* env) {
//TODO: if we collect all locks in a range buffer and then
// make one call to lock_tree::release_locks(), will that be faster?
for (auto& key_map_iter : *key_map) {
@@ -855,15 +947,15 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env* env) {
{
MutexLock l(&range_list->mutex_);
/*
- The lt->release_locks() call below will walk range_list->buffer. We
+ The lt->release_locks() call below will walk range_list->buffer_. We
need to prevent lock escalation callback from replacing
- range_list->buffer while we are doing that.
+ range_list->buffer_ while we are doing that.
Additional complication here is internal mutex(es) in the locktree
(let's call them latches):
- Lock escalation first obtains latches on the lock tree
- Then, it calls RangeLockMgr::on_escalate to replace transaction's
- range_list->buffer.
+ range_list->buffer_.
= Access to that buffer must be synchronized, so it will want to
acquire the range_list->mutex_.
@@ -873,25 +965,25 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env* env) {
- and acquire latches on parts of the lock tree to remove locks from
it.
- How do we avoid the deadlock? Thei ideas is that here we set
- releasing_locks=true, and release the mutex.
+ How do we avoid the deadlock? The idea is that here we set
+ releasing_locks_=true, and release the mutex.
All other users of the range_list must:
- - Acquire the mutex, then check that releasing_locks=false.
+ - Acquire the mutex, then check that releasing_locks_=false.
(the code in this function doesnt do that as there's only one thread
- that does lock release)
+ that releases transaction's locks)
*/
- range_list->releasing_locks= true;
+ range_list->releasing_locks_= true;
}
// Don't try to call release_locks() if the buffer is empty! if we are
- // not holding any locks, the lock tree might be on STO-mode with another
- // transaction, and our attempt to release an empty set of locks will
- // cause an assertion failure.
- if (range_list->buffer.get_num_ranges())
- lt->release_locks((TXNID)txn, &range_list->buffer, true);
- range_list->buffer.destroy();
- range_list->buffer.create();
- range_list->releasing_locks= false;
+ // not holding any locks, the lock tree might be in the STO-mode with
+ // another transaction, and our attempt to release an empty set of locks
+ // will cause an assertion failure.
+ if (range_list->buffer_.get_num_ranges())
+ lt->release_locks((TXNID)txn, &range_list->buffer_, true);
+ range_list->buffer_.destroy();
+ range_list->buffer_.create();
+ range_list->releasing_locks_= false;
toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */);
}
@@ -899,16 +991,14 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env* env) {
int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg,
const DBT *a_key,
- const DBT *b_key)
-{
+ const DBT *b_key) {
RangeLockMgr* mgr= (RangeLockMgr*) arg;
return mgr->compare_endpoints((const char*)a_key->data, a_key->size,
(const char*)b_key->data, b_key->size);
}
-RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db)
-{
+RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db) {
ltm.create(on_create, on_destroy, on_escalate, NULL);
lt= nullptr;
}
@@ -925,14 +1015,13 @@ RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db)
*/
void RangeLockMgr::on_escalate(TXNID txnid, const locktree *lt,
- const range_buffer &buffer, void *extra)
-{
+ const range_buffer &buffer, void *extra) {
auto txn= (PessimisticTransaction*)txnid;
RangeLockList* trx_locks= (RangeLockList*)txn->owned_locks.get();
MutexLock l(&trx_locks->mutex_);
- if (trx_locks->releasing_locks) {
+ if (trx_locks->releasing_locks_) {
/*
Do nothing. The transaction is releasing its locks, so it will not care
about having a correct list of ranges. (In TokuDB,
@@ -942,20 +1031,20 @@ void RangeLockMgr::on_escalate(TXNID txnid, const locktree *lt,
}
// TODO: are we tracking this mem: lt->get_manager()->note_mem_released(trx_locks->ranges.buffer->total_memory_size());
- trx_locks->buffer.destroy();
- trx_locks->buffer.create();
+ trx_locks->buffer_.destroy();
+ trx_locks->buffer_.create();
toku::range_buffer::iterator iter(&buffer);
toku::range_buffer::iterator::record rec;
while (iter.current(&rec)) {
- trx_locks->buffer.append(rec.get_left_key(), rec.get_right_key());
- iter.next();
+ trx_locks->buffer_.append(rec.get_left_key(), rec.get_right_key());
+ iter.next();
}
// TODO: same as above: lt->get_manager()->note_mem_used(ranges.buffer->total_memory_size());
}
-void RangeLockMgr::set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_func,
- compare_endpoints_func cmp_func)
-{
+void
+RangeLockMgr::set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_func,
+ compare_endpoints_func cmp_func) {
convert_key_to_endpoint= cvt_func;
compare_endpoints= cmp_func;
@@ -963,20 +1052,18 @@ void RangeLockMgr::set_endpoint_cmp_functions(convert_key_to_endpoint_func cvt_f
assert(!lt);
toku::comparator cmp;
- //cmp.create(toku_builtin_compare_fun, NULL);
cmp.create(compare_dbt_endpoints, (void*)this, NULL);
DICTIONARY_ID dict_id = { .dictid = 1 };
lt= ltm.get_lt(dict_id, cmp , /* on_create_extra*/nullptr);
}
-uint64_t RangeLockMgr::get_escalation_count()
-{
+uint64_t RangeLockMgr::get_escalation_count() {
LTM_STATUS_S ltm_status_test;
ltm.get_status(<m_status_test);
- // psergey-todo: The below is how Toku's unit tests do it.
- // why didn't Toku just make LTM_ESCALATION_COUNT constant visible?
+ // Searching status variable by its string name is how Toku's unit tests
+ // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?)
TOKU_ENGINE_STATUS_ROW key_status = NULL;
// lookup keyname in status
for (int i = 0; ; i++) {
@@ -992,52 +1079,6 @@ uint64_t RangeLockMgr::get_escalation_count()
return key_status->value.num;
}
-void TransactionLockMgr::UnLock(const PessimisticTransaction* txn,
- const TransactionKeyMap* key_map, Env* env) {
- for (auto& key_map_iter : *key_map) {
- uint32_t column_family_id = key_map_iter.first;
- auto& keys = key_map_iter.second;
-
- std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
- LockMap* lock_map = lock_map_ptr.get();
-
- if (lock_map == nullptr) {
- // Column Family must have been dropped.
- return;
- }
-
- // Bucket keys by lock_map_ stripe
- std::unordered_map<size_t, std::vector<const std::string*>> keys_by_stripe(
- std::max(keys.size(), lock_map->num_stripes_));
-
- for (auto& key_iter : keys) {
- const std::string& key = key_iter.first;
-
- size_t stripe_num = lock_map->GetStripe(key);
- keys_by_stripe[stripe_num].push_back(&key);
- }
-
- // For each stripe, grab the stripe mutex and unlock all keys in this stripe
- for (auto& stripe_iter : keys_by_stripe) {
- size_t stripe_num = stripe_iter.first;
- auto& stripe_keys = stripe_iter.second;
-
- assert(lock_map->lock_map_stripes_.size() > stripe_num);
- LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
-
- stripe->stripe_mutex->Lock();
-
- for (const std::string* key : stripe_keys) {
- UnLockKey(txn, *key, stripe, lock_map, env);
- }
-
- stripe->stripe_mutex->UnLock();
-
- // Signal waiting threads to retry locking
- stripe->stripe_cv->NotifyAll();
- }
- }
-}
struct LOCK_PRINT_CONTEXT {
TransactionLockMgr::LockStatusData *data;
@@ -1047,8 +1088,7 @@ struct LOCK_PRINT_CONTEXT {
static
void push_into_lock_status_data(void* param, const DBT *left,
- const DBT *right, TXNID txnid_arg)
-{
+ const DBT *right, TXNID txnid_arg) {
struct LOCK_PRINT_CONTEXT *ctx= (LOCK_PRINT_CONTEXT*)param;
struct KeyLockInfo info;
@@ -1076,54 +1116,5 @@ TransactionLockMgr::LockStatusData RangeLockMgr::GetLockStatusData() {
return data;
}
-
-TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
- LockStatusData data;
- // Lock order here is important. The correct order is lock_map_mutex_, then
- // for every column family ID in ascending order lock every stripe in
- // ascending order.
- InstrumentedMutexLock l(&lock_map_mutex_);
-
- std::vector<uint32_t> cf_ids;
- for (const auto& map : lock_maps_) {
- cf_ids.push_back(map.first);
- }
- std::sort(cf_ids.begin(), cf_ids.end());
-
- for (auto i : cf_ids) {
- const auto& stripes = lock_maps_[i]->lock_map_stripes_;
- // Iterate and lock all stripes in ascending order.
- for (const auto& j : stripes) {
- j->stripe_mutex->Lock();
- for (const auto& it : j->keys) {
- struct KeyLockInfo info;
- info.exclusive = it.second.exclusive;
- info.key = it.first;
- for (const auto& id : it.second.txn_ids) {
- info.ids.push_back(id);
- }
- data.insert({i, info});
- }
- }
- }
-
- // Unlock everything. Unlocking order is not important.
- for (auto i : cf_ids) {
- const auto& stripes = lock_maps_[i]->lock_map_stripes_;
- for (const auto& j : stripes) {
- j->stripe_mutex->UnLock();
- }
- }
-
- return data;
-}
-std::vector<DeadlockPath> TransactionLockMgr::GetDeadlockInfoBuffer() {
- return dlock_buffer_.PrepareBuffer();
-}
-
-void TransactionLockMgr::Resize(uint32_t target_size) {
- dlock_buffer_.Resize(target_size);
-}
-
} // namespace rocksdb
#endif // ROCKSDB_LITE
diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h
index f0b406ab5..f2380aad7 100644
--- a/utilities/transactions/transaction_lock_mgr.h
+++ b/utilities/transactions/transaction_lock_mgr.h
@@ -188,8 +188,7 @@ using namespace toku;
*/
class RangeLockMgr :
public BaseLockMgr,
- public RangeLockMgrControl
-{
+ public RangeLockMgrControl {
public:
void AddColumnFamily(uint32_t column_family_id) override { /* do nothing */ }
void RemoveColumnFamily(uint32_t column_family_id) override { /* do nothing */ }