revision-id: 8eb09c8598238945560b9750425b2a64238bec1d (v5.8-1028-g8eb09c859) parent(s): 3abdc95f082d80a3ce76bad7d6d319f8706429b2 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-04-07 20:49:34 +0300 message: Remove Endpoints as a separate data format Before, Range Endpoints were passed as Slice object, and also the user provided compare_endpoints_func and convert_key_to_endpoint_func. Now, the user passes rocksdb::Endpoint objects. The lock tree stores endpoints, but the encoding is handled internally by the range locking module. The patch is work-in-progress as the endpoint comparison function still uses memcmp. --- include/rocksdb/utilities/transaction.h | 49 ++++++++++- include/rocksdb/utilities/transaction_db.h | 21 ----- utilities/transactions/pessimistic_transaction.cc | 4 +- utilities/transactions/pessimistic_transaction.h | 4 +- .../transactions/pessimistic_transaction_db.cc | 7 +- .../transactions/pessimistic_transaction_db.h | 4 +- utilities/transactions/range_locking_test.cc | 23 +---- utilities/transactions/transaction_lock_mgr.cc | 99 ++++++++++++++++++---- utilities/transactions/transaction_lock_mgr.h | 16 +--- 9 files changed, 144 insertions(+), 83 deletions(-) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 8a15a3bbd..18ae4726c 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -24,6 +24,53 @@ using TransactionName = std::string; using TransactionID = uint64_t; + +/* + A range endpoint. + + Basic ranges can be defined over rowkeys. A Comparator function defines + ordering, a range endpoint is just a rowkey. + + When one use lexicographic-like ordering, they may want to request "prefix + ranges". + + == Lexicographic ordering == + A lexicographic-like ordering satisfies these criteria: + + 1.The ordering is prefix-based. If there are two keys in form + + key_a = {prefix_a suffix_a} + key_b = {prefix_b suffix_b} + and + prefix_a < prefix_b + then + key_a < key_b. + + An empty string is less than any other constant (from this it follows that + for any prefix and suffix, {prefix, suffix} > {prefix}) + + == Prefix ranges == + With lexicographic-like ordering, one may to construct ranges from a + restriction in form prefix=P: + - the left endpoint would would be {P, inf_suffix=false} + - the right endpoint would be {P, inf_suffix=true}. + + (TODO: or should we instead of the above just require that [Reverse]ByteWiseComparator + is used?) +*/ + +class Endpoint { + public: + Slice slice; + bool inf_suffix; + + Endpoint(const char* s, bool inf_suffix_arg=false) : + slice(s), inf_suffix(inf_suffix_arg) {} + + Endpoint(const char* s, size_t size, bool inf_suffix_arg=false) : + slice(s, size), inf_suffix(inf_suffix_arg) {} +}; + // Provides notification to the caller of SetSnapshotOnNextOperation when // the actual snapshot gets created class TransactionNotifier { @@ -256,7 +303,7 @@ class Transaction { // Note: range endpoints generally a use a different data format than // ranges. virtual Status GetRangeLock(ColumnFamilyHandle*, - const Slice&, const Slice&) { + const Endpoint&, const Endpoint&) { return Status::NotSupported(); } diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 06d73bedd..b12c70785 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -33,24 +33,6 @@ enum TxnDBWritePolicy { const uint32_t kInitialMaxDeadlocks = 5; -struct RangeLockingOptions { - typedef void (*convert_key_to_endpoint_func)(const rocksdb::Slice &key, - std::string *endpoint); - - typedef int (*compare_endpoints_func)(const char *a, size_t a_len, - const char *b, size_t b_len); - - // TODO: So, functions to compare ranges are here, while - // functions to compare rowkeys are in per-column family and are in - // rocksdb::ColumnFamilyOptions - // - // TODO: Can we change this to work in a way that does not expose the endpoints - // to the user (like discussed on the meeting?) - // - convert_key_to_endpoint_func cvt_func; - compare_endpoints_func cmp_func; -}; - struct TransactionDBOptions { // Specifies the maximum number of keys that can be locked at the same time // per column family. @@ -115,9 +97,6 @@ struct TransactionDBOptions { // If true, range_locking_opts specifies options on range locking (filling // the struct is mandatory) bool use_range_locking = false; - - // Members are valid if use_range_locking= true. - RangeLockingOptions range_locking_opts; }; struct TransactionOptions { diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 339dcf422..7ca6ab981 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -601,8 +601,8 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, Status PessimisticTransaction::GetRangeLock(ColumnFamilyHandle* column_family, - const Slice& start_endp, - const Slice& end_endp) { + const Endpoint& start_endp, + const Endpoint& end_endp) { ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); uint32_t cfh_id= GetColumnFamilyID(cfh); diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 50030700d..9fb44c74e 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -113,8 +113,8 @@ class PessimisticTransaction : public TransactionBaseImpl { int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } virtual Status GetRangeLock(ColumnFamilyHandle* column_family, - const Slice& start_key, - const Slice& end_key); + const Endpoint& start_key, + const Endpoint& end_key); protected: // Refer to // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 270859d1a..06981adf4 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -50,8 +50,7 @@ void PessimisticTransactionDB::init_lock_manager() { new TransactionDBMutexFactoryImpl()); if (txn_db_options_.use_range_locking) { - range_lock_mgr_= new RangeLockMgr(this, txn_db_options_.range_locking_opts, - mutex_factory); + range_lock_mgr_= new RangeLockMgr(this, mutex_factory); lock_mgr = range_lock_mgr_; } else { lock_mgr = new TransactionLockMgr(this, txn_db_options_.num_stripes, @@ -399,8 +398,8 @@ Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction *txn, uint32_t cfh_id, - const Slice& start_endp, - const Slice& end_endp) { + const Endpoint& start_endp, + const Endpoint& end_endp) { if (range_lock_mgr_) { return range_lock_mgr_->TryRangeLock(txn, cfh_id, start_endp, end_endp, /*exclusive=*/false); diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index ad6eaf4a9..59da1634c 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -80,8 +80,8 @@ class PessimisticTransactionDB : public TransactionDB { const std::string& key, bool exclusive); Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id, - const Slice& start_endp, - const Slice& end_endp); + const Endpoint& start_endp, + const Endpoint& end_endp); void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys, bool all_keys_hint=false); diff --git a/utilities/transactions/range_locking_test.cc b/utilities/transactions/range_locking_test.cc index 26ee9b84c..aef345be5 100644 --- a/utilities/transactions/range_locking_test.cc +++ b/utilities/transactions/range_locking_test.cc @@ -36,19 +36,6 @@ using std::string; namespace rocksdb { -void range_endpoint_convert_same(const rocksdb::Slice &key, - std::string *res) -{ - res->clear(); - res->append(key.data(), key.size()); -} - -int range_endpoints_compare_default(const char *a, size_t a_len, - const char *b, size_t b_len) -{ - return Slice(a, a_len).compare(Slice(b, b_len)); -} - class RangeLockingTest : public ::testing::Test { public: TransactionDB* db; @@ -65,10 +52,6 @@ class RangeLockingTest : public ::testing::Test { DestroyDB(dbname, options); Status s; txn_db_options.use_range_locking = true; - txn_db_options.range_locking_opts.cvt_func = - range_endpoint_convert_same; - txn_db_options.range_locking_opts.cmp_func = - range_endpoints_compare_default; s = TransactionDB::Open(options, txn_db_options, dbname, &db); assert(s.ok()); @@ -101,7 +84,7 @@ TEST_F(RangeLockingTest, BasicRangeLocking) { // Get a range lock { auto s= txn0->GetRangeLock(db->DefaultColumnFamily(), - Slice("a"), Slice("c")); + Endpoint("a"), Endpoint("c")); ASSERT_EQ(s, Status::OK()); } @@ -109,7 +92,7 @@ TEST_F(RangeLockingTest, BasicRangeLocking) { // Check that range Lock inhibits an overlapping range lock { auto s= txn1->GetRangeLock(db->DefaultColumnFamily(), - Slice("b"), Slice("z")); + Endpoint("b"), Endpoint("z")); ASSERT_TRUE(s.IsTimedOut()); } @@ -127,7 +110,7 @@ TEST_F(RangeLockingTest, BasicRangeLocking) { ASSERT_EQ(s, Status::OK()); auto s2= txn1->GetRangeLock(db->DefaultColumnFamily(), - Slice("c"), Slice("e")); + Endpoint("c"), Endpoint("e")); ASSERT_TRUE(s2.IsTimedOut()); } diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index e0802fb25..d6c221cc7 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -790,16 +790,30 @@ public: bool releasing_locks_; }; +static +void serialize_endpoint(const Endpoint &endp, std::string *buf) { + const char SUFFIX_INF= 0x0; + const char SUFFIX_SUP= 0x1; + buf->push_back(endp.inf_suffix ? SUFFIX_SUP : SUFFIX_INF); + buf->append(endp.slice.data(), endp.slice.size()); +} + + // Get a range lock on [start_key; end_key] range Status RangeLockMgr::TryRangeLock(PessimisticTransaction* txn, uint32_t column_family_id, - const rocksdb::Slice &start_key, - const rocksdb::Slice &end_key, + const Endpoint &start_endp, + const Endpoint &end_endp, bool /*exclusive*/) { toku::lock_request request; request.create(mutex_factory_); DBT start_key_dbt, end_key_dbt; + std::string start_key; + std::string end_key; + serialize_endpoint(start_endp, &start_key); + serialize_endpoint(end_endp, &end_key); + toku_fill_dbt(&start_key_dbt, start_key.data(), start_key.size()); toku_fill_dbt(&end_key_dbt, end_key.data(), end_key.size()); @@ -896,10 +910,8 @@ Status RangeLockMgr::TryLock(PessimisticTransaction* txn, uint32_t column_family_id, const std::string& key, Env*, bool exclusive) { - std::string endpoint; - convert_key_to_endpoint(rocksdb::Slice(key.data(), key.size()), &endpoint); - rocksdb::Slice endp_slice(endpoint.data(), endpoint.length()); - return TryRangeLock(txn, column_family_id, endp_slice, endp_slice, exclusive); + Endpoint endp(key.data(), key.size(), false); + return TryRangeLock(txn, column_family_id, endp, endp, exclusive); } static void @@ -907,8 +919,12 @@ range_lock_mgr_release_lock_int(toku::locktree *lt, const PessimisticTransaction* txn, uint32_t /*column_family_id*/, const std::string& key) { - DBT key_dbt; - toku_fill_dbt(&key_dbt, key.data(), key.size()); + DBT key_dbt; + Endpoint endp(key.data(), key.size(), false); + std::string endp_image; + serialize_endpoint(endp, &endp_image); + + toku_fill_dbt(&key_dbt, endp_image.data(), endp_image.size()); toku::range_buffer range_buf; range_buf.create(); range_buf.append(&key_dbt, &key_dbt); @@ -917,8 +933,8 @@ range_lock_mgr_release_lock_int(toku::locktree *lt, } void RangeLockMgr::UnLock(PessimisticTransaction* txn, - uint32_t column_family_id, - const std::string& key, Env*) { + uint32_t column_family_id, + const std::string& key, Env*) { range_lock_mgr_release_lock_int(lt, txn, column_family_id, key); toku::lock_request::retry_all_lock_requests(lt, nullptr /* lock_wait_needed_callback */); } @@ -993,24 +1009,73 @@ void RangeLockMgr::UnLockAll(const PessimisticTransaction* txn, Env*) { } } + + + int RangeLockMgr::compare_dbt_endpoints(__toku_db*, void *arg, const DBT *a_key, const DBT *b_key) { RangeLockMgr* mgr= (RangeLockMgr*) arg; - return mgr->compare_endpoints((const char*)a_key->data, a_key->size, - (const char*)b_key->data, b_key->size); + // TODO: this should compare endpoints using the user-provided comparator + + // endpoint encoding. + // (just use one from any column family) + + const char *a= (const char*)a_key->data; + const char *b= (const char*)b_key->data; + + size_t a_len= a_key->size; + size_t b_len= b_key->size; + + size_t min_len= std::min(a_len, b_len); + + //compare the values. Skip the first byte as it is the endpoint signifier + + //TODO: use the upper layer' comparison function. + int res= memcmp(a+1, b+1, min_len-1); + if (!res) + { + if (b_len > min_len) + { + // a is shorter; + if (a[0] == 0) + return -1; //"a is smaller" + else + { + // a is considered padded with 0xFF:FF:FF:FF... + return 1; // "a" is bigger + } + } + else if (a_len > min_len) + { + // the opposite of the above: b is shorter. + if (b[0] == 0) + return 1; //"b is smaller" + else + { + // b is considered padded with 0xFF:FF:FF:FF... + return -1; // "b" is bigger + } + } + else + { + // the lengths are equal (and the key values, too) + if (a[0] < b[0]) + return -1; + else if (a[0] > b[0]) + return 1; + else + return 0; + } + } + else + return res; } RangeLockMgr::RangeLockMgr(TransactionDB* txn_db, - const RangeLockingOptions& opts, std::shared_ptr<TransactionDBMutexFactory> mutex_factory) : my_txn_db_(txn_db), mutex_factory_(mutex_factory) { - convert_key_to_endpoint= opts.cvt_func; - compare_endpoints= opts.cmp_func; - ltm.create(on_create, on_destroy, on_escalate, NULL, mutex_factory_); - cmp_.create(compare_dbt_endpoints, (void*)this, NULL); DICTIONARY_ID dict_id = { .dictid = 1 }; lt= ltm.get_lt(dict_id, cmp_, /* on_create_extra*/nullptr); diff --git a/utilities/transactions/transaction_lock_mgr.h b/utilities/transactions/transaction_lock_mgr.h index d6c5479f2..c45ebae3f 100644 --- a/utilities/transactions/transaction_lock_mgr.h +++ b/utilities/transactions/transaction_lock_mgr.h @@ -212,8 +212,8 @@ class RangeLockMgr : // used ATM) Status TryRangeLock(PessimisticTransaction* txn, uint32_t column_family_id, - const rocksdb::Slice &start_key, - const rocksdb::Slice &end_key, + const Endpoint &start_endp, + const Endpoint &end_endp, bool exclusive); void UnLock(const PessimisticTransaction* txn, const TransactionKeyMap* keys, @@ -227,7 +227,6 @@ class RangeLockMgr : const std::string& key, Env* env) override ; RangeLockMgr(TransactionDB* txn_db, - const RangeLockingOptions& opts, std::shared_ptr<TransactionDBMutexFactory> mutex_factory); ~RangeLockMgr(); @@ -240,23 +239,12 @@ class RangeLockMgr : LockStatusData GetLockStatusData() override; - typedef RangeLockingOptions::convert_key_to_endpoint_func convert_key_to_endpoint_func; - typedef RangeLockingOptions::compare_endpoints_func compare_endpoints_func; - private: toku::locktree_manager ltm; toku::locktree *lt; // only one tree for now toku::comparator cmp_; - // Convert rowkey to endpoint (TODO: shouldn't "rowkey=const" translate into - // a pair of [start; end] endpoints in general? They translate into the same - // value in our current encoding, but...) - convert_key_to_endpoint_func convert_key_to_endpoint; - - // User-provided endpoint comparison function - compare_endpoints_func compare_endpoints; - TransactionDB* my_txn_db_; std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;