[Commits] 190a29d06: Make Lock Tree's lock waits use externally provided mutexes/conditions
revision-id: 190a29d06f4e79d2df4cb513944ac34bd133caa0 (v5.8-1024-g190a29d06) parent(s): 7a8c777d4880cde8fd96135f6cc216223371a9f4 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-03-12 11:52:17 +0300 message: Make Lock Tree's lock waits use externally provided mutexes/conditions In MyRocks, this allows: - SHOW PROCESSLIST to show "waiting for row lock" for waiting threads - KILL statement to abort the lock wait in the same way as it does it with point locking (without use of handlerton::kill_connection() method) - Remove previously added TransactionDB::KillLockWait() method as it is no longer needed. --- include/rocksdb/utilities/transaction_db.h | 1 - .../transactions/pessimistic_transaction_db.cc | 12 +++- .../transactions/pessimistic_transaction_db.h | 4 -- .../range_locking/locktree/lock_request.cc | 35 ++++++---- .../range_locking/locktree/lock_request.h | 9 ++- .../range_locking/locktree/locktree.cc | 12 ++-- .../transactions/range_locking/locktree/locktree.h | 13 ++-- .../transactions/range_locking/locktree/manager.cc | 17 +++-- .../portability/toku_external_pthread.h | 81 ++++++++++++++++++++++ utilities/transactions/transaction_lock_mgr.cc | 15 ++-- utilities/transactions/transaction_lock_mgr.h | 6 +- 11 files changed, 155 insertions(+), 50 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 2a98a8d3d..fb3c6a88c 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -291,7 +291,6 @@ class TransactionDB : public StackableDB { bool use_range_locking; virtual RangeLockMgrControl* get_range_lock_manager() { return nullptr; } - virtual void KillLockWait(TransactionID txnid){}; protected: // To Create an TransactionDB, call Open() // The ownership of db is transferred to the base StackableDB diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index d44f329e5..308187e13 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -41,7 +41,11 @@ PessimisticTransactionDB::PessimisticTransactionDB( ? txn_db_options_.custom_mutex_factory : std::shared_ptr<TransactionDBMutexFactory>( new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this) { + range_lock_mgr_(this, + txn_db_options_.custom_mutex_factory? + txn_db_options_.custom_mutex_factory : + std::shared_ptr<TransactionDBMutexFactory>( + new TransactionDBMutexFactoryImpl())) { assert(db_impl_ != nullptr); info_log_ = db_impl_->GetDBOptions().info_log; } @@ -73,7 +77,11 @@ PessimisticTransactionDB::PessimisticTransactionDB( ? txn_db_options_.custom_mutex_factory : std::shared_ptr<TransactionDBMutexFactory>( new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this) { + range_lock_mgr_(this, + txn_db_options_.custom_mutex_factory + ? txn_db_options_.custom_mutex_factory + : std::shared_ptr<TransactionDBMutexFactory>( + new TransactionDBMutexFactoryImpl())) { assert(db_impl_ != nullptr); } diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 07ed3fed2..388ed4099 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -127,10 +127,6 @@ class PessimisticTransactionDB : public TransactionDB { virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} - void KillLockWait(TransactionID txnid) override { - if (use_range_locking) - range_lock_mgr_.KillLockWait(txnid); - } protected: DBImpl* db_impl_; std::shared_ptr<Logger> info_log_; diff --git a/utilities/transactions/range_locking/locktree/lock_request.cc b/utilities/transactions/range_locking/locktree/lock_request.cc index 0b96b2051..84cfdbf62 100644 --- a/utilities/transactions/range_locking/locktree/lock_request.cc +++ b/utilities/transactions/range_locking/locktree/lock_request.cc @@ -59,7 +59,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. namespace toku { // initialize a lock request's internals -void lock_request::create(void) { +void lock_request::create(toku_external_mutex_factory_t mutex_factory) { m_txnid = TXNID_NONE; m_conflicting_txnid = TXNID_NONE; m_start_time = 0; @@ -75,7 +75,9 @@ void lock_request::create(void) { m_state = state::UNINITIALIZED; m_info = nullptr; - toku_cond_init(*lock_request_m_wait_cond_key, &m_wait_cond, nullptr); + //psergey-todo: this condition is for interruptible wait + // note: moved to here from lock_request::create: + toku_external_cond_init(mutex_factory, &m_wait_cond); m_start_test_callback = nullptr; m_start_before_pending_test_callback = nullptr; @@ -89,13 +91,14 @@ void lock_request::destroy(void) { m_state = state::DESTROYED; toku_destroy_dbt(&m_left_key_copy); toku_destroy_dbt(&m_right_key_copy); - toku_cond_destroy(&m_wait_cond); + toku_external_cond_destroy(&m_wait_cond); } // set the lock request parameters. this API allows a lock request to be reused. void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) { invariant(m_state != state::PENDING); m_lt = lt; + m_txnid = txnid; m_left_key = left_key; m_right_key = right_key; @@ -190,13 +193,13 @@ int lock_request::start(void) { m_conflicting_txnid = conflicts.get(0); if (m_start_before_pending_test_callback) m_start_before_pending_test_callback(); - toku_mutex_lock(&m_info->mutex); + toku_external_mutex_lock(&m_info->mutex); insert_into_lock_requests(); if (deadlock_exists(conflicts)) { remove_from_lock_requests(); r = DB_LOCK_DEADLOCK; } - toku_mutex_unlock(&m_info->mutex); + toku_external_mutex_unlock(&m_info->mutex); if (m_start_test_callback) m_start_test_callback(); // test callback } @@ -221,7 +224,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil uint64_t t_start = t_now; uint64_t t_end = t_start + wait_time_ms * 1000; - toku_mutex_lock(&m_info->mutex); + toku_external_mutex_lock(&m_info->mutex); // check again, this time locking out other retry calls if (m_state == state::PENDING) { @@ -252,10 +255,14 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil if (t_wait > t_end) t_wait = t_end; } + /* + PORT: don't need to compute "time when the wait should end" anymore. struct timespec ts = {0, 0}; ts.tv_sec = t_wait / 1000000; ts.tv_nsec = (t_wait % 1000000) * 1000; - int r = toku_cond_timedwait(&m_wait_cond, &m_info->mutex, &ts); + */ + int r = toku_external_cond_timedwait(&m_wait_cond, &m_info->mutex, + int32_t(wait_time_ms)*1000); invariant(r == 0 || r == ETIMEDOUT); t_now = toku_current_time_microsec(); @@ -279,7 +286,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil m_info->counters.long_wait_count += 1; m_info->counters.long_wait_time += duration; } - toku_mutex_unlock(&m_info->mutex); + toku_external_mutex_unlock(&m_info->mutex); invariant(m_state == state::COMPLETE); return m_complete_r; @@ -332,7 +339,7 @@ int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) { complete(r); if (m_retry_test_callback) m_retry_test_callback(); // test callback - toku_cond_broadcast(&m_wait_cond); + toku_external_cond_broadcast(&m_wait_cond); } else { m_conflicting_txnid = conflicts.get(0); add_conflicts_to_waits(&conflicts, conflicts_collector); @@ -393,7 +400,7 @@ void lock_request::retry_all_lock_requests( } void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, GrowableArray<TXNID> *collector) { - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex); // retry all of the pending lock requests. for (size_t i = 0; i < info->pending_lock_requests.size();) { lock_request *request; @@ -412,7 +419,7 @@ void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, Grow // future threads should only retry lock requests if some still exist info->should_retry_lock_requests = info->pending_lock_requests.size() > 0; - toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); } void lock_request::add_conflicts_to_waits(txnid_set *conflicts, @@ -444,12 +451,12 @@ void *lock_request::get_extra(void) const { void lock_request::kill_waiter(void) { remove_from_lock_requests(); complete(DB_LOCK_NOTGRANTED); - toku_cond_broadcast(&m_wait_cond); + toku_external_cond_broadcast(&m_wait_cond); } void lock_request::kill_waiter(locktree *lt, void *extra) { lt_lock_request_info *info = lt->get_lock_request_info(); - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex); for (size_t i = 0; i < info->pending_lock_requests.size(); i++) { lock_request *request; int r = info->pending_lock_requests.fetch(i, &request); @@ -458,7 +465,7 @@ void lock_request::kill_waiter(locktree *lt, void *extra) { break; } } - toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); } // find another lock request by txnid. must hold the mutex. diff --git a/utilities/transactions/range_locking/locktree/lock_request.h b/utilities/transactions/range_locking/locktree/lock_request.h index 8b9529344..b17b639c8 100644 --- a/utilities/transactions/range_locking/locktree/lock_request.h +++ b/utilities/transactions/range_locking/locktree/lock_request.h @@ -84,7 +84,7 @@ public: }; // effect: Initializes a lock request. - void create(void); + void create(toku_external_mutex_factory_t mutex_factory); // effect: Destroys a lock request. void destroy(void); @@ -171,7 +171,7 @@ public: int m_complete_r; state m_state; - toku_cond_t m_wait_cond; + toku_external_cond_t m_wait_cond; bool m_big_txn; @@ -227,6 +227,9 @@ public: friend class lock_request_unit_test; }; -ENSURE_POD(lock_request); +// PORT: lock_request is not a POD anymore due to use of toku_external_cond_t +// This is ok as the PODness is not really required: lock_request objects are +// not moved in memory or anything. +//ENSURE_POD(lock_request); } /* namespace toku */ diff --git a/utilities/transactions/range_locking/locktree/locktree.cc b/utilities/transactions/range_locking/locktree/locktree.cc index 4ecfb2d26..9b530c7b0 100644 --- a/utilities/transactions/range_locking/locktree/locktree.cc +++ b/utilities/transactions/range_locking/locktree/locktree.cc @@ -74,7 +74,9 @@ namespace toku { // but does nothing based on the value of the reference count - it is // up to the user of the locktree to destroy it when it sees fit. -void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) { +void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, + const comparator &cmp, + toku_external_mutex_factory_t mutex_factory) { m_mgr = mgr; m_dict_id = dict_id; @@ -91,14 +93,14 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const compar m_sto_end_early_count = 0; m_sto_end_early_time = 0; - m_lock_request_info.init(); + m_lock_request_info.init(mutex_factory); } -void lt_lock_request_info::init(void) { +void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) { pending_lock_requests.create(); pending_is_empty = true; ZERO_STRUCT(mutex); - toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr); + toku_external_mutex_init(mutex_factory, &mutex); retry_want = retry_done = 0; ZERO_STRUCT(counters); ZERO_STRUCT(retry_mutex); @@ -124,7 +126,7 @@ void locktree::destroy(void) { void lt_lock_request_info::destroy(void) { pending_lock_requests.destroy(); - toku_mutex_destroy(&mutex); + toku_external_mutex_destroy(&mutex); toku_mutex_destroy(&retry_mutex); toku_cond_destroy(&retry_cv); } diff --git a/utilities/transactions/range_locking/locktree/locktree.h b/utilities/transactions/range_locking/locktree/locktree.h index 4f0530855..5ff4f7449 100644 --- a/utilities/transactions/range_locking/locktree/locktree.h +++ b/utilities/transactions/range_locking/locktree/locktree.h @@ -55,6 +55,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include <db.h> #include <toku_pthread.h> +#include <toku_external_pthread.h> #include <toku_time.h> #include <ft/comparator.h> @@ -102,7 +103,7 @@ namespace toku { struct lt_lock_request_info { omt<lock_request *> pending_lock_requests; std::atomic_bool pending_is_empty; - toku_mutex_t mutex; + toku_external_mutex_t mutex; bool should_retry_lock_requests; lt_counters counters; std::atomic_ullong retry_want; @@ -111,7 +112,7 @@ namespace toku { toku_cond_t retry_cv; bool running_retry; - void init(void); + void init(toku_external_mutex_factory_t mutex_factory); void destroy(void); }; @@ -127,7 +128,8 @@ namespace toku { void create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, - void *extra); + void *extra, + toku_external_mutex_factory_t mutex_factory_arg); void destroy(void); @@ -212,6 +214,8 @@ namespace toku { omt<locktree *> m_locktree_map; + toku_external_mutex_factory_t mutex_factory; + // the manager's mutex protects the locktree map toku_mutex_t m_mutex; @@ -275,7 +279,8 @@ namespace toku { class locktree { public: // effect: Creates a locktree - void create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp); + void create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp, + toku_external_mutex_factory_t mutex_factory); void destroy(void); diff --git a/utilities/transactions/range_locking/locktree/manager.cc b/utilities/transactions/range_locking/locktree/manager.cc index c31158bb8..47a782565 100644 --- a/utilities/transactions/range_locking/locktree/manager.cc +++ b/utilities/transactions/range_locking/locktree/manager.cc @@ -60,7 +60,12 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. namespace toku { -void locktree_manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) { +void locktree_manager::create(lt_create_cb create_cb, + lt_destroy_cb destroy_cb, + lt_escalate_cb escalate_cb, + void *escalate_extra, + toku_external_mutex_factory_t mutex_factory_arg) { + mutex_factory= mutex_factory_arg; m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY; m_current_lock_memory = 0; @@ -151,7 +156,7 @@ locktree *locktree_manager::get_lt(DICTIONARY_ID dict_id, locktree *lt = locktree_map_find(dict_id); if (lt == nullptr) { XCALLOC(lt); - lt->create(this, dict_id, cmp); + lt->create(this, dict_id, cmp, mutex_factory); // new locktree created - call the on_create callback // and put it in the locktree map @@ -320,7 +325,7 @@ int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callbac invariant_zero(r); struct lt_lock_request_info *info = lt->get_lock_request_info(); - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex); size_t num_requests = info->pending_lock_requests.size(); for (size_t k = 0; k < num_requests && r == 0; k++) { @@ -332,7 +337,7 @@ int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callbac req->get_conflicting_txnid(), req->get_start_time(), extra); } - toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); } mutex_unlock(); return r; @@ -472,10 +477,10 @@ void locktree_manager::get_status(LTM_STATUS statp) { locktree *lt; int r = m_locktree_map.fetch(i, <); invariant_zero(r); - if (toku_mutex_trylock(<->m_lock_request_info.mutex) == 0) { + if (toku_external_mutex_trylock(<->m_lock_request_info.mutex) == 0) { lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size(); lt_counters.add(lt->get_lock_request_info()->counters); - toku_mutex_unlock(<->m_lock_request_info.mutex); + toku_external_mutex_unlock(<->m_lock_request_info.mutex); } sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0; sto_end_early_count += lt->m_sto_end_early_count; diff --git a/utilities/transactions/range_locking/portability/toku_external_pthread.h b/utilities/transactions/range_locking/portability/toku_external_pthread.h new file mode 100644 index 000000000..2482ec507 --- /dev/null +++ b/utilities/transactions/range_locking/portability/toku_external_pthread.h @@ -0,0 +1,81 @@ +/* + A wrapper around rocksdb::TransactionDBMutexFactory-provided condition and mutex + that provides toku_pthread_*-like interface. The functions are named + + toku_external_{mutex|cond}_XXX + + Lock Tree uses this mutex and condition for interruptible (long) lock waits. + + (It also still uses toku_pthread_XXX calls for mutexes/conditions for + shorter waits on internal objects) +*/ + +#pragma once + +#include <pthread.h> +#include <time.h> +#include <stdint.h> + +#include "toku_portability.h" + +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/transaction_db_mutex.h" + +using rocksdb::TransactionDBMutex; +using rocksdb::TransactionDBCondVar; + +typedef std::shared_ptr<rocksdb::TransactionDBMutexFactory> toku_external_mutex_factory_t; + +typedef std::shared_ptr<TransactionDBMutex> toku_external_mutex_t; +typedef std::shared_ptr<TransactionDBCondVar> toku_external_cond_t; + +static inline +void toku_external_cond_init(toku_external_mutex_factory_t mutex_factory, + toku_external_cond_t *cond) { + *cond= mutex_factory->AllocateCondVar(); +} + +inline void toku_external_cond_destroy(toku_external_cond_t *cond) { + cond->reset(); // this will destroy the managed object +} + +inline void toku_external_cond_signal(toku_external_cond_t *cond) { + (*cond)->Notify(); +} + +inline void toku_external_cond_broadcast(toku_external_cond_t *cond) { + (*cond)->NotifyAll(); +} + +inline int toku_external_cond_timedwait(toku_external_cond_t *cond, + toku_external_mutex_t *mutex, + int32_t timeout_microsec) { + auto res= (*cond)->WaitFor(*mutex, timeout_microsec); + if (res.ok()) + return 0; + else + return ETIMEDOUT; +} + +inline void toku_external_mutex_init(toku_external_mutex_factory_t factory, + toku_external_mutex_t *mutex) { + *mutex = factory->AllocateMutex(); +} + +inline void toku_external_mutex_lock(toku_external_mutex_t *mutex) { + (*mutex)->Lock(); +} + +inline int toku_external_mutex_trylock(toku_external_mutex_t *mutex) { + (*mutex)->Lock(); + return 0; +} + +inline void toku_external_mutex_unlock(toku_external_mutex_t *mutex) { + (*mutex)->UnLock(); +} + +inline void toku_external_mutex_destroy(toku_external_mutex_t *mutex) { + mutex->reset(); // this will destroy the managed object +} + diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 56f3295fd..79164a962 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -790,11 +790,6 @@ public: bool releasing_locks_; }; - -void RangeLockMgr::KillLockWait(TransactionID txnid) { - ltm.kill_waiter((void*)txnid); -} - // Get a range lock on [start_key; end_key] range Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, uint32_t column_family_id, @@ -802,7 +797,7 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, const rocksdb::Slice &end_key, bool exclusive) { toku::lock_request request; - request.create(); + request.create(mutex_factory_); DBT start_key_dbt, end_key_dbt; toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); @@ -810,6 +805,8 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, // Use the txnid as "extra" in the lock_request. Then, KillLockWait() // will be able to use kill_waiter(txn_id) to kill the wait if needed + // TODO: KillLockWait is gone and we are no longer using + // locktree::kill_waiter call. Do we need this anymore? TransactionID wait_txn_id = txn->GetID(); request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, @@ -1004,8 +1001,10 @@ int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg, } -RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db) { - ltm.create(on_create, on_destroy, on_escalate, NULL); +RangeLockMgr::RangeLockMgr(TransactionDB* txn_db, + std::shared_ptr<TransactionDBMutexFactory> mutex_factory) : + my_txn_db(txn_db), mutex_factory_(mutex_factory) { + ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); lt= nullptr; cmp_initialized_= false; } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 0c3284519..9f73e5a07 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -215,11 +215,10 @@ class RangeLockMgr : void UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env) override ; - RangeLockMgr(TransactionDB* txn_db); + RangeLockMgr(TransactionDB* txn_db, + std::shared_ptr<TransactionDBMutexFactory> mutex_factory); ~RangeLockMgr(); - void KillLockWait(TransactionID txnid); - int set_max_lock_memory(size_t max_lock_memory) override { return ltm.set_max_lock_memory(max_lock_memory); @@ -251,6 +250,7 @@ class RangeLockMgr : compare_endpoints_func compare_endpoints; TransactionDB* my_txn_db; + std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; static int compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key);
Hi Sergei! Just wondering why keeping the toku_ prefix after renaming toku_* to toku_external_* since it’s also used by other engine like RockDB? BR, Jocelyn Fournier
Le 12 mars 2019 à 09:52, Sergei Petrunia <psergey@askmonty.org> a écrit :
revision-id: 190a29d06f4e79d2df4cb513944ac34bd133caa0 (v5.8-1024-g190a29d06) parent(s): 7a8c777d4880cde8fd96135f6cc216223371a9f4 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-03-12 11:52:17 +0300 message:
Make Lock Tree's lock waits use externally provided mutexes/conditions
In MyRocks, this allows: - SHOW PROCESSLIST to show "waiting for row lock" for waiting threads - KILL statement to abort the lock wait in the same way as it does it with point locking (without use of handlerton::kill_connection() method)
- Remove previously added TransactionDB::KillLockWait() method as it is no longer needed.
--- include/rocksdb/utilities/transaction_db.h | 1 - .../transactions/pessimistic_transaction_db.cc | 12 +++- .../transactions/pessimistic_transaction_db.h | 4 -- .../range_locking/locktree/lock_request.cc | 35 ++++++---- .../range_locking/locktree/lock_request.h | 9 ++- .../range_locking/locktree/locktree.cc | 12 ++-- .../transactions/range_locking/locktree/locktree.h | 13 ++-- .../transactions/range_locking/locktree/manager.cc | 17 +++-- .../portability/toku_external_pthread.h | 81 ++++++++++++++++++++++ utilities/transactions/transaction_lock_mgr.cc | 15 ++-- utilities/transactions/transaction_lock_mgr.h | 6 +- 11 files changed, 155 insertions(+), 50 deletions(-)
diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 2a98a8d3d..fb3c6a88c 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -291,7 +291,6 @@ class TransactionDB : public StackableDB { bool use_range_locking; virtual RangeLockMgrControl* get_range_lock_manager() { return nullptr; }
- virtual void KillLockWait(TransactionID txnid){}; protected: // To Create an TransactionDB, call Open() // The ownership of db is transferred to the base StackableDB diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index d44f329e5..308187e13 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -41,7 +41,11 @@ PessimisticTransactionDB::PessimisticTransactionDB( ? txn_db_options_.custom_mutex_factory : std::shared_ptr<TransactionDBMutexFactory>( new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this) { + range_lock_mgr_(this, + txn_db_options_.custom_mutex_factory? + txn_db_options_.custom_mutex_factory : + std::shared_ptr<TransactionDBMutexFactory>( + new TransactionDBMutexFactoryImpl())) { assert(db_impl_ != nullptr); info_log_ = db_impl_->GetDBOptions().info_log; } @@ -73,7 +77,11 @@ PessimisticTransactionDB::PessimisticTransactionDB( ? txn_db_options_.custom_mutex_factory : std::shared_ptr<TransactionDBMutexFactory>( new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this) { + range_lock_mgr_(this, + txn_db_options_.custom_mutex_factory + ? txn_db_options_.custom_mutex_factory + : std::shared_ptr<TransactionDBMutexFactory>( + new TransactionDBMutexFactoryImpl())) { assert(db_impl_ != nullptr); }
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 07ed3fed2..388ed4099 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -127,10 +127,6 @@ class PessimisticTransactionDB : public TransactionDB { virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
- void KillLockWait(TransactionID txnid) override { - if (use_range_locking) - range_lock_mgr_.KillLockWait(txnid); - } protected: DBImpl* db_impl_; std::shared_ptr<Logger> info_log_; diff --git a/utilities/transactions/range_locking/locktree/lock_request.cc b/utilities/transactions/range_locking/locktree/lock_request.cc index 0b96b2051..84cfdbf62 100644 --- a/utilities/transactions/range_locking/locktree/lock_request.cc +++ b/utilities/transactions/range_locking/locktree/lock_request.cc @@ -59,7 +59,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. namespace toku {
// initialize a lock request's internals -void lock_request::create(void) { +void lock_request::create(toku_external_mutex_factory_t mutex_factory) { m_txnid = TXNID_NONE; m_conflicting_txnid = TXNID_NONE; m_start_time = 0; @@ -75,7 +75,9 @@ void lock_request::create(void) { m_state = state::UNINITIALIZED; m_info = nullptr;
- toku_cond_init(*lock_request_m_wait_cond_key, &m_wait_cond, nullptr); + //psergey-todo: this condition is for interruptible wait + // note: moved to here from lock_request::create: + toku_external_cond_init(mutex_factory, &m_wait_cond);
m_start_test_callback = nullptr; m_start_before_pending_test_callback = nullptr; @@ -89,13 +91,14 @@ void lock_request::destroy(void) { m_state = state::DESTROYED; toku_destroy_dbt(&m_left_key_copy); toku_destroy_dbt(&m_right_key_copy); - toku_cond_destroy(&m_wait_cond); + toku_external_cond_destroy(&m_wait_cond); }
// set the lock request parameters. this API allows a lock request to be reused. void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) { invariant(m_state != state::PENDING); m_lt = lt; + m_txnid = txnid; m_left_key = left_key; m_right_key = right_key; @@ -190,13 +193,13 @@ int lock_request::start(void) { m_conflicting_txnid = conflicts.get(0); if (m_start_before_pending_test_callback) m_start_before_pending_test_callback(); - toku_mutex_lock(&m_info->mutex); + toku_external_mutex_lock(&m_info->mutex); insert_into_lock_requests(); if (deadlock_exists(conflicts)) { remove_from_lock_requests(); r = DB_LOCK_DEADLOCK; } - toku_mutex_unlock(&m_info->mutex); + toku_external_mutex_unlock(&m_info->mutex); if (m_start_test_callback) m_start_test_callback(); // test callback } @@ -221,7 +224,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil uint64_t t_start = t_now; uint64_t t_end = t_start + wait_time_ms * 1000;
- toku_mutex_lock(&m_info->mutex); + toku_external_mutex_lock(&m_info->mutex);
// check again, this time locking out other retry calls if (m_state == state::PENDING) { @@ -252,10 +255,14 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil if (t_wait > t_end) t_wait = t_end; } + /* + PORT: don't need to compute "time when the wait should end" anymore. struct timespec ts = {0, 0}; ts.tv_sec = t_wait / 1000000; ts.tv_nsec = (t_wait % 1000000) * 1000; - int r = toku_cond_timedwait(&m_wait_cond, &m_info->mutex, &ts); + */ + int r = toku_external_cond_timedwait(&m_wait_cond, &m_info->mutex, + int32_t(wait_time_ms)*1000); invariant(r == 0 || r == ETIMEDOUT);
t_now = toku_current_time_microsec(); @@ -279,7 +286,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil m_info->counters.long_wait_count += 1; m_info->counters.long_wait_time += duration; } - toku_mutex_unlock(&m_info->mutex); + toku_external_mutex_unlock(&m_info->mutex);
invariant(m_state == state::COMPLETE); return m_complete_r; @@ -332,7 +339,7 @@ int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) { complete(r); if (m_retry_test_callback) m_retry_test_callback(); // test callback - toku_cond_broadcast(&m_wait_cond); + toku_external_cond_broadcast(&m_wait_cond); } else { m_conflicting_txnid = conflicts.get(0); add_conflicts_to_waits(&conflicts, conflicts_collector); @@ -393,7 +400,7 @@ void lock_request::retry_all_lock_requests( }
void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, GrowableArray<TXNID> *collector) { - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex); // retry all of the pending lock requests. for (size_t i = 0; i < info->pending_lock_requests.size();) { lock_request *request; @@ -412,7 +419,7 @@ void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, Grow
// future threads should only retry lock requests if some still exist info->should_retry_lock_requests = info->pending_lock_requests.size() > 0; - toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); }
void lock_request::add_conflicts_to_waits(txnid_set *conflicts, @@ -444,12 +451,12 @@ void *lock_request::get_extra(void) const { void lock_request::kill_waiter(void) { remove_from_lock_requests(); complete(DB_LOCK_NOTGRANTED); - toku_cond_broadcast(&m_wait_cond); + toku_external_cond_broadcast(&m_wait_cond); }
void lock_request::kill_waiter(locktree *lt, void *extra) { lt_lock_request_info *info = lt->get_lock_request_info(); - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex); for (size_t i = 0; i < info->pending_lock_requests.size(); i++) { lock_request *request; int r = info->pending_lock_requests.fetch(i, &request); @@ -458,7 +465,7 @@ void lock_request::kill_waiter(locktree *lt, void *extra) { break; } } - toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); }
// find another lock request by txnid. must hold the mutex. diff --git a/utilities/transactions/range_locking/locktree/lock_request.h b/utilities/transactions/range_locking/locktree/lock_request.h index 8b9529344..b17b639c8 100644 --- a/utilities/transactions/range_locking/locktree/lock_request.h +++ b/utilities/transactions/range_locking/locktree/lock_request.h @@ -84,7 +84,7 @@ public: };
// effect: Initializes a lock request. - void create(void); + void create(toku_external_mutex_factory_t mutex_factory);
// effect: Destroys a lock request. void destroy(void); @@ -171,7 +171,7 @@ public: int m_complete_r; state m_state;
- toku_cond_t m_wait_cond; + toku_external_cond_t m_wait_cond;
bool m_big_txn;
@@ -227,6 +227,9 @@ public:
friend class lock_request_unit_test; }; -ENSURE_POD(lock_request); +// PORT: lock_request is not a POD anymore due to use of toku_external_cond_t +// This is ok as the PODness is not really required: lock_request objects are +// not moved in memory or anything. +//ENSURE_POD(lock_request);
} /* namespace toku */ diff --git a/utilities/transactions/range_locking/locktree/locktree.cc b/utilities/transactions/range_locking/locktree/locktree.cc index 4ecfb2d26..9b530c7b0 100644 --- a/utilities/transactions/range_locking/locktree/locktree.cc +++ b/utilities/transactions/range_locking/locktree/locktree.cc @@ -74,7 +74,9 @@ namespace toku { // but does nothing based on the value of the reference count - it is // up to the user of the locktree to destroy it when it sees fit.
-void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) { +void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, + const comparator &cmp, + toku_external_mutex_factory_t mutex_factory) { m_mgr = mgr; m_dict_id = dict_id;
@@ -91,14 +93,14 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const compar m_sto_end_early_count = 0; m_sto_end_early_time = 0;
- m_lock_request_info.init(); + m_lock_request_info.init(mutex_factory); }
-void lt_lock_request_info::init(void) { +void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) { pending_lock_requests.create(); pending_is_empty = true; ZERO_STRUCT(mutex); - toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr); + toku_external_mutex_init(mutex_factory, &mutex); retry_want = retry_done = 0; ZERO_STRUCT(counters); ZERO_STRUCT(retry_mutex); @@ -124,7 +126,7 @@ void locktree::destroy(void) {
void lt_lock_request_info::destroy(void) { pending_lock_requests.destroy(); - toku_mutex_destroy(&mutex); + toku_external_mutex_destroy(&mutex); toku_mutex_destroy(&retry_mutex); toku_cond_destroy(&retry_cv); } diff --git a/utilities/transactions/range_locking/locktree/locktree.h b/utilities/transactions/range_locking/locktree/locktree.h index 4f0530855..5ff4f7449 100644 --- a/utilities/transactions/range_locking/locktree/locktree.h +++ b/utilities/transactions/range_locking/locktree/locktree.h @@ -55,6 +55,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#include <db.h> #include <toku_pthread.h> +#include <toku_external_pthread.h> #include <toku_time.h>
#include <ft/comparator.h> @@ -102,7 +103,7 @@ namespace toku { struct lt_lock_request_info { omt<lock_request *> pending_lock_requests; std::atomic_bool pending_is_empty; - toku_mutex_t mutex; + toku_external_mutex_t mutex; bool should_retry_lock_requests; lt_counters counters; std::atomic_ullong retry_want; @@ -111,7 +112,7 @@ namespace toku { toku_cond_t retry_cv; bool running_retry;
- void init(void); + void init(toku_external_mutex_factory_t mutex_factory); void destroy(void); };
@@ -127,7 +128,8 @@ namespace toku { void create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, - void *extra); + void *extra, + toku_external_mutex_factory_t mutex_factory_arg);
void destroy(void);
@@ -212,6 +214,8 @@ namespace toku {
omt<locktree *> m_locktree_map;
+ toku_external_mutex_factory_t mutex_factory; + // the manager's mutex protects the locktree map toku_mutex_t m_mutex;
@@ -275,7 +279,8 @@ namespace toku { class locktree { public: // effect: Creates a locktree - void create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp); + void create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp, + toku_external_mutex_factory_t mutex_factory);
void destroy(void);
diff --git a/utilities/transactions/range_locking/locktree/manager.cc b/utilities/transactions/range_locking/locktree/manager.cc index c31158bb8..47a782565 100644 --- a/utilities/transactions/range_locking/locktree/manager.cc +++ b/utilities/transactions/range_locking/locktree/manager.cc @@ -60,7 +60,12 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
namespace toku {
-void locktree_manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) { +void locktree_manager::create(lt_create_cb create_cb, + lt_destroy_cb destroy_cb, + lt_escalate_cb escalate_cb, + void *escalate_extra, + toku_external_mutex_factory_t mutex_factory_arg) { + mutex_factory= mutex_factory_arg; m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY; m_current_lock_memory = 0;
@@ -151,7 +156,7 @@ locktree *locktree_manager::get_lt(DICTIONARY_ID dict_id, locktree *lt = locktree_map_find(dict_id); if (lt == nullptr) { XCALLOC(lt); - lt->create(this, dict_id, cmp); + lt->create(this, dict_id, cmp, mutex_factory);
// new locktree created - call the on_create callback // and put it in the locktree map @@ -320,7 +325,7 @@ int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callbac invariant_zero(r);
struct lt_lock_request_info *info = lt->get_lock_request_info(); - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex);
size_t num_requests = info->pending_lock_requests.size(); for (size_t k = 0; k < num_requests && r == 0; k++) { @@ -332,7 +337,7 @@ int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callbac req->get_conflicting_txnid(), req->get_start_time(), extra); }
- toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); } mutex_unlock(); return r; @@ -472,10 +477,10 @@ void locktree_manager::get_status(LTM_STATUS statp) { locktree *lt; int r = m_locktree_map.fetch(i, <); invariant_zero(r); - if (toku_mutex_trylock(<->m_lock_request_info.mutex) == 0) { + if (toku_external_mutex_trylock(<->m_lock_request_info.mutex) == 0) { lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size(); lt_counters.add(lt->get_lock_request_info()->counters); - toku_mutex_unlock(<->m_lock_request_info.mutex); + toku_external_mutex_unlock(<->m_lock_request_info.mutex); } sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0; sto_end_early_count += lt->m_sto_end_early_count; diff --git a/utilities/transactions/range_locking/portability/toku_external_pthread.h b/utilities/transactions/range_locking/portability/toku_external_pthread.h new file mode 100644 index 000000000..2482ec507 --- /dev/null +++ b/utilities/transactions/range_locking/portability/toku_external_pthread.h @@ -0,0 +1,81 @@ +/* + A wrapper around rocksdb::TransactionDBMutexFactory-provided condition and mutex + that provides toku_pthread_*-like interface. The functions are named + + toku_external_{mutex|cond}_XXX + + Lock Tree uses this mutex and condition for interruptible (long) lock waits. + + (It also still uses toku_pthread_XXX calls for mutexes/conditions for + shorter waits on internal objects) +*/ + +#pragma once + +#include <pthread.h> +#include <time.h> +#include <stdint.h> + +#include "toku_portability.h" + +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/transaction_db_mutex.h" + +using rocksdb::TransactionDBMutex; +using rocksdb::TransactionDBCondVar; + +typedef std::shared_ptr<rocksdb::TransactionDBMutexFactory> toku_external_mutex_factory_t; + +typedef std::shared_ptr<TransactionDBMutex> toku_external_mutex_t; +typedef std::shared_ptr<TransactionDBCondVar> toku_external_cond_t; + +static inline +void toku_external_cond_init(toku_external_mutex_factory_t mutex_factory, + toku_external_cond_t *cond) { + *cond= mutex_factory->AllocateCondVar(); +} + +inline void toku_external_cond_destroy(toku_external_cond_t *cond) { + cond->reset(); // this will destroy the managed object +} + +inline void toku_external_cond_signal(toku_external_cond_t *cond) { + (*cond)->Notify(); +} + +inline void toku_external_cond_broadcast(toku_external_cond_t *cond) { + (*cond)->NotifyAll(); +} + +inline int toku_external_cond_timedwait(toku_external_cond_t *cond, + toku_external_mutex_t *mutex, + int32_t timeout_microsec) { + auto res= (*cond)->WaitFor(*mutex, timeout_microsec); + if (res.ok()) + return 0; + else + return ETIMEDOUT; +} + +inline void toku_external_mutex_init(toku_external_mutex_factory_t factory, + toku_external_mutex_t *mutex) { + *mutex = factory->AllocateMutex(); +} + +inline void toku_external_mutex_lock(toku_external_mutex_t *mutex) { + (*mutex)->Lock(); +} + +inline int toku_external_mutex_trylock(toku_external_mutex_t *mutex) { + (*mutex)->Lock(); + return 0; +} + +inline void toku_external_mutex_unlock(toku_external_mutex_t *mutex) { + (*mutex)->UnLock(); +} + +inline void toku_external_mutex_destroy(toku_external_mutex_t *mutex) { + mutex->reset(); // this will destroy the managed object +} + diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 56f3295fd..79164a962 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -790,11 +790,6 @@ public: bool releasing_locks_; };
- -void RangeLockMgr::KillLockWait(TransactionID txnid) { - ltm.kill_waiter((void*)txnid); -} - // Get a range lock on [start_key; end_key] range Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, uint32_t column_family_id, @@ -802,7 +797,7 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, const rocksdb::Slice &end_key, bool exclusive) { toku::lock_request request; - request.create(); + request.create(mutex_factory_); DBT start_key_dbt, end_key_dbt;
toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); @@ -810,6 +805,8 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
// Use the txnid as "extra" in the lock_request. Then, KillLockWait() // will be able to use kill_waiter(txn_id) to kill the wait if needed + // TODO: KillLockWait is gone and we are no longer using + // locktree::kill_waiter call. Do we need this anymore? TransactionID wait_txn_id = txn->GetID();
request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, @@ -1004,8 +1001,10 @@ int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg, }
-RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db) { - ltm.create(on_create, on_destroy, on_escalate, NULL); +RangeLockMgr::RangeLockMgr(TransactionDB* txn_db, + std::shared_ptr<TransactionDBMutexFactory> mutex_factory) : + my_txn_db(txn_db), mutex_factory_(mutex_factory) { + ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); lt= nullptr; cmp_initialized_= false; } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 0c3284519..9f73e5a07 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -215,11 +215,10 @@ class RangeLockMgr : void UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env) override ;
- RangeLockMgr(TransactionDB* txn_db); + RangeLockMgr(TransactionDB* txn_db, + std::shared_ptr<TransactionDBMutexFactory> mutex_factory); ~RangeLockMgr();
- void KillLockWait(TransactionID txnid); - int set_max_lock_memory(size_t max_lock_memory) override { return ltm.set_max_lock_memory(max_lock_memory); @@ -251,6 +250,7 @@ class RangeLockMgr : compare_endpoints_func compare_endpoints;
TransactionDB* my_txn_db; + std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
static int compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key);
_______________________________________________ commits mailing list commits@mariadb.org https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
Hi Sergei!
Just wondering why keeping the toku_ prefix after renaming toku_* to toku_external_* since it’s also used by other engine like RockDB?
My intent with using similar names was to show that the API is mimicking
Hi Jocelyn, On Tue, Mar 12, 2019 at 5:50 PM jocelyn fournier <jocelyn.fournier@gmail.com> wrote: the toku_pthread_XXX API. If I make further progress with this code, names will be one of the things to think of. Thanks for raising this point.
participants (3)
-
jocelyn fournier
-
Sergei Petrunia
-
Sergei Petrunia