revision-id: 8e230333ad43098b8460240386b82e5d67a974c0 (fb-prod201903-168-g8e230333ad4) parent(s): 6f58201e1a4a18b20cd1d414b5ad3f057a622245 author: Sergei Petrunia committer: Sergei Petrunia timestamp: 2019-09-21 00:55:01 +0300 message: Issue #790, MyRocks/MRR: code cleanup - Mrr_sec_key_rowid_source::get_next_rowid() does an index scan on the secondary index. This may produce an error (e.g. "data corrupted" or other kind of RocksDB). Don't swallow this error, return it all they way up to the SQL layer. - Do TTL filtering (if we find an expired row, skip it). --- storage/rocksdb/ha_rocksdb.cc | 126 ++++++++++++++++++++++-------------------- 1 file changed, 67 insertions(+), 59 deletions(-) diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 10db1e9398b..15601fd3f82 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -15230,7 +15230,7 @@ class Mrr_rowid_source { public: // Get the next rowid, in the on-disk mem-comparable form. Also, get the // "range pointer" associated with the rowid (it is returned in *range_ptr). - virtual int get_next_rowid(uchar *buf, char **range_ptr) = 0 ; + virtual int get_next_rowid(uchar *buf, int *size, char **range_ptr) = 0; virtual bool eof() = 0; virtual ~Mrr_rowid_source() {} }; @@ -15250,14 +15250,14 @@ class Mrr_pk_scan_rowid_source : public Mrr_rowid_source { mrr_seq_it = self->mrr_funcs.init(seq_init_param, n_ranges, mode); } - int get_next_rowid(uchar *buf, char **range_ptr) override { + int get_next_rowid(uchar *buf, int *size, char **range_ptr) override { if (mrr_ranges_eof) - return -1; // At eof already + return HA_ERR_END_OF_FILE; // At eof already KEY_MULTI_RANGE range; if ((mrr_ranges_eof = self->mrr_funcs.next(mrr_seq_it, &range))) - return -1; // Got eof now + return HA_ERR_END_OF_FILE; // Got eof now key_part_map all_parts_map = (key_part_map(1) << self->m_pk_descr->get_key_parts()) - 1; @@ -15265,9 +15265,11 @@ class Mrr_pk_scan_rowid_source : public Mrr_rowid_source { DBUG_ASSERT(range.end_key.keypart_map == all_parts_map); *range_ptr = range.ptr; - return self->m_pk_descr->pack_index_tuple(self->table, self->m_pack_buffer, - buf, range.start_key.key, - all_parts_map); + *size = self->m_pk_descr->pack_index_tuple(self->table, + self->m_pack_buffer, + buf, range.start_key.key, + all_parts_map); + return 0; } virtual bool eof() override { return mrr_ranges_eof; } @@ -15280,9 +15282,9 @@ class Mrr_pk_scan_rowid_source : public Mrr_rowid_source { // class Mrr_sec_key_rowid_source : public Mrr_rowid_source { ha_rocksdb *self; - int got_err; + int err; public: - Mrr_sec_key_rowid_source(ha_rocksdb *self_arg) : self(self_arg), got_err(0) { + Mrr_sec_key_rowid_source(ha_rocksdb *self_arg) : self(self_arg), err(0) { } int init(RANGE_SEQ_IF *seq, void *seq_init_param, @@ -15293,18 +15295,18 @@ class Mrr_sec_key_rowid_source : public Mrr_rowid_source { mode, nullptr); } - int get_next_rowid(uchar *buf, char **range_ptr) override { - if (got_err) - return got_err; + int get_next_rowid(uchar *buf, int *size, char **range_ptr) override { + if (err) + return err; - got_err = self->handler::multi_range_read_next(range_ptr); - if (!got_err) { + err = self->handler::multi_range_read_next(range_ptr); + if (!err) { memcpy(buf, self->m_last_rowkey.ptr(), self->m_last_rowkey.length()); - return self->m_last_rowkey.length(); + *size = self->m_last_rowkey.length(); } - return -1; + return err; } - virtual bool eof() override { return got_err != 0; } + virtual bool eof() override { return err != 0; } }; @@ -15421,8 +15423,10 @@ int ha_rocksdb::mrr_fill_buffer() { ssize_t n_elements = (mrr_buf.buffer_end - mrr_buf.buffer) / element_size; if (n_elements < 1) { + // We shouldn't get here as multi_range_read_init() has logic to fall back + // to the default MRR implementation in this case. DBUG_ASSERT(0); - return HA_ERR_INTERNAL_ERROR; // error + return HA_ERR_INTERNAL_ERROR; } char *buf = (char *)mrr_buf.buffer; @@ -15457,8 +15461,9 @@ int ha_rocksdb::mrr_fill_buffer() { mrr_n_elements = elem; int key_size; char *range_ptr; - while ((key_size = mrr_rowid_reader->get_next_rowid((uchar*)buf, &range_ptr)) > 0 ) { - + int err; + while (!(err = mrr_rowid_reader->get_next_rowid((uchar*)buf, &key_size, + &range_ptr))) { DEBUG_SYNC(table->in_use, "rocksdb.mrr_fill_buffer.loop"); if (table->in_use->killed) return HA_ERR_QUERY_INTERRUPTED; @@ -15471,13 +15476,15 @@ int ha_rocksdb::mrr_fill_buffer() { elem++; mrr_n_elements= elem; - if ((elem == n_elements) || - (buf + m_pk_descr->max_storage_fmt_length() >= (char*)mrr_buf.buffer_end)) { + if ((elem == n_elements) || (buf + m_pk_descr->max_storage_fmt_length() >= + (char*)mrr_buf.buffer_end)) { // No more buffer space break; } } + if (err && err != HA_ERR_END_OF_FILE) return err; + if (mrr_n_elements == 0) return HA_ERR_END_OF_FILE; // nothing to scan Rdb_transaction *const tx = get_or_create_tx(table->in_use); @@ -15517,53 +15524,54 @@ int ha_rocksdb::multi_range_read_next(char **range_info) { return handler::multi_range_read_next(range_info); } - while (1) { - if (mrr_read_index >= mrr_n_elements) { - if (mrr_rowid_reader->eof() || !mrr_n_elements) { - table->status = STATUS_NOT_FOUND; // not sure if this is necessary? - mrr_free_rows(); - return HA_ERR_END_OF_FILE; - } + Rdb_transaction *&tx = get_tx_from_thd(table->in_use); + int rc; - if (table->in_use->killed) return HA_ERR_QUERY_INTERRUPTED; + do { + while (1) { + if (mrr_read_index >= mrr_n_elements) { + if (mrr_rowid_reader->eof() || !mrr_n_elements) { + table->status = STATUS_NOT_FOUND; // not sure if this is necessary? + mrr_free_rows(); + return HA_ERR_END_OF_FILE; + } - int res; - if ((res = mrr_fill_buffer())) { - if (res == HA_ERR_END_OF_FILE) - table->status = STATUS_NOT_FOUND; - return res; + if (table->in_use->killed) return HA_ERR_QUERY_INTERRUPTED; + + if ((rc = mrr_fill_buffer())) { + if (rc == HA_ERR_END_OF_FILE) table->status = STATUS_NOT_FOUND; + return rc; + } } + // Skip the "is not found" errors + if (mrr_statuses[mrr_read_index].ok()) break; + mrr_read_index++; } - // Skip the "is not found" errors - if (mrr_statuses[mrr_read_index].ok()) break; - mrr_read_index++; - } - // Ok, mrr_read_index points to the next row - size_t cur_key = mrr_read_index++; + size_t cur_key = mrr_read_index++; + + const rocksdb::Slice &rowkey = mrr_keys[cur_key]; + m_last_rowkey.copy((const char *)rowkey.data(), rowkey.size(), + &my_charset_bin); + + *range_info = mrr_range_ptrs[cur_key]; + + m_retrieved_record.Reset(); + m_retrieved_record.PinSlice(mrr_values[cur_key], &mrr_values[cur_key]); - // get the next row out -#if 0 /* If we found the record, but it's expired, pretend we didn't find it. */ - if (!skip_ttl_check && m_pk_descr->has_ttl() && + if (m_pk_descr->has_ttl() && should_hide_ttl_rec(*m_pk_descr, m_retrieved_record, tx->m_snapshot_timestamp)) { - DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); + m_retrieved_record.Reset(); + mrr_values[cur_key].Reset(); + continue; } -#endif - const rocksdb::Slice &rowkey = mrr_keys[cur_key]; - m_last_rowkey.copy((const char *)rowkey.data(), rowkey.size(), - &my_charset_bin); - - *range_info = mrr_range_ptrs[cur_key]; - - m_retrieved_record.Reset(); - m_retrieved_record.PinSlice(mrr_values[cur_key], &mrr_values[cur_key]); - int rc = convert_record_from_storage_format(&rowkey, table->record[0]); - - m_retrieved_record.Reset(); - mrr_values[cur_key].Reset(); - table->status = rc ? STATUS_NOT_FOUND : 0; + rc = convert_record_from_storage_format(&rowkey, table->record[0]); + m_retrieved_record.Reset(); + mrr_values[cur_key].Reset(); + table->status = rc ? STATUS_NOT_FOUND : 0; + } while (0); return rc; }