[Commits] b8dd21ef7: Range Locking - implementation based on the locktree library
revision-id: b8dd21ef71dd9a9d7215b32f44f57984785c0c46 (v5.8-3174-gb8dd21ef7) parent(s): 8ff6557e7f096ed45de296a5073374958fbfaa55 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2020-12-10 14:18:31 +0300 message: Range Locking - implementation based on the locktree library - Add a RangeTreeLockManager and RangeTreeLockTracker which implement range locking using the locktree library. - Point locks are handled as locks on single-point ranges. - Add a unit test: range_locking_test --- CMakeLists.txt | 3 + Makefile | 4 + TARGETS | 7 + src.mk | 3 + .../transactions/lock/range/range_locking_test.cc | 273 ++++++++++ .../range/range_tree/range_tree_lock_manager.cc | 581 +++++++++++++++++++++ .../range/range_tree/range_tree_lock_manager.h | 128 +++++ .../range/range_tree/range_tree_lock_tracker.cc | 65 +++ .../range/range_tree/range_tree_lock_tracker.h | 148 ++++++ utilities/transactions/transaction_base.h | 6 + 10 files changed, 1218 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index ed717c2b4..03be79ddf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -814,6 +814,8 @@ set(SOURCES utilities/transactions/lock/lock_manager.cc utilities/transactions/lock/point/point_lock_tracker.cc utilities/transactions/lock/point/point_lock_manager.cc + utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc + utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction.cc utilities/transactions/pessimistic_transaction.cc @@ -1224,6 +1226,7 @@ if(WITH_TESTS) utilities/transactions/lock/point/point_lock_manager_test.cc utilities/transactions/write_prepared_transaction_test.cc utilities/transactions/write_unprepared_transaction_test.cc + utilities/transactions/lock/range/range_locking_test.cc utilities/ttl/ttl_test.cc utilities/write_batch_with_index/write_batch_with_index_test.cc ) diff --git a/Makefile b/Makefile index 6e230a08e..129ecd358 100644 --- a/Makefile +++ b/Makefile @@ -558,6 +558,7 @@ PARALLEL_TEST = \ table_test \ transaction_test \ point_lock_manager_test \ + range_locking_test \ write_prepared_transaction_test \ write_unprepared_transaction_test \ @@ -1926,6 +1927,9 @@ blob_db_test: $(OBJ_DIR)/utilities/blob_db/blob_db_test.o $(TEST_LIBRARY) $(LIBR repeatable_thread_test: $(OBJ_DIR)/util/repeatable_thread_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +range_locking_test: utilities/transactions/lock/range/range_locking_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + range_tombstone_fragmenter_test: $(OBJ_DIR)/db/range_tombstone_fragmenter_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 66e45cd32..4e15589a5 100644 --- a/TARGETS +++ b/TARGETS @@ -1800,6 +1800,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "range_locking_test", + "utilities/transactions/lock/range/range_locking_test.cc", + "parallel", + [], + [], + ], [ "range_tombstone_fragmenter_test", "db/range_tombstone_fragmenter_test.cc", diff --git a/src.mk b/src.mk index 2f8077d5b..2bb45f3eb 100644 --- a/src.mk +++ b/src.mk @@ -267,6 +267,8 @@ LIB_SOURCES = \ utilities/transactions/lock/range/range_tree/lib/standalone_port.cc \ utilities/transactions/lock/range/range_tree/lib/util/dbt.cc \ utilities/transactions/lock/range/range_tree/lib/util/memarena.cc \ + utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc \ + utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc \ utilities/transactions/optimistic_transaction.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/pessimistic_transaction.cc \ @@ -536,6 +538,7 @@ TEST_MAIN_SOURCES = \ utilities/simulator_cache/sim_cache_test.cc \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ utilities/transactions/optimistic_transaction_test.cc \ + utilities/transactions/lock/range/range_locking_test.cc \ utilities/transactions/transaction_test.cc \ utilities/transactions/lock/point/point_lock_manager_test.cc \ utilities/transactions/write_prepared_transaction_test.cc \ diff --git a/utilities/transactions/lock/range/range_locking_test.cc b/utilities/transactions/lock/range/range_locking_test.cc new file mode 100644 index 000000000..b5051dbee --- /dev/null +++ b/utilities/transactions/lock/range/range_locking_test.cc @@ -0,0 +1,273 @@ +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include <algorithm> +#include <functional> +#include <string> +#include <thread> + +#include "db/db_impl/db_impl.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "utilities/transactions/lock/point/point_lock_manager_test.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_test.h" + +using std::string; + +namespace ROCKSDB_NAMESPACE { + +class RangeLockingTest : public ::testing::Test { + public: + TransactionDB* db; + std::string dbname; + Options options; + + std::shared_ptr<RangeLockManagerHandle> range_lock_mgr; + TransactionDBOptions txn_db_options; + + RangeLockingTest() : db(nullptr) { + options.create_if_missing = true; + dbname = test::PerThreadDBPath("range_locking_testdb"); + + DestroyDB(dbname, options); + Status s; + + range_lock_mgr.reset(NewRangeLockManager(nullptr)); + txn_db_options.lock_mgr_handle = range_lock_mgr; + + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + assert(s.ok()); + } + + ~RangeLockingTest() { + delete db; + db = nullptr; + // This is to skip the assert statement in FaultInjectionTestEnv. There + // seems to be a bug in btrfs that the makes readdir return recently + // unlink-ed files. By using the default fs we simply ignore errors resulted + // from attempting to delete such files in DestroyDB. + DestroyDB(dbname, options); + } + + PessimisticTransaction* NewTxn( + TransactionOptions txn_opt = TransactionOptions()) { + Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opt); + return reinterpret_cast<PessimisticTransaction*>(txn); + } +}; + +// TODO: set a smaller lock wait timeout so that the test runs faster. +TEST_F(RangeLockingTest, BasicRangeLocking) { + WriteOptions write_options; + TransactionOptions txn_options; + std::string value; + ReadOptions read_options; + auto cf = db->DefaultColumnFamily(); + + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + // Get a range lock + { + auto s = txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")); + ASSERT_EQ(s, Status::OK()); + } + + // Check that range Lock inhibits an overlapping range lock + { + auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z")); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Check that range Lock inhibits an overlapping point lock + { + auto s = txn1->GetForUpdate(read_options, cf, Slice("b"), &value); + ASSERT_TRUE(s.IsTimedOut()); + } + + // Get a point lock, check that it inhibits range locks + { + auto s = txn0->Put(cf, Slice("n"), Slice("value")); + ASSERT_EQ(s, Status::OK()); + + auto s2 = txn1->GetRangeLock(cf, Endpoint("m"), Endpoint("p")); + ASSERT_TRUE(s2.IsTimedOut()); + } + + ASSERT_OK(txn0->Commit()); + txn1->Rollback(); + + delete txn0; + delete txn1; +} + +TEST_F(RangeLockingTest, MyRocksLikeUpdate) { + WriteOptions write_options; + TransactionOptions txn_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto cf = db->DefaultColumnFamily(); + Status s; + + // Get a range lock for the range we are about to update + s = txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")); + ASSERT_EQ(s, Status::OK()); + + bool try_range_lock_called = false; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "RangeTreeLockManager::TryRangeLock:enter", + [&](void* /*arg*/) { try_range_lock_called = true; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // For performance reasons, the following must NOT call lock_mgr->TryLock(): + // We verify that by checking the value of try_range_lock_called. + s = txn0->Put(cf, Slice("b"), Slice("value"), /*assume_tracked=*/true); + ASSERT_EQ(s, Status::OK()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_EQ(try_range_lock_called, false); + + txn0->Rollback(); + + delete txn0; +} + +TEST_F(RangeLockingTest, SnapshotValidation) { + Status s; + Slice key_slice = Slice("k"); + ColumnFamilyHandle* cfh = db->DefaultColumnFamily(); + + auto txn0 = NewTxn(); + txn0->Put(key_slice, Slice("initial")); + txn0->Commit(); + + // txn1 + auto txn1 = NewTxn(); + txn1->SetSnapshot(); + std::string val1; + s = txn1->Get(ReadOptions(), cfh, key_slice, &val1); + ASSERT_TRUE(s.ok()); + val1 = val1 + std::string("-txn1"); + + s = txn1->Put(cfh, key_slice, Slice(val1)); + ASSERT_TRUE(s.ok()); + + // txn2 + auto txn2 = NewTxn(); + txn2->SetSnapshot(); + std::string val2; + // This will see the original value as nothing is committed + // This is also Get, so it is doesn't acquire any locks. + s = txn2->Get(ReadOptions(), cfh, key_slice, &val2); + ASSERT_TRUE(s.ok()); + + // txn1 + s = txn1->Commit(); + ASSERT_TRUE(s.ok()); + + // txn2 + val2 = val2 + std::string("-txn2"); + // Now, this call should do Snapshot Validation and fail: + s = txn2->Put(cfh, key_slice, Slice(val2)); + ASSERT_TRUE(s.IsBusy()); + + s = txn2->Commit(); + ASSERT_TRUE(s.ok()); + + /* + // Not meaningful if s.IsBusy() above is true: + // Examine the result + auto txn3= NewTxn(); + std::string val3; + Status s3 = txn3->Get(ReadOptions(), cfh, key_slice, &val3); + fprintf(stderr, "Final: %s\n", val3.c_str()); + */ + delete txn0; + delete txn1; + delete txn2; +} + +TEST_F(RangeLockingTest, MultipleTrxLockStatusData) { + WriteOptions write_options; + TransactionOptions txn_options; + auto cf = db->DefaultColumnFamily(); + Status s; + + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + + // Get a range lock + s = txn0->GetRangeLock(cf, Endpoint("z"), Endpoint("z")); + ASSERT_EQ(s, Status::OK()); + + s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("e")); + ASSERT_EQ(s, Status::OK()); + + auto s2 = range_lock_mgr->GetRangeLockStatusData(); + + ASSERT_EQ(s2.size(), 2); + delete txn0; + delete txn1; +} + +void PointLockManagerTestExternalSetup(PointLockManagerTest* self) { + self->env_ = Env::Default(); + self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test"); + ASSERT_OK(self->env_->CreateDir(self->db_dir_)); + + Options opt; + opt.create_if_missing = true; + TransactionDBOptions txn_opt; + txn_opt.transaction_lock_timeout = 0; + + auto mutex_factory = std::make_shared<TransactionDBMutexFactoryImpl>(); + self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager()); + std::shared_ptr<RangeLockManagerHandle> range_lock_mgr = + std::dynamic_pointer_cast<RangeLockManagerHandle>(self->locker_); + txn_opt.lock_mgr_handle = range_lock_mgr; + + ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_)); + self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn"; +} + +INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest, + ::testing::Values(PointLockManagerTestExternalSetup)); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else // OS_WIN + +#include <stdio.h> +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "skipped as Range Locking is not supported on Windows\n"); + return 0; +} + +#endif // OS_WIN + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "skipped as transactions are not supported in rocksdb_lite\n"); + return 0; +} + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc new file mode 100644 index 000000000..32c678688 --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -0,0 +1,581 @@ +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +#include "utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h" + +#include <algorithm> +#include <cinttypes> +#include <mutex> + +#include "monitoring/perf_context_imp.h" +#include "range_tree_lock_tracker.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/transaction_db_mutex.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/hash.h" +#include "util/thread_local.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" + +namespace ROCKSDB_NAMESPACE { + +///////////////////////////////////////////////////////////////////////////// +// RangeTreeLockManager - A Range Lock Manager that uses PerconaFT's +// locktree library +///////////////////////////////////////////////////////////////////////////// + +RangeLockManagerHandle* NewRangeLockManager( + std::shared_ptr<TransactionDBMutexFactory> mutex_factory) { + std::shared_ptr<TransactionDBMutexFactory> use_factory; + + if (mutex_factory) + use_factory = mutex_factory; + else + use_factory.reset(new TransactionDBMutexFactoryImpl()); + + return new RangeTreeLockManager(use_factory); +} + +static const char SUFFIX_INFIMUM = 0x0; +static const char SUFFIX_SUPREMUM = 0x1; + +void serialize_endpoint(const Endpoint& endp, std::string* buf) { + buf->push_back(endp.inf_suffix ? SUFFIX_SUPREMUM : SUFFIX_INFIMUM); + buf->append(endp.slice.data(), endp.slice.size()); +} + +void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) { + assert(dbt->size >= 1); + const char* dbt_data = (const char*)dbt->data; + char suffix = dbt_data[0]; + assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM); + endp->inf_suffix = (suffix == SUFFIX_SUPREMUM); + endp->slice.assign(dbt_data + 1, dbt->size - 1); +} + +// Get a range lock on [start_key; end_key] range +Status RangeTreeLockManager::TryLock(PessimisticTransaction* txn, + uint32_t column_family_id, + const Endpoint& start_endp, + const Endpoint& end_endp, Env*, + bool exclusive) { + toku::lock_request request; + request.create(mutex_factory_); + DBT start_key_dbt, end_key_dbt; + + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:enter"); + std::string start_key; + std::string end_key; + serialize_endpoint(start_endp, &start_key); + serialize_endpoint(end_endp, &end_key); + + toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); + toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size()); + + // Use the txnid as "extra" in the lock_request. Then, KillLockWait() + // will be able to use kill_waiter(txn_id) to kill the wait if needed + // TODO: KillLockWait is gone and we are no longer using + // locktree::kill_waiter call. Do we need this anymore? + TransactionID wait_txn_id = txn->GetID(); + + auto lt = get_locktree_by_cfid(column_family_id); + + request.set(lt, (TXNID)txn, &start_key_dbt, &end_key_dbt, + exclusive ? toku::lock_request::WRITE : toku::lock_request::READ, + false /* not a big txn */, (void*)wait_txn_id); + + // This is for "periodically wake up and check if the wait is killed" feature + // which we are not using. + uint64_t killed_time_msec = 0; + uint64_t wait_time_msec = txn->GetLockTimeout(); + + // convert microseconds to milliseconds + if (wait_time_msec != (uint64_t)-1) + wait_time_msec = (wait_time_msec + 500) / 1000; + + std::vector<RangeDeadlockInfo> di_path; + request.m_deadlock_cb = [&](TXNID txnid, bool is_exclusive, + const DBT* start_dbt, const DBT* end_dbt) { + EndpointWithString start; + EndpointWithString end; + deserialize_endpoint(start_dbt, &start); + deserialize_endpoint(end_dbt, &end); + + di_path.push_back({((PessimisticTransaction*)txnid)->GetID(), + column_family_id, is_exclusive, std::move(start), + std::move(end)}); + }; + + request.start(); + + /* + If we are going to wait on the lock, we should set appropriate status in + the 'txn' object. This is done by the SetWaitingTxn() call below. + The API we are using are MariaDB's wait notification API, so the way this + is done is a bit convoluted. + In MyRocks, the wait details are visible in I_S.rocksdb_trx. + */ + std::string key_str(start_key.data(), start_key.size()); + struct st_wait_info { + PessimisticTransaction* txn; + uint32_t column_family_id; + std::string* key_ptr; + autovector<TransactionID> wait_ids; + bool done = false; + + static void lock_wait_callback(void* cdata, TXNID /*waiter*/, + TXNID waitee) { + TEST_SYNC_POINT("RangeTreeLockManager::TryRangeLock:WaitingTxn"); + auto self = (struct st_wait_info*)cdata; + // we know that the waiter is self->txn->GetID() (TODO: assert?) + if (!self->done) { + self->wait_ids.push_back(waitee); + self->txn->SetWaitingTxn(self->wait_ids, self->column_family_id, + self->key_ptr); + self->done = true; + } + } + } wait_info; + + wait_info.txn = txn; + wait_info.column_family_id = column_family_id; + wait_info.key_ptr = &key_str; + wait_info.done = false; + + const int r = + request.wait(wait_time_msec, killed_time_msec, + nullptr, // killed_callback + st_wait_info::lock_wait_callback, (void*)&wait_info); + + // Inform the txn that we are no longer waiting: + txn->ClearWaitingTxn(); + + request.destroy(); + switch (r) { + case 0: + break; /* fall through */ + case DB_LOCK_NOTGRANTED: + return Status::TimedOut(Status::SubCode::kLockTimeout); + case TOKUDB_OUT_OF_LOCKS: + return Status::Busy(Status::SubCode::kLockLimit); + case DB_LOCK_DEADLOCK: { + std::reverse(di_path.begin(), di_path.end()); + dlock_buffer_.AddNewPath( + RangeDeadlockPath(di_path, request.get_start_time())); + return Status::Busy(Status::SubCode::kDeadlock); + } + default: + assert(0); + return Status::Busy(Status::SubCode::kLockLimit); + } + + // Don't: save the acquired lock in txn->owned_locks. + // It is now responsibility of RangeTreeLockManager + // RangeLockList* range_list= + // ((RangeTreeLockTracker*)txn->tracked_locks_.get())->getOrCreateList(); + // range_list->append(column_family_id, &start_key_dbt, &end_key_dbt); + + return Status::OK(); +} + +static void range_lock_mgr_release_lock_int(toku::locktree* lt, + const PessimisticTransaction* txn, + uint32_t /*column_family_id*/, + const std::string& key) { + DBT key_dbt; + Endpoint endp(key.data(), key.size(), false); + std::string endp_image; + serialize_endpoint(endp, &endp_image); + + toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size()); + toku::range_buffer range_buf; + range_buf.create(); + range_buf.append(&key_dbt, &key_dbt); + lt->release_locks((TXNID)txn, &range_buf); + range_buf.destroy(); +} + +void RangeTreeLockManager::UnLock(PessimisticTransaction* txn, + ColumnFamilyId column_family_id, + const std::string& key, Env*) { + auto lt = get_locktree_by_cfid(column_family_id); + range_lock_mgr_release_lock_int(lt, txn, column_family_id, key); + toku::lock_request::retry_all_lock_requests( + lt, nullptr /* lock_wait_needed_callback */); +} + +void RangeTreeLockManager::UnLock(PessimisticTransaction* txn, + const LockTracker& tracker, Env*) { + const RangeTreeLockTracker* range_tracker = + static_cast<const RangeTreeLockTracker*>(&tracker); + + RangeTreeLockTracker* range_trx_tracker = + static_cast<RangeTreeLockTracker*>(txn->tracked_locks_.get()); + + bool all_keys = (range_trx_tracker == range_tracker); + + // tracked_locks_->range_list may hold nullptr if the transaction has never + // acquired any locks. + RangeLockList* range_list = ((RangeTreeLockTracker*)range_tracker)->getList(); + + if (range_list) { + { + MutexLock l(&range_list->mutex_); + /* + The lt->release_locks() call below will walk range_list->buffer_. We + need to prevent lock escalation callback from replacing + range_list->buffer_ while we are doing that. + + Additional complication here is internal mutex(es) in the locktree + (let's call them latches): + - Lock escalation first obtains latches on the lock tree + - Then, it calls RangeTreeLockManager::on_escalate to replace + transaction's range_list->buffer_. = Access to that buffer must be + synchronized, so it will want to acquire the range_list->mutex_. + + While in this function we would want to do the reverse: + - Acquire range_list->mutex_ to prevent access to the range_list. + - Then, lt->release_locks() call will walk through the range_list + - and acquire latches on parts of the lock tree to remove locks from + it. + + How do we avoid the deadlock? The idea is that here we set + releasing_locks_=true, and release the mutex. + All other users of the range_list must: + - Acquire the mutex, then check that releasing_locks_=false. + (the code in this function doesnt do that as there's only one thread + that releases transaction's locks) + */ + range_list->releasing_locks_ = true; + } + + // Don't try to call release_locks() if the buffer is empty! if we are + // not holding any locks, the lock tree might be in the STO-mode with + // another transaction, and our attempt to release an empty set of locks + // will cause an assertion failure. + for (auto it : range_list->buffers_) { + if (it.second->get_num_ranges()) { + toku::locktree* lt = get_locktree_by_cfid(it.first); + lt->release_locks((TXNID)txn, it.second.get(), all_keys); + + it.second->destroy(); + it.second->create(); + + toku::lock_request::retry_all_lock_requests(lt, nullptr); + } + } + range_list->clear(); // TODO: need this? + range_list->releasing_locks_ = false; + } +} + +int RangeTreeLockManager::compare_dbt_endpoints(void* arg, const DBT* a_key, + const DBT* b_key) { + const char* a = (const char*)a_key->data; + const char* b = (const char*)b_key->data; + + size_t a_len = a_key->size; + size_t b_len = b_key->size; + + size_t min_len = std::min(a_len, b_len); + + // Compare the values. The first byte encodes the endpoint type, its value + // is either SUFFIX_INFIMUM or SUFFIX_SUPREMUM. + Comparator* cmp = (Comparator*)arg; + int res = cmp->Compare(Slice(a + 1, min_len - 1), Slice(b + 1, min_len - 1)); + if (!res) { + if (b_len > min_len) { + // a is shorter; + if (a[0] == SUFFIX_INFIMUM) + return -1; //"a is smaller" + else { + // a is considered padded with 0xFF:FF:FF:FF... + return 1; // "a" is bigger + } + } else if (a_len > min_len) { + // the opposite of the above: b is shorter. + if (b[0] == SUFFIX_INFIMUM) + return 1; //"b is smaller" + else { + // b is considered padded with 0xFF:FF:FF:FF... + return -1; // "b" is bigger + } + } else { + // the lengths are equal (and the key values, too) + if (a[0] < b[0]) + return -1; + else if (a[0] > b[0]) + return 1; + else + return 0; + } + } else + return res; +} + +namespace { +void UnrefLockTreeMapsCache(void* ptr) { + // Called when a thread exits or a ThreadLocalPtr gets destroyed. + auto lock_tree_map_cache = + static_cast<std::unordered_map<uint32_t, locktree*>*>(ptr); + delete lock_tree_map_cache; +} +} // anonymous namespace + +RangeTreeLockManager::RangeTreeLockManager( + std::shared_ptr<TransactionDBMutexFactory> mutex_factory) + : mutex_factory_(mutex_factory), + ltree_lookup_cache_(new ThreadLocalPtr(&UnrefLockTreeMapsCache)), + dlock_buffer_(10) { + ltm_.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); +} + +void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize( + uint32_t target_size) { + dlock_buffer_.Resize(target_size); +} + +void RangeTreeLockManager::Resize(uint32_t target_size) { + SetRangeDeadlockInfoBufferSize(target_size); +} + +std::vector<RangeDeadlockPath> +RangeTreeLockManager::GetRangeDeadlockInfoBuffer() { + return dlock_buffer_.PrepareBuffer(); +} + +std::vector<DeadlockPath> RangeTreeLockManager::GetDeadlockInfoBuffer() { + std::vector<DeadlockPath> res; + std::vector<RangeDeadlockPath> data = GetRangeDeadlockInfoBuffer(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + std::vector<DeadlockInfo> path; + + for (auto it2 = it->path.begin(); it2 != it->path.end(); ++it2) { + path.push_back( + {it2->m_txn_id, it2->m_cf_id, it2->m_exclusive, it2->m_start.slice}); + } + res.push_back(DeadlockPath(path, it->deadlock_time)); + } + return res; +} + +/* + @brief Lock Escalation Callback function + + @param txnid Transaction whose locks got escalated + @param lt Lock Tree where escalation is happening (currently there is + only one) + @param buffer Escalation result: list of locks that this transaction now + owns in this lock tree. + @param void* Callback context +*/ + +void RangeTreeLockManager::on_escalate(TXNID txnid, const locktree* lt, + const range_buffer& buffer, void*) { + auto txn = (PessimisticTransaction*)txnid; + + RangeLockList* trx_locks = + ((RangeTreeLockTracker*)txn->tracked_locks_.get())->getList(); + + MutexLock l(&trx_locks->mutex_); + if (trx_locks->releasing_locks_) { + /* + Do nothing. The transaction is releasing its locks, so it will not care + about having a correct list of ranges. (In TokuDB, + toku_db_txn_escalate_callback() makes use of this property, too) + */ + return; + } + + // TODO: are we tracking this mem: + // lt->get_manager()->note_mem_released(trx_locks->ranges.buffer->total_memory_size()); + + uint32_t cf_id = (uint32_t)lt->get_dict_id().dictid; + + auto it = trx_locks->buffers_.find(cf_id); + it->second->destroy(); + it->second->create(); + + toku::range_buffer::iterator iter(&buffer); + toku::range_buffer::iterator::record rec; + while (iter.current(&rec)) { + it->second->append(rec.get_left_key(), rec.get_right_key()); + iter.next(); + } + // TODO: same as above: + // lt->get_manager()->note_mem_used(ranges.buffer->total_memory_size()); +} + +RangeTreeLockManager::~RangeTreeLockManager() { + // TODO: at this point, synchronization is not needed, right? + + autovector<void*> local_caches; + ltree_lookup_cache_->Scrape(&local_caches, nullptr); + for (auto cache : local_caches) { + delete static_cast<LockTreeMap*>(cache); + } + + for (auto it : ltree_map_) { + ltm_.release_lt(it.second); + } + ltm_.destroy(); +} + +RangeLockManagerHandle::Counters RangeTreeLockManager::GetStatus() { + LTM_STATUS_S ltm_status_test; + ltm_.get_status(<m_status_test); + Counters res; + + // Searching status variable by its string name is how Toku's unit tests + // do it (why didn't they make LTM_ESCALATION_COUNT constant visible?) + // lookup keyname in status + for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS; i++) { + TOKU_ENGINE_STATUS_ROW status = <m_status_test.status[i]; + if (strcmp(status->keyname, "LTM_ESCALATION_COUNT") == 0) { + res.escalation_count = status->value.num; + continue; + } + if (strcmp(status->keyname, "LTM_SIZE_CURRENT") == 0) { + res.current_lock_memory = status->value.num; + } + } + return res; +} + +void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) { + uint32_t column_family_id = cfh->GetID(); + + InstrumentedMutexLock l(<ree_map_mutex_); + if (ltree_map_.find(column_family_id) == ltree_map_.end()) { + DICTIONARY_ID dict_id = {.dictid = column_family_id}; + toku::comparator cmp; + cmp.create(compare_dbt_endpoints, (void*)cfh->GetComparator()); + toku::locktree* ltree = ltm_.get_lt(dict_id, cmp, + /* on_create_extra*/ nullptr); + // This is ok to because get_lt has copied the comparator: + cmp.destroy(); + ltree_map_.emplace(column_family_id, ltree); + } else { + // column_family already exists in lock map + // assert(false); + } +} + +void RangeTreeLockManager::RemoveColumnFamily(const ColumnFamilyHandle* cfh) { + uint32_t column_family_id = cfh->GetID(); + // Remove lock_map for this column family. Since the lock map is stored + // as a shared ptr, concurrent transactions can still keep using it + // until they release their references to it. + + // TODO^ if one can drop a column family while a transaction is holding a + // lock in it, is column family's comparator guaranteed to survive until + // all locks are released? (we depend on this). + { + InstrumentedMutexLock l(<ree_map_mutex_); + + auto lock_maps_iter = ltree_map_.find(column_family_id); + assert(lock_maps_iter != ltree_map_.end()); + + ltm_.release_lt(lock_maps_iter->second); + + ltree_map_.erase(lock_maps_iter); + } // lock_map_mutex_ + + // TODO: why do we delete first and clear the caches second? Shouldn't this be + // done in the reverse order? (if we do it in the reverse order, how will we + // prevent somebody from re-populating the cache?) + + // Clear all thread-local caches. We collect a vector of caches but we dont + // really need them. + autovector<void*> local_caches; + ltree_lookup_cache_->Scrape(&local_caches, nullptr); +} + +toku::locktree* RangeTreeLockManager::get_locktree_by_cfid( + uint32_t column_family_id) { + // First check thread-local cache + if (ltree_lookup_cache_->Get() == nullptr) { + ltree_lookup_cache_->Reset(new LockTreeMap()); + } + + auto ltree_map_cache = static_cast<LockTreeMap*>(ltree_lookup_cache_->Get()); + + auto it = ltree_map_cache->find(column_family_id); + if (it != ltree_map_cache->end()) { + // Found lock map for this column family. + return it->second; + } + + // Not found in local cache, grab mutex and check shared LockMaps + InstrumentedMutexLock l(<ree_map_mutex_); + + it = ltree_map_.find(column_family_id); + if (it == ltree_map_.end()) { + return nullptr; + } else { + // Found lock map. Store in thread-local cache and return. + // std::shared_ptr<LockMap>& lock_map = lock_map_iter->second; + ltree_map_cache->insert({column_family_id, it->second}); + return it->second; + } + + return nullptr; +} + +struct LOCK_PRINT_CONTEXT { + RangeLockManagerHandle::RangeLockStatus* data; // Save locks here + uint32_t cfh_id; // Column Family whose tree we are traversing +}; + +// Report left endpoints of the acquired locks +LockManager::PointLockStatus RangeTreeLockManager::GetPointLockStatus() { + PointLockStatus res; + LockManager::RangeLockStatus data = GetRangeLockStatus(); + // report left endpoints + for (auto it = data.begin(); it != data.end(); ++it) { + auto& val = it->second; + res.insert({it->first, {val.start.slice, val.ids, val.exclusive}}); + } + return res; +} + +static void push_into_lock_status_data(void* param, const DBT* left, + const DBT* right, TXNID txnid_arg, + bool is_shared, TxnidVector* owners) { + struct LOCK_PRINT_CONTEXT* ctx = (LOCK_PRINT_CONTEXT*)param; + struct RangeLockInfo info; + + info.exclusive = !is_shared; + + deserialize_endpoint(left, &info.start); + deserialize_endpoint(right, &info.end); + + if (txnid_arg != TXNID_SHARED) { + TXNID txnid = ((PessimisticTransaction*)txnid_arg)->GetID(); + info.ids.push_back(txnid); + } else { + for (auto it : *owners) { + TXNID real_id = ((PessimisticTransaction*)it)->GetID(); + info.ids.push_back(real_id); + } + } + ctx->data->insert({ctx->cfh_id, info}); +} + +LockManager::RangeLockStatus RangeTreeLockManager::GetRangeLockStatus() { + LockManager::RangeLockStatus data; + { + InstrumentedMutexLock l(<ree_map_mutex_); + for (auto it : ltree_map_) { + LOCK_PRINT_CONTEXT ctx = {&data, it.first}; + it.second->dump_locks((void*)&ctx, push_into_lock_status_data); + } + } + return data; +} + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_WIN +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h new file mode 100644 index 000000000..b53fe90a1 --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h @@ -0,0 +1,128 @@ +// +// Range Lock Manager that uses PerconaFT's locktree library +// +#pragma once +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +// For DeadlockInfoBuffer: +#include "util/thread_local.h" +#include "utilities/transactions/lock/point/point_lock_manager.h" +#include "utilities/transactions/lock/range/range_lock_manager.h" + +// Lock Tree library: +#include "lib/locktree/lock_request.h" +#include "lib/locktree/locktree.h" +#include "range_tree_lock_tracker.h" + +namespace ROCKSDB_NAMESPACE { + +using namespace toku; + +typedef DeadlockInfoBufferTempl<RangeDeadlockPath> RangeDeadlockInfoBuffer; + +/* + A Range Lock Manager that uses PerconaFT's locktree library +*/ +class RangeTreeLockManager : public RangeLockManagerBase, + public RangeLockManagerHandle { + public: + LockManager* getLockManager() override { return this; } + + void AddColumnFamily(const ColumnFamilyHandle* cfh) override; + void RemoveColumnFamily(const ColumnFamilyHandle* cfh) override; + + void Resize(uint32_t) override; + std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; + + std::vector<RangeDeadlockPath> GetRangeDeadlockInfoBuffer() override; + void SetRangeDeadlockInfoBufferSize(uint32_t target_size) override; + + // Get a lock on a range + // @note only exclusive locks are currently supported (requesting a + // non-exclusive lock will get an exclusive one) + using LockManager::TryLock; + Status TryLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const Endpoint& start_endp, const Endpoint& end_endp, Env* env, + bool exclusive) override; + + void UnLock(PessimisticTransaction* txn, const LockTracker& tracker, + Env* env) override; + void UnLock(PessimisticTransaction* txn, ColumnFamilyId column_family_id, + const std::string& key, Env* env) override; + void UnLock(PessimisticTransaction*, ColumnFamilyId, const Endpoint&, + const Endpoint&, Env*) override{ + // TODO: range unlock does nothing... + }; + + RangeTreeLockManager( + std::shared_ptr<TransactionDBMutexFactory> mutex_factory); + + ~RangeTreeLockManager(); + + int SetMaxLockMemory(size_t max_lock_memory) override { + return ltm_.set_max_lock_memory(max_lock_memory); + } + + size_t GetMaxLockMemory() override { return ltm_.get_max_lock_memory(); } + + Counters GetStatus() override; + + bool IsPointLockSupported() const override { + // One could have acquired a point lock (it is reduced to range lock) + return true; + } + + PointLockStatus GetPointLockStatus() override; + + // This is from LockManager + LockManager::RangeLockStatus GetRangeLockStatus() override; + + // This has the same meaning as GetRangeLockStatus but is from + // RangeLockManagerHandle + RangeLockManagerHandle::RangeLockStatus GetRangeLockStatusData() override { + return GetRangeLockStatus(); + } + + bool IsRangeLockSupported() const override { return true; } + + const LockTrackerFactory& GetLockTrackerFactory() const override { + return RangeTreeLockTrackerFactory::Get(); + } + + private: + toku::locktree_manager ltm_; + + std::shared_ptr<TransactionDBMutexFactory> mutex_factory_; + + // Map from cf_id to locktree*. Can only be accessed while holding the + // ltree_map_mutex_. + using LockTreeMap = std::unordered_map<uint32_t, locktree*>; + LockTreeMap ltree_map_; + + InstrumentedMutex ltree_map_mutex_; + + // Per-thread cache of ltree_map_. + // (uses the same approach as TransactionLockMgr::lock_maps_cache_) + std::unique_ptr<ThreadLocalPtr> ltree_lookup_cache_; + + RangeDeadlockInfoBuffer dlock_buffer_; + + // Get the lock tree which stores locks for Column Family with given cf_id + toku::locktree* get_locktree_by_cfid(uint32_t cf_id); + + static int compare_dbt_endpoints(void* arg, const DBT* a_key, + const DBT* b_key); + + // Callbacks + static int on_create(locktree*, void*) { return 0; /* no error */ } + static void on_destroy(locktree*) {} + static void on_escalate(TXNID txnid, const locktree* lt, + const range_buffer& buffer, void* extra); +}; + +void serialize_endpoint(const Endpoint& endp, std::string* buf); + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_WIN +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc new file mode 100644 index 000000000..f510c1f09 --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc @@ -0,0 +1,65 @@ +// +// A lock tracker to be used together with RangeTreeLockManager +// +#ifndef ROCKSDB_LITE +#ifndef OS_WIN + +#include "range_tree_lock_tracker.h" + +#include "range_tree_lock_manager.h" + +namespace ROCKSDB_NAMESPACE { + +RangeLockList *RangeTreeLockTracker::getOrCreateList() { + RangeLockList *res; + if ((res = getList())) return res; + + // Doesn't exist, create + range_list.reset(new RangeLockList()); + return getList(); +} + +void RangeTreeLockTracker::Track(const PointLockRequest &lock_req) { + DBT key_dbt; + std::string key; + serialize_endpoint(Endpoint(lock_req.key, false), &key); + toku_fill_dbt(&key_dbt, key.data(), key.size()); + RangeLockList *rl = getOrCreateList(); + rl->append(lock_req.column_family_id, &key_dbt, &key_dbt); +} + +void RangeTreeLockTracker::Track(const RangeLockRequest &lock_req) { + DBT start_dbt, end_dbt; + std::string start_key, end_key; + + serialize_endpoint(lock_req.start_endp, &start_key); + serialize_endpoint(lock_req.end_endp, &end_key); + + toku_fill_dbt(&start_dbt, start_key.data(), start_key.size()); + toku_fill_dbt(&end_dbt, end_key.data(), end_key.size()); + + RangeLockList *rl = getOrCreateList(); + rl->append(lock_req.column_family_id, &start_dbt, &end_dbt); +} + +PointLockStatus RangeTreeLockTracker::GetPointLockStatus( + ColumnFamilyId /*cf_id*/, const std::string & /*key*/) const { + // TODO: what to do here if we are holding a range lock that is embedding the + // point? + + // "Cheat" and return the status which says the point is not locked. + PointLockStatus p; + p.locked = false; + p.exclusive = true; + p.seq = 0; + return p; +} + +void RangeTreeLockTracker::Clear() { + // This will delete the RangeLockList and cause a proper cleanup + range_list = nullptr; +} + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_WIN +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h new file mode 100644 index 000000000..8e9e0852d --- /dev/null +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.h @@ -0,0 +1,148 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <memory> +#include <string> +#include <unordered_map> + +#include "util/mutexlock.h" +#include "utilities/transactions/lock/lock_tracker.h" + +// Range Locking: +#include "lib/locktree/lock_request.h" +#include "lib/locktree/locktree.h" + +namespace ROCKSDB_NAMESPACE { + +class RangeLockList; + +/* + Storage for locks that are currently held by a transaction. + + Locks are kept in toku::range_buffer because toku::locktree::release_locks() + accepts that as an argument. + + Note: the list of locks may differ slighly from the contents of the lock + tree, due to concurrency between lock acquisition, lock release, and lock + escalation. See MDEV-18227 and RangeTreeLockManager::UnLock for details. + This property is currently harmless. +*/ +class RangeLockList /*: public PessimisticTransaction::Lock--Storage */ { + public: + virtual ~RangeLockList() { clear(); } + + void clear() { + for (auto it : buffers_) { + it.second->destroy(); + } + buffers_.clear(); + } + + RangeLockList() : releasing_locks_(false) {} + + void append(uint32_t cf_id, const DBT* left_key, const DBT* right_key) { + MutexLock l(&mutex_); + // there's only one thread that calls this function. + // the same thread will do lock release. + assert(!releasing_locks_); + auto it = buffers_.find(cf_id); + if (it == buffers_.end()) { + // create a new one + it = buffers_ + .emplace(cf_id, std::shared_ptr<toku::range_buffer>( + new toku::range_buffer())) + .first; + it->second->create(); + } + it->second->append(left_key, right_key); + } + + std::unordered_map<uint32_t, std::shared_ptr<toku::range_buffer>> buffers_; + + // Synchronization. See RangeTreeLockManager::UnLock for details + port::Mutex mutex_; + bool releasing_locks_; +}; + +// Tracks range locks + +class RangeTreeLockTracker : public LockTracker { + public: + RangeTreeLockTracker() : range_list(nullptr) {} + + RangeTreeLockTracker(const RangeTreeLockTracker&) = delete; + RangeTreeLockTracker& operator=(const RangeTreeLockTracker&) = delete; + + void Track(const PointLockRequest&) override; + void Track(const RangeLockRequest&) override; + + bool IsPointLockSupported() const override { + // This indicates that we don't implement GetPointLockStatus() + return false; + } + bool IsRangeLockSupported() const override { return true; } + + // a Not-supported dummy implementation. + UntrackStatus Untrack(const RangeLockRequest& /*lock_request*/) override { + return UntrackStatus::NOT_TRACKED; + } + + UntrackStatus Untrack(const PointLockRequest& /*lock_request*/) override { + return UntrackStatus::NOT_TRACKED; + } + + // "If this method is not supported, leave it as a no-op." + void Merge(const LockTracker&) override {} + + // "If this method is not supported, leave it as a no-op." + void Subtract(const LockTracker&) override {} + + void Clear() override; + + // "If this method is not supported, returns nullptr." + virtual LockTracker* GetTrackedLocksSinceSavePoint( + const LockTracker&) const override { + return nullptr; + } + + PointLockStatus GetPointLockStatus(ColumnFamilyId column_family_id, + const std::string& key) const override; + + // The return value is only used for tests + uint64_t GetNumPointLocks() const override { return 0; } + + ColumnFamilyIterator* GetColumnFamilyIterator() const override { + return nullptr; + } + + KeyIterator* GetKeyIterator( + ColumnFamilyId /*column_family_id*/) const override { + return nullptr; + } + + // Non-override (TODO: make private!) + RangeLockList* getList() { return range_list.get(); } + RangeLockList* getOrCreateList(); + + private: + std::shared_ptr<RangeLockList> range_list; +}; + +class RangeTreeLockTrackerFactory : public LockTrackerFactory { + public: + static const RangeTreeLockTrackerFactory& Get() { + static const RangeTreeLockTrackerFactory instance; + return instance; + } + + LockTracker* Create() const override { return new RangeTreeLockTracker(); } + + private: + RangeTreeLockTrackerFactory() {} +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 14f507b81..05adc1919 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -330,8 +330,14 @@ class TransactionBaseImpl : public Transaction { // Optimistic Transactions will keep note the requested locks (not actually // locked), and do conflict checking until commit time based on the tracked // lock requests. + // + // Declared as public because + // - RangeLocking's lock escalation may replace it + // - RangeTreeLockManager::UnLock needs access to it + public: std::unique_ptr<LockTracker> tracked_locks_; + protected: // Stack of the Snapshot saved at each save point. Saved snapshots may be // nullptr if there was no snapshot at the time SetSavePoint() was called. std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint,
participants (1)
-
psergey