Hi Sergei! Just wondering why keeping the toku_ prefix after renaming toku_* to toku_external_* since it’s also used by other engine like RockDB? BR, Jocelyn Fournier
Le 12 mars 2019 à 09:52, Sergei Petrunia <psergey@askmonty.org> a écrit :
revision-id: 190a29d06f4e79d2df4cb513944ac34bd133caa0 (v5.8-1024-g190a29d06) parent(s): 7a8c777d4880cde8fd96135f6cc216223371a9f4 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-03-12 11:52:17 +0300 message:
Make Lock Tree's lock waits use externally provided mutexes/conditions
In MyRocks, this allows: - SHOW PROCESSLIST to show "waiting for row lock" for waiting threads - KILL statement to abort the lock wait in the same way as it does it with point locking (without use of handlerton::kill_connection() method)
- Remove previously added TransactionDB::KillLockWait() method as it is no longer needed.
--- include/rocksdb/utilities/transaction_db.h | 1 - .../transactions/pessimistic_transaction_db.cc | 12 +++- .../transactions/pessimistic_transaction_db.h | 4 -- .../range_locking/locktree/lock_request.cc | 35 ++++++---- .../range_locking/locktree/lock_request.h | 9 ++- .../range_locking/locktree/locktree.cc | 12 ++-- .../transactions/range_locking/locktree/locktree.h | 13 ++-- .../transactions/range_locking/locktree/manager.cc | 17 +++-- .../portability/toku_external_pthread.h | 81 ++++++++++++++++++++++ utilities/transactions/transaction_lock_mgr.cc | 15 ++-- utilities/transactions/transaction_lock_mgr.h | 6 +- 11 files changed, 155 insertions(+), 50 deletions(-)
diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 2a98a8d3d..fb3c6a88c 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -291,7 +291,6 @@ class TransactionDB : public StackableDB { bool use_range_locking; virtual RangeLockMgrControl* get_range_lock_manager() { return nullptr; }
- virtual void KillLockWait(TransactionID txnid){}; protected: // To Create an TransactionDB, call Open() // The ownership of db is transferred to the base StackableDB diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index d44f329e5..308187e13 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -41,7 +41,11 @@ PessimisticTransactionDB::PessimisticTransactionDB( ? txn_db_options_.custom_mutex_factory : std::shared_ptr<TransactionDBMutexFactory>( new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this) { + range_lock_mgr_(this, + txn_db_options_.custom_mutex_factory? + txn_db_options_.custom_mutex_factory : + std::shared_ptr<TransactionDBMutexFactory>( + new TransactionDBMutexFactoryImpl())) { assert(db_impl_ != nullptr); info_log_ = db_impl_->GetDBOptions().info_log; } @@ -73,7 +77,11 @@ PessimisticTransactionDB::PessimisticTransactionDB( ? txn_db_options_.custom_mutex_factory : std::shared_ptr<TransactionDBMutexFactory>( new TransactionDBMutexFactoryImpl())), - range_lock_mgr_(this) { + range_lock_mgr_(this, + txn_db_options_.custom_mutex_factory + ? txn_db_options_.custom_mutex_factory + : std::shared_ptr<TransactionDBMutexFactory>( + new TransactionDBMutexFactoryImpl())) { assert(db_impl_ != nullptr); }
diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 07ed3fed2..388ed4099 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -127,10 +127,6 @@ class PessimisticTransactionDB : public TransactionDB { virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
- void KillLockWait(TransactionID txnid) override { - if (use_range_locking) - range_lock_mgr_.KillLockWait(txnid); - } protected: DBImpl* db_impl_; std::shared_ptr<Logger> info_log_; diff --git a/utilities/transactions/range_locking/locktree/lock_request.cc b/utilities/transactions/range_locking/locktree/lock_request.cc index 0b96b2051..84cfdbf62 100644 --- a/utilities/transactions/range_locking/locktree/lock_request.cc +++ b/utilities/transactions/range_locking/locktree/lock_request.cc @@ -59,7 +59,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. namespace toku {
// initialize a lock request's internals -void lock_request::create(void) { +void lock_request::create(toku_external_mutex_factory_t mutex_factory) { m_txnid = TXNID_NONE; m_conflicting_txnid = TXNID_NONE; m_start_time = 0; @@ -75,7 +75,9 @@ void lock_request::create(void) { m_state = state::UNINITIALIZED; m_info = nullptr;
- toku_cond_init(*lock_request_m_wait_cond_key, &m_wait_cond, nullptr); + //psergey-todo: this condition is for interruptible wait + // note: moved to here from lock_request::create: + toku_external_cond_init(mutex_factory, &m_wait_cond);
m_start_test_callback = nullptr; m_start_before_pending_test_callback = nullptr; @@ -89,13 +91,14 @@ void lock_request::destroy(void) { m_state = state::DESTROYED; toku_destroy_dbt(&m_left_key_copy); toku_destroy_dbt(&m_right_key_copy); - toku_cond_destroy(&m_wait_cond); + toku_external_cond_destroy(&m_wait_cond); }
// set the lock request parameters. this API allows a lock request to be reused. void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) { invariant(m_state != state::PENDING); m_lt = lt; + m_txnid = txnid; m_left_key = left_key; m_right_key = right_key; @@ -190,13 +193,13 @@ int lock_request::start(void) { m_conflicting_txnid = conflicts.get(0); if (m_start_before_pending_test_callback) m_start_before_pending_test_callback(); - toku_mutex_lock(&m_info->mutex); + toku_external_mutex_lock(&m_info->mutex); insert_into_lock_requests(); if (deadlock_exists(conflicts)) { remove_from_lock_requests(); r = DB_LOCK_DEADLOCK; } - toku_mutex_unlock(&m_info->mutex); + toku_external_mutex_unlock(&m_info->mutex); if (m_start_test_callback) m_start_test_callback(); // test callback } @@ -221,7 +224,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil uint64_t t_start = t_now; uint64_t t_end = t_start + wait_time_ms * 1000;
- toku_mutex_lock(&m_info->mutex); + toku_external_mutex_lock(&m_info->mutex);
// check again, this time locking out other retry calls if (m_state == state::PENDING) { @@ -252,10 +255,14 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil if (t_wait > t_end) t_wait = t_end; } + /* + PORT: don't need to compute "time when the wait should end" anymore. struct timespec ts = {0, 0}; ts.tv_sec = t_wait / 1000000; ts.tv_nsec = (t_wait % 1000000) * 1000; - int r = toku_cond_timedwait(&m_wait_cond, &m_info->mutex, &ts); + */ + int r = toku_external_cond_timedwait(&m_wait_cond, &m_info->mutex, + int32_t(wait_time_ms)*1000); invariant(r == 0 || r == ETIMEDOUT);
t_now = toku_current_time_microsec(); @@ -279,7 +286,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil m_info->counters.long_wait_count += 1; m_info->counters.long_wait_time += duration; } - toku_mutex_unlock(&m_info->mutex); + toku_external_mutex_unlock(&m_info->mutex);
invariant(m_state == state::COMPLETE); return m_complete_r; @@ -332,7 +339,7 @@ int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) { complete(r); if (m_retry_test_callback) m_retry_test_callback(); // test callback - toku_cond_broadcast(&m_wait_cond); + toku_external_cond_broadcast(&m_wait_cond); } else { m_conflicting_txnid = conflicts.get(0); add_conflicts_to_waits(&conflicts, conflicts_collector); @@ -393,7 +400,7 @@ void lock_request::retry_all_lock_requests( }
void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, GrowableArray<TXNID> *collector) { - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex); // retry all of the pending lock requests. for (size_t i = 0; i < info->pending_lock_requests.size();) { lock_request *request; @@ -412,7 +419,7 @@ void lock_request::retry_all_lock_requests_info(lt_lock_request_info *info, Grow
// future threads should only retry lock requests if some still exist info->should_retry_lock_requests = info->pending_lock_requests.size() > 0; - toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); }
void lock_request::add_conflicts_to_waits(txnid_set *conflicts, @@ -444,12 +451,12 @@ void *lock_request::get_extra(void) const { void lock_request::kill_waiter(void) { remove_from_lock_requests(); complete(DB_LOCK_NOTGRANTED); - toku_cond_broadcast(&m_wait_cond); + toku_external_cond_broadcast(&m_wait_cond); }
void lock_request::kill_waiter(locktree *lt, void *extra) { lt_lock_request_info *info = lt->get_lock_request_info(); - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex); for (size_t i = 0; i < info->pending_lock_requests.size(); i++) { lock_request *request; int r = info->pending_lock_requests.fetch(i, &request); @@ -458,7 +465,7 @@ void lock_request::kill_waiter(locktree *lt, void *extra) { break; } } - toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); }
// find another lock request by txnid. must hold the mutex. diff --git a/utilities/transactions/range_locking/locktree/lock_request.h b/utilities/transactions/range_locking/locktree/lock_request.h index 8b9529344..b17b639c8 100644 --- a/utilities/transactions/range_locking/locktree/lock_request.h +++ b/utilities/transactions/range_locking/locktree/lock_request.h @@ -84,7 +84,7 @@ public: };
// effect: Initializes a lock request. - void create(void); + void create(toku_external_mutex_factory_t mutex_factory);
// effect: Destroys a lock request. void destroy(void); @@ -171,7 +171,7 @@ public: int m_complete_r; state m_state;
- toku_cond_t m_wait_cond; + toku_external_cond_t m_wait_cond;
bool m_big_txn;
@@ -227,6 +227,9 @@ public:
friend class lock_request_unit_test; }; -ENSURE_POD(lock_request); +// PORT: lock_request is not a POD anymore due to use of toku_external_cond_t +// This is ok as the PODness is not really required: lock_request objects are +// not moved in memory or anything. +//ENSURE_POD(lock_request);
} /* namespace toku */ diff --git a/utilities/transactions/range_locking/locktree/locktree.cc b/utilities/transactions/range_locking/locktree/locktree.cc index 4ecfb2d26..9b530c7b0 100644 --- a/utilities/transactions/range_locking/locktree/locktree.cc +++ b/utilities/transactions/range_locking/locktree/locktree.cc @@ -74,7 +74,9 @@ namespace toku { // but does nothing based on the value of the reference count - it is // up to the user of the locktree to destroy it when it sees fit.
-void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp) { +void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, + const comparator &cmp, + toku_external_mutex_factory_t mutex_factory) { m_mgr = mgr; m_dict_id = dict_id;
@@ -91,14 +93,14 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const compar m_sto_end_early_count = 0; m_sto_end_early_time = 0;
- m_lock_request_info.init(); + m_lock_request_info.init(mutex_factory); }
-void lt_lock_request_info::init(void) { +void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) { pending_lock_requests.create(); pending_is_empty = true; ZERO_STRUCT(mutex); - toku_mutex_init(*locktree_request_info_mutex_key, &mutex, nullptr); + toku_external_mutex_init(mutex_factory, &mutex); retry_want = retry_done = 0; ZERO_STRUCT(counters); ZERO_STRUCT(retry_mutex); @@ -124,7 +126,7 @@ void locktree::destroy(void) {
void lt_lock_request_info::destroy(void) { pending_lock_requests.destroy(); - toku_mutex_destroy(&mutex); + toku_external_mutex_destroy(&mutex); toku_mutex_destroy(&retry_mutex); toku_cond_destroy(&retry_cv); } diff --git a/utilities/transactions/range_locking/locktree/locktree.h b/utilities/transactions/range_locking/locktree/locktree.h index 4f0530855..5ff4f7449 100644 --- a/utilities/transactions/range_locking/locktree/locktree.h +++ b/utilities/transactions/range_locking/locktree/locktree.h @@ -55,6 +55,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#include <db.h> #include <toku_pthread.h> +#include <toku_external_pthread.h> #include <toku_time.h>
#include <ft/comparator.h> @@ -102,7 +103,7 @@ namespace toku { struct lt_lock_request_info { omt<lock_request *> pending_lock_requests; std::atomic_bool pending_is_empty; - toku_mutex_t mutex; + toku_external_mutex_t mutex; bool should_retry_lock_requests; lt_counters counters; std::atomic_ullong retry_want; @@ -111,7 +112,7 @@ namespace toku { toku_cond_t retry_cv; bool running_retry;
- void init(void); + void init(toku_external_mutex_factory_t mutex_factory); void destroy(void); };
@@ -127,7 +128,8 @@ namespace toku { void create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, - void *extra); + void *extra, + toku_external_mutex_factory_t mutex_factory_arg);
void destroy(void);
@@ -212,6 +214,8 @@ namespace toku {
omt<locktree *> m_locktree_map;
+ toku_external_mutex_factory_t mutex_factory; + // the manager's mutex protects the locktree map toku_mutex_t m_mutex;
@@ -275,7 +279,8 @@ namespace toku { class locktree { public: // effect: Creates a locktree - void create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp); + void create(locktree_manager *mgr, DICTIONARY_ID dict_id, const comparator &cmp, + toku_external_mutex_factory_t mutex_factory);
void destroy(void);
diff --git a/utilities/transactions/range_locking/locktree/manager.cc b/utilities/transactions/range_locking/locktree/manager.cc index c31158bb8..47a782565 100644 --- a/utilities/transactions/range_locking/locktree/manager.cc +++ b/utilities/transactions/range_locking/locktree/manager.cc @@ -60,7 +60,12 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
namespace toku {
-void locktree_manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) { +void locktree_manager::create(lt_create_cb create_cb, + lt_destroy_cb destroy_cb, + lt_escalate_cb escalate_cb, + void *escalate_extra, + toku_external_mutex_factory_t mutex_factory_arg) { + mutex_factory= mutex_factory_arg; m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY; m_current_lock_memory = 0;
@@ -151,7 +156,7 @@ locktree *locktree_manager::get_lt(DICTIONARY_ID dict_id, locktree *lt = locktree_map_find(dict_id); if (lt == nullptr) { XCALLOC(lt); - lt->create(this, dict_id, cmp); + lt->create(this, dict_id, cmp, mutex_factory);
// new locktree created - call the on_create callback // and put it in the locktree map @@ -320,7 +325,7 @@ int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callbac invariant_zero(r);
struct lt_lock_request_info *info = lt->get_lock_request_info(); - toku_mutex_lock(&info->mutex); + toku_external_mutex_lock(&info->mutex);
size_t num_requests = info->pending_lock_requests.size(); for (size_t k = 0; k < num_requests && r == 0; k++) { @@ -332,7 +337,7 @@ int locktree_manager::iterate_pending_lock_requests(lock_request_iterate_callbac req->get_conflicting_txnid(), req->get_start_time(), extra); }
- toku_mutex_unlock(&info->mutex); + toku_external_mutex_unlock(&info->mutex); } mutex_unlock(); return r; @@ -472,10 +477,10 @@ void locktree_manager::get_status(LTM_STATUS statp) { locktree *lt; int r = m_locktree_map.fetch(i, <); invariant_zero(r); - if (toku_mutex_trylock(<->m_lock_request_info.mutex) == 0) { + if (toku_external_mutex_trylock(<->m_lock_request_info.mutex) == 0) { lock_requests_pending += lt->m_lock_request_info.pending_lock_requests.size(); lt_counters.add(lt->get_lock_request_info()->counters); - toku_mutex_unlock(<->m_lock_request_info.mutex); + toku_external_mutex_unlock(<->m_lock_request_info.mutex); } sto_num_eligible += lt->sto_txnid_is_valid_unsafe() ? 1 : 0; sto_end_early_count += lt->m_sto_end_early_count; diff --git a/utilities/transactions/range_locking/portability/toku_external_pthread.h b/utilities/transactions/range_locking/portability/toku_external_pthread.h new file mode 100644 index 000000000..2482ec507 --- /dev/null +++ b/utilities/transactions/range_locking/portability/toku_external_pthread.h @@ -0,0 +1,81 @@ +/* + A wrapper around rocksdb::TransactionDBMutexFactory-provided condition and mutex + that provides toku_pthread_*-like interface. The functions are named + + toku_external_{mutex|cond}_XXX + + Lock Tree uses this mutex and condition for interruptible (long) lock waits. + + (It also still uses toku_pthread_XXX calls for mutexes/conditions for + shorter waits on internal objects) +*/ + +#pragma once + +#include <pthread.h> +#include <time.h> +#include <stdint.h> + +#include "toku_portability.h" + +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/transaction_db_mutex.h" + +using rocksdb::TransactionDBMutex; +using rocksdb::TransactionDBCondVar; + +typedef std::shared_ptr<rocksdb::TransactionDBMutexFactory> toku_external_mutex_factory_t; + +typedef std::shared_ptr<TransactionDBMutex> toku_external_mutex_t; +typedef std::shared_ptr<TransactionDBCondVar> toku_external_cond_t; + +static inline +void toku_external_cond_init(toku_external_mutex_factory_t mutex_factory, + toku_external_cond_t *cond) { + *cond= mutex_factory->AllocateCondVar(); +} + +inline void toku_external_cond_destroy(toku_external_cond_t *cond) { + cond->reset(); // this will destroy the managed object +} + +inline void toku_external_cond_signal(toku_external_cond_t *cond) { + (*cond)->Notify(); +} + +inline void toku_external_cond_broadcast(toku_external_cond_t *cond) { + (*cond)->NotifyAll(); +} + +inline int toku_external_cond_timedwait(toku_external_cond_t *cond, + toku_external_mutex_t *mutex, + int32_t timeout_microsec) { + auto res= (*cond)->WaitFor(*mutex, timeout_microsec); + if (res.ok()) + return 0; + else + return ETIMEDOUT; +} + +inline void toku_external_mutex_init(toku_external_mutex_factory_t factory, + toku_external_mutex_t *mutex) { + *mutex = factory->AllocateMutex(); +} + +inline void toku_external_mutex_lock(toku_external_mutex_t *mutex) { + (*mutex)->Lock(); +} + +inline int toku_external_mutex_trylock(toku_external_mutex_t *mutex) { + (*mutex)->Lock(); + return 0; +} + +inline void toku_external_mutex_unlock(toku_external_mutex_t *mutex) { + (*mutex)->UnLock(); +} + +inline void toku_external_mutex_destroy(toku_external_mutex_t *mutex) { + mutex->reset(); // this will destroy the managed object +} + diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 56f3295fd..79164a962 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -790,11 +790,6 @@ public: bool releasing_locks_; };
- -void RangeLockMgr::KillLockWait(TransactionID txnid) { - ltm.kill_waiter((void*)txnid); -} - // Get a range lock on [start_key; end_key] range Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, uint32_t column_family_id, @@ -802,7 +797,7 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, const rocksdb::Slice &end_key, bool exclusive) { toku::lock_request request; - request.create(); + request.create(mutex_factory_); DBT start_key_dbt, end_key_dbt;
toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); @@ -810,6 +805,8 @@ Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn,
// Use the txnid as "extra" in the lock_request. Then, KillLockWait() // will be able to use kill_waiter(txn_id) to kill the wait if needed + // TODO: KillLockWait is gone and we are no longer using + // locktree::kill_waiter call. Do we need this anymore? TransactionID wait_txn_id = txn->GetID();
request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, @@ -1004,8 +1001,10 @@ int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg, }
-RangeLockMgr::RangeLockMgr(TransactionDB* txn_db) : my_txn_db(txn_db) { - ltm.create(on_create, on_destroy, on_escalate, NULL); +RangeLockMgr::RangeLockMgr(TransactionDB* txn_db, + std::shared_ptr<TransactionDBMutexFactory> mutex_factory) : + my_txn_db(txn_db), mutex_factory_(mutex_factory) { + ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); lt= nullptr; cmp_initialized_= false; } diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index 0c3284519..9f73e5a07 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -215,11 +215,10 @@ class RangeLockMgr : void UnLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env* env) override ;
- RangeLockMgr(TransactionDB* txn_db); + RangeLockMgr(TransactionDB* txn_db, + std::shared_ptr<TransactionDBMutexFactory> mutex_factory); ~RangeLockMgr();
- void KillLockWait(TransactionID txnid); - int set_max_lock_memory(size_t max_lock_memory) override { return ltm.set_max_lock_memory(max_lock_memory); @@ -251,6 +250,7 @@ class RangeLockMgr : compare_endpoints_func compare_endpoints;
TransactionDB* my_txn_db; + std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
static int compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key);
_______________________________________________ commits mailing list commits@mariadb.org https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits