[Commits] 38080be: MDEV-16428: Make optimistic parallel replication work
revision-id: 38080be88bf20e8956754d62322de1c98ab5d736 parent(s): ba295cda29daee3ffe58549542804efdfd969784 committer: Sergei Petrunia branch nick: rocksdb-10.2-r12-new-submodule timestamp: 2018-06-25 22:43:23 +0300 message: MDEV-16428: Make optimistic parallel replication work DRAFT PATCH, RocksDB part of it: Add a callback function that is used to inform the SQL layer that transaction X is waiting on a row lock that is held by transaction Y. Optimistic parallel slave uses it to determine that transaction Y has ran ahead of transaction X. If they need to be committed in the other order (first X, then Y), then Y will be rolled back to allow X to proceed (and Y will be retried later on). --- include/rocksdb/utilities/transaction_db.h | 7 +++++++ utilities/transactions/pessimistic_transaction_db.h | 4 ++++ utilities/transactions/transaction_lock_mgr.cc | 15 ++++++++++++++- utilities/transactions/transaction_lock_mgr.h | 3 +++ 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index b5a33d1..1d5d3dd 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -156,6 +156,11 @@ struct DeadlockPath { bool empty() { return path.empty() && !limit_exceeded; } }; + +typedef void (*row_wait_callback_t)(rocksdb::TransactionID waiter, + rocksdb::TransactionID waitee); + + class TransactionDB : public StackableDB { public: // Open a TransactionDB similar to DB::Open(). @@ -217,6 +222,8 @@ class TransactionDB : public StackableDB { virtual std::vector<DeadlockPath> GetDeadlockInfoBuffer() = 0; virtual void SetDeadlockInfoBufferSize(uint32_t target_size) = 0; + virtual void SetRowWaitCallback(row_wait_callback_t callback) {} + protected: // To Create an TransactionDB, call Open() explicit TransactionDB(DB* db) : StackableDB(db) {} diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 4311e88..8a47cbd 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -113,6 +113,10 @@ class PessimisticTransactionDB : public TransactionDB { std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; void SetDeadlockInfoBufferSize(uint32_t target_size) override; + virtual void SetRowWaitCallback(row_wait_callback_t callback) override { + lock_mgr_.SetRowWaitCallback(callback); + } + protected: DBImpl* db_impl_; std::shared_ptr<Logger> info_log_; diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index 9485067..e6593b0 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -173,7 +173,8 @@ TransactionLockMgr::TransactionLockMgr( max_num_locks_(max_num_locks), lock_maps_cache_(new ThreadLocalPtr(&UnrefLockMapsCache)), dlock_buffer_(max_num_deadlocks), - mutex_factory_(mutex_factory) { + mutex_factory_(mutex_factory), + row_wait_callback(NULL) { assert(txn_db); txn_db_impl_ = static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db); @@ -383,6 +384,18 @@ Status TransactionLockMgr::AcquireWithTimeout( } TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn"); + + //psergey: Call the MariaDB callback to inform it that + // transaction X is doing a row lock wait for a lock that + // is held by transaction Y. + // + // Transaction Y + if (row_wait_callback) + { + for (auto it= wait_ids.begin(); it != wait_ids.end(); it++) + row_wait_callback(txn->GetID(), *it); + } + if (cv_end_time < 0) { // Wait indefinitely result = stripe->stripe_cv->Wait(stripe->stripe_mutex); diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index abf7c5d..86a9904 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -84,6 +84,8 @@ class TransactionLockMgr { std::vector<DeadlockPath> GetDeadlockInfoBuffer(); void Resize(uint32_t); + void SetRowWaitCallback(row_wait_callback_t callback) { row_wait_callback= callback; } + private: PessimisticTransactionDB* txn_db_impl_; @@ -149,6 +151,7 @@ class TransactionLockMgr { void DecrementWaitersImpl(const PessimisticTransaction* txn, const autovector<TransactionID>& wait_ids); + row_wait_callback_t row_wait_callback; // No copying allowed TransactionLockMgr(const TransactionLockMgr&); void operator=(const TransactionLockMgr&);
participants (1)
-
psergey@askmonty.org