[Commits] f286a3586f1: Apply patch: Implement iterator class

revision-id: f286a3586f1fec1f9c7bad5314136e176ea30653 (percona-202102-52-gf286a3586f1) parent(s): 0f4980afd8faa61b23a3008d3cba726881072174 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2021-05-17 17:42:54 +0300 message: Apply patch: Implement iterator class Summary: This abstracts basic iteration over keys (with TTL filtering) into the `Rdb_iterator_base` class. Logic that does ICP or primary key locking is not included. Test Plan: mtr Reviewers: luqun, herman, yzha, #mysql_eng Subscribers: pgl Differential Revision: https://phabricator.intern.facebook.com/D25933169 --- mysql-test/suite/rocksdb/r/check_flags.result | 2 +- mysql-test/suite/rocksdb/t/check_flags.test | 2 +- storage/rocksdb/CMakeLists.txt | 1 + storage/rocksdb/ha_rocksdb.cc | 676 +++++++------------------- storage/rocksdb/ha_rocksdb.h | 94 ++-- storage/rocksdb/ha_rocksdb_proto.h | 2 +- storage/rocksdb/nosql_access.cc | 4 +- storage/rocksdb/rdb_converter.h | 2 +- storage/rocksdb/rdb_iterator.cc | 359 ++++++++++++++ storage/rocksdb/rdb_iterator.h | 121 +++++ 10 files changed, 703 insertions(+), 560 deletions(-) diff --git a/mysql-test/suite/rocksdb/r/check_flags.result b/mysql-test/suite/rocksdb/r/check_flags.result index 8ff4153707e..9fe20b968a6 100644 --- a/mysql-test/suite/rocksdb/r/check_flags.result +++ b/mysql-test/suite/rocksdb/r/check_flags.result @@ -34,7 +34,7 @@ KILL QUERY $conn1_id; set debug_sync='now SIGNAL go'; ERROR 70100: Query execution was interrupted set debug_sync='RESET'; -set debug_sync='rocksdb.check_flags_inwdi SIGNAL parked WAIT_FOR go'; +set debug_sync='rocksdb.check_flags_nwd SIGNAL parked WAIT_FOR go'; SELECT kp1 FROM t3 ORDER BY kp1; set debug_sync='now WAIT_FOR parked'; KILL QUERY $conn1_id; diff --git a/mysql-test/suite/rocksdb/t/check_flags.test b/mysql-test/suite/rocksdb/t/check_flags.test index 58dc1f4f8da..c100dce8afc 100644 --- a/mysql-test/suite/rocksdb/t/check_flags.test +++ b/mysql-test/suite/rocksdb/t/check_flags.test @@ -91,7 +91,7 @@ set debug_sync='RESET'; connection conn1; -set debug_sync='rocksdb.check_flags_inwdi SIGNAL parked WAIT_FOR go'; +set debug_sync='rocksdb.check_flags_nwd SIGNAL parked WAIT_FOR go'; send SELECT kp1 FROM t3 ORDER BY kp1; connection default; diff --git a/storage/rocksdb/CMakeLists.txt b/storage/rocksdb/CMakeLists.txt index 3fc21fb97cc..135a6af62df 100644 --- a/storage/rocksdb/CMakeLists.txt +++ b/storage/rocksdb/CMakeLists.txt @@ -125,6 +125,7 @@ SET(ROCKSDB_SOURCES ha_rocksdb.cc ha_rocksdb.h ha_rocksdb_proto.h logger.h rdb_datadic.cc rdb_datadic.h + rdb_iterator.cc rdb_iterator.h rdb_cf_options.cc rdb_cf_options.h rdb_cf_manager.cc rdb_cf_manager.h rdb_converter.cc rdb_converter.h diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 2f41ba40a17..c484dfb5894 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -94,6 +94,7 @@ #include "./rdb_datadic.h" #include "./rdb_i_s.h" #include "./rdb_index_merge.h" +#include "./rdb_iterator.h" #include "./rdb_mutex_wrapper.h" #include "./rdb_psi.h" #include "./rdb_threads.h" @@ -2975,8 +2976,9 @@ class Rdb_transaction { } int set_status_error(THD *const thd, const rocksdb::Status &s, - const Rdb_key_def &kd, Rdb_tbl_def *const tbl_def, - Rdb_table_handler *const table_handler) { + const Rdb_key_def &kd, const Rdb_tbl_def *const tbl_def, + Rdb_table_handler *const table_handler + MY_ATTRIBUTE((unused))) { DBUG_ASSERT(!s.ok()); DBUG_ASSERT(tbl_def != nullptr); @@ -2993,7 +2995,8 @@ class Rdb_transaction { rocksdb_rollback_on_timeout); m_detailed_error.copy(timeout_message( "index", tbl_def->full_tablename().c_str(), kd.get_name().c_str())); - table_handler->m_lock_wait_timeout_counter.inc(); + /* TODO(yzha) - row stats are gone in 8.0 + table_handler->m_lock_wait_timeout_counter.inc(); */ rocksdb_row_lock_wait_timeouts++; return HA_ERR_LOCK_WAIT_TIMEOUT; @@ -3002,7 +3005,8 @@ class Rdb_transaction { if (s.IsDeadlock()) { my_core::thd_mark_transaction_to_rollback(thd, 1 /* whole transaction */); m_detailed_error = String(); - table_handler->m_deadlock_counter.inc(); + /* TODO(yzha) - row stats are gone in 8.0 + table_handler->m_deadlock_counter.inc(); */ rocksdb_row_lock_deadlocks++; return HA_ERR_LOCK_DEADLOCK; } else if (s.IsBusy()) { @@ -3017,7 +3021,8 @@ class Rdb_transaction { user_host_buff, thd->query()); } m_detailed_error = String(" (snapshot conflict)", system_charset_info); - table_handler->m_deadlock_counter.inc(); + /* TODO(yzha) - row stats are gone in 8.0 + table_handler->m_deadlock_counter.inc(); */ return HA_ERR_ROCKSDB_STATUS_BUSY; } @@ -3530,7 +3535,7 @@ class Rdb_transaction { rocksdb::Iterator *get_iterator( rocksdb::ColumnFamilyHandle *const column_family, bool skip_bloom_filter, - bool fill_cache, const rocksdb::Slice &eq_cond_lower_bound, + const rocksdb::Slice &eq_cond_lower_bound, const rocksdb::Slice &eq_cond_upper_bound, bool read_current = false, bool create_snapshot = true) { // Make sure we are not doing both read_current (which implies we don't @@ -3542,6 +3547,7 @@ class Rdb_transaction { if (create_snapshot) acquire_snapshot(true); rocksdb::ReadOptions options = m_read_opts; + const bool fill_cache = !THDVAR(get_thd(), skip_fill_cache); if (skip_bloom_filter) { const bool enable_iterate_bounds = @@ -6787,7 +6793,7 @@ static void dbug_change_status_to_corrupted(rocksdb::Status *status) { // If the iterator is not valid it might be because of EOF but might be due // to IOError or corruption. The good practice is always check it. // https://github.com/facebook/rocksdb/wiki/Iterator#error-handling -inline bool is_valid_iterator(rocksdb::Iterator *scan_it) { +bool is_valid_iterator(rocksdb::Iterator *scan_it) { if (scan_it->Valid()) { return true; } else { @@ -6915,6 +6921,11 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() { const int save_active_index = active_index; active_index = table->s->next_number_index; const uint8 save_table_status = table->m_status; + + std::unique_ptr<Rdb_iterator> save_iterator(new Rdb_iterator_base( + ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + std::swap(m_iterator, save_iterator); + ulonglong last_val = 0; Rdb_transaction *const tx = get_or_create_tx(table->in_use); @@ -6966,7 +6977,7 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() { (Why don't we use index_init/index_end? class handler defines index_init as private, for some reason). */ - release_scan_iterator(); + std::swap(m_iterator, save_iterator); return last_val; } @@ -7012,41 +7023,44 @@ int ha_rocksdb::load_hidden_pk_value() { active_index = MAX_KEY; const uint8 save_table_status = table->m_status; + std::unique_ptr<Rdb_iterator> save_iterator(new Rdb_iterator_base( + ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + std::swap(m_iterator, save_iterator); + Rdb_transaction *const tx = get_or_create_tx(table->in_use); const bool is_new_snapshot = !tx->has_snapshot(); longlong hidden_pk_id = 1; + longlong old = 0; + int rc = 0; // Do a lookup. if (!index_last(table->record[0])) { /* Decode PK field from the key */ - auto err = read_hidden_pk_id_from_rowkey(&hidden_pk_id); - if (err) { - if (is_new_snapshot) { - tx->release_snapshot(); - } - return err; + rc = read_hidden_pk_id_from_rowkey(&hidden_pk_id); + if (rc) { + goto exit; } hidden_pk_id++; } - longlong old = m_tbl_def->m_hidden_pk_val; + old = m_tbl_def->m_hidden_pk_val; while (old < hidden_pk_id && !m_tbl_def->m_hidden_pk_val.compare_exchange_weak(old, hidden_pk_id)) { } +exit: if (is_new_snapshot) { tx->release_snapshot(); } table->m_status = save_table_status; active_index = save_active_index; + std::swap(m_iterator, save_iterator); - release_scan_iterator(); - - return HA_EXIT_SUCCESS; + return rc; } /* Get PK value from m_tbl_def->m_hidden_pk_info. */ @@ -7125,11 +7139,6 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton, my_core::TABLE_SHARE *const table_arg) : handler(hton, table_arg), m_table_handler(nullptr), - m_scan_it(nullptr), - m_scan_it_skips_bloom(false), - m_scan_it_snapshot(nullptr), - m_scan_it_lower_bound(nullptr), - m_scan_it_upper_bound(nullptr), m_tbl_def(nullptr), m_pk_descr(nullptr), m_key_descr_arr(nullptr), @@ -7137,7 +7146,6 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton, m_pk_packed_tuple(nullptr), m_sk_packed_tuple(nullptr), m_end_key_packed_tuple(nullptr), - m_sk_match_prefix(nullptr), m_sk_packed_tuple_old(nullptr), m_dup_sk_packed_tuple(nullptr), m_dup_sk_packed_tuple_old(nullptr), @@ -7203,12 +7211,13 @@ bool ha_rocksdb::init_with_fields() { rows within a transaction, etc, because the compaction filter ignores snapshots when filtering keys. */ -bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd, - const rocksdb::Slice &ttl_rec_val, - const int64_t curr_ts) { +bool rdb_should_hide_ttl_rec(const Rdb_key_def &kd, + const rocksdb::Slice &ttl_rec_val, + Rdb_transaction *tx) { DBUG_ASSERT(kd.has_ttl()); DBUG_ASSERT(kd.m_ttl_rec_offset != UINT_MAX); - THD *thd = ha_thd(); + THD *thd = tx->get_thd(); + const int64_t curr_ts = tx->m_snapshot_timestamp; /* Curr_ts can only be 0 if there are no snapshots open. @@ -7223,7 +7232,7 @@ bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd, DBUG_ASSERT(false); push_warning_printf(thd, Sql_condition::SL_WARNING, ER_WRONG_ARGUMENTS, "TTL read filtering called with no snapshot."); - update_row_stats(ROWS_UNFILTERED_NO_SNAPSHOT); + rdb_update_global_stats(ROWS_UNFILTERED_NO_SNAPSHOT, 1); return false; } @@ -7263,7 +7272,7 @@ bool ha_rocksdb::should_hide_ttl_rec(const Rdb_key_def &kd, bool is_hide_ttl = ts + kd.m_ttl_duration + read_filter_ts <= static_cast<uint64>(curr_ts); if (is_hide_ttl) { - update_row_stats(ROWS_FILTERED); + rdb_update_global_stats(ROWS_FILTERED, 1); /* increment examined row count when rows are skipped */ thd->inc_examined_row_count(1); @@ -7381,8 +7390,6 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, m_sk_packed_tuple = reinterpret_cast<uchar *>( my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0))); - m_sk_match_prefix = reinterpret_cast<uchar *>( - my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0))); m_sk_packed_tuple_old = reinterpret_cast<uchar *>( my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0))); m_end_key_packed_tuple = reinterpret_cast<uchar *>( @@ -7390,11 +7397,6 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, m_pack_buffer = reinterpret_cast<uchar *>( my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0))); - m_scan_it_lower_bound = reinterpret_cast<uchar *>( - my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0))); - m_scan_it_upper_bound = reinterpret_cast<uchar *>( - my_malloc(PSI_NOT_INSTRUMENTED, max_packed_sk_len, MYF(0))); - /* If inplace alter is happening, allocate special buffers for unique secondary index duplicate checking. @@ -7408,8 +7410,7 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg, if (m_pk_packed_tuple == nullptr || m_sk_packed_tuple == nullptr || m_sk_packed_tuple_old == nullptr || m_end_key_packed_tuple == nullptr || - m_pack_buffer == nullptr || m_scan_it_upper_bound == nullptr || - m_scan_it_lower_bound == nullptr || + m_pack_buffer == nullptr || (alloc_alter_buffers && (m_dup_sk_packed_tuple == nullptr || m_dup_sk_packed_tuple_old == nullptr))) { // One or more of the above allocations failed. Clean up and exit @@ -7428,9 +7429,6 @@ void ha_rocksdb::free_key_buffers() { my_free(m_sk_packed_tuple); m_sk_packed_tuple = nullptr; - my_free(m_sk_match_prefix); - m_sk_match_prefix = nullptr; - my_free(m_sk_packed_tuple_old); m_sk_packed_tuple_old = nullptr; @@ -7446,12 +7444,6 @@ void ha_rocksdb::free_key_buffers() { my_free(m_dup_sk_packed_tuple_old); m_dup_sk_packed_tuple_old = nullptr; - my_free(m_scan_it_lower_bound); - m_scan_it_lower_bound = nullptr; - - my_free(m_scan_it_upper_bound); - m_scan_it_upper_bound = nullptr; - release_blob_buffer(); } @@ -7576,6 +7568,7 @@ int ha_rocksdb::close(void) { m_pk_descr = nullptr; m_key_descr_arr = nullptr; m_converter = nullptr; + m_iterator = nullptr; free_key_buffers(); if (m_table_handler != nullptr) { @@ -8738,173 +8731,6 @@ bool ha_rocksdb::check_keyread_allowed(bool &pk_can_be_decoded, return true; } -int ha_rocksdb::read_key_exact(const Rdb_key_def &kd, - rocksdb::Iterator *const iter, - const rocksdb::Slice &key_slice, - const int64_t ttl_filter_ts) { - THD *thd = ha_thd(); - /* - We are looking for the first record such that - index_tuple= lookup_tuple. - lookup_tuple may be a prefix of the index. - */ - rocksdb_smart_seek(kd.m_is_reverse_cf, iter, key_slice); - - while (iter->Valid() && kd.value_matches_prefix(iter->key(), key_slice)) { - if (thd && thd->killed) { - return HA_ERR_QUERY_INTERRUPTED; - } - /* - If TTL is enabled we need to check if the given key has already expired - from the POV of the current transaction. If it has, try going to the next - key. - */ - if (kd.has_ttl() && should_hide_ttl_rec(kd, iter->value(), ttl_filter_ts)) { - rocksdb_smart_next(kd.m_is_reverse_cf, iter); - continue; - } - - return HA_EXIT_SUCCESS; - } - - /* - Got a record that is not equal to the lookup value, or even a record - from another table.index. - */ - return HA_ERR_KEY_NOT_FOUND; -} - -int ha_rocksdb::read_before_key(const Rdb_key_def &kd, - const bool full_key_match, - const rocksdb::Slice &key_slice) { - THD *thd = ha_thd(); - /* - We are looking for the first record such that - - index_tuple $LT lookup_tuple - - with HA_READ_BEFORE_KEY, $LT = '<', - with HA_READ_PREFIX_LAST_OR_PREV, $LT = '<=' - with HA_READ_PREFIX_LAST, $LT = '==' - - Symmetry with read_after_key is possible if rocksdb supported prefix seeks. - */ - rocksdb_smart_seek(!kd.m_is_reverse_cf, m_scan_it, key_slice); - - while (is_valid_iterator(m_scan_it)) { - if (thd && thd->killed) { - return HA_ERR_QUERY_INTERRUPTED; - } - /* - We are using full key and we've hit an exact match. - */ - if ((full_key_match && - kd.value_matches_prefix(m_scan_it->key(), key_slice))) { - rocksdb_smart_next(!kd.m_is_reverse_cf, m_scan_it); - continue; - } - - return HA_EXIT_SUCCESS; - } - - return HA_ERR_KEY_NOT_FOUND; -} - -int ha_rocksdb::read_after_key(const Rdb_key_def &kd, - const rocksdb::Slice &key_slice) { - /* - We are looking for the first record such that - - index_tuple $GT lookup_tuple - - with HA_READ_AFTER_KEY, $GT = '>', - with HA_READ_KEY_OR_NEXT, $GT = '>=' - with HA_READ_KEY_EXACT, $GT = '==' - */ - rocksdb_smart_seek(kd.m_is_reverse_cf, m_scan_it, key_slice); - - return is_valid_iterator(m_scan_it) ? HA_EXIT_SUCCESS : HA_ERR_KEY_NOT_FOUND; -} - -int ha_rocksdb::position_to_correct_key(const Rdb_key_def &kd, - const enum ha_rkey_function &find_flag, - const bool full_key_match, - const rocksdb::Slice &key_slice, - bool *const move_forward) { - int rc = 0; - - *move_forward = true; - - switch (find_flag) { - case HA_READ_KEY_EXACT: - case HA_READ_AFTER_KEY: - case HA_READ_KEY_OR_NEXT: - rc = read_after_key(kd, key_slice); - break; - case HA_READ_BEFORE_KEY: - case HA_READ_PREFIX_LAST: - case HA_READ_PREFIX_LAST_OR_PREV: - *move_forward = false; - rc = read_before_key(kd, full_key_match, key_slice); - break; - case HA_READ_KEY_OR_PREV: - case HA_READ_PREFIX: - /* These flags are not used by the SQL layer, so we don't support them - * yet. */ - rc = HA_ERR_UNSUPPORTED; - break; - default: - DBUG_ASSERT(0); - break; - } - - return rc; -} - -int ha_rocksdb::calc_eq_cond_len(const Rdb_key_def &kd, - const enum ha_rkey_function &find_flag, - const rocksdb::Slice &slice, - const int bytes_changed_by_succ, - const key_range *const end_key) { - if (find_flag == HA_READ_KEY_EXACT) return slice.size(); - - if (find_flag == HA_READ_PREFIX_LAST) { - /* - We have made the kd.successor(m_sk_packed_tuple) call above. - - The slice is at least Rdb_key_def::INDEX_NUMBER_SIZE bytes long. - */ - return slice.size() - bytes_changed_by_succ; - } - - if (end_key) { - uint end_key_packed_size = 0; - end_key_packed_size = - kd.pack_index_tuple(table, m_pack_buffer, m_end_key_packed_tuple, - end_key->key, end_key->keypart_map); - - /* - Calculating length of the equal conditions here. 4 byte index id is - included. - Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3) - WHERE id1=1 AND id2=1 AND id3>=2 => eq_cond_len= 4+8+4= 16 - WHERE id1=1 AND id2>=1 AND id3>=2 => eq_cond_len= 4+8= 12 - Example2: id1 VARCHAR(30), id2 INT, PRIMARY KEY (id1, id2) - WHERE id1 = 'AAA' and id2 < 3; => eq_cond_len=13 (varchar used 9 bytes) - */ - rocksdb::Slice end_slice(reinterpret_cast<char *>(m_end_key_packed_tuple), - end_key_packed_size); - return slice.difference_offset(end_slice); - } - - /* - On range scan without any end key condition, there is no - eq cond, and eq cond length is the same as index_id size (8 bytes). - Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3) - WHERE id1>=1 AND id2 >= 2 and id2 <= 5 => eq_cond_len= 4 - */ - return Rdb_key_def::INDEX_ID_SIZE; -} /** @note @@ -9088,7 +8914,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, next/prev anyway. To avoid correctness issues, just free the iterator. */ - release_scan_iterator(); + m_iterator->reset(); DBUG_RETURN(rc); } else { /* @@ -9124,7 +8950,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, rc = get_row_by_rowid(buf, m_last_rowkey.ptr(), m_last_rowkey.length()); - release_scan_iterator(); + m_iterator->reset(); DBUG_RETURN(rc); } @@ -9143,7 +8969,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, update_row_stats(ROWS_READ); } - release_scan_iterator(); + m_iterator->reset(); DBUG_RETURN(rc); } } @@ -9153,28 +8979,23 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, key, keypart_map); } - if (find_flag == HA_READ_KEY_EXACT || find_flag == HA_READ_PREFIX_LAST) { - m_sk_match_length = packed_size; - memcpy(m_sk_match_prefix, m_sk_packed_tuple, packed_size); - } else { - kd.get_infimum_key(m_sk_match_prefix, &m_sk_match_length); - } - - int bytes_changed_by_succ = 0; - if (find_flag == HA_READ_PREFIX_LAST_OR_PREV || - find_flag == HA_READ_PREFIX_LAST || find_flag == HA_READ_AFTER_KEY) { - /* See below */ - bytes_changed_by_succ = kd.successor(m_sk_packed_tuple, packed_size); - } - rocksdb::Slice slice(reinterpret_cast<const char *>(m_sk_packed_tuple), packed_size); - const uint eq_cond_len = - calc_eq_cond_len(kd, find_flag, slice, bytes_changed_by_succ, end_range); + rocksdb::Slice end_slice; + if (end_range && find_flag != HA_READ_KEY_EXACT && + find_flag != HA_READ_PREFIX_LAST) { + uint end_key_packed_size = 0; + end_key_packed_size = + kd.pack_index_tuple(table, m_pack_buffer, m_end_key_packed_tuple, + end_range->key, end_range->keypart_map); + end_slice = + rocksdb::Slice((char *)m_end_key_packed_tuple, end_key_packed_size); + } Rdb_transaction *const tx = get_or_create_tx(table->in_use); const bool is_new_snapshot = !tx->has_snapshot(); + // Loop as long as we get a deadlock error AND we end up creating the // snapshot here (i.e. it did not exist prior to this) for (;;) { @@ -9187,15 +9008,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, This will open the iterator and position it at a record that's equal or greater than the lookup tuple. */ - setup_scan_iterator(kd, &slice, eq_cond_len); - - /* - Once we are positioned on from above, move to the position we really - want: See storage/rocksdb/rocksdb-range-access.txt - */ - bool move_forward; - rc = position_to_correct_key(kd, find_flag, using_full_key, slice, - &move_forward); + rc = m_iterator->seek(find_flag, slice, using_full_key, end_slice); if (rc) { break; @@ -9206,7 +9019,10 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, then we have all the rows we need. For a secondary key we now need to lookup the primary key. */ - rc = index_next_with_direction_intern(buf, move_forward, true); + bool direction = (find_flag == HA_READ_KEY_EXACT) || + (find_flag == HA_READ_AFTER_KEY) || + (find_flag == HA_READ_KEY_OR_NEXT); + rc = index_next_with_direction_intern(buf, direction, true); if (!should_recreate_snapshot(rc, is_new_snapshot)) { break; /* Exit the loop */ @@ -9214,7 +9030,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, // release the snapshot and iterator so they will be regenerated tx->release_snapshot(); - release_scan_iterator(); + m_iterator->reset(); } if (!rc) { @@ -9247,7 +9063,14 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key, DBUG_ENTER_FUNC(); ha_statistic_increment(&System_status_var::ha_read_key_count); - DBUG_RETURN(index_read_intern(buf, key, keypart_map, find_flag)); + int rc = index_read_intern(buf, key, keypart_map, find_flag); + + // The SQL layer generally expects HA_ERR_KEY_NOT_FOUND for this call. + if (rc == HA_ERR_END_OF_FILE) { + rc = HA_ERR_KEY_NOT_FOUND; + } + + DBUG_RETURN(rc); } /** @@ -9324,7 +9147,7 @@ int ha_rocksdb::check(THD *const thd MY_ATTRIBUTE((__unused__)), table_name, rows, res); goto error; } - rocksdb::Slice key = m_scan_it->key(); + rocksdb::Slice key = m_iterator->key(); sec_key_copy.copy(key.data(), key.size(), &my_charset_bin); rowkey_copy.copy(m_last_rowkey.ptr(), m_last_rowkey.length(), &my_charset_bin); @@ -9470,16 +9293,9 @@ rocksdb::Status ha_rocksdb::get_for_update( Rdb_transaction *const tx, const Rdb_key_def &key_descr, const rocksdb::Slice &key, rocksdb::PinnableSlice *const value) const { DBUG_ASSERT(m_lock_rows != RDB_LOCK_NONE); - + DBUG_ASSERT(value == nullptr); bool exclusive = m_lock_rows != RDB_LOCK_READ; - bool do_validate = my_core::thd_tx_isolation(ha_thd()) > ISO_READ_COMMITTED; - rocksdb::Status s = - tx->get_for_update(key_descr, key, value, exclusive, do_validate); - -#ifndef DBUG_OFF - ++rocksdb_num_get_for_update_calls; -#endif - return s; + return rdb_tx_get_for_update(tx, key_descr, key, value, exclusive); } bool ha_rocksdb::is_blind_delete_enabled() { @@ -9522,9 +9338,6 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act))); };); - bool found; - rocksdb::Status s; - /* Pretend row found without looking up */ if (skip_lookup) { /* TODO(yzha) - rows stas are gone in 8.0 @@ -9535,11 +9348,9 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, DBUG_RETURN(0); } - if (m_lock_rows == RDB_LOCK_NONE) { - tx->acquire_snapshot(true); - s = tx->get(m_pk_descr->get_cf(), key_slice, &m_retrieved_record); - } else if (m_insert_with_update && m_dup_key_found && - m_pk_descr->get_keyno() == m_dupp_errkey) { + if (m_insert_with_update && m_dup_key_found && + m_pk_descr->get_keyno() == m_dupp_errkey) { + DBUG_ASSERT(m_lock_rows == RDB_LOCK_WRITE); DBUG_ASSERT(m_dup_key_tuple.length() == key_slice.size()); DBUG_ASSERT( memcmp(m_dup_key_tuple.ptr(), key_slice.data(), key_slice.size()) == 0); @@ -9548,43 +9359,26 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid, // m_dup_key_retrieved_record during write_row already, so just move it // over. m_retrieved_record = std::move(m_dup_key_retrieved_record); - s = rocksdb::Status::OK(); + rc = HA_EXIT_SUCCESS; } else { - s = get_for_update(tx, *m_pk_descr, key_slice, &m_retrieved_record); + tx->acquire_snapshot(false); + Rdb_iterator_base iter(ha_thd(), m_pk_descr, m_pk_descr, m_tbl_def); + rc = iter.get(&key_slice, &m_retrieved_record, m_lock_rows, skip_ttl_check); } - DBUG_EXECUTE_IF("rocksdb_return_status_corrupted", - dbug_change_status_to_corrupted(&s);); - - if (!s.IsNotFound() && !s.ok()) { - DBUG_RETURN(tx->set_status_error(table->in_use, s, *m_pk_descr, m_tbl_def, - m_table_handler)); - } - found = !s.IsNotFound(); - - table->m_status = STATUS_NOT_FOUND; - if (found) { - /* If we found the record, but it's expired, pretend we didn't find it. */ - if (!skip_ttl_check && m_pk_descr->has_ttl() && - should_hide_ttl_rec(*m_pk_descr, m_retrieved_record, - tx->m_snapshot_timestamp)) { - DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); - } - + if (!rc) { m_last_rowkey.copy((const char *)rowid, rowid_size, &my_charset_bin); rc = convert_record_from_storage_format(&key_slice, buf); if (!rc) { table->m_status = 0; } - } else { - /* - Note: we don't need to unlock the row. It is intentional that we keep - locks on rows that don't exist. - */ - rc = HA_ERR_KEY_NOT_FOUND; } + /* + Note: we don't need to unlock the row. It is intentional that we keep + locks on rows that don't exist. + */ DBUG_RETURN(rc); } @@ -9615,23 +9409,9 @@ int ha_rocksdb::records_from_index(ha_rows *num_rows, uint index) { int ha_rocksdb::get_row_by_sk(uchar *buf, const Rdb_key_def &kd, const rocksdb::Slice *key) { DBUG_ENTER_FUNC(); - Rdb_transaction *const tx = get_or_create_tx(table->in_use); - - auto s = tx->get(kd.get_cf(), *key, &m_retrieved_record); - - if (!s.IsNotFound() && !s.ok()) { - DBUG_RETURN( - tx->set_status_error(table->in_use, s, kd, m_tbl_def, m_table_handler)); - } - if (s.IsNotFound()) { - DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); - } - - if (kd.has_ttl() && - should_hide_ttl_rec(kd, m_retrieved_record, tx->m_snapshot_timestamp)) { - DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); - } + int rc = m_iterator->get(key, &m_retrieved_record, RDB_LOCK_NONE); + if (rc) DBUG_RETURN(rc); const uint size = kd.get_primary_key_tuple(table, *m_pk_descr, key, m_pk_packed_tuple); @@ -9641,7 +9421,7 @@ int ha_rocksdb::get_row_by_sk(uchar *buf, const Rdb_key_def &kd, m_last_rowkey.copy((const char *)m_pk_packed_tuple, size, &my_charset_bin); - int rc = secondary_index_read(active_index, buf, &m_retrieved_record); + rc = secondary_index_read(active_index, buf, &m_retrieved_record); if (!rc) { table->m_status = 0; } @@ -9700,9 +9480,6 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf, THD *thd = ha_thd(); int rc = 0; const Rdb_key_def &kd = *m_key_descr_arr[active_index_pos()]; - Rdb_transaction *const tx = get_or_create_tx(thd); - rocksdb::Slice prefix_tuple(reinterpret_cast<char *>(m_sk_match_prefix), - m_sk_match_length); table->m_status = STATUS_NOT_FOUND; /* TODO(yzha) - row stats are gone in 8.0 @@ -9715,8 +9492,8 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf, break; } - DBUG_ASSERT(m_scan_it); - if (m_scan_it == nullptr) { + DBUG_ASSERT(m_iterator != nullptr); + if (m_iterator == nullptr) { rc = HA_ERR_INTERNAL_ERROR; break; } @@ -9725,31 +9502,18 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf, skip_next = false; } else { if (move_forward) { - rocksdb_smart_next(kd.m_is_reverse_cf, m_scan_it); + rc = m_iterator->next(); } else { - rocksdb_smart_prev(kd.m_is_reverse_cf, m_scan_it); + rc = m_iterator->prev(); } } - if (!is_valid_iterator(m_scan_it)) { - rc = HA_ERR_END_OF_FILE; + if (rc == HA_ERR_END_OF_FILE) { break; } - const rocksdb::Slice &key = m_scan_it->key(); - const rocksdb::Slice &value = m_scan_it->value(); - - // Outside our range, return EOF. - if (!kd.value_matches_prefix(key, prefix_tuple)) { - rc = HA_ERR_END_OF_FILE; - break; - } - - // Record is not visible due to TTL, move to next record. - if (m_pk_descr->has_ttl() && - should_hide_ttl_rec(kd, value, tx->m_snapshot_timestamp)) { - continue; - } + const rocksdb::Slice &key = m_iterator->key(); + const rocksdb::Slice &value = m_iterator->value(); if (active_index == table->s->primary_key) { if (m_lock_rows != RDB_LOCK_NONE) { @@ -9958,15 +9722,14 @@ bool ha_rocksdb::skip_unique_check() const { use_read_free_rpl(); } -bool ha_rocksdb::commit_in_the_middle() { +bool commit_in_the_middle(THD *thd) { // It does not make sense to use write unprepared with commit in the middle, // since both handle large transactions by flushing the write batches onto // disk. // // For the two to work together, we would need to assign a new xid after // committing. - return (THDVAR(table->in_use, bulk_load) || - THDVAR(table->in_use, commit_in_the_middle)) && + return (THDVAR(thd, bulk_load) || THDVAR(thd, commit_in_the_middle)) && rocksdb_write_policy != rocksdb::TxnDBWritePolicy::WRITE_UNPREPARED; } @@ -9976,7 +9739,7 @@ bool ha_rocksdb::commit_in_the_middle() { @retval false if bulk commit was skipped or succeeded */ bool ha_rocksdb::do_bulk_commit(Rdb_transaction *const tx) { - return commit_in_the_middle() && + return commit_in_the_middle(table->in_use) && tx->get_write_count() >= THDVAR(table->in_use, bulk_load_size) && tx->flush_batch(); } @@ -10200,6 +9963,10 @@ void ha_rocksdb::set_last_rowkey( } } +void ha_rocksdb::set_last_rowkey(const char *str, size_t len) { + m_last_rowkey.copy(str, len, &my_charset_bin); +} + /** Collect update data for primary key @@ -10294,28 +10061,18 @@ int ha_rocksdb::check_and_lock_unique_pk(const uint key_id, 2) T1 Get(empty) -> T1 Put(insert, not committed yet) -> T2 Get(empty) -> T2 Put(insert, blocked) -> T1 commit -> T2 commit(overwrite) */ - const rocksdb::Status s = - get_for_update(row_info.tx, *m_pk_descr, row_info.new_pk_slice, - ignore_pk_unique_check ? nullptr : pslice); - if (!s.ok() && !s.IsNotFound()) { - return row_info.tx->set_status_error( - table->in_use, s, *m_key_descr_arr[key_id], m_tbl_def, m_table_handler); - } + Rdb_iterator_base iter(ha_thd(), m_key_descr_arr[key_id], m_pk_descr, + m_tbl_def); - bool key_found = ignore_pk_unique_check ? false : !s.IsNotFound(); + int rc = iter.get(&row_info.new_pk_slice, + ignore_pk_unique_check ? nullptr : pslice, m_lock_rows); - /* - If the pk key has ttl, we may need to pretend the row wasn't - found if it is already expired. - */ - DBUG_ASSERT(row_info.tx->has_snapshot() && - row_info.tx->m_snapshot_timestamp != 0); - if (key_found && m_pk_descr->has_ttl() && - should_hide_ttl_rec(*m_pk_descr, *pslice, - row_info.tx->m_snapshot_timestamp)) { - key_found = false; + if (rc && rc != HA_ERR_KEY_NOT_FOUND) { + return rc; } + bool key_found = ignore_pk_unique_check ? false : (rc == HA_EXIT_SUCCESS); + if (key_found && row_info.old_data == nullptr && m_insert_with_update) { // In INSERT ON DUPLICATE KEY UPDATE ... case, if the insert failed // due to a duplicate key, remember the last key and skip the check @@ -10441,53 +10198,30 @@ int ha_rocksdb::check_and_lock_sk( The bloom filter may need to be disabled for this lookup. */ - uchar lower_bound_buf[Rdb_key_def::INDEX_ID_SIZE]; - uchar upper_bound_buf[Rdb_key_def::INDEX_ID_SIZE]; - rocksdb::Slice lower_bound_slice; - rocksdb::Slice upper_bound_slice; + Rdb_iterator_base iter(ha_thd(), m_key_descr_arr[key_id], m_pk_descr, + m_tbl_def); + int rc = HA_EXIT_SUCCESS; - const rocksdb::Status s = - get_for_update(row_info.tx, kd, new_slice, - all_parts_used ? &m_retrieved_record : nullptr); - if (!s.ok() && !s.IsNotFound()) { - return row_info.tx->set_status_error(table->in_use, s, kd, m_tbl_def, - m_table_handler); + rc = iter.get(&new_slice, all_parts_used ? &m_retrieved_record : nullptr, + m_lock_rows); + if (rc && rc != HA_ERR_KEY_NOT_FOUND) { + return rc; } - rocksdb::Iterator *iter = nullptr; + if (!all_parts_used) { + rc = iter.seek(HA_READ_KEY_EXACT, new_slice, false /* full_key_match */, + new_slice, true /* read current */); - if (all_parts_used) { - *found = !s.IsNotFound(); - if (*found && kd.has_ttl() && - should_hide_ttl_rec(kd, m_retrieved_record, - row_info.tx->m_snapshot_timestamp)) { - *found = false; + if (rc && rc != HA_ERR_END_OF_FILE) { + return rc; } - } else { - const bool total_order_seek = !check_bloom_and_set_bounds( - ha_thd(), kd, new_slice, Rdb_key_def::INDEX_ID_SIZE, lower_bound_buf, - upper_bound_buf, &lower_bound_slice, &upper_bound_slice); - const bool fill_cache = !THDVAR(ha_thd(), skip_fill_cache); - - iter = row_info.tx->get_iterator(kd.get_cf(), total_order_seek, fill_cache, - lower_bound_slice, upper_bound_slice, - true /* read current data */, - false /* acquire snapshot */); - /* - Need to scan the transaction to see if there is a duplicate key. - Also need to scan RocksDB and verify the key has not been deleted - in the transaction. - */ - DBUG_ASSERT(row_info.tx->has_snapshot() && - row_info.tx->m_snapshot_timestamp != 0); - *found = - !read_key_exact(kd, iter, new_slice, row_info.tx->m_snapshot_timestamp); } - int rc = HA_EXIT_SUCCESS; + *found = (rc == HA_EXIT_SUCCESS); + rc = HA_EXIT_SUCCESS; if (*found && m_insert_with_update) { - const rocksdb::Slice &rkey = all_parts_used ? new_slice : iter->key(); + const rocksdb::Slice &rkey = all_parts_used ? new_slice : iter.key(); uint pk_size = kd.get_primary_key_tuple(table, *m_pk_descr, &rkey, m_pk_packed_tuple); if (pk_size == RDB_INVALID_KEY_LEN) { @@ -10503,7 +10237,6 @@ int ha_rocksdb::check_and_lock_sk( } } - delete iter; return rc; } @@ -11040,10 +10773,12 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, 0x0000b3eb003f65c5e78857, and lower bound would be 0x0000b3eb003f65c5e78859. These cover given eq condition range. */ -void ha_rocksdb::setup_iterator_bounds( - const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, size_t bound_len, - uchar *const lower_bound, uchar *const upper_bound, - rocksdb::Slice *lower_bound_slice, rocksdb::Slice *upper_bound_slice) { +static void setup_iterator_bounds(const Rdb_key_def &kd, + const rocksdb::Slice &eq_cond, + size_t bound_len, uchar *const lower_bound, + uchar *const upper_bound, + rocksdb::Slice *lower_bound_slice, + rocksdb::Slice *upper_bound_slice) { // If eq_cond is shorter than Rdb_key_def::INDEX_NUMBER_SIZE, we should be // able to get better bounds just by using index id directly. if (eq_cond.size() <= Rdb_key_def::INDEX_ID_SIZE) { @@ -11070,91 +10805,6 @@ void ha_rocksdb::setup_iterator_bounds( } } -/* - Open a cursor -*/ - -void ha_rocksdb::setup_scan_iterator(const Rdb_key_def &kd, - rocksdb::Slice *const slice, - const uint eq_cond_len) { - DBUG_ASSERT(slice->size() >= eq_cond_len); - - Rdb_transaction *const tx = get_or_create_tx(table->in_use); - - bool skip_bloom = true; - - const rocksdb::Slice eq_cond(slice->data(), eq_cond_len); - // The size of m_scan_it_lower_bound (and upper) is technically - // max_packed_sk_len as calculated in ha_rocksdb::alloc_key_buffers. Rather - // than recalculating that number, we pass in the max of eq_cond_len and - // Rdb_key_def::INDEX_NUMBER_SIZE which is guaranteed to be smaller than - // max_packed_sk_len, hence ensuring no buffer overrun. - // - // See ha_rocksdb::setup_iterator_bounds on how the bound_len parameter is - // used. - if (check_bloom_and_set_bounds( - ha_thd(), kd, eq_cond, - std::max(eq_cond_len, (uint)Rdb_key_def::INDEX_ID_SIZE), - m_scan_it_lower_bound, m_scan_it_upper_bound, - &m_scan_it_lower_bound_slice, &m_scan_it_upper_bound_slice)) { - skip_bloom = false; - } - - /* - In some cases, setup_scan_iterator() is called multiple times from - the same query but bloom filter can not always be used. - Suppose the following query example. id2 is VARCHAR(30) and PRIMARY KEY - (id1, id2). - select count(*) from t2 WHERE id1=100 and id2 IN ('00000000000000000000', - '100'); - In this case, setup_scan_iterator() is called twice, the first time is for - (id1, id2)=(100, '00000000000000000000') and the second time is for (100, - '100'). - If prefix bloom filter length is 24 bytes, prefix bloom filter can be used - for the - first condition but not for the second condition. - If bloom filter condition is changed, currently it is necessary to destroy - and - re-create Iterator. - */ - if (m_scan_it_skips_bloom != skip_bloom) { - release_scan_iterator(); - } - - /* - SQL layer can call rnd_init() multiple times in a row. - In that case, re-use the iterator, but re-position it at the table start. - */ - if (!m_scan_it) { - const bool fill_cache = !THDVAR(ha_thd(), skip_fill_cache); - if (commit_in_the_middle()) { - DBUG_ASSERT(m_scan_it_snapshot == nullptr); - m_scan_it_snapshot = rdb->GetSnapshot(); - - auto read_opts = rocksdb::ReadOptions(); - // TODO(mung): set based on WHERE conditions - read_opts.total_order_seek = true; - read_opts.snapshot = m_scan_it_snapshot; - m_scan_it = rdb->NewIterator(read_opts, kd.get_cf()); - } else { - m_scan_it = tx->get_iterator(kd.get_cf(), skip_bloom, fill_cache, - m_scan_it_lower_bound_slice, - m_scan_it_upper_bound_slice); - } - m_scan_it_skips_bloom = skip_bloom; - } -} - -void ha_rocksdb::release_scan_iterator() { - delete m_scan_it; - m_scan_it = nullptr; - - if (m_scan_it_snapshot) { - rdb->ReleaseSnapshot(m_scan_it_snapshot); - m_scan_it_snapshot = nullptr; - } -} - /** @return HA_EXIT_SUCCESS OK @@ -11241,6 +10891,10 @@ int ha_rocksdb::index_init(uint idx, bool sorted MY_ATTRIBUTE((__unused__))) { Rdb_transaction *const tx = get_or_create_tx(thd); DBUG_ASSERT(tx != nullptr); + active_index = idx; + m_iterator.reset(new Rdb_iterator_base( + thd, m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + // If m_lock_rows is not RDB_LOCK_NONE then we will be doing a get_for_update // when accessing the index, so don't acquire the snapshot right away. // Otherwise acquire the snapshot immediately. @@ -11257,8 +10911,7 @@ int ha_rocksdb::index_end() { DBUG_ENTER_FUNC(); m_need_build_decoder = false; - - release_scan_iterator(); + m_iterator = nullptr; active_index = MAX_KEY; in_range_check_pushed_down = false; @@ -15141,14 +14794,14 @@ bool rdb_dbug_set_ttl_ignore_pk() { return rocksdb_debug_ttl_ignore_pk; } #endif void rdb_update_global_stats(const operation_type &type, uint count, - bool is_system_table) { + Rdb_tbl_def *td) { DBUG_ASSERT(type < ROWS_MAX); if (count == 0) { return; } - if (is_system_table) { + if (td && td->m_is_mysql_system_table) { global_stats.system_rows[type].add(count); } else { global_stats.rows[type].add(count); @@ -16132,13 +15785,33 @@ const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx) { rocksdb::Iterator *rdb_tx_get_iterator( Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family, - bool skip_bloom_filter, bool fill_cache, - const rocksdb::Slice &lower_bound_slice, + bool skip_bloom_filter, const rocksdb::Slice &lower_bound_slice, const rocksdb::Slice &upper_bound_slice, bool read_current, bool create_snapshot) { - return tx->get_iterator(column_family, skip_bloom_filter, fill_cache, - lower_bound_slice, upper_bound_slice, read_current, - create_snapshot); + return tx->get_iterator(column_family, skip_bloom_filter, lower_bound_slice, + upper_bound_slice, read_current, create_snapshot); +} + +rocksdb::Iterator *rdb_tx_get_iterator( + THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter, + const rocksdb::Slice &eq_cond_lower_bound, + const rocksdb::Slice &eq_cond_upper_bound, + const rocksdb::Snapshot **snapshot, bool read_current, + bool create_snapshot) { + if (commit_in_the_middle(thd)) { + DBUG_ASSERT(*snapshot == nullptr); + *snapshot = rdb->GetSnapshot(); + + auto read_opts = rocksdb::ReadOptions(); + // TODO(mung): set based on WHERE conditions + read_opts.total_order_seek = true; + read_opts.snapshot = *snapshot; + return rdb->NewIterator(read_opts, cf); + } else { + Rdb_transaction *tx = get_tx_from_thd(thd); + return tx->get_iterator(cf, skip_bloom_filter, eq_cond_lower_bound, + eq_cond_upper_bound, read_current, create_snapshot); + } } bool rdb_tx_started(Rdb_transaction *tx) { return tx->is_tx_started(); } @@ -16150,6 +15823,22 @@ rocksdb::Status rdb_tx_get(Rdb_transaction *tx, return tx->get(column_family, key, value); } +rocksdb::Status rdb_tx_get_for_update(Rdb_transaction *tx, + const Rdb_key_def &kd, + const rocksdb::Slice &key, + rocksdb::PinnableSlice *const value, + bool exclusive) { + bool do_validate = + my_core::thd_tx_isolation(tx->get_thd()) > ISO_READ_COMMITTED; + rocksdb::Status s = + tx->get_for_update(kd, key, value, exclusive, do_validate); + +#ifndef DBUG_OFF + ++rocksdb_num_get_for_update_calls; +#endif + return s; +} + void rdb_tx_multi_get(Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family, const size_t num_keys, const rocksdb::Slice *keys, @@ -16158,6 +15847,12 @@ void rdb_tx_multi_get(Rdb_transaction *tx, tx->multi_get(column_family, num_keys, keys, values, statuses, sorted_input); } +int rdb_tx_set_status_error(Rdb_transaction *tx, const rocksdb::Status &s, + const Rdb_key_def &kd, + const Rdb_tbl_def *const tbl_def) { + return tx->set_status_error(tx->get_thd(), s, kd, tbl_def, nullptr); +} + /**************************************************************************** * Multi-Range-Read implementation based on RocksDB's MultiGet() call ***************************************************************************/ @@ -16685,8 +16380,7 @@ int ha_rocksdb::multi_range_read_next(char **range_info) { /* If we found the record, but it's expired, pretend we didn't find it. */ if (m_pk_descr->has_ttl() && - should_hide_ttl_rec(*m_pk_descr, m_retrieved_record, - tx->m_snapshot_timestamp)) { + rdb_should_hide_ttl_rec(*m_pk_descr, m_retrieved_record, tx)) { continue; } diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 9c3c3927498..369af0bc9c4 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -77,6 +77,7 @@ namespace myrocks { class Rdb_converter; +class Rdb_iterator; class Rdb_key_def; class Rdb_tbl_def; class Rdb_transaction; @@ -134,6 +135,8 @@ enum table_cardinality_scan_type { SCAN_TYPE_FULL_TABLE, }; +enum Rdb_lock_type { RDB_LOCK_NONE, RDB_LOCK_READ, RDB_LOCK_WRITE }; + class Mrr_rowid_source; uint32_t rocksdb_perf_context_level(THD *const thd); @@ -148,20 +151,6 @@ class ha_rocksdb : public my_core::handler { Rdb_table_handler *m_table_handler; ///< Open table handler - /* Iterator used for range scans and for full table/index scans */ - rocksdb::Iterator *m_scan_it; - - /* Whether m_scan_it was created with skip_bloom=true */ - bool m_scan_it_skips_bloom; - - const rocksdb::Snapshot *m_scan_it_snapshot; - - /* Buffers used for upper/lower bounds for m_scan_it. */ - uchar *m_scan_it_lower_bound; - uchar *m_scan_it_upper_bound; - rocksdb::Slice m_scan_it_lower_bound_slice; - rocksdb::Slice m_scan_it_upper_bound_slice; - Rdb_tbl_def *m_tbl_def; /* Primary Key encoder from KeyTupleFormat to StorageFormat */ @@ -197,13 +186,6 @@ class ha_rocksdb : public my_core::handler { Rdb_string_writer m_sk_tails; Rdb_string_writer m_pk_unpack_info; - /* - ha_rockdb->index_read_map(.. HA_READ_KEY_EXACT or similar) will save here - mem-comparable form of the index lookup tuple. - */ - uchar *m_sk_match_prefix; - uint m_sk_match_length; - /* Second buffers, used by UPDATE. */ uchar *m_sk_packed_tuple_old; Rdb_string_writer m_sk_tails_old; @@ -221,6 +203,8 @@ class ha_rocksdb : public my_core::handler { /* class to convert between Mysql format and RocksDB format*/ std::unique_ptr<Rdb_converter> m_converter; + std::unique_ptr<Rdb_iterator> m_iterator; + /* Pointer to the original TTL timestamp value (8 bytes) during UPDATE. */ @@ -269,7 +253,7 @@ class ha_rocksdb : public my_core::handler { uint m_total_blob_buffer_allocated = 0; /* Type of locking to apply to rows */ - enum { RDB_LOCK_NONE, RDB_LOCK_READ, RDB_LOCK_WRITE } m_lock_rows; + Rdb_lock_type m_lock_rows; /* true means we're doing an index-only read. false means otherwise. */ bool m_keyread_only; @@ -327,17 +311,6 @@ class ha_rocksdb : public my_core::handler { int secondary_index_read(const int keyno, uchar *const buf, const rocksdb::Slice *value) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - static void setup_iterator_bounds(const Rdb_key_def &kd, - const rocksdb::Slice &eq_cond, - size_t bound_len, uchar *const lower_bound, - uchar *const upper_bound, - rocksdb::Slice *lower_bound_slice, - rocksdb::Slice *upper_bound_slice); - static bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd, - const rocksdb::Slice &eq_cond); - void setup_scan_iterator(const Rdb_key_def &kd, rocksdb::Slice *slice, - const uint eq_cond_len) MY_ATTRIBUTE((__nonnull__)); - void release_scan_iterator(void); rocksdb::Status get_for_update(Rdb_transaction *const tx, const Rdb_key_def &kd, @@ -373,7 +346,6 @@ class ha_rocksdb : public my_core::handler { MY_ATTRIBUTE((__warn_unused_result__)); bool is_blind_delete_enabled(); bool skip_unique_check() const; - bool commit_in_the_middle() MY_ATTRIBUTE((__warn_unused_result__)); bool do_bulk_commit(Rdb_transaction *const tx) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); bool has_hidden_pk(const TABLE *const table) const @@ -382,6 +354,7 @@ class ha_rocksdb : public my_core::handler { void update_row_stats(const operation_type &type, ulonglong count = 1); void set_last_rowkey(const uchar *const old_data); + void set_last_rowkey(const char *str, size_t len); int alloc_key_buffers(const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg, @@ -666,6 +639,8 @@ class ha_rocksdb : public my_core::handler { THD *thd, const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, size_t bound_len, uchar *const lower_bound, uchar *const upper_bound, rocksdb::Slice *lower_bound_slice, rocksdb::Slice *upper_bound_slice); + static bool can_use_bloom_filter(THD *thd, const Rdb_key_def &kd, + const rocksdb::Slice &eq_cond); private: // true <=> The scan uses the default MRR implementation, just redirect all @@ -680,6 +655,7 @@ class ha_rocksdb : public my_core::handler { friend class Mrr_rowid_source; friend class Mrr_pk_scan_rowid_source; friend class Mrr_sec_key_rowid_source; + friend class Rdb_iterator; // MRR parameters and output values rocksdb::Slice *mrr_keys; @@ -778,11 +754,6 @@ class ha_rocksdb : public my_core::handler { int compare_keys(const KEY *const old_key, const KEY *const new_key) const MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - bool should_hide_ttl_rec(const Rdb_key_def &kd, - const rocksdb::Slice &ttl_rec_val, - const int64_t curr_ts) - MY_ATTRIBUTE((__warn_unused_result__)); - int index_read_intern(uchar *const buf, const uchar *const key, key_part_map keypart_map, enum ha_rkey_function find_flag) @@ -834,29 +805,6 @@ class ha_rocksdb : public my_core::handler { const bool pk_changed) MY_ATTRIBUTE((__warn_unused_result__)); - int read_key_exact(const Rdb_key_def &kd, rocksdb::Iterator *const iter, - const rocksdb::Slice &key_slice, - const int64_t ttl_filter_ts) - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - int read_before_key(const Rdb_key_def &kd, const bool using_full_key, - const rocksdb::Slice &key_slice) - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - int read_after_key(const Rdb_key_def &kd, const rocksdb::Slice &key_slice) - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - int position_to_correct_key(const Rdb_key_def &kd, - const enum ha_rkey_function &find_flag, - const bool full_key_match, - const rocksdb::Slice &key_slice, - bool *const move_forward) - MY_ATTRIBUTE((__warn_unused_result__)); - - int calc_eq_cond_len(const Rdb_key_def &kd, - const enum ha_rkey_function &find_flag, - const rocksdb::Slice &slice, - const int bytes_changed_by_succ, - const key_range *const end_key) - MY_ATTRIBUTE((__warn_unused_result__)); - Rdb_tbl_def *get_table_if_exists(const char *const tablename) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); void read_thd_vars(THD *const thd) MY_ATTRIBUTE((__nonnull__)); @@ -1196,15 +1144,28 @@ const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx); rocksdb::Iterator *rdb_tx_get_iterator( Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family, - bool skip_bloom, bool fill_cache, const rocksdb::Slice &lower_bound_slice, + bool skip_bloom, const rocksdb::Slice &lower_bound_slice, const rocksdb::Slice &upper_bound_slice, bool read_current = false, bool create_snapshot = true); +rocksdb::Iterator *rdb_tx_get_iterator( + THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter, + const rocksdb::Slice &eq_cond_lower_bound, + const rocksdb::Slice &eq_cond_upper_bound, + const rocksdb::Snapshot **snapshot, bool read_current = false, + bool create_snapshot = true); + rocksdb::Status rdb_tx_get(Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, rocksdb::PinnableSlice *const value); +rocksdb::Status rdb_tx_get_for_update(Rdb_transaction *tx, + const Rdb_key_def &kd, + const rocksdb::Slice &key, + rocksdb::PinnableSlice *const value, + bool exclusive); + void rdb_tx_multi_get(Rdb_transaction *tx, rocksdb::ColumnFamilyHandle *const column_family, const size_t num_keys, const rocksdb::Slice *keys, @@ -1244,7 +1205,14 @@ inline void rocksdb_smart_prev(bool seek_backward, // https://github.com/facebook/rocksdb/wiki/Iterator#error-handling bool is_valid_iterator(rocksdb::Iterator *scan_it); +bool rdb_should_hide_ttl_rec(const Rdb_key_def &kd, + const rocksdb::Slice &ttl_rec_val, + Rdb_transaction *tx); + bool rdb_tx_started(Rdb_transaction *tx); +int rdb_tx_set_status_error(Rdb_transaction *tx, const rocksdb::Status &s, + const Rdb_key_def &kd, + const Rdb_tbl_def *const tbl_def); extern std::atomic<uint64_t> rocksdb_select_bypass_executed; extern std::atomic<uint64_t> rocksdb_select_bypass_rejected; diff --git a/storage/rocksdb/ha_rocksdb_proto.h b/storage/rocksdb/ha_rocksdb_proto.h index e72d666a781..58eee57ac27 100644 --- a/storage/rocksdb/ha_rocksdb_proto.h +++ b/storage/rocksdb/ha_rocksdb_proto.h @@ -105,7 +105,7 @@ bool rdb_sync_wal_supported(); enum operation_type : int; void rdb_update_global_stats(const operation_type &type, uint count, - bool is_system_table = false); + Rdb_tbl_def *td = nullptr); class Rdb_dict_manager; Rdb_dict_manager *rdb_get_dict_manager(void) diff --git a/storage/rocksdb/nosql_access.cc b/storage/rocksdb/nosql_access.cc index 1788486ee9f..f13b94b32f1 100644 --- a/storage/rocksdb/nosql_access.cc +++ b/storage/rocksdb/nosql_access.cc @@ -678,8 +678,8 @@ class select_exec { bool use_bloom, const rocksdb::Slice &lower_bound, const rocksdb::Slice &upper_bound) { - return rdb_tx_get_iterator(m_tx, cf, !use_bloom, true /* fill_cache */, - lower_bound, upper_bound); + return rdb_tx_get_iterator(m_tx, cf, !use_bloom, lower_bound, + upper_bound); } rocksdb::Status get(rocksdb::ColumnFamilyHandle *cf, diff --git a/storage/rocksdb/rdb_converter.h b/storage/rocksdb/rdb_converter.h index 2e6f1ed9689..e121215c2e8 100644 --- a/storage/rocksdb/rdb_converter.h +++ b/storage/rocksdb/rdb_converter.h @@ -173,12 +173,12 @@ class Rdb_converter { } const MY_BITMAP *get_lookup_bitmap() { return &m_lookup_bitmap; } + private: int decode_value_header_for_pk(Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def, rocksdb::Slice *unpack_slice); - private: void setup_field_encoders(); void get_storage_type(Rdb_field_encoder *const encoder, const uint kp); diff --git a/storage/rocksdb/rdb_iterator.cc b/storage/rocksdb/rdb_iterator.cc new file mode 100644 index 00000000000..529cd6dacae --- /dev/null +++ b/storage/rocksdb/rdb_iterator.cc @@ -0,0 +1,359 @@ +/* + Copyright (c) 2020, Facebook, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "./rdb_iterator.h" + +#include "scope_guard.h" + +namespace myrocks { + +Rdb_iterator::~Rdb_iterator() {} + +Rdb_iterator_base::Rdb_iterator_base(THD *thd, + const std::shared_ptr<Rdb_key_def> kd, + const std::shared_ptr<Rdb_key_def> pkd, + const Rdb_tbl_def *tbl_def) + : m_kd(kd), + m_pkd(pkd), + m_tbl_def(tbl_def), + m_thd(thd), + m_scan_it(nullptr), + m_scan_it_skips_bloom(false), + m_scan_it_snapshot(nullptr), + m_scan_it_lower_bound(nullptr), + m_scan_it_upper_bound(nullptr), + m_prefix_buf(nullptr) {} + +Rdb_iterator_base::~Rdb_iterator_base() { + release_scan_iterator(); + my_free(m_scan_it_lower_bound); + m_scan_it_lower_bound = nullptr; + my_free(m_scan_it_upper_bound); + m_scan_it_upper_bound = nullptr; + my_free(m_prefix_buf); + m_prefix_buf = nullptr; +} + +int Rdb_iterator_base::read_before_key(const bool full_key_match, + const rocksdb::Slice &key_slice) { + /* + We are looking for the first record such that + + index_tuple $LT lookup_tuple + + with HA_READ_BEFORE_KEY, $LT = '<', + with HA_READ_PREFIX_LAST_OR_PREV, $LT = '<=' + with HA_READ_PREFIX_LAST, $LT = '==' + + Symmetry with read_after_key is possible if rocksdb supported prefix seeks. + */ + rocksdb_smart_seek(!m_kd->m_is_reverse_cf, m_scan_it, key_slice); + + while (is_valid_iterator(m_scan_it)) { + if (thd_killed(m_thd)) { + return HA_ERR_QUERY_INTERRUPTED; + } + /* + We are using full key and we've hit an exact match. + */ + if ((full_key_match && + m_kd->value_matches_prefix(m_scan_it->key(), key_slice))) { + rocksdb_smart_next(!m_kd->m_is_reverse_cf, m_scan_it); + continue; + } + + return HA_EXIT_SUCCESS; + } + + return HA_ERR_END_OF_FILE; +} + +int Rdb_iterator_base::read_after_key(const rocksdb::Slice &key_slice) { + /* + We are looking for the first record such that + + index_tuple $GT lookup_tuple + + with HA_READ_AFTER_KEY, $GT = '>', + with HA_READ_KEY_OR_NEXT, $GT = '>=' + with HA_READ_KEY_EXACT, $GT = '==' + */ + rocksdb_smart_seek(m_kd->m_is_reverse_cf, m_scan_it, key_slice); + + return is_valid_iterator(m_scan_it) ? HA_EXIT_SUCCESS : HA_ERR_END_OF_FILE; +} + +void Rdb_iterator_base::release_scan_iterator() { + delete m_scan_it; + m_scan_it = nullptr; + + if (m_scan_it_snapshot) { + auto rdb = rdb_get_rocksdb_db(); + rdb->ReleaseSnapshot(m_scan_it_snapshot); + m_scan_it_snapshot = nullptr; + } +} + +void Rdb_iterator_base::setup_scan_iterator(const rocksdb::Slice *const slice, + const uint eq_cond_len, + bool read_current) { + DBUG_ASSERT(slice->size() >= eq_cond_len); + + bool skip_bloom = true; + + const rocksdb::Slice eq_cond(slice->data(), eq_cond_len); + + // The size of m_scan_it_lower_bound (and upper) is technically + // max_packed_sk_len as calculated in ha_rocksdb::alloc_key_buffers. Rather + // than recalculating that number, we pass in the max of eq_cond_len and + // Rdb_key_def::INDEX_NUMBER_SIZE which is guaranteed to be smaller than + // max_packed_sk_len, hence ensuring no buffer overrun. + // + // See setup_iterator_bounds on how the bound_len parameter is + // used. + if (ha_rocksdb::check_bloom_and_set_bounds( + m_thd, *m_kd, eq_cond, + std::max(eq_cond_len, (uint)Rdb_key_def::INDEX_ID_SIZE), + m_scan_it_lower_bound, m_scan_it_upper_bound, + &m_scan_it_lower_bound_slice, &m_scan_it_upper_bound_slice)) { + skip_bloom = false; + } + + /* + In some cases, setup_scan_iterator() is called multiple times from + the same query but bloom filter can not always be used. + Suppose the following query example. id2 is VARCHAR(30) and PRIMARY KEY + (id1, id2). + select count(*) from t2 WHERE id1=100 and id2 IN ('00000000000000000000', + '100'); + In this case, setup_scan_iterator() is called twice, the first time is for + (id1, id2)=(100, '00000000000000000000') and the second time is for (100, + '100'). + If prefix bloom filter length is 24 bytes, prefix bloom filter can be used + for the + first condition but not for the second condition. + If bloom filter condition is changed, currently it is necessary to destroy + and + re-create Iterator. + */ + if (m_scan_it_skips_bloom != skip_bloom) { + release_scan_iterator(); + } + + /* + SQL layer can call rnd_init() multiple times in a row. + In that case, re-use the iterator, but re-position it at the table start. + */ + if (!m_scan_it) { + m_scan_it = rdb_tx_get_iterator( + m_thd, m_kd->get_cf(), skip_bloom, m_scan_it_lower_bound_slice, + m_scan_it_upper_bound_slice, &m_scan_it_snapshot, read_current, + !read_current); + m_scan_it_skips_bloom = skip_bloom; + } +} + +int Rdb_iterator_base::calc_eq_cond_len(enum ha_rkey_function find_flag, + const rocksdb::Slice &start_key, + const int bytes_changed_by_succ, + const rocksdb::Slice &end_key) { + if (find_flag == HA_READ_KEY_EXACT) return start_key.size(); + + if (find_flag == HA_READ_PREFIX_LAST) { + /* + We have made the kd.successor(m_sk_packed_tuple) call above. + + The slice is at least Rdb_key_def::INDEX_NUMBER_SIZE bytes long. + */ + return start_key.size() - bytes_changed_by_succ; + } + + if (!end_key.empty()) { + /* + Calculating length of the equal conditions here. 4 byte index id is + included. + Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3) + WHERE id1=1 AND id2=1 AND id3>=2 => eq_cond_len= 4+8+4= 16 + WHERE id1=1 AND id2>=1 AND id3>=2 => eq_cond_len= 4+8= 12 + Example2: id1 VARCHAR(30), id2 INT, PRIMARY KEY (id1, id2) + WHERE id1 = 'AAA' and id2 < 3; => eq_cond_len=13 (varchar used 9 bytes) + */ + return start_key.difference_offset(end_key); + } + + /* + On range scan without any end key condition, there is no + eq cond, and eq cond length is the same as index_id size (4 bytes). + Example1: id1 BIGINT, id2 INT, id3 BIGINT, PRIMARY KEY (id1, id2, id3) + WHERE id1>=1 AND id2 >= 2 and id2 <= 5 => eq_cond_len= 4 + */ + return Rdb_key_def::INDEX_ID_SIZE; +} + +int Rdb_iterator_base::next_with_direction(bool move_forward, bool skip_next) { + int rc = 0; + const auto &kd = *m_kd; + Rdb_transaction *const tx = get_tx_from_thd(m_thd); + + for (;;) { + DEBUG_SYNC(m_thd, "rocksdb.check_flags_nwd"); + if (thd_killed(m_thd)) { + rc = HA_ERR_QUERY_INTERRUPTED; + break; + } + + DBUG_ASSERT(m_scan_it != nullptr); + if (m_scan_it == nullptr) { + rc = HA_ERR_INTERNAL_ERROR; + break; + } + + if (skip_next) { + skip_next = false; + } else { + if (move_forward) { + rocksdb_smart_next(kd.m_is_reverse_cf, m_scan_it); + } else { + rocksdb_smart_prev(kd.m_is_reverse_cf, m_scan_it); + } + } + + if (!is_valid_iterator(m_scan_it)) { + rc = HA_ERR_END_OF_FILE; + break; + } + + const rocksdb::Slice &key = m_scan_it->key(); + const rocksdb::Slice &value = m_scan_it->value(); + + // Outside our range, return EOF. + if (!kd.value_matches_prefix(key, m_prefix_tuple)) { + rc = HA_ERR_END_OF_FILE; + break; + } + + // Record is not visible due to TTL, move to next record. + if (m_pkd->has_ttl() && rdb_should_hide_ttl_rec(kd, value, tx)) { + continue; + } + + break; + } + + return rc; +} + +int Rdb_iterator_base::seek(enum ha_rkey_function find_flag, + const rocksdb::Slice start_key, bool full_key_match, + const rocksdb::Slice end_key, bool read_current) { + int rc = 0; + + uint prefix_key_len; + + if (!m_prefix_buf) { + const uint packed_len = m_kd->max_storage_fmt_length(); + m_scan_it_lower_bound = reinterpret_cast<uchar *>( + my_malloc(PSI_NOT_INSTRUMENTED, packed_len, MYF(0))); + m_scan_it_upper_bound = reinterpret_cast<uchar *>( + my_malloc(PSI_NOT_INSTRUMENTED, packed_len, MYF(0))); + m_prefix_buf = reinterpret_cast<uchar *>( + my_malloc(PSI_NOT_INSTRUMENTED, packed_len, MYF(0))); + } + + if (find_flag == HA_READ_KEY_EXACT || find_flag == HA_READ_PREFIX_LAST) { + memcpy(m_prefix_buf, start_key.data(), start_key.size()); + prefix_key_len = start_key.size(); + } else { + m_kd->get_infimum_key(m_prefix_buf, &prefix_key_len); + } + m_prefix_tuple = rocksdb::Slice((char *)m_prefix_buf, prefix_key_len); + + int bytes_changed_by_succ = 0; + uchar *start_key_buf = (uchar *)start_key.data(); + // We need to undo mutating the start key in case of retries using the same + // buffer. + auto start_key_guard = create_scope_guard([this, start_key_buf, start_key] { + this->m_kd->predecessor(start_key_buf, start_key.size()); + }); + if (find_flag == HA_READ_PREFIX_LAST_OR_PREV || + find_flag == HA_READ_PREFIX_LAST || find_flag == HA_READ_AFTER_KEY) { + bytes_changed_by_succ = m_kd->successor(start_key_buf, start_key.size()); + } else { + start_key_guard.commit(); + } + + const uint eq_cond_len = + calc_eq_cond_len(find_flag, start_key, bytes_changed_by_succ, end_key); + + /* + This will open the iterator and position it at a record that's equal or + greater than the lookup tuple. + */ + setup_scan_iterator(&start_key, eq_cond_len, read_current); + + /* + Once we are positioned on from above, move to the position we really + want: See storage/rocksdb/rocksdb-range-access.txt + */ + bool direction = (find_flag == HA_READ_KEY_EXACT) || + (find_flag == HA_READ_AFTER_KEY) || + (find_flag == HA_READ_KEY_OR_NEXT); + if (direction) { + rc = read_after_key(start_key); + } else { + rc = read_before_key(full_key_match, start_key); + } + + if (rc) { + return rc; + } + + rc = next_with_direction(direction, true); + return rc; +} + +int Rdb_iterator_base::get(const rocksdb::Slice *key, + rocksdb::PinnableSlice *value, Rdb_lock_type type, + bool skip_ttl_check) { + int rc = HA_EXIT_SUCCESS; + Rdb_transaction *const tx = get_tx_from_thd(m_thd); + rocksdb::Status s; + if (type == RDB_LOCK_NONE) { + s = rdb_tx_get(tx, m_kd->get_cf(), *key, value); + } else { + s = rdb_tx_get_for_update(tx, *m_kd, *key, value, type == RDB_LOCK_WRITE); + } + + DBUG_EXECUTE_IF("rocksdb_return_status_corrupted", + { s = rocksdb::Status::Corruption(); }); + + if (!s.IsNotFound() && !s.ok()) { + return rdb_tx_set_status_error(tx, s, *m_kd, m_tbl_def); + } + + if (s.IsNotFound()) { + return HA_ERR_KEY_NOT_FOUND; + } + + if (!skip_ttl_check && m_kd->has_ttl() && + rdb_should_hide_ttl_rec(*m_kd, *value, tx)) { + return HA_ERR_KEY_NOT_FOUND; + } + + return rc; +} + +} // namespace myrocks diff --git a/storage/rocksdb/rdb_iterator.h b/storage/rocksdb/rdb_iterator.h new file mode 100644 index 00000000000..2a0f5bd5760 --- /dev/null +++ b/storage/rocksdb/rdb_iterator.h @@ -0,0 +1,121 @@ +/* + Copyright (c) 2020, Facebook, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#pragma once + +// MySQL header files +#include "sql/debug_sync.h" +#include "sql/handler.h" + +// MyRocks header files +#include "./ha_rocksdb.h" +#include "./ha_rocksdb_proto.h" +#include "./rdb_converter.h" +#include "./rdb_datadic.h" + +namespace myrocks { + +class Rdb_iterator { + public: + virtual ~Rdb_iterator() = 0; + + /* + direction specifies which logical direction the table is scanned in. + start_key is inclusive if scanning forwards, but exclusive when scanning + backwards. full_key_match indicates whether the seek key may match the full + + Once rocksdb supports prefix seeks, the API can be simplified since + full_key_match is no longer needed. + */ + virtual int seek(enum ha_rkey_function find_flag, + const rocksdb::Slice start_key, bool full_key_match, + const rocksdb::Slice end_key, bool read_current = false) = 0; + virtual int get(const rocksdb::Slice *key, rocksdb::PinnableSlice *value, + Rdb_lock_type type, bool skip_ttl_check = false) = 0; + virtual int next() = 0; + virtual int prev() = 0; + virtual rocksdb::Slice key() = 0; + virtual rocksdb::Slice value() = 0; + virtual void reset() = 0; +}; + +class Rdb_iterator_base : public Rdb_iterator { + private: + int read_before_key(const bool full_key_match, + const rocksdb::Slice &key_slice); + int read_after_key(const rocksdb::Slice &key_slice); + void release_scan_iterator(); + void setup_scan_iterator(const rocksdb::Slice *const slice, + const uint eq_cond_len, bool read_current); + int calc_eq_cond_len(enum ha_rkey_function find_flag, + const rocksdb::Slice &start_key, + const int bytes_changed_by_succ, + const rocksdb::Slice &end_key); + int next_with_direction(bool move_forward, bool skip_next); + + public: + Rdb_iterator_base(THD *thd, const std::shared_ptr<Rdb_key_def> kd, + const std::shared_ptr<Rdb_key_def> pkd, + const Rdb_tbl_def *tbl_def); + + ~Rdb_iterator_base() override; + + int seek(enum ha_rkey_function find_flag, const rocksdb::Slice start_key, + bool full_key_match, const rocksdb::Slice end_key, + bool read_current) override; + int get(const rocksdb::Slice *key, rocksdb::PinnableSlice *value, + Rdb_lock_type type, bool skip_ttl_check = false) override; + + int next() override { return next_with_direction(true, false); } + + int prev() override { return next_with_direction(false, false); } + + rocksdb::Slice key() override { return m_scan_it->key(); } + + rocksdb::Slice value() override { return m_scan_it->value(); } + + void reset() override { release_scan_iterator(); } + + protected: + friend class Rdb_iterator; + const std::shared_ptr<Rdb_key_def> m_kd; + + // Rdb_key_def of the primary key + const std::shared_ptr<Rdb_key_def> m_pkd; + + const Rdb_tbl_def *m_tbl_def; + + THD *m_thd; + + /* Iterator used for range scans and for full table/index scans */ + rocksdb::Iterator *m_scan_it; + + /* Whether m_scan_it was created with skip_bloom=true */ + bool m_scan_it_skips_bloom; + + const rocksdb::Snapshot *m_scan_it_snapshot; + + /* Buffers used for upper/lower bounds for m_scan_it. */ + uchar *m_scan_it_lower_bound; + uchar *m_scan_it_upper_bound; + rocksdb::Slice m_scan_it_lower_bound_slice; + rocksdb::Slice m_scan_it_upper_bound_slice; + + uchar *m_prefix_buf; + rocksdb::Slice m_prefix_tuple; +}; + +} // namespace myrocks
participants (1)
-
psergey