Hi!
On Sun, Oct 9, 2016 at 1:45 AM, Andrea <montyw(a)askmonty.org> wrote:
>
>
>
> --- Forwarded message ---
> From: Oleksandr Byelkin <sanja(a)montyprogram.com>
> Date: 8 October 2016 1:01:58 p.m.
> Subject: REVIEW: Fwd: [Commits] 8cd4778: MDEV-9114: Bulk operations (Array
> binding)
> To: Michael Widenius <monty(a)mariadb.com>
>
> Hi, Monty!
>
>
> It is the patch. There left 2 "TODO" marks about embedded I think that
> best variant leave there it as is, but maybe you have other opinion.
Embedded is ok to not have bulk insert (for now), but we should give
an error, not an assert if this is used!
Review of bulk operations patch
> diff --git a/include/mysql.h.pp b/include/mysql.h.pp
> index 857f5b9..c985792 100644
> --- a/include/mysql.h.pp
> +++ b/include/mysql.h.pp
> @@ -11,11 +11,17 @@ enum enum_server_command
> COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
> COM_MDB_GAP_BEG,
> COM_MDB_GAP_END=250,
> - COM_SLAVE_WORKER,
> - COM_SLAVE_IO,
> - COM_SLAVE_SQL,
> - COM_MULTI,
> - COM_END
> + COM_SLAVE_WORKER=251,
> + COM_SLAVE_IO=252,
> + COM_SLAVE_SQL=253,
> + COM_MULTI=254,
> + COM_END=255
> +};
Why the numbering ?
(Just curious as this shouldn't be needed)
> +enum enum_indicator_type
> +{
> + STMT_INDICATOR_NONE= 0,
> + STMT_INDICATOR_NULL,
> + STMT_INDICATOR_DEFAULT
> };
Please add comment what this enum is used for.
Note that this enum_indicator_type isn't used in this commit.
(I have a fix for this later on)
You should probably just rename it to indicator_type.
(On can always use 'enum indicator_type' if one wants to point out in the code
that it's an enum).
> diff --git a/sql/item.cc b/sql/item.cc
> index 61635ea..41a7aaf 100644
> --- a/sql/item.cc
> +++ b/sql/item.cc
> @@ -812,6 +818,13 @@ bool Item_ident::collect_outer_ref_processor(void *param)
> return FALSE;
> }
>
> +void Item_ident::set_default_value_target(Item *item)
> +{
> + if ((default_value_target= item) && fixed &&
Please add a comment under which circumstances fixed could be false.
Also add a comment what it means when set_default_value_source isn't called.
Will it be called later or isn't it needed ?
> + type() == FIELD_ITEM)
> + default_value_target->set_default_value_source(((Item_field *)this)->
> + field);
> +}
> @@ -7242,6 +7300,7 @@ bool Item_ref::fix_fields(THD *thd, Item **reference)
> Item_field* fld;
> if (!(fld= new (thd->mem_root) Item_field(thd, from_field)))
> goto error;
> + fld->set_default_value_target(default_value_target);
Add a comment like: /* Note that fld->fixed isn't set here */
> @@ -8428,36 +8487,38 @@ bool Item_default_value::eq(const Item *item, bool binary_cmp) const
> bool Item_default_value::fix_fields(THD *thd, Item **items)
> {
> Item *real_arg;
> - Item_field *field_arg;
> Field *def_field;
> DBUG_ASSERT(fixed == 0);
>
> - if (!arg)
> + if (!arg && !arg_fld)
> {
> fixed= 1;
> return FALSE;
> }
> - if (!arg->fixed && arg->fix_fields(thd, &arg))
> - goto error;
> + if (arg)
> + {
> + if (!arg->fixed && arg->fix_fields(thd, &arg))
> + goto error;
>
>
> - real_arg= arg->real_item();
> - if (real_arg->type() != FIELD_ITEM)
> - {
> - my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), arg->name);
> - goto error;
> - }
> + real_arg= arg->real_item();
> + if (real_arg->type() != FIELD_ITEM)
> + {
> + my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), arg->name);
> + goto error;
> + }
>
> - field_arg= (Item_field *)real_arg;
> - if ((field_arg->field->flags & NO_DEFAULT_VALUE_FLAG))
> + arg_fld= ((Item_field *)real_arg)->field;
> + }
What is the reason from having arg_field (type field) instead of
field_arg (type Item_field) ?
arg_field is a bad name for a class variable as it doesn't tell us
what the variable actually contains. A variable name in a function is
not that critical to name right as one can understand what it contains
by looking at is usage. For a class variable this isn't the case.
field_in_table or just field would be a bit more clear (but not yet perfect).
I looked at the useage arg_field and don't understand why this needs to be
part of the class and not just a local variable.
As def_field is basically a copy of def_field, then another possible name
for arg_fld would be def_field_org.
> --- a/sql/item.h
> +++ b/sql/item.h
> @@ -1872,6 +1872,9 @@ class Item: public Value_source,
> {
> marker &= ~EXTRACTION_MASK;
> }
> +
> + virtual void set_default_value_source(Field *fld) {};
> + virtual bool set_default_if_needed() { return FALSE; };
> };
>
>
> @@ -2351,6 +2354,8 @@ class Item_ident :public Item_result_field
> const char *orig_table_name;
> const char *orig_field_name;
>
remove the extra empty line here
> + Item *default_value_target;
> +
> @@ -2727,6 +2736,15 @@ class Item_param :public Item_basic_value,
> DECIMAL_VALUE
> } state;
>
> + Field *default_value_ref;
> + Item_default_value *default_value_source;
> + /*
> + Used for bulk protocol. Indicates if we should expect
> + indicators byte before value of the parameter
> + */
> + my_bool indicators;
> + uint indicator;
> @@ -4980,14 +5011,19 @@ class Item_default_value : public Item_field
> void calculate();
> public:
> Item *arg;
> + Field *arg_fld;
Why is the above needed ?
How is it used ?
> +++ b/sql/sql_base.cc
> @@ -7810,9 +7810,10 @@ fill_record(THD *thd, TABLE *table_arg, List<Item> &fields, List<Item> &values,
> if (table->next_number_field &&
> rfield->field_index == table->next_number_field->field_index)
> table->auto_increment_field_not_null= TRUE;
> - if (rfield->vcol_info &&
> - value->type() != Item::DEFAULT_VALUE_ITEM &&
> - value->type() != Item::NULL_ITEM &&
> + Item::Type type= value->type();
> + if (rfield->vcol_info &&
> + type != Item::DEFAULT_VALUE_ITEM &&
> + type != Item::NULL_ITEM &&
> table->s->table_category != TABLE_CATEGORY_TEMPORARY)
The above code is slower than the original as it will requre an extra
function call.
Better to do:
if (rfield->vcol_info)
{
Item::Type type= value->type();
if (type != Item::DEFAULT_VALUE_ITEM &&
type != Item::NULL_ITEM &&
table->s->table_category != TABLE_CATEGORY_TEMPORARY)
...
> @@ -7820,6 +7821,8 @@ fill_record(THD *thd, TABLE *table_arg, List<Item> &fields, List<Item> &values,
> ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
> rfield->field_name, table->s->table_name.str);
> }
> + if (value->set_default_if_needed())
> + goto err;
For which use case is the above needed?
(Would like to understand the exact usage case and preferably have this
documented in the code as it's far from clear why this is needed).
Normal default handling should be handled by the following code:
if (!update && table_arg->default_field &&
table_arg->update_default_fields(0, ignore_errors))
goto err;
Why doesn't this work for bulk operations?
If value is 'default', then just not storing it into field should have the
above code to store the default value into the field.
> @@ -8060,9 +8063,10 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,
> value=v++;
> if (field->field_index == autoinc_index)
> table->auto_increment_field_not_null= TRUE;
> - if (field->vcol_info &&
> - value->type() != Item::DEFAULT_VALUE_ITEM &&
> - value->type() != Item::NULL_ITEM &&
> + Item::Type type= value->type();
> + if (field->vcol_info &&
> + type != Item::DEFAULT_VALUE_ITEM &&
> + type != Item::NULL_ITEM &&
> table->s->table_category != TABLE_CATEGORY_TEMPORARY)
Move calling of value->type() down (see other comment above for similar code)
> @@ -8070,6 +8074,8 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,
> ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
> field->field_name, table->s->table_name.str);
> }
> + if (value->set_default_if_needed())
> + goto err;
Why is the above code needed ?
> +++ b/sql/sql_class.cc
> @@ -5766,6 +5767,17 @@ int THD::decide_logging_format(TABLE_LIST *tables)
> !(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
> !binlog_filter->db_ok(db)))
> {
> +
> + if (is_bulk_op())
> + {
> + if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
> + {
> + my_error(ER_BINLOG_NON_SUPPORTED_BULK, MYF(0));
> + DBUG_PRINT("info",
> + ("decision: no logging since an error was generated"));
no logging -> aborting exceution
> diff --git a/sql/sql_class.h b/sql/sql_class.h
> index 51642ec..b444b36 100644
> --- a/sql/sql_class.h
> +++ b/sql/sql_class.h
> @@ -2465,6 +2465,8 @@ class THD :public Statement,
> */
> Query_arena *stmt_arena;
>
> + void *bulk_param;
> +
> /*
> map for tables that will be updated for a multi-table update query
> statement, for other query statements, this will be zero.
> @@ -3440,6 +3442,12 @@ class THD :public Statement,
> To raise this flag, use my_error().
> */
> inline bool is_error() const { return m_stmt_da->is_error(); }
> + void set_bulk_execution(void *bulk)
> + {
> + bulk_param= bulk;
> + m_stmt_da->set_bulk_execution(MY_TEST(bulk));
> + }
> + bool is_bulk_op() const { return m_stmt_da->is_bulk_op(); }
Why not instead do:
bool is_bulk_op() const { return MY_TEST(bulk_param); }
If this isn't the same it would be good to know why.
> index 1d234c5..34d608f 100644
> --- a/sql/sql_error.cc
> +++ b/sql/sql_error.cc
> @@ -320,7 +320,7 @@ Sql_condition::set_sqlstate(const char* sqlstate)
> }
>
> Diagnostics_area::Diagnostics_area(bool initialize)
> - : m_main_wi(0, false, initialize)
> + : is_bulk_execution(0), m_main_wi(0, false, initialize)
> {
Please add a comment why you need is_bulk_execution both in THD and
Diagnostics_area. If this is the same and this is just a cache, it would be
good to do an assert check in set_ok_status() to verify this
> @@ -376,7 +377,7 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
> const char *message)
> {
> DBUG_ENTER("set_ok_status");
> - DBUG_ASSERT(! is_set());
> + DBUG_ASSERT(!is_set() || (m_status == DA_OK_BULK && is_bulk_op()));
> /*
> In production, refuse to overwrite an error or a custom response
> with an OK packet.
> @@ -384,14 +385,23 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
> if (is_error() || is_disabled())
> return;
>
> - m_statement_warn_count= current_statement_warn_count();
> - m_affected_rows= affected_rows;
Add here the following comment:
/*
When running a bulk operation, m_status will be DA_OK for the first operation
and set to DA_OK_BULK for all following operations
*/
> + if (m_status == DA_OK_BULK)
> + {
> + DBUG_ASSERT(is_bulk_op());
You don't need the above assert as you have already tested this a few lines
above.
> + m_statement_warn_count+= current_statement_warn_count();
> + m_affected_rows+= affected_rows;
> + }
> + else
> + {
> + m_statement_warn_count= current_statement_warn_count();
> + m_affected_rows= affected_rows;
> + m_status= (is_bulk_op() ? DA_OK_BULK : DA_OK);
> + }
> --- a/sql/protocol.cc
> +++ b/sql/protocol.cc
> @@ -572,6 +572,7 @@ void Protocol::end_statement()
> thd->get_stmt_da()->statement_warn_count());
> break;
> case Diagnostics_area::DA_OK:
> + case Diagnostics_area::DA_OK_BULK:
> error= send_ok(thd->server_status,
> thd->get_stmt_da()->statement_warn_count(),
> thd->get_stmt_da()->affected_rows(),
For bulk operations, why do we send a a packet for every array element?
isn't it enough with just sending a summary last?
> diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
> @@ -637,6 +638,66 @@ static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
> }
>
Please add a detailed comment what the following function does
>
> +inline static void set_defaults_relation(Item *fld, Item *val)
> +{
> + Item::Type type= fld->type();
> + if (type == Item::FIELD_ITEM)
> + {
> + Item_field *item_field= (Item_field *)fld;
Why test fixed ?
Shouldn't all items should be fixed this late in the game.
> + if (item_field->fixed)
> + val->set_default_value_source(item_field->field);
> + else
> + item_field->set_default_value_target(val);
> + }
> + else if (type == Item::REF_ITEM)
> + {
> + Item_ref *item_field= (Item_ref *)fld;
> + // may turn to Item_field after fix_fields()
> + if (!item_field->fixed)
> + item_field->set_default_value_target(val);
> + }
> +}
> +
Please add a detailed comment what the following function does.
For example, is this only needed to be exceuted when using bulk operations?
> +void setup_deault_parameters(TABLE_LIST *table, List<Item> *fields,
> + List<Item> *values)
Typo. should be setup_default_parameters
> +{
> +
> + List_iterator_fast<Item> itv(*values);
> + Item *val;
> + if (fields->elements)
> + {
> + List_iterator_fast<Item> itf(*fields);
> + Item *fld;
> + while((fld= itf++) && (val= itv++))
> + {
> + set_defaults_relation(fld->real_item(), val);
> + }
> + }
> + else if (table != NULL)
I checked both calls to setup_default_parameters and it looks impossible that
table could be NULL when this function is called. (Both calls has
table_list->next_local= 0 just before the call).
Please add instead of the above test an DBUG_ASSERT(table) in the
beginning of this function.
> + {
> + if (table->view)
> + {
> + Field_iterator_view field_it;
> + field_it.set(table);
> + for (; !field_it.end_of_fields() && (val= itv++); field_it.next())
> + {
> + set_defaults_relation(field_it.item()->real_item(), val);
> + }
> + }
> + else
> + {
I assume this if for multi-table update ?
If yes, please add a comment
> + Field_iterator_table_ref field_it;
> + field_it.set(table);
> + for (; !field_it.end_of_fields() && (val= itv++); field_it.next())
> + {
> + Field *fld= field_it.field();
> + val->set_default_value_source(fld);
> + }
> + }
> + }
> +}
I have to acknowledge that I am not completely sure when one should use
set_default_value_source, set_defaults_relation or set_default_value_target.
Can you please document somewhere in the code the purpose of the above
functions and when they should be used.
> @@ -770,6 +833,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
> if (setup_fields(thd, Ref_ptr_array(), *values, MARK_COLUMNS_READ, 0, 0))
> goto abort;
> switch_to_nullable_trigger_fields(*values, table);
Why do we go loop over all values an extra time below?
If this is only needed for bulk operations, then we should have
some extra tests there!
> + setup_deault_parameters(table_list, &fields, values);
> @@ -885,105 +949,113 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
> goto values_loop_end;
> }
> }
> -
> - while ((values= its++))
> + for (ulong iteration= 0; iteration < bulk_iterations; iteration++)
Change to do--while as we are always executing this loop once.
> {
> - if (fields.elements || !value_count)
> +
> + if (iteration && bulk_parameters_set(thd))
> + goto abort;
Wouldn't a better name be 'get_bulk_arguments_from_client()'
This would more closely reflect what the function does.
Can you please send me a copy of your sql_insert.cc. It was very hard
to review some parts of this file as there was many indententation
changes. I tried to apply your patch to my 10.2 version, but it didn't
apply.
> @@ -1445,6 +1517,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
> /* Prepare the fields in the statement. */
> if (values)
> {
> +
Remove extra empty line
> /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
> DBUG_ASSERT (!select_lex->group_list.elements);
>
> @@ -1463,6 +1536,10 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
> check_insert_fields(thd, context->table_list, fields, *values,
> !insert_into_view, 0, &map));
>
> + setup_deault_parameters(table_list, &fields, values);
Is the above call always needed?
> diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
> @@ -960,11 +973,65 @@ static bool insert_params(Prepared_statement *stmt, uchar *null_array,
> }
>
>
> +static bool insert_bulk_params(Prepared_statement *stmt,
> + uchar **read_pos, uchar *data_end,
> + bool reset)
> +{
> + Item_param **begin= stmt->param_array;
> + Item_param **end= begin + stmt->param_count;
> +
> + DBUG_ENTER("insert_params");
> +
> + for (Item_param **it= begin; it < end; ++it)
> + {
> + Item_param *param= *it;
> + if (reset)
> + param->reset();
> + if (param->state != Item_param::LONG_DATA_VALUE)
> + {
> + if (param->indicators)
> + param->indicator= *((*read_pos)++);
> + else
> + param->indicator= STMT_INDICATOR_NONE;
> + if ((*read_pos) > data_end)
> + DBUG_RETURN(1);
> + switch (param->indicator) {
move { to next line
> + case STMT_INDICATOR_NONE:
> + if ((*read_pos) >= data_end)
> + DBUG_RETURN(1);
> + param->set_param_func(param, read_pos, (uint) (data_end - (*read_pos)));
> + if (param->state == Item_param::NO_VALUE)
> + DBUG_RETURN(1);
> + break;
> + case STMT_INDICATOR_NULL:
> + param->set_null();
> + break;
> + case STMT_INDICATOR_DEFAULT:
> + if (param->set_default(TRUE))
> + DBUG_RETURN(1);
If all the default handling is about above handling, we should be able to
do things much simpler by just not marking the field to have given a value and
let the normal default handling take over.
> + break;
> + }
> + }
> + /*
> + A long data stream was supplied for this parameter marker.
> + This was done after prepare, prior to providing a placeholder
> + type (the types are supplied at execute). Check that the
> + supplied type of placeholder can accept a data stream.
> + */
> + else
Move comment after else. Check also that comment is accurate as there
is no checks below.
> + DBUG_RETURN(1); // long is not supported here
> + }
> + DBUG_RETURN(0);
> @@ -2982,12 +3056,16 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
> thd->profiling.set_query_source(stmt->query(), stmt->query_length());
> #endif
> DBUG_PRINT("exec_query", ("%s", stmt->query()));
> - DBUG_PRINT("info",("stmt: 0x%lx", (long) stmt));
> + DBUG_PRINT("info",("stmt: 0x%lx iterations: %lu", (long) stmt, iterations));
>
When changing a DBUG_PRINT that contains 0x%lx, please change to %p
and remove the cast to long.
<cut>
> +my_bool Prepared_statement::set_bulk_parameters(bool reset)
> +{
> + DBUG_ENTER("Prepared_statement::set_bulk_parameters");
> + DBUG_PRINT("info", ("iteration: %lu", iterations));
> + if (iterations)
> + {
> +#ifndef EMBEDDED_LIBRARY
> + if ((*set_bulk_params)(this, &packet, packet_end, reset))
> +#else
> + DBUG_ASSERT(0); //TODO: support bulk parameters for embedded server
> +#endif
Please add an error instead when trying to do a bulk operations on the
client side in the embedded server. We shouldn't crash because
something isn't supported.
> +bool
> +Prepared_statement::execute_bulk_loop(String *expanded_query,
> + bool open_cursor,
> + uchar *packet_arg,
> + uchar *packet_end_arg,
> + ulong iterations_arg)
> +{
> + Reprepare_observer reprepare_observer;
> + bool error= 0;
> + packet= packet_arg;
> + packet_end= packet_end_arg;
> + iterations= iterations_arg;
> + start_param= true;
> +#ifndef DBUG_OFF
> + Item *free_list_state= thd->free_list;
> +#endif
> + thd->select_number= select_number_after_prepare;
> + thd->set_bulk_execution((void *)this);
> + /* Check if we got an error when sending long data */
> + if (state == Query_arena::STMT_ERROR)
> + {
> + my_message(last_errno, last_error, MYF(0));
> + thd->set_bulk_execution(0);
> + return TRUE;
> + }
> +
> + if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE))
> + {
> + my_error(ER_UNSUPPORTED_PS, MYF(0));
> + thd->set_bulk_execution(0);
> + return TRUE;
> + }
> +
> +#ifndef EMBEDDED_LIBRARY
> + if (setup_conversion_functions(this, &packet, packet_end, TRUE))
> +#else
> + DBUG_ASSERT(0); //TODO: support bulk parameters for embedded server
> +#endif
> + {
> + my_error(ER_WRONG_ARGUMENTS, MYF(0),
> + "mysqld_stmt_bulk_execute");
> + reset_stmt_params(this);
> + thd->set_bulk_execution(0);
> + return true;
> + }
> +
> +#ifdef NOT_YET_FROM_MYSQL_5_6
> + if (unlikely(thd->security_ctx->password_expired &&
> + !lex->is_change_password))
> + {
> + my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
> + thd->set_bulk_execution(0);
> + return true;
> + }
> +#endif
> +
> + while((iterations || start_param) && !error && !thd->is_error())
Please add space after while
Add also a comment how the code works.
As 'iterations' doesn't change at all in the loop, the above while is a bit
strange.
> + {
> + int reprepare_attempt= 0;
> +
Add a comment how the following code works.
I understand that if there is an bulk insert, then the loop that is fetching
data will happen mysql_insert(). However I don't understand how this will work
with other bulk operations that doesn't have SP_BULK_OPTIMZED set.
> + if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_OPTIMIZED))
> + {
> + if (set_bulk_parameters(TRUE))
> + {
> + thd->set_bulk_execution(0);
> + return true;
> + }
> + }
> +
> +reexecute:
---------------------
Conclusion:
Most of bulk insert code looks ok. Some missing features (embedded
server error handling, no binlog logging) that must be fixed before
pushing (at least binlog logging is critical).
However, I don't like the default handling in the new code:
- Too much new code and complex code (a lot of similar functions that
works a slightly different)
- A lot of extra overhead to loop over all fields and values, that is not
needed.
- Totally different DEFAULT handling compared to how things are normally done.
I would prefer that all the new default handling would be removed and instead
we should do things the "normal way":
- If a field is given value of DEFAULT, it should not be given a value. In
this case the normal default handling should be able to take over and
handle the case.
If this is impossible, then we should try to ensure that the code will work
exactly as if we would have given column=DEFAULT in SQL.
As we don't need any loops over all values for handling the above case
we shouldn't need that for bulk inserts either.
I will be back working normally on Friday, so then we can discuss the DEFAULT
handling on IRC then.
Regards,
Monty