15 Mar
2017
15 Mar
'17
3:23 p.m.
Hi, Sergei! Here is better answer (I had time to dig in the sources) Am 14.03.2017 um 12:27 schrieb Sergei Golubchik: > Hi, Oleksandr! > > On Jan 17, Oleksandr Byelkin wrote: >> revision-id: 23959479689f47bdfe5aa33cb6cd5e1171b5f8a8 (mariadb-10.2.2-128-g23959479689) >> parent(s): 9ea5de30963dd16cec7190b8b2f77858f4a82545 >> committer: Oleksandr Byelkin >> timestamp: 2017-01-17 15:47:17 +0100 >> message: >> >> MDEV-11419: Report all INSERT ID for bulk operation INSERT >> >> Send all Insert IDs of the buld operation to client (JDBC need it) >> >> --- >> include/mysql.h.pp | 3 +- >> include/mysql_com.h | 3 +- >> sql/protocol.cc | 22 +++++++--- >> sql/protocol.h | 6 +++ >> sql/sql_class.cc | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++++ >> sql/sql_class.h | 8 ++++ >> sql/sql_insert.cc | 11 ++++- >> sql/sql_prepare.cc | 21 ++++++++-- > Tests are in C/C repository, I hope? We discussed it, will be the test. > >> diff --git a/include/mysql_com.h b/include/mysql_com.h >> index c399520022d..9fac5edd1bc 100644 >> --- a/include/mysql_com.h >> +++ b/include/mysql_com.h >> @@ -551,7 +551,8 @@ enum enum_cursor_type >> CURSOR_TYPE_NO_CURSOR= 0, >> CURSOR_TYPE_READ_ONLY= 1, >> CURSOR_TYPE_FOR_UPDATE= 2, >> - CURSOR_TYPE_SCROLLABLE= 4 >> + CURSOR_TYPE_SCROLLABLE= 4, >> + INSERT_ID_REQUEST= 128 > That's very weird. Why would "INSERT_ID_REQUEST" be a flag in the > "cursor_type"? > > Don't put this flag into enum_cursor_type It is flags and it happened that they are about cursors, but field of flags can be used for other flags also (IMHO) > >> }; >> >> >> diff --git a/sql/protocol.cc b/sql/protocol.cc >> index f8b68c02fff..608c06da2df 100644 >> --- a/sql/protocol.cc >> +++ b/sql/protocol.cc >> @@ -562,6 +562,7 @@ void Protocol::end_statement() >> >> switch (thd->get_stmt_da()->status()) { >> case Diagnostics_area::DA_ERROR: >> + thd->stop_collecting_insert_id(); >> /* The query failed, send error to log and abort bootstrap. */ >> error= send_error(thd->get_stmt_da()->sql_errno(), >> thd->get_stmt_da()->message(), >> @@ -573,12 +574,21 @@ void Protocol::end_statement() >> break; >> case Diagnostics_area::DA_OK: >> case Diagnostics_area::DA_OK_BULK: >> - error= send_ok(thd->server_status, >> - thd->get_stmt_da()->statement_warn_count(), >> - thd->get_stmt_da()->affected_rows(), >> - thd->get_stmt_da()->last_insert_id(), >> - thd->get_stmt_da()->message(), >> - thd->get_stmt_da()->skip_flush()); >> + if (thd->report_collected_insert_id()) >> + if (thd->is_error()) >> + error= send_error(thd->get_stmt_da()->sql_errno(), >> + thd->get_stmt_da()->message(), >> + thd->get_stmt_da()->get_sqlstate()); >> + else >> + error= send_eof(thd->server_status, >> + thd->get_stmt_da()->statement_warn_count()); >> + else >> + error= send_ok(thd->server_status, >> + thd->get_stmt_da()->statement_warn_count(), >> + thd->get_stmt_da()->affected_rows(), >> + thd->get_stmt_da()->last_insert_id(), >> + thd->get_stmt_da()->message(), >> + thd->get_stmt_da()->skip_flush()); >> break; >> case Diagnostics_area::DA_DISABLED: >> break; >> diff --git a/sql/protocol.h b/sql/protocol.h >> index 6397e3dd5e6..bf74f52fa98 100644 >> --- a/sql/protocol.h >> +++ b/sql/protocol.h >> @@ -30,6 +30,12 @@ class Item_param; >> typedef struct st_mysql_field MYSQL_FIELD; >> typedef struct st_mysql_rows MYSQL_ROWS; >> >> +struct insert_id_desc >> +{ >> + ulonglong first_id; >> + ulonglong sequence; >> +}; >> + >> class Protocol >> { >> protected: >> diff --git a/sql/sql_class.cc b/sql/sql_class.cc >> index 8fabc8f593e..53591c371c0 100644 >> --- a/sql/sql_class.cc >> +++ b/sql/sql_class.cc >> @@ -1474,6 +1474,7 @@ void THD::init(void) >> #endif //EMBEDDED_LIBRARY >> >> apc_target.init(&LOCK_thd_data); >> + insert_ids= NULL; >> DBUG_VOID_RETURN; >> } >> >> @@ -7365,4 +7366,119 @@ bool Discrete_intervals_list::append(Discrete_interval *new_interval) >> DBUG_RETURN(0); >> } >> >> +bool THD::init_collecting_insert_id() >> +{ >> + if (!insert_ids) >> + { >> + void *buff; >> + if (!(my_multi_malloc(MYF(MY_WME), &insert_ids, sizeof(DYNAMIC_ARRAY), >> + &buff, sizeof(insert_id_desc) * 10, >> + NullS)) || >> + my_init_dynamic_array2(insert_ids, sizeof(insert_id_desc), >> + buff, 10, 100, MYF(MY_WME))) > Huh? > 1. You allocate DYNAMIC_ARRAY, on the heap, for every (insert-id > generating) statement. Why? > > 2. You preallocate a buffer, but dynamic array cannot reallocate it, so > on the 11'th element, it'll allocate *another* buffer and won't > use yours. > > Better: put DYNAMIC_ARRAY in the THD, or (even better) allocate it with > thd->alloc. And let DYNAMIC_ARRAY do it's own memory management. Problem with thd->alloc is that MEM_ROOT of THD freed before we can send the data. Idea of preallocation is that it is hardly exceed it in most realistic scenario. >> + { >> + if (insert_ids) >> + my_free(insert_ids); >> + insert_ids= NULL; >> + return TRUE; >> + } >> + collect_auto_increment_increment= variables.auto_increment_increment; >> + } >> + return FALSE; >> +} >> + >> +void THD::stop_collecting_insert_id() >> +{ >> + if (insert_ids) >> + { >> + delete_dynamic(insert_ids); >> + my_free(insert_ids); >> + insert_ids= NULL; >> + } >> +} >> + >> +bool THD::collect_insert_id(ulonglong id) >> +{ >> + if (insert_ids) >> + { >> + if (insert_ids->elements) >> + { >> + insert_id_desc *last= >> + (insert_id_desc *)dynamic_array_ptr(insert_ids, >> + insert_ids->elements - 1); >> + if (id == last->first_id) >> + { >> + return FALSE; // no new insert id >> + } >> + if (id == last->first_id + (last->sequence * >> + collect_auto_increment_increment)) >> + { >> + last->sequence++; >> + return FALSE; >> + } >> + } >> + insert_id_desc el; >> + el.first_id= id; >> + el.sequence= 1; >> + if (insert_dynamic(insert_ids, &el)) >> + { >> + return TRUE; >> + } >> + } >> + return FALSE; >> +} > This definitely needs a comment, explaining what you're doing and how > you're storing your insert id values. OK. /** Collecting sequences of INSERT ID and storing them in the format: <first ID, number of ID in the sequence> with the step of collect_auto_increment_increment */ > >> + >> + >> +bool THD::report_collected_insert_id() >> +{ >> + if (insert_ids) >> + { >> + List<Item> field_list; >> + MEM_ROOT tmp_mem_root; >> + Query_arena arena(&tmp_mem_root, Query_arena::STMT_INITIALIZED), backup; >> + >> + init_alloc_root(arena.mem_root, 2048, 4096, MYF(0)); >> + set_n_backup_active_arena(&arena, &backup); >> + DBUG_ASSERT(mem_root == &tmp_mem_root); >> + >> + field_list.push_back(new (mem_root) >> + Item_int(this, "Id", 0, MY_INT64_NUM_DECIMAL_DIGITS), >> + mem_root); >> + field_list.push_back(new (mem_root) >> + Item_int(this, "Len", 0, MY_INT64_NUM_DECIMAL_DIGITS), >> + mem_root); >> + field_list.push_back(new (mem_root) >> + Item_return_int(this, "Inc", 0, MYSQL_TYPE_LONG), >> + mem_root); >> + >> + if (protocol_binary.send_result_set_metadata(&field_list, >> + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) >> + goto error; >> + >> + for (ulonglong i= 0; i < insert_ids->elements; i++) >> + { >> + insert_id_desc *last= >> + (insert_id_desc *)dynamic_array_ptr(insert_ids, i); >> + if (insert_ids->elements == 1 && last->first_id == 0 && >> + get_stmt_da()->affected_rows() != 1) >> + continue; // No insert IDs >> + protocol_binary.prepare_for_resend(); >> + protocol_binary.store_longlong(last->first_id, TRUE); >> + protocol_binary.store_longlong(last->sequence, TRUE); >> + protocol_binary.store_long(collect_auto_increment_increment); >> + if (protocol_binary.write()) >> + goto error; >> + } >> +error: >> + restore_active_arena(&arena, &backup); >> + DBUG_ASSERT(arena.mem_root == &tmp_mem_root); >> + // no need free Items because they was only constants >> + free_root(arena.mem_root, MYF(0)); >> + stop_collecting_insert_id(); >> + return TRUE; >> + } >> + return FALSE; >> + >> +} >> + >> #endif /* !defined(MYSQL_CLIENT) */ >> diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc >> index 7c0c1b68a2b..20b775c9273 100644 >> --- a/sql/sql_insert.cc >> +++ b/sql/sql_insert.cc >> @@ -1007,6 +1007,12 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, >> } >> its.rewind(); >> iteration++; >> + >> + if (!error && thd->bulk_param) >> + { >> + thd->collect_insert_id(table->file->insert_id_for_cur_row); >> + } > 1. Why are you doing it here, and not per row, inside > `while ((values= its++))` loop? It is what we need INSERT ID as if the command would be executed with the parameters separately, i.e ID correspond to PARAMETER SET. > > 2. What about INSERT ... SELECT? It is not supported by ARRAY binding. > > 3. What about a insert-id generated in one statement but for many > tables? (stored function, trigger, whatever) As above we need what usual INSERT send back to client in single execution. > >> + >> } while (iteration < bulk_iterations); > Regards, > Sergei > Chief Architect MariaDB > and security@mariadb.org > > _______________________________________________ > Mailing list: https://launchpad.net/~maria-developers > Post to : maria-developers@lists.launchpad.net > Unsubscribe : https://launchpad.net/~maria-developers > More help : https://help.launchpad.net/ListHelp