[Commits] 31a395578da: Apply patch: Consolidate rnd_init/rnd_next/rnd_end into index_*
revision-id: 31a395578da893e201ca58a4e20cd43596c9c295 (percona-202102-45-g31a395578da) parent(s): dea55321f0f03df1a29130aa6a2d38a0f9b6e39e author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2021-05-17 16:40:35 +0300 message: Apply patch: Consolidate rnd_init/rnd_next/rnd_end into index_* Summary: Instead of maintaining separate code paths for full table scans, merge them into full index scan code paths. Full table scans are just full index scans on the primary key anyway. Test Plan: mtr Reviewers: luqun, herman, yzha Subscribers: pgl, vinaybhat Differential Revision: https://phabricator.intern.facebook.com/D23358422 --- storage/rocksdb/ha_rocksdb.cc | 153 +++++++++++++++++++----------------------- storage/rocksdb/ha_rocksdb.h | 11 ++- 2 files changed, 77 insertions(+), 87 deletions(-) diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 7f7c6a79a2e..a5afc625139 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -7009,7 +7009,7 @@ void ha_rocksdb::update_auto_incr_val_from_field() { int ha_rocksdb::load_hidden_pk_value() { const int save_active_index = active_index; - active_index = m_tbl_def->m_key_count - 1; + active_index = MAX_KEY; const uint8 save_table_status = table->m_status; Rdb_transaction *const tx = get_or_create_tx(table->in_use); @@ -9218,7 +9218,7 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key, } ha_statistic_increment(&System_status_var::ha_read_key_count); - const Rdb_key_def &kd = *m_key_descr_arr[active_index]; + const Rdb_key_def &kd = *m_key_descr_arr[active_index_pos()]; const uint actual_key_parts = kd.get_key_parts(); bool using_full_key = is_using_full_key(keypart_map, actual_key_parts); @@ -9415,7 +9415,7 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key, int ha_rocksdb::find_icp_matching_index_rec(const bool move_forward, uchar *const buf) { if (pushed_idx_cond && pushed_idx_cond_keyno == active_index) { - const Rdb_key_def &kd = *m_key_descr_arr[active_index]; + const Rdb_key_def &kd = *m_key_descr_arr[active_index_pos()]; THD *thd = ha_thd(); while (1) { @@ -9835,19 +9835,10 @@ int ha_rocksdb::records_from_index(ha_rows *num_rows, uint index) { */ int ha_rocksdb::index_next(uchar *const buf) { DBUG_ENTER_FUNC(); - check_build_decoder(); - bool moves_forward = true; ha_statistic_increment(&System_status_var::ha_read_next_count); - if (m_key_descr_arr[active_index]->m_is_reverse_cf) { - moves_forward = false; - } - - int rc = index_next_with_direction(buf, moves_forward); - if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; - - DBUG_RETURN(rc); + DBUG_RETURN(index_next_intern(buf)); } /** @@ -9857,19 +9848,10 @@ int ha_rocksdb::index_next(uchar *const buf) { */ int ha_rocksdb::index_prev(uchar *const buf) { DBUG_ENTER_FUNC(); - check_build_decoder(); - bool moves_forward = false; ha_statistic_increment(&System_status_var::ha_read_prev_count); - if (m_key_descr_arr[active_index]->m_is_reverse_cf) { - moves_forward = true; - } - - int rc = index_next_with_direction(buf, moves_forward); - if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; - - DBUG_RETURN(rc); + DBUG_RETURN(index_prev_intern(buf)); } int ha_rocksdb::index_next_with_direction(uchar *const buf, bool move_forward) { @@ -9877,7 +9859,7 @@ int ha_rocksdb::index_next_with_direction(uchar *const buf, bool move_forward) { int rc; - if (active_index == pk_index(table, m_tbl_def)) { + if (active_index == table->s->primary_key) { rc = rnd_next_with_direction(buf, move_forward); } else { THD *thd = ha_thd(); @@ -9896,7 +9878,7 @@ int ha_rocksdb::index_next_with_direction(uchar *const buf, bool move_forward) { m_scan_it->Prev(); } } - rc = rocksdb_skip_expired_records(*m_key_descr_arr[active_index], + rc = rocksdb_skip_expired_records(*m_key_descr_arr[active_index_pos()], m_scan_it, !move_forward); if (rc != HA_EXIT_SUCCESS) { break; @@ -10017,7 +9999,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, bool first) { uint key_size; int rc; - if (is_pk(active_index, table, m_tbl_def)) { + if (active_index == table->s->primary_key) { key = m_pk_packed_tuple; } else { key = m_sk_packed_tuple; @@ -10025,7 +10007,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, bool first) { DBUG_ASSERT(key != nullptr); - const Rdb_key_def &kd = *m_key_descr_arr[active_index]; + const Rdb_key_def &kd = *m_key_descr_arr[active_index_pos()]; bool backwards = (first && kd.m_is_reverse_cf) || (!first && !kd.m_is_reverse_cf); @@ -10166,6 +10148,11 @@ uint ha_rocksdb::pk_index(const TABLE *const table_arg, : table_arg->s->primary_key; } +/* Returns the index into m_key_descr_arr array based on active_index */ +uint ha_rocksdb::active_index_pos() { + return active_index == MAX_KEY ? m_tbl_def->m_key_count - 1 : active_index; +} + /* Returns true if given index number is a primary key */ bool ha_rocksdb::is_pk(const uint index, const TABLE *const table_arg, const Rdb_tbl_def *const tbl_def_arg) { @@ -11292,50 +11279,21 @@ void ha_rocksdb::release_scan_iterator() { } } -void ha_rocksdb::setup_iterator_for_rnd_scan() { - uint key_size; - - int key_start_matching_bytes = - m_pk_descr->get_first_key(m_pk_packed_tuple, &key_size); - - rocksdb::Slice table_key((const char *)m_pk_packed_tuple, key_size); - - setup_scan_iterator(*m_pk_descr, &table_key, false, key_start_matching_bytes); - m_scan_it->Seek(table_key); - m_skip_scan_it_next_call = true; -} - /** @return HA_EXIT_SUCCESS OK other HA_ERR error code (can be SE-specific) */ -int ha_rocksdb::rnd_init(bool scan) { +int ha_rocksdb::rnd_init(bool) { DBUG_ENTER_FUNC(); m_need_build_decoder = true; active_index = table->s->primary_key; - THD *thd = ha_thd(); - if (thd && thd->killed) { - DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED); - } - - Rdb_transaction *const tx = get_or_create_tx(table->in_use); - - if (scan) { - m_rnd_scan_is_new_snapshot = !tx->has_snapshot(); - setup_iterator_for_rnd_scan(); - } else { - /* We don't need any preparations for rnd_pos() calls. */ - } - - // If m_lock_rows is on then we will be doing a get_for_update when accessing - // the index, so don't acquire the snapshot right away. Otherwise acquire - // the snapshot immediately. - tx->acquire_snapshot(m_lock_rows == RDB_LOCK_NONE); - - DBUG_RETURN(HA_EXIT_SUCCESS); + m_rnd_scan_started = false; + DBUG_RETURN( + index_init(has_hidden_pk(table) ? MAX_KEY : pk_index(table, m_tbl_def), + false /* sorted */)); } /** @@ -11350,20 +11308,22 @@ int ha_rocksdb::rnd_next(uchar *const buf) { int rc; ha_statistic_increment(&System_status_var::ha_read_rnd_next_count); - for (;;) { - rc = rnd_next_with_direction(buf, true); - if (!should_recreate_snapshot(rc, m_rnd_scan_is_new_snapshot)) { - break; /* exit the loop */ + + /* + Since order does not matter, the scan will occur go with natural index + order. + */ + bool is_reverse_cf = m_key_descr_arr[active_index_pos()]->m_is_reverse_cf; + if (!m_rnd_scan_started) { + rc = index_read_intern(buf, !is_reverse_cf /* first */); + m_rnd_scan_started = true; + } else { + if (is_reverse_cf) { + rc = index_prev_intern(buf); + } else { + rc = index_next_intern(buf); } - // release the snapshot and iterator and then regenerate them - Rdb_transaction *tx = get_or_create_tx(table->in_use); - tx->release_snapshot(); - release_scan_iterator(); - setup_iterator_for_rnd_scan(); } - - m_rnd_scan_is_new_snapshot = false; - if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; DBUG_RETURN(rc); @@ -11492,12 +11452,9 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) { int ha_rocksdb::rnd_end() { DBUG_ENTER_FUNC(); - + DBUG_RETURN(index_end()); m_need_build_decoder = false; - release_scan_iterator(); - - DBUG_RETURN(HA_EXIT_SUCCESS); } void ha_rocksdb::build_decoder() { @@ -11561,6 +11518,34 @@ int ha_rocksdb::index_end() { DBUG_RETURN(HA_EXIT_SUCCESS); } +int ha_rocksdb::index_next_intern(uchar *const buf) { + DBUG_ENTER_FUNC(); + + bool moves_forward = true; + if (m_key_descr_arr[active_index_pos()]->m_is_reverse_cf) { + moves_forward = false; + } + + int rc = index_next_with_direction(buf, moves_forward); + if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; + + DBUG_RETURN(rc); +} + +int ha_rocksdb::index_prev_intern(uchar *const buf) { + DBUG_ENTER_FUNC(); + + bool moves_forward = false; + if (m_key_descr_arr[active_index_pos()]->m_is_reverse_cf) { + moves_forward = true; + } + + int rc = index_next_with_direction(buf, moves_forward); + if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; + + DBUG_RETURN(rc); +} + /** Called by the partition manager for truncating tables. @@ -14025,7 +14010,7 @@ int ha_rocksdb::inplace_populate_sk( } /* - Note: We pass in the currently existing table + tbl_def object here, + Note: We use the currently existing table + tbl_def object here, as the pk index position may have changed in the case of hidden primary keys. */ @@ -14034,14 +14019,14 @@ int ha_rocksdb::inplace_populate_sk( if (res) DBUG_RETURN(res); /* Scan each record in the primary key in order */ - for (res = index_first(table->record[0]); res == 0; - res = index_next(table->record[0])) { + for (res = ha_rnd_next(table->record[0]); res == 0; + res = ha_rnd_next(table->record[0])) { longlong hidden_pk_id = 0; if (hidden_pk_exists && (res = read_hidden_pk_id_from_rowkey(&hidden_pk_id))) { // NO_LINT_DEBUG sql_print_error("Error retrieving hidden pk id."); - ha_index_end(); + ha_rnd_end(); DBUG_RETURN(res); } @@ -14062,7 +14047,7 @@ int ha_rocksdb::inplace_populate_sk( disk in sorted chunks. */ if ((res = rdb_merge.add(key, val))) { - ha_index_end(); + ha_rnd_end(); DBUG_RETURN(res); } } @@ -14070,11 +14055,11 @@ int ha_rocksdb::inplace_populate_sk( if (res != HA_ERR_END_OF_FILE) { // NO_LINT_DEBUG sql_print_error("Error retrieving index entry from primary key."); - ha_index_end(); + ha_rnd_end(); DBUG_RETURN(res); } - ha_index_end(); + ha_rnd_end(); /* Perform an n-way merge of n sorted buffers on disk, then writes all diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 18c40fc1ca9..ba62521fa79 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -282,8 +282,7 @@ class ha_rocksdb : public my_core::handler { bool m_skip_scan_it_next_call; - /* true means we are accessing the first row after a snapshot was created */ - bool m_rnd_scan_is_new_snapshot; + bool m_rnd_scan_started; /* true means INSERT ON DUPLICATE KEY UPDATE. In such case we can optimize by @@ -331,7 +330,6 @@ class ha_rocksdb : public my_core::handler { MY_ATTRIBUTE((__nonnull__(2, 3), __warn_unused_result__)); int secondary_index_read(const int keyno, uchar *const buf) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); - void setup_iterator_for_rnd_scan(); static void setup_iterator_bounds(const Rdb_key_def &kd, const rocksdb::Slice &eq_cond, size_t bound_len, uchar *const lower_bound, @@ -539,6 +537,8 @@ class ha_rocksdb : public my_core::handler { const Rdb_tbl_def *const tbl_def_arg) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); + uint active_index_pos() MY_ATTRIBUTE((__warn_unused_result__)); + static bool is_pk(const uint index, const TABLE *table_arg, const Rdb_tbl_def *tbl_def_arg) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); @@ -644,6 +644,11 @@ class ha_rocksdb : public my_core::handler { int index_last(uchar *const buf) override MY_ATTRIBUTE((__warn_unused_result__)); + int index_next_intern(uchar *const buf) + MY_ATTRIBUTE((__warn_unused_result__)); + int index_prev_intern(uchar *const buf) + MY_ATTRIBUTE((__warn_unused_result__)); + class Item *idx_cond_push(uint keyno, class Item *const idx_cond) override; /* Default implementation from cancel_pushed_idx_cond() suits us
participants (1)
-
psergey