[Commits] 1c288c2: MDEV-16428: Concurrent DML on RocksDB tables makes optimistic parallel replication abort
revision-id: 1c288c266e0208e98d55e5348fa78c27517022c9 parent(s): a107c79fcdde80d1dea0a1caf5859647f77b48c9 committer: Sergei Petrunia branch nick: 10.2-r12-new-submodule timestamp: 2018-06-25 23:15:18 +0300 message: MDEV-16428: Concurrent DML on RocksDB tables makes optimistic parallel replication abort DRAFT: Make MyRocks call thd_rpl_deadlock_check(X, Y) whenever a transaction X is about to wait on a row that's locked by transaction Y. This allows the SQL layer to check if transaction Y has "ran ahead" of transaction X and abort it if it is necessary. The patch includes adding a hook into RocksDB to detect the waits. --- .gitmodules | 2 +- storage/rocksdb/ha_rocksdb.cc | 132 ++++++++++++++++++++++++++++++++++++++++++ storage/rocksdb/rocksdb | 2 +- 3 files changed, 134 insertions(+), 2 deletions(-) diff --git a/.gitmodules b/.gitmodules index 6419657..2ef79a2 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,4 +3,4 @@ url = https://github.com/MariaDB/mariadb-connector-c [submodule "storage/rocksdb/rocksdb"] path = storage/rocksdb/rocksdb - url = https://github.com/facebook/rocksdb.git + url = https://github.com/spetrunia/rocksdb.git diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 96bc7da..bd2e0bf 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -53,6 +53,9 @@ #endif #include <mysys_err.h> +#include "my_sys.h" +#include "lf.h" + // Both MySQL and RocksDB define the same constant. To avoid compilation errors // till we make the fix in RocksDB, we'll temporary undefine it here. #undef CACHE_LINE_SIZE @@ -116,6 +119,101 @@ MYSQL_PLUGIN_IMPORT bool my_disable_leak_check; void ignore_db_dirs_append(const char *dirname_arg); +//////////////////////////////////////////////////////////////////////// + + +/* + A thread-safe, concurrent way to maintain a map from RocksDB's + TransactionIDs to MariaDB's THD*. +*/ + +class Transaction_id_map +{ + typedef rocksdb::TransactionID key_type; + typedef THD* value_type; + + typedef struct + { + key_type key; + value_type val; + } ELEMENT; + + LF_HASH lf_hash; + +public: + void init(); + void cleanup(); + + /* + Before using the LockTable, each thread should get its own "pins". + */ + LF_PINS* get_pins() { return lf_hash_get_pins(&lf_hash); } + void put_pins(LF_PINS *pins) { return lf_hash_put_pins(pins); } + + void put(LF_PINS *pins, key_type key, value_type value); + value_type get(LF_PINS *pins, key_type key); + void remove(LF_PINS *pins, key_type key); +}; + +void Transaction_id_map::init() +{ + lf_hash_init(&lf_hash, + sizeof(ELEMENT), + LF_HASH_UNIQUE, + 0 /* key offset */, + sizeof(key_type)/*key_len*/, NULL /*get_hash_key*/, + NULL /*charset*/); + + lf_hash.alloc.constructor= NULL; + lf_hash.alloc.destructor= NULL; + lf_hash.element_size= sizeof(ELEMENT); +} + + +void Transaction_id_map::cleanup() +{ + DBUG_ASSERT(lf_hash.count == 0); + lf_hash_destroy(&lf_hash); +} + + +void Transaction_id_map::put(LF_PINS *pins, key_type key, value_type value) +{ + ELEMENT new_elem; + new_elem.key= key; + new_elem.val= value; + + /* + The following call may return: + 0: OK + 1: Element already exists (should not happen) + -1: out-of-memory condition. + Dont do error handling as we don't have any way to do it. + */ + lf_hash_insert(&lf_hash, pins, &new_elem); +} + + +void Transaction_id_map::remove(LF_PINS *pins, key_type key) +{ + lf_hash_delete(&lf_hash, pins, &key, sizeof(key_type)); +} + + +Transaction_id_map::value_type Transaction_id_map::get(LF_PINS *pins, key_type key) +{ + ELEMENT *ptr= (ELEMENT*)lf_hash_search(&lf_hash, pins, &key, sizeof(key_type)); + + if (ptr) + return ptr->val; + + return NULL; +} + +Transaction_id_map trx_id_map; +/////////////////////////////////////////////////////////////////////////// + + namespace myrocks { static st_global_stats global_stats; @@ -2492,6 +2590,7 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::Transaction *m_rocksdb_reuse_tx = nullptr; public: + LF_PINS *lf_pins; // LF-hash pins to use with trx_id_map void set_lock_timeout(int timeout_sec_arg) override { if (m_rocksdb_tx) m_rocksdb_tx->SetLockTimeout(rdb_convert_sec_to_ms(m_timeout_sec)); @@ -2555,6 +2654,7 @@ class Rdb_transaction_impl : public Rdb_transaction { release_snapshot(); s = m_rocksdb_tx->Commit(); + trx_id_map.remove(lf_pins, m_rocksdb_tx->GetID()); if (!s.ok()) { rdb_handle_io_error(s, RDB_IO_ERROR_TX_COMMIT); res = true; @@ -2589,6 +2689,7 @@ class Rdb_transaction_impl : public Rdb_transaction { /* This will also release all of the locks: */ m_rocksdb_tx->Rollback(); + trx_id_map.remove(lf_pins, m_rocksdb_tx->GetID()); /* Save the transaction object to be reused */ release_tx(); @@ -2743,6 +2844,8 @@ class Rdb_transaction_impl : public Rdb_transaction { rdb->BeginTransaction(write_opts, tx_opts, m_rocksdb_reuse_tx); m_rocksdb_reuse_tx = nullptr; + trx_id_map.put(lf_pins, m_rocksdb_tx->GetID(), m_thd); + m_read_opts = rocksdb::ReadOptions(); set_initial_savepoint(); @@ -2801,6 +2904,7 @@ class Rdb_transaction_impl : public Rdb_transaction { : Rdb_transaction(thd), m_rocksdb_tx(nullptr) { // Create a notifier that can be called when a snapshot gets generated. m_notifier = std::make_shared<Rdb_snapshot_notifier>(this); + lf_pins= trx_id_map.get_pins(); } virtual ~Rdb_transaction_impl() { @@ -2813,6 +2917,7 @@ class Rdb_transaction_impl : public Rdb_transaction { // Free any transaction memory that is still hanging around. delete m_rocksdb_reuse_tx; + trx_id_map.put_pins(lf_pins); DBUG_ASSERT(m_rocksdb_tx == nullptr); } }; @@ -4131,6 +4236,28 @@ static int rocksdb_start_tx_and_assign_read_view( return HA_EXIT_SUCCESS; } + +/* There is no prototype of this function in any header */ +extern "C" int thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd); + + +/* A row_wait_callback_t-Compatible function */ +void rdb_row_wait_callback(rocksdb::TransactionID waiter, + rocksdb::TransactionID waitee) +{ + Rdb_transaction *&tx = get_tx_from_thd(current_thd); + + if (!tx->is_writebatch_trx()) { + const auto tx_impl = static_cast<const Rdb_transaction_impl *>(tx); + + THD *waiter_thd= trx_id_map.get(tx_impl->lf_pins, waiter); + THD *waitee_thd= trx_id_map.get(tx_impl->lf_pins, waitee); + //DBUG_ASSERT(waiter_thd == tx->m_thd); + thd_rpl_deadlock_check(waiter_thd, waitee_thd); + } +} + + /* Dummy SAVEPOINT support. This is needed for long running transactions * like mysqldump (https://bugs.mysql.com/bug.php?id=71017). * Current SAVEPOINT does not correctly handle ROLLBACK and does not return @@ -4663,6 +4790,10 @@ static int rocksdb_init_func(void *const p) { Rdb_sst_info::init(rdb); + trx_id_map.init(); + + rdb->SetRowWaitCallback(&rdb_row_wait_callback); + /* Enable auto compaction, things needed for compaction filter are finished initializing @@ -4887,6 +5018,7 @@ static int rocksdb_done_func(void *const p) { delete rdb; rdb = nullptr; + trx_id_map.cleanup(); delete commit_latency_stats; commit_latency_stats = nullptr; diff --git a/storage/rocksdb/rocksdb b/storage/rocksdb/rocksdb index ba295cd..38080be 160000 --- a/storage/rocksdb/rocksdb +++ b/storage/rocksdb/rocksdb @@ -1 +1 @@ -Subproject commit ba295cda29daee3ffe58549542804efdfd969784 +Subproject commit 38080be88bf20e8956754d62322de1c98ab5d736
participants (1)
-
psergey@askmonty.org