[Commits] fa7a3d580: Range Locking: add support for escalation barriers
revision-id: fa7a3d580fc0f3b8217f3d61b22554469527fbea (v6.26.0-142-gfa7a3d580) parent(s): eca85cdb6642c80ee1ac60eb758c7bd2627759f5 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2021-12-13 19:38:17 +0300 message: Range Locking: add support for escalation barriers Range Locking supports Lock Escalation. Lock Escalation is invoked when lock memory is nearly exhausted and it reduced the amount of memory used by joining adjacent locks. Bridging the gap between certain locks has adverse effects. For example, in MyRocks it is not a good idea to bridge the gap between locks in different indexes, as that get the lock to cover large portions of indexes, or even entire indexes. Resolve this by introducing Escalation Barrier. The escalation process will call the user-provided barrier callback function: bool(const Endpoint& a, const Endpoint& b) If the function returns true, there's a barrier between a and b and Lock Escalation will not try to bridge the gap between a and b. --- include/rocksdb/utilities/transaction_db.h | 6 +++ .../transactions/lock/range/range_locking_test.cc | 53 ++++++++++++++++++++++ .../lock/range/range_tree/lib/locktree/locktree.cc | 19 +++++++- .../lock/range/range_tree/lib/locktree/locktree.h | 14 ++++++ .../range/range_tree/range_tree_lock_manager.cc | 30 +++++++++++- .../range/range_tree/range_tree_lock_manager.h | 12 ++++- 6 files changed, 130 insertions(+), 4 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 265d4b79a..ca850cbdf 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -97,6 +97,12 @@ class RangeLockManagerHandle : public LockManagerHandle { using RangeLockStatus = std::unordered_multimap<ColumnFamilyId, RangeLockInfo>; + // Lock escalation barrier function. It returns true if lock escalation + // is not allowed to bridge the gap between the endpoints a and b. + using EscalationBarrierFunc = std::function<bool(const Endpoint& a, + const Endpoint& b)>; + virtual void SetEscalationBarrierFunc(EscalationBarrierFunc func) = 0; + virtual RangeLockStatus GetRangeLockStatusData() = 0; class Counters { diff --git a/utilities/transactions/lock/range/range_locking_test.cc b/utilities/transactions/lock/range/range_locking_test.cc index c881b68cb..be94623f9 100644 --- a/utilities/transactions/lock/range/range_locking_test.cc +++ b/utilities/transactions/lock/range/range_locking_test.cc @@ -286,6 +286,59 @@ TEST_F(RangeLockingTest, BasicLockEscalation) { delete txn; } + +/* + An escalation barrier function. Allow escalation iff the first two bytes are + identical. +*/ +static bool escalation_barrier(const Endpoint& a, const Endpoint& b) { + assert(a.slice.size()>2); + assert(b.slice.size()>2); + if (memcmp(a.slice.data(), b.slice.data(), 2)) + return true; // This is a barrier + else + return false; // No barrier +} + +TEST_F(RangeLockingTest, LockEscalationBarrier) { + auto cf = db->DefaultColumnFamily(); + + auto counters = range_lock_mgr->GetStatus(); + + // Initially not using any lock memory + ASSERT_EQ(counters.escalation_count, 0); + + range_lock_mgr->SetMaxLockMemory(8000); + range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier); + + // Insert enough locks to cause lock escalations to happen + auto txn = NewTxn(); + const int N=2000; + for (int i = 0; i < N; i++) { + char buf[32]; + snprintf(buf, sizeof(buf) - 1, "%04d", i); + ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf), Endpoint(buf))); + } + counters = range_lock_mgr->GetStatus(); + ASSERT_GT(counters.escalation_count, 0); + + // Check that lock escalation was not performed across escalation barriers: + // Use another txn to acquire locks near the barriers. + auto txn2 = NewTxn(); + range_lock_mgr->SetMaxLockMemory(500000); + for (int i = 100; i < N; i+=100) { + char buf[32]; + snprintf(buf, sizeof(buf) - 1, "%04d-a", i-1); + // Check that we CAN get a lock near the escalation barrier + ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf), Endpoint(buf))); + } + + txn->Rollback(); + txn2->Rollback(); + delete txn; + delete txn2; +} + #endif void PointLockManagerTestExternalSetup(PointLockManagerTest* self) { diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc index c238b0204..0959beced 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc @@ -96,9 +96,19 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, m_sto_end_early_count = 0; m_sto_end_early_time = 0; + m_escalation_barrier = [](const DBT*, const DBT*, void *) -> bool { return false; }; + m_lock_request_info.init(mutex_factory); } +void +locktree::set_escalation_barrier_func(lt_escalation_barrier_check_func func, + void *extra) { + m_escalation_barrier= func; + m_escalation_barrier_arg= extra; +} + + void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) { pending_lock_requests.create(); pending_is_empty = true; @@ -863,14 +873,19 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback, // - belongs to a different txnid, or // - belongs to several txnids, or // - is a shared lock (we could potentially merge those but - // currently we don't) + // currently we don't), or + // - is across a lock escalation barrier. int next_txnid_index = current_index + 1; while (next_txnid_index < num_extracted && (extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) && !extracted_buf[next_txnid_index].is_shared && - !extracted_buf[next_txnid_index].owners) { + !extracted_buf[next_txnid_index].owners && + !m_escalation_barrier( + extracted_buf[current_index].range.get_right_key(), + extracted_buf[next_txnid_index].range.get_left_key(), + m_escalation_barrier_arg)) { next_txnid_index++; } diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h index 3e438f502..e416ac5a3 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h @@ -85,6 +85,9 @@ typedef void (*lt_destroy_cb)(locktree *lt); typedef void (*lt_escalate_cb)(TXNID txnid, const locktree *lt, const range_buffer &buffer, void *extra); +typedef bool (*lt_escalation_barrier_check_func)(const DBT* a, const DBT *b, + void* extra); + struct lt_counters { uint64_t wait_count, wait_time; uint64_t long_wait_count, long_wait_time; @@ -343,6 +346,14 @@ class locktree { void set_comparator(const comparator &cmp); + // Escalation barrier prevents lock escalation. + // For two keys A and B, if escalation_barrier_check_func(A, B)==true, then + // there's a barrier between them, and lock escalation is not allowed to + // bridge the gap between A and B. + // This method sets the user-provided barrier check function. + void set_escalation_barrier_func(lt_escalation_barrier_check_func func, + void *extra); + int compare(const locktree *lt) const; DICTIONARY_ID get_dict_id() const; @@ -373,6 +384,9 @@ class locktree { // userdata pointer below. see locktree_manager::get_lt w/ on_create_extra comparator m_cmp; + lt_escalation_barrier_check_func m_escalation_barrier; + void *m_escalation_barrier_arg; + concurrent_tree *m_rangetree; void *m_userdata; diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc index 6dfb78d3f..b1788b9b7 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -58,6 +58,16 @@ void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) { endp->slice.assign(dbt_data + 1, dbt->size - 1); } +// Same as above, but decode into Endpoint structure +void deserialize_endpoint(const DBT* dbt, Endpoint* endp) { + assert(dbt->size >= 1); + const char* dbt_data = (const char*)dbt->data; + char suffix = dbt_data[0]; + assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM); + endp->inf_suffix = (suffix == SUFFIX_SUPREMUM); + endp->slice= Slice(dbt_data + 1, dbt->size - 1); +} + // Get a range lock on [start_key; end_key] range Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, uint32_t column_family_id, @@ -262,6 +272,24 @@ RangeTreeLockManager::RangeTreeLockManager( ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_); } +int RangeTreeLockManager::on_create(locktree* lt, void *arg) +{ + // arg is a pointer to RangeTreeLockManager + lt->set_escalation_barrier_func(&OnEscalationBarrierCheck, arg); + return 0; +} + +bool RangeTreeLockManager::OnEscalationBarrierCheck(const DBT *a, + const DBT *b, + void *extra) +{ + Endpoint a_endp, b_endp; + deserialize_endpoint(a, &a_endp); + deserialize_endpoint(b, &b_endp); + auto self = (RangeTreeLockManager*) extra; + return self->barrier_func_(a_endp, b_endp); +} + void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize( uint32_t target_size) { dlock_buffer_.Resize(target_size); @@ -351,7 +379,7 @@ void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) { toku::comparator cmp; cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator()); toku::locktree* ltree = ltm_.get_lt(dict_id, cmp, - /* on_create_extra*/ nullptr); + /* on_create_extra*/ (void*)this); // This is ok to because get_lt has copied the comparator: cmp.destroy(); diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h index 5d55ded02..3cbae850c 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h @@ -93,9 +93,16 @@ class RangeTreeLockManager : public RangeLockManagerBase, // Get the locktree which stores locks for the Column Family with given cf_id std::shared_ptr<locktree> GetLockTreeForCF(ColumnFamilyId cf_id); + void SetEscalationBarrierFunc(EscalationBarrierFunc func) override { + barrier_func_ = func; + } + private: toku::locktree_manager ltm_; + EscalationBarrierFunc barrier_func_ = + [](const Endpoint&, const Endpoint&)->bool { return false; }; + std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; // Map from cf_id to locktree*. Can only be accessed while holding the @@ -116,10 +123,13 @@ class RangeTreeLockManager : public RangeLockManagerBase, static int CompareDbtEndpoints(void* arg, const DBT* a_key, const DBT* b_key); // Callbacks - static int on_create(locktree*, void*) { return 0; /* no error */ } + static int on_create(locktree*, void*); static void on_destroy(locktree*) {} static void on_escalate(TXNID txnid, const locktree* lt, const range_buffer& buffer, void* extra); + + static bool OnEscalationBarrierCheck(const DBT *a, const DBT *b, + void *extra); }; void serialize_endpoint(const Endpoint& endp, std::string* buf);
participants (1)
-
psergey