[Commits] 269ee6f6076: Issue #790: MultiGet-based MRR: Code cleanup
revision-id: 269ee6f6076173a579e721d35b3834d8433271c3 (fb-prod201903-161-g269ee6f6076) parent(s): 4eb7a964f21d743d816d41870efd9133d4198469 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-09-14 18:43:28 +0300 message: Issue #790: MultiGet-based MRR: Code cleanup Code cleanup. --- storage/rocksdb/ha_rocksdb.cc | 193 ++++++++++++++++++++++++++++-------------- storage/rocksdb/ha_rocksdb.h | 4 - 2 files changed, 130 insertions(+), 67 deletions(-) diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index e490684e79b..cd2a195f088 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -15114,16 +15114,23 @@ void rdb_tx_multi_get(Rdb_transaction *tx, tx->multi_get(column_family, num_keys, keys, values, statuses, sorted_input); } -//// + /**************************************************************************** - * DS-MRR implementation + * Multi-Range-Read implementation based on RocksDB's MultiGet() call ***************************************************************************/ /* - A MultiGet-MRR is applicable if - - using the clustered PK - - all ranges are single-point lookups using full PK. + Check if MultiGet-MRR can be used to scan given list of ranges. + + @param seq List of ranges to scan + @param bufsz OUT How much buffer space will be required + @param flags INOUT Properties of the scan to be done + + @return + HA_POS_ERROR - The scan cannot be done at all + Other value - Number of expected output rows */ + ha_rows ha_rocksdb::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, void *seq_init_param, uint n_ranges, uint *bufsz, @@ -15132,9 +15139,9 @@ ha_rows ha_rocksdb::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, ha_rows res; THD *thd = table->in_use; - // MultiGet-MRR is allowed only with these settings: - // optimizer_switch='mrr=on,mrr_cost_based=off' - + // We allow MultiGet-MRR only with these settings: + // optimizer_switch='mrr=on,mrr_cost_based=off' + // mrr_cost_based is not supported bool mrr_enabled = thd->optimizer_switch_flag(OPTIMIZER_SWITCH_MRR) && !thd->optimizer_switch_flag(OPTIMIZER_SWITCH_MRR_COST_BASED); @@ -15142,10 +15149,17 @@ ha_rows ha_rocksdb::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, res = handler::multi_range_read_info_const(keyno, seq, seq_init_param, n_ranges, bufsz, flags, cost); - if (res == HA_POS_ERROR || m_lock_rows != RDB_LOCK_NONE || !mrr_enabled) + if (res == HA_POS_ERROR) return res; // Not possible to do the scan + + // Use the default MRR implementation if @@optimizer_switch value tells us + // to, or if the query needs to do a locking read. + if (!mrr_enabled || m_lock_rows != RDB_LOCK_NONE) return res; if (keyno == table->s->primary_key) { + // We need all ranges to be single-point lookups using full PK values. + // (Range scans, like "pk BETWEEN 10 and 20" or restrictions on PK prefix + // cannot be used) bool all_eq_ranges = true; KEY_MULTI_RANGE range; range_seq_t seq_it; @@ -15155,15 +15169,18 @@ ha_rows ha_rocksdb::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, all_eq_ranges = false; break; } + if (table->in_use->killed) return HA_POS_ERROR; } if (all_eq_ranges) { + // Indicate that we will use Mutlit-Get MRR *flags &= ~HA_MRR_USE_DEFAULT_IMPL; *flags |= HA_MRR_SUPPORT_SORTED; *bufsz = mrr_get_length_per_rec() * res * 1.1 + 1; } } else { - // Secondary keys are supported if the scan is non-index_only + // For scans on secondary keys, we use MultiGet when we read the PK values. + // We only need PK values when the scan is non-index-only. if (!(*flags & HA_MRR_INDEX_ONLY)) { *flags &= ~HA_MRR_SUPPORT_SORTED; // Non-sorted mode *flags &= ~HA_MRR_USE_DEFAULT_IMPL; @@ -15194,7 +15211,6 @@ ha_rows ha_rocksdb::multi_range_read_info(uint keyno, uint n_ranges, uint keys, } if (keyno != table->s->primary_key && !(*flags & HA_MRR_INDEX_ONLY)) { - // Secondary key mode *flags &= ~HA_MRR_USE_DEFAULT_IMPL; *flags &= ~HA_MRR_SUPPORT_SORTED; // Non-sorted mode } @@ -15202,17 +15218,23 @@ ha_rows ha_rocksdb::multi_range_read_info(uint keyno, uint n_ranges, uint keys, return 0; // "0" means ok, despite the ha_rows return type. } -/* - The source of rowids for the MRR scan. -*/ + +// +// Source of Rowids for the MRR scan +// class Mrr_rowid_source { public: - // This will get the next rowid tuple. In on-disk format. + // Get the next rowid, in the on-disk mem-comparable form. Also, get the + // "range pointer" associated with the rowid (it is returned in *range_ptr). virtual int get_next_rowid(uchar *buf, char **range_ptr) = 0 ; virtual bool eof() = 0; virtual ~Mrr_rowid_source() {} }; + +// +// Rowid source that produces rowids by enumerating a seqence of ranges +// class Mrr_pk_scan_rowid_source : public Mrr_rowid_source { range_seq_t mrr_seq_it; bool mrr_ranges_eof; // true means we've got eof when enumerating the ranges. @@ -15247,6 +15269,11 @@ class Mrr_pk_scan_rowid_source : public Mrr_rowid_source { virtual bool eof() override { return mrr_ranges_eof; } }; + +// +// Rowid source that produces rowids by doing an index-only scan on a +// secondary index and returning rowids from the index records +// class Mrr_sec_key_rowid_source : public Mrr_rowid_source { ha_rocksdb *self; int got_err; @@ -15256,7 +15283,7 @@ class Mrr_sec_key_rowid_source : public Mrr_rowid_source { int init(RANGE_SEQ_IF *seq, void *seq_init_param, uint n_ranges, uint mode) { - self->m_keyread_only = true; // TODO: is this ugly or fine? + self->m_keyread_only = true; self->mrr_enabled_keyread = true; return self->handler::multi_range_read_init(seq, seq_init_param, n_ranges, mode, nullptr); @@ -15277,10 +15304,7 @@ class Mrr_sec_key_rowid_source : public Mrr_rowid_source { }; -/** - * Multi Range Read interface, DS-MRR calls - */ - +// Initialize an MRR scan int ha_rocksdb::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param, uint n_ranges, uint mode, HANDLER_BUFFER *buf) { @@ -15315,18 +15339,22 @@ int ha_rocksdb::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param, if (active_index == table->s->primary_key) { mrr_rowid_reader = new Mrr_pk_scan_rowid_source(this, seq_init_param, n_ranges, mode); - mrr_n_rowids = n_ranges; } else { auto reader = new Mrr_sec_key_rowid_source(this); reader->init(seq, seq_init_param, n_ranges, mode); mrr_rowid_reader = reader; - mrr_n_rowids = SSIZE_MAX-1; // TODO: get rid of this } + // note: here, we must NOT return HA_ERR_END_OF_FILE even if we know there + // are no matches. We should return 0 here and return HA_ERR_END_OF_FILE + // from the first multi_range_read_next() call. + res = mrr_fill_buffer(); return res; } +// Return the amount of buffer space that MRR scan requires for each record +// returned uint ha_rocksdb::mrr_get_length_per_rec() { return sizeof(rocksdb::Slice) + sizeof(rocksdb::Status) + sizeof(rocksdb::PinnableSlice) + @@ -15335,80 +15363,115 @@ uint ha_rocksdb::mrr_get_length_per_rec() { } +template <typename T> void align_ptr(char **p) { + if (((size_t)p) % alignof(T)) { + *p += alignof(T) - ((size_t)p) % alignof(T); + } +} + +/* + We've got a buffer in mrr_buf, and in order to call RocksDB's MultiGet, we + need to use this space to construct several arrays of the same size N: + + rocksdb::Slice[N] - lookup keys + rocksdb::Status[N] - return statuses + rocksdb::PinnableSlice[N] - return rows (*) + char*[N] - "ptr" value of KEY_MULTI_RANGE. This tells the + SQL layer which lookup key the returned record + matches with (**) + {PK lookup value}[N] - The rowid (Primary Key) to lookup. The + corresponding rocksdb::Slice object points to + this key. + + (*) The memory for rows is allocated somewhere inside RocksDB, there's no + way to make it use the user-supplied buffer. + (**) The engine could specify HA_MRR_NO_ASSOCIATION which would mean "we + cannot tell which key the returned records match" we don't do this. + + The PK lookup value is in mem-comparable encoding. It may have variable + length (this is the case when table's PRIMARY KEY has VARCHAR() columns). + Currently, we optimize for fixed-size primary keys and consume + m_pk_descr->max_storage_fmt_length() bytes for each lookup value. One can + develop a solution for variable-length PKs but this is not a priority. + + Note that the buffer may be much larger than necessary. For range scans, + @@rnd_buffer_size=256K is passed, even if there will be only a few lookup + values. +*/ + int ha_rocksdb::mrr_fill_buffer() { mrr_free_rows(); mrr_read_index = 0; - // Assume mrr_buf.buffer is aligned. - // Assume mrr_buf.buffer_end is aligned. - // This should agree with the code in mrr_get_length_per_rec(): ssize_t element_size = sizeof(rocksdb::Slice) + sizeof(rocksdb::Status) + sizeof(rocksdb::PinnableSlice) + sizeof(char *) + // this for KEY_MULTI_RANGE::ptr m_pk_descr->max_storage_fmt_length(); - // We can fit into the buffer this many elements: + // The buffer has space for this many elements: ssize_t n_elements = (mrr_buf.buffer_end - mrr_buf.buffer) / element_size; - // The "+1" is there to: - // - try to use a bit more space but get finished in one sweep (and avoid - // zero-sized second sweep) - // - for safety: we don't want 0-sized buffers - n_elements = std::min(n_elements, (ssize_t)mrr_n_rowids + 1); - if (n_elements < 1) { DBUG_ASSERT(0); return HA_ERR_INTERNAL_ERROR; // error } - // TODO: why are we allocating/de-allocating every time buffer is refilled? + char *buf = (char *)mrr_buf.buffer; - mrr_keys = new (buf) rocksdb::Slice[n_elements]; + + align_ptr<rocksdb::Slice>(&buf); + mrr_keys = (rocksdb::Slice*)buf; buf += sizeof(rocksdb::Slice) * n_elements; - mrr_statuses = new (buf) rocksdb::Status[n_elements]; + align_ptr<rocksdb::Status>(&buf); + mrr_statuses = (rocksdb::Status*)buf; buf += sizeof(rocksdb::Status) * n_elements; - mrr_values = new (buf) rocksdb::PinnableSlice[n_elements]; + align_ptr<rocksdb::PinnableSlice>(&buf); + mrr_values = (rocksdb::PinnableSlice*)buf; buf += sizeof(rocksdb::PinnableSlice) * n_elements; + align_ptr<char*>(&buf); mrr_range_ptrs = (char **)buf; buf += sizeof(char *) * n_elements; - ssize_t elem = -1; + if (buf + m_pk_descr->max_storage_fmt_length() >= (char*)mrr_buf.buffer_end) { + // a VERY unlikely scenario: we were given a really small buffer, + // (probably for just one rowid), and also we had to use some bytes for + // alignment. As a result, there's no buffer space left to hold even one + // rowid. Return an error immediately to avoid looping. + DBUG_ASSERT(0); + return HA_ERR_INTERNAL_ERROR; // error + } + + ssize_t elem = 0; + + mrr_n_elements = elem; int key_size; char *range_ptr; while ((key_size = mrr_rowid_reader->get_next_rowid((uchar*)buf, &range_ptr)) > 0 ) { DEBUG_SYNC(table->in_use, "rocksdb.mrr_fill_buffer.loop"); - if (table->in_use->killed) { - return HA_ERR_QUERY_INTERRUPTED; - } + if (table->in_use->killed) return HA_ERR_QUERY_INTERRUPTED; - elem++; - mrr_keys[elem] = rocksdb::Slice(buf, key_size); + new (&mrr_keys[elem]) rocksdb::Slice(buf, key_size); + new (&mrr_statuses[elem]) rocksdb::Status; + new (&mrr_values[elem]) rocksdb::PinnableSlice; mrr_range_ptrs[elem] = range_ptr; buf += key_size; - if (elem == n_elements - 1) { - // the arrays are full. bail out + elem++; + mrr_n_elements= elem; + + if ((elem == n_elements) || + (buf + m_pk_descr->max_storage_fmt_length() >= (char*)mrr_buf.buffer_end)) { + // No more buffer space break; } } - // now, mrr_keys[elem] holds the last valid element (except when elem==-1) - - mrr_n_elements = elem + 1; - - mrr_n_rowids -= mrr_n_elements; - if (mrr_n_rowids < 0) { - // a kind of "default batch size". This would be used when we were doing a - // clustered PK scan and ran over the range count - // (TODO: avoid array-based new[] and get rid of this altogether) - mrr_n_rowids = 2000; - } - if (mrr_n_elements == 0) return 0; // nothing to scan + if (mrr_n_elements == 0) return HA_ERR_END_OF_FILE; // nothing to scan Rdb_transaction *const tx = get_or_create_tx(table->in_use); @@ -15431,7 +15494,12 @@ void ha_rocksdb::mrr_free() { } void ha_rocksdb::mrr_free_rows() { - for (ssize_t i = 0; i < mrr_n_elements; i++) mrr_values[i].Reset(); + for (ssize_t i = 0; i < mrr_n_elements; i++) { + mrr_values[i].Reset(); + mrr_values[i].~PinnableSlice(); + mrr_statuses[i].~Status(); + // no need to free mrr_keys + } mrr_n_elements = 0; // We can't rely on the data from HANDLER_BUFFER once the scan is over, so: mrr_values = nullptr; @@ -15449,16 +15517,15 @@ int ha_rocksdb::multi_range_read_next(char **range_info) { mrr_free_rows(); return HA_ERR_END_OF_FILE; } - int res; + if (table->in_use->killed) return HA_ERR_QUERY_INTERRUPTED; + + int res; if ((res = mrr_fill_buffer())) { + if (res == HA_ERR_END_OF_FILE) + table->status = STATUS_NOT_FOUND; return res; } - - if (!mrr_n_elements) { - table->status = STATUS_NOT_FOUND; // not sure if this is necessary? - return HA_ERR_END_OF_FILE; - } } // Skip the "is not found" errors if (mrr_statuses[mrr_read_index].ok()) break; diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index c531b7a0340..6d3fa75e3fa 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -693,10 +693,6 @@ class ha_rocksdb : public my_core::handler { // if true, MRR code has enabled keyread (and should disable it back) bool mrr_enabled_keyread; - // Expected number of rowids that are left to scan. - // This number is used to avoid allocating huge arrays in mrr_fill_buffer - ssize_t mrr_n_rowids; - int mrr_fill_buffer(); void mrr_free_rows(); void mrr_free();
participants (1)
-
Sergei Petrunia