revision-id: 6618bee92d82872739fac5c115e70077da238fb4 (fb-prod202002-126-g6618bee92d8) parent(s): cf38ba95e9a5c4fda9351cf981b5bd55f42db61c author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2020-08-24 16:58:09 +0300 message: Post-rebase fixes: apply the patch lost during rebase: commit 43613926c1b82427c83a02759fc6785282fbfce2 Author: Sergei Petrunia <psergey@askmonty.org> Date: Mon Jan 7 19:52:48 2019 +0300 Range Locking: Make range locks on secondary indexes inhibit DMLs --- mysql-test/suite/rocksdb/r/range_locking.result | 45 +++++++++++++++++++ storage/rocksdb/ha_rocksdb.cc | 57 +++++++++++++++++++------ storage/rocksdb/ha_rocksdb.h | 4 +- 3 files changed, 91 insertions(+), 15 deletions(-) diff --git a/mysql-test/suite/rocksdb/r/range_locking.result b/mysql-test/suite/rocksdb/r/range_locking.result index dfac4ef5123..3641fb1943e 100644 --- a/mysql-test/suite/rocksdb/r/range_locking.result +++ b/mysql-test/suite/rocksdb/r/range_locking.result @@ -161,6 +161,51 @@ connection default; disconnect con1; drop table t0,t1; # +# Test that locks on ranges on non-unique secondary keys inhibit +# modifications of the contents of these ranges +# +create table t0(a int); +insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); +create table t1 ( +kp1 int not null, +kp2 int not null, +a int, +key(kp1, kp2) comment 'default' +) engine=rocksdb; +insert into t1 select 1, a, 1234 from t0; +insert into t1 values (2, 3, 1234); +insert into t1 values (2, 5, 1234); +insert into t1 values (2, 7, 1234); +insert into t1 select 3, a, 1234 from t0; +connect con1,localhost,root,,; +connection con1; +begin; +explain +select * from t1 where kp1=2 for update; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ref kp1 kp1 4 const # NULL +select * from t1 where kp1=2 for update; +kp1 kp2 a +2 3 1234 +2 5 1234 +2 7 1234 +connection default; +begin; +insert into t1 values (2, 9, 9999); +ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1 +delete from t1 where kp1=2 and kp2=5; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1 +update t1 set kp1=333 where kp1=2 and kp2=3; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1 +update t1 set kp1=2 where kp1=1 and kp2=8; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction: Timeout on index: test.t1.kp1 +rollback; +connection con1; +rollback; +disconnect con1; +connection default; +drop table t0,t1; +# # Transaction isolation test # create table t1 (pk int primary key, a int) engine=rocksdb; diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index cd02ee1f8a3..053fe4a99f6 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -2580,6 +2580,7 @@ class Rdb_transaction { bool m_is_delayed_snapshot = false; bool m_is_two_phase = false; + std::unordered_set<Rdb_tbl_def*> modified_tables; private: @@ -2614,7 +2615,6 @@ class Rdb_transaction { virtual rocksdb::WriteBatchBase *get_write_batch() = 0; virtual bool commit_no_binlog() = 0; - /* @detail This function takes in the WriteBatch of the transaction to add @@ -2866,10 +2866,7 @@ class Rdb_transaction { return lock_range(cf, endp, endp); } - virtual bool prepare(const rocksdb::TransactionName &name) = 0; - - - + virtual bool prepare() = 0; bool commit_or_rollback() { bool res; @@ -3488,7 +3485,7 @@ class Rdb_transaction_impl : public Rdb_transaction { virtual bool is_writebatch_trx() const override { return false; } /* - Both start and end endpoint may may be prefixes. + Both start and end endpoint may be prefixes. Both bounds are inclusive. */ rocksdb::Status lock_range(rocksdb::ColumnFamilyHandle *const cf, @@ -3496,8 +3493,6 @@ class Rdb_transaction_impl : public Rdb_transaction { const rocksdb::Endpoint &end_endp) override { ++m_lock_count; return m_rocksdb_tx->GetRangeLock(cf, start_endp, end_endp); - ) override - { } private: void release_tx(void) { @@ -3968,7 +3963,6 @@ class Rdb_writebatch_impl : public Rdb_transaction { rocksdb::Status lock_range(rocksdb::ColumnFamilyHandle *const cf, const rocksdb::Endpoint &left_endp, const rocksdb::Endpoint &right_endp) override { - { return rocksdb::Status::OK(); } @@ -4088,8 +4082,8 @@ class Rdb_writebatch_impl : public Rdb_transaction { } void set_name() override {} - void start_stmt(bool is_dml_statement) override {} + void start_stmt(bool is_dml_statement) override {} void rollback_stmt() override { if (m_batch) rollback_to_stmt_savepoint(); @@ -8589,7 +8583,7 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key, } -void ha_rocksdb::set_range_lock(Rdb_transaction *tx, +int ha_rocksdb::set_range_lock(Rdb_transaction *tx, const Rdb_key_def &kd, const enum ha_rkey_function &find_flag, const rocksdb::Slice &slice_arg, @@ -8605,7 +8599,7 @@ void ha_rocksdb::set_range_lock(Rdb_transaction *tx, *use_locking_iterator= false; if (m_lock_rows == RDB_LOCK_NONE || !rocksdb_use_range_locking) { - return; + return 0; } bool no_end_endpoint= false; @@ -10662,6 +10656,15 @@ int ha_rocksdb::update_write_sk(const TABLE *const table_arg, old_key_slice = rocksdb::Slice( reinterpret_cast<const char *>(m_sk_packed_tuple_old), old_packed_size); + /* Range locking: lock the index tuple being deleted */ + if (rocksdb_use_range_locking) { + auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), old_key_slice); + if (!s.ok()) { + return (row_info.tx->set_status_error(table->in_use, s, kd, + m_tbl_def, m_table_handler)); + } + } + row_info.tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(), old_key_slice); @@ -10677,6 +10680,14 @@ int ha_rocksdb::update_write_sk(const TABLE *const table_arg, if (bulk_load_sk && row_info.old_data == nullptr) { rc = bulk_load_key(row_info.tx, kd, new_key_slice, new_value_slice, true); } else { + /* Range locking: lock the index tuple being inserted */ + if (rocksdb_use_range_locking) { + auto s= row_info.tx->lock_singlepoint_range(kd.get_cf(), new_key_slice); + if (!s.ok()) { + return (row_info.tx->set_status_error(table->in_use, s, kd, + m_tbl_def, m_table_handler)); + } + } row_info.tx->get_indexed_write_batch()->Put(kd.get_cf(), new_key_slice, new_value_slice); } @@ -10774,6 +10785,10 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, DBUG_RETURN(rc); } + // Range Locking: do we have a lock on the old PK value here? + // - we have read the row we are about to update, right? (except for some + // RBR mode? (in which we won't want to acquire locks anyway?)) + /* For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs always require locking. @@ -10879,6 +10894,7 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, Rdb_transaction *const tx = get_or_create_tx(table->in_use); bool skip_bloom = true; + const rocksdb::Slice eq_cond(slice->data(), eq_cond_len); // The size of m_scan_it_lower_bound (and upper) is technically // max_packed_sk_len as calculated in ha_rocksdb::alloc_key_buffers. Rather @@ -10913,7 +10929,6 @@ void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, and re-create Iterator. */ - if (m_scan_it_skips_bloom != skip_bloom || use_locking_iterator) { release_scan_iterator(); } @@ -11262,6 +11277,9 @@ int ha_rocksdb::delete_row(const uchar *const buf) { Rdb_transaction *const tx = get_or_create_tx(table->in_use); ulonglong bytes_written = 0; + // Range Locking: we are certain that the PK record is already locked here, + // right? + const uint index = pk_index(table, m_tbl_def); rocksdb::Status s = delete_or_singledelete(index, tx, m_pk_descr->get_cf(), key_slice); @@ -11315,6 +11333,19 @@ int ha_rocksdb::delete_row(const uchar *const buf) { nullptr, false, hidden_pk_id); rocksdb::Slice secondary_key_slice( reinterpret_cast<const char *>(m_sk_packed_tuple), packed_size); + + /* + For point locking, Deleting on secondary key doesn't need any locks. + Range locking must set locks + */ + if (rocksdb_use_range_locking) { + auto s= tx->lock_singlepoint_range(kd.get_cf(), secondary_key_slice); + if (!s.ok()) { + return (tx->set_status_error(table->in_use, s, kd, m_tbl_def, + m_table_handler)); + } + } + tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(), secondary_key_slice); bytes_written += secondary_key_slice.size(); diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index dbc7920efff..364492cda89 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -329,8 +329,8 @@ class ha_rocksdb : public my_core::handler { const bool use_all_keys, const uint eq_cond_len, bool use_locking_iterator) MY_ATTRIBUTE((__nonnull__)); - //psergey: - void set_range_lock(Rdb_transaction *tx, + + int set_range_lock(Rdb_transaction *tx, const Rdb_key_def &kd, const enum ha_rkey_function &find_flag, const rocksdb::Slice &slice,