developers
Threads by month
- ----- 2025 -----
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2009 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- 4 participants
- 6831 discussions

[Maria-developers] MDEV-3792: review of the handler part of the cassandra engine
by Sergei Golubchik 20 Dec '12
by Sergei Golubchik 20 Dec '12
20 Dec '12
Hi, Sergey,
The handler is quite ok. I've only had few mostly cosmetical comments.
> === modified file 'cmake/build_configurations/mysql_release.cmake'
> --- cmake/build_configurations/mysql_release.cmake 2012-06-06 12:15:29 +0000
> +++ cmake/build_configurations/mysql_release.cmake 2012-11-12 15:11:23 +0000
> @@ -46,6 +46,8 @@ SET(FEATURE_SET_large 5)
> SET(FEATURE_SET_xlarge 6)
> SET(FEATURE_SET_community 7)
>
> +SET(WITH_CASSANDRA_STORAGE_ENGINE ON)
> +
This probably should be reverted before pushing into main, right?
> IF(FEATURE_SET)
> STRING(TOLOWER ${FEATURE_SET} feature_set)
> SET(num ${FEATURE_SET_${feature_set}})
>
> === modified file 'sql/sql_join_cache.cc'
> --- sql/sql_join_cache.cc 2012-03-24 17:21:22 +0000
> +++ sql/sql_join_cache.cc 2012-11-12 15:11:23 +0000
> @@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_m
> {
> last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0;
> if (no_association &&
> - (curr_matching_chain= get_matching_chain_by_join_key()))
> + !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
Is that something that should be pushed into the main branch or
it's a temporary hack for cassandra, that should be reverted?
> return 1;
> last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain);
> return 0;
>
> === added file 'storage/cassandra/CMakeLists.txt'
> --- storage/cassandra/CMakeLists.txt 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/CMakeLists.txt 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,25 @@
> +
> +SET(cassandra_sources
> + ha_cassandra.cc
> + ha_cassandra.h
> + cassandra_se.h
> + cassandra_se.cc
> + gen-cpp/Cassandra.cpp
> + gen-cpp/cassandra_types.h
> + gen-cpp/cassandra_types.cpp
> + gen-cpp/cassandra_constants.h
> + gen-cpp/cassandra_constants.cpp
> + gen-cpp/Cassandra.h)
> +
> +#INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS})
> +
> +#INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift)
> +INCLUDE_DIRECTORIES(AFTER /home/buildbot/build/thrift-inst/include/thrift/)
this needs to be fixed before pushing into the main tree
I suppose, you'll need to detect here if thrift is available
and disable the engine otherwise
> +
> +#
> +STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
> +STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
> +#LINK_DIRECTORIES(/home/psergey/cassandra/thrift/lib)
> +
> +MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE LINK_LIBRARIES thrift)
> +# was: STORAGE_ENGINE MANDATORY
>
> === added file 'storage/cassandra/cassandra_se.cc'
> --- storage/cassandra/cassandra_se.cc 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/cassandra_se.cc 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,809 @@
> +
> +// Cassandra includes:
> +#include <inttypes.h>
> +#include <netinet/in.h>
> +#include <sys/time.h>
> +#include <stdio.h>
> +#include <stdarg.h>
> +
> +#include "Thrift.h"
> +#include "transport/TSocket.h"
> +#include "transport/TTransport.h"
> +#include "transport/TBufferTransports.h"
> +#include "protocol/TProtocol.h"
> +#include "protocol/TBinaryProtocol.h"
> +#include "gen-cpp/Cassandra.h"
> +// cassandra includes end
> +
> +#include "cassandra_se.h"
> +
> +struct st_mysql_lex_string
> +{
> + char *str;
> + size_t length;
> +};
> +
> +using namespace std;
> +using namespace apache::thrift;
> +using namespace apache::thrift::transport;
> +using namespace apache::thrift::protocol;
> +using namespace org::apache::cassandra;
> +
> +
> +void Cassandra_se_interface::print_error(const char *format, ...)
> +{
> + va_list ap;
> + va_start(ap, format);
> + // it's not a problem if output was truncated
> + vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
my_vsnprintf please
> + va_end(ap);
> +}
> +
...
> === added file 'storage/cassandra/ha_cassandra.h'
> --- storage/cassandra/ha_cassandra.h 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/ha_cassandra.h 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,328 @@
> +/*
> + Copyright (c) 2012, Monty Program Ab
> +
> + This program is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; version 2 of the License.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program; if not, write to the Free Software
> + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
> +#ifdef USE_PRAGMA_INTERFACE
> +#pragma interface /* gcc class implementation */
> +#endif
> +
> +
> +#include "my_global.h" /* ulonglong */
> +#include "thr_lock.h" /* THR_LOCK, THR_LOCK_DATA */
> +#include "handler.h" /* handler */
> +#include "my_base.h" /* ha_rows */
> +
> +#include "cassandra_se.h"
> +
> +/** @brief
> + CASSANDRA_SHARE is a structure that will be shared among all open handlers.
> + This example implements the minimum of what you will probably need.
> +*/
> +typedef struct st_cassandra_share {
> + char *table_name;
> + uint table_name_length,use_count;
> + mysql_mutex_t mutex;
> + THR_LOCK lock;
> +} CASSANDRA_SHARE;
> +
> +class ColumnDataConverter;
> +
> +struct ha_table_option_struct;
> +
> +
> +struct st_dynamic_column_value;
> +
> +typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
> + int cass_data_len,
> + struct st_dynamic_column_value *value);
> +typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
> + char **cass_data,
> + int *cass_data_len,
> + void *buf, void **freemem);
> +struct cassandra_type_def
> +{
> + const char *name;
> + CAS2DYN_CONVERTER cassandra_to_dynamic;
> + DYN2CAS_CONVERTER dynamic_to_cassandra;
> +};
> +
> +typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
> +
> +enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
> + CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
> + CT_DECIMAL};
> +
> +typedef enum cassandtra_type_enum CASSANDRA_TYPE;
> +
> +
> +
> +/** @brief
> + Class definition for the storage engine
> +*/
> +class ha_cassandra: public handler
> +{
> + friend class Column_name_enumerator_impl;
> + THR_LOCK_DATA lock; ///< MySQL lock
> + CASSANDRA_SHARE *share; ///< Shared lock info
> +
> + Cassandra_se_interface *se;
> +
> + /* description of static part of the table definition */
> + ColumnDataConverter **field_converters;
> + uint n_field_converters;
> +
> + CASSANDRA_TYPE_DEF *default_type_def;
> + /* description of dynamic columns part */
> + CASSANDRA_TYPE_DEF *special_type_field_converters;
> + LEX_STRING *special_type_field_names;
> + uint n_special_type_fields;
> + DYNAMIC_ARRAY dynamic_values, dynamic_names;
> + DYNAMIC_STRING dynamic_rec;
> +
> + ColumnDataConverter *rowkey_converter;
> +
> + bool setup_field_converters(Field **field, uint n_fields);
> + void free_field_converters();
> +
> + int read_cassandra_columns(bool unpack_pk);
> + int check_table_options(struct ha_table_option_struct* options);
> +
> + bool doing_insert_batch;
> + ha_rows insert_rows_batched;
> +
> + uint dyncol_field;
> + bool dyncol_set;
> +
> + /* Used to produce 'wrong column %s at row %lu' warnings */
> + ha_rows insert_lineno;
> + void print_conversion_error(const char *field_name,
> + char *cass_value, int cass_value_len);
> + int connect_and_check_options(TABLE *table_arg);
> +public:
> + ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
> + ~ha_cassandra()
> + {
> + free_field_converters();
> + delete se;
> + }
> +
> + /** @brief
> + The name that will be used for display purposes.
> + */
> + const char *table_type() const { return "CASSANDRA"; }
> +
> + /** @brief
> + The name of the index type that will be used for display.
> + Don't implement this method unless you really have indexes.
> + */
> + const char *index_type(uint inx) { return "HASH"; }
> +
> + /** @brief
> + The file extensions.
> + */
> + const char **bas_ext() const;
> +
> + /** @brief
> + This is a list of flags that indicate what functionality the storage engine
> + implements. The current table flags are documented in handler.h
> + */
> + ulonglong table_flags() const
> + {
> + /*
> + HA_BINLOG_STMT_CAPABLE
> + We are saying that this engine is just statement capable to have
> + an engine that can only handle statement-based logging. This is
> + used in testing.
> + HA_REC_NOT_IN_SEQ
> + If we don't set it, filesort crashes, because it assumes rowids are
> + 1..8 byte numbers
> + */
> + return HA_BINLOG_STMT_CAPABLE |
> + HA_REC_NOT_IN_SEQ;
HA_NO_TRANSACTIONS is unset. Is this engine transactional?
HA_PARTIAL_COLUMN_READ is unset. Do you always work with all columns, ignoring read_set and write_set?
HA_TABLE_SCAN_ON_INDEX is unset. Do you store the data MyISAM-style, index and data in separate files?
HA_FAST_KEY_READ is unset. is reading keys in order faster in cassandra, than random reads?
HA_REQUIRE_PRIMARY_KEY is unset. but a primary key is required.
HA_PRIMARY_KEY_IN_READ_INDEX is unset. but as far as I see, primary key columns are always returned (and needed for position())
HA_PRIMARY_KEY_REQUIRED_FOR_POSITION is unset, but a primary key is required for position()
HA_NO_AUTO_INCREMENT is unset. is auto-increment supported?
> +
> + }
> +
> + /** @brief
> + This is a bitmap of flags that indicates how the storage engine
> + implements indexes. The current index flags are documented in
> + handler.h. If you do not implement indexes, just return zero here.
> +
> + @details
> + part is the key part to check. First key part is 0.
> + If all_parts is set, MySQL wants to know the flags for the combined
> + index, up to and including 'part'.
> + */
> + ulong index_flags(uint inx, uint part, bool all_parts) const
> + {
> + return 0;
> + }
> +
> + /** @brief
> + unireg.cc will call max_supported_record_length(), max_supported_keys(),
> + max_supported_key_parts(), uint max_supported_key_length()
> + to make sure that the storage engine can handle the data it is about to
> + send. Return *real* limits of your storage engine here; MySQL will do
> + min(your_limits, MySQL_limits) automatically.
> + */
> + uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
> +
> + /* Support only one Primary Key, for now */
> + uint max_supported_keys() const { return 1; }
> + uint max_supported_key_parts() const { return 1; }
> +
> + /** @brief
> + unireg.cc will call this to make sure that the storage engine can handle
> + the data it is about to send. Return *real* limits of your storage engine
> + here; MySQL will do min(your_limits, MySQL_limits) automatically.
> +
> + @details
> + There is no need to implement ..._key_... methods if your engine doesn't
> + support indexes.
> + */
> + uint max_supported_key_length() const { return 16*1024; /* just to return something*/ }
> +
> + int index_init(uint idx, bool sorted);
> +
> + int index_read_map(uchar * buf, const uchar * key,
> + key_part_map keypart_map,
> + enum ha_rkey_function find_flag);
> +
> + /** @brief
> + Called in test_quick_select to determine if indexes should be used.
> + */
> + virtual double scan_time() { return (double) (stats.records+stats.deleted) / 20.0+10; }
does that make any sense? (I understand that it's copy-paste)
stats.deleted, for example, is never set.
> +
> + /** @brief
> + This method will never be called if you do not implement indexes.
> + */
> + virtual double read_time(uint, uint, ha_rows rows)
> + { return (double) rows / 20.0+1; }
same question
> +
> + virtual void start_bulk_insert(ha_rows rows);
> + virtual int end_bulk_insert();
> +
> + virtual int reset();
> +
> +
> + int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
> + uint n_ranges, uint mode, HANDLER_BUFFER *buf);
> + int multi_range_read_next(range_id_t *range_info);
> + ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
> + void *seq_init_param,
> + uint n_ranges, uint *bufsz,
> + uint *flags, COST_VECT *cost);
> + ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
> + uint key_parts, uint *bufsz,
> + uint *flags, COST_VECT *cost);
> + int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size);
> +
> +private:
> + bool source_exhausted;
> + bool mrr_start_read();
> + int check_field_options(Field **fields);
> + int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
> + String *valcol, char **freenames);
> + int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals);
> + void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
> + char *free_names);
> + CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
> + int cass_name_length);
> +public:
> +
> + /*
> + Everything below are methods that we implement in ha_example.cc.
may be it'd be better to remove the references to ha_example.cc (and example
in general) before pushing this into the main tree?
> +
> + Most of these methods are not obligatory, skip them and
> + MySQL will treat them as not implemented
> + */
> + /** @brief
> + We implement this in ha_example.cc; it's a required method.
> + */
> + int open(const char *name, int mode, uint test_if_locked); // required
> +
> + /** @brief
> + We implement this in ha_example.cc; it's a required method.
> + */
> + int close(void); // required
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int write_row(uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int update_row(const uchar *old_data, uchar *new_data);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int delete_row(const uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_next(uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_prev(uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_first(uchar *buf);
why to have unimplemented methods here?
I'd rather remove them.
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_last(uchar *buf);
> +
> + /** @brief
> + Unlike index_init(), rnd_init() can be called two consecutive times
> + without rnd_end() in between (it only makes sense if scan=1). In this
> + case, the second call should prepare for the new table scan (e.g if
> + rnd_init() allocates the cursor, the second call should position the
> + cursor to the start of the table; no need to deallocate and allocate
> + it again. This is a required method.
> + */
> + int rnd_init(bool scan); //required
> + int rnd_end();
> + int rnd_next(uchar *buf); ///< required
> + int rnd_pos(uchar *buf, uchar *pos); ///< required
> + void position(const uchar *record); ///< required
> + int info(uint); ///< required
> + int extra(enum ha_extra_function operation);
> + int external_lock(THD *thd, int lock_type); ///< required
> + int delete_all_rows(void);
> + ha_rows records_in_range(uint inx, key_range *min_key,
> + key_range *max_key);
> + int delete_table(const char *from);
> + int create(const char *name, TABLE *form,
> + HA_CREATE_INFO *create_info); ///< required
> + bool check_if_incompatible_data(HA_CREATE_INFO *info,
> + uint table_changes);
> +
> + THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
> + enum thr_lock_type lock_type); ///< required
> +};
>
> === added file 'storage/cassandra/ha_cassandra.cc'
> --- storage/cassandra/ha_cassandra.cc 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/ha_cassandra.cc 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,2674 @@
> +/*
> + Copyright (c) 2012, Monty Program Ab
> +
> + This program is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; version 2 of the License.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program; if not, write to the Free Software
> + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
> +
> +#ifdef USE_PRAGMA_IMPLEMENTATION
> +#pragma implementation // gcc: Class implementation
> +#endif
> +
> +#include <mysql/plugin.h>
> +#include "ha_cassandra.h"
> +#include "sql_class.h"
> +
> +#define DYNCOL_USUAL 20
> +#define DYNCOL_DELTA 100
> +#define DYNCOL_USUAL_REC 1024
> +#define DYNCOL_DELTA_REC 1024
> +
> +static handler *cassandra_create_handler(handlerton *hton,
> + TABLE_SHARE *table,
> + MEM_ROOT *mem_root);
> +
> +extern int dynamic_column_error_message(enum_dyncol_func_result rc);
> +
> +handlerton *cassandra_hton;
> +
> +
> +/*
> + Hash used to track the number of open tables; variable for example share
> + methods
> +*/
> +static HASH cassandra_open_tables;
> +
> +/* The mutex used to init the hash; variable for example share methods */
> +mysql_mutex_t cassandra_mutex;
> +
> +
> +/**
> + Structure for CREATE TABLE options (table options).
> + It needs to be called ha_table_option_struct.
> +
> + The option values can be specified in the CREATE TABLE at the end:
> + CREATE TABLE ( ... ) *here*
> +*/
> +
> +struct ha_table_option_struct
> +{
> + const char *thrift_host;
> + int thrift_port;
> + const char *keyspace;
> + const char *column_family;
> +};
> +
> +
> +ha_create_table_option cassandra_table_option_list[]=
> +{
> + /*
> + one option that takes an arbitrary string
> + */
> + HA_TOPTION_STRING("thrift_host", thrift_host),
> + HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
> + HA_TOPTION_STRING("keyspace", keyspace),
> + HA_TOPTION_STRING("column_family", column_family),
> + HA_TOPTION_END
> +};
> +
> +/**
> + Structure for CREATE TABLE options (field options).
> +*/
> +
> +struct ha_field_option_struct
> +{
> + bool dyncol_field;
> +};
> +
> +ha_create_table_option cassandra_field_option_list[]=
> +{
> + /*
> + Collect all other columns as dynamic here,
> + the valid values are YES/NO, ON/OFF, 1/0.
> + The default is 0, that is true, yes, on.
really? looks like a typo. 0 is false, 1 is true.
> + */
> + HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
> + HA_FOPTION_END
> +};
> +
> +static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
> + "Number of rows in an INSERT batch",
> + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
> + "Number of rows in a multiget(MRR) batch",
> + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
> + "Number of rows in an rnd_read (full scan) batch",
> + NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
> + "Number of times to retry Cassandra calls that failed due to timeouts or "
> + "network communication problems. The default, 0, means not to retry.",
> + NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);
> +
> +/* These match values in enum_cassandra_consistency_level */
> +const char *cassandra_consistency_level[] =
> +{
> + "ONE",
> + "QUORUM",
> + "LOCAL_QUORUM",
> + "EACH_QUORUM",
> + "ALL",
> + "ANY",
> + "TWO",
> + "THREE",
> + NullS
> +};
> +
> +TYPELIB cassandra_consistency_level_typelib= {
> + array_elements(cassandra_consistency_level) - 1, "",
> + cassandra_consistency_level, NULL
> +};
> +
> +
> +static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
> + "Cassandra consistency level to use for write operations", NULL, NULL,
> + ONE, &cassandra_consistency_level_typelib);
> +
> +static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
> + "Cassandra consistency level to use for read operations", NULL, NULL,
> + ONE, &cassandra_consistency_level_typelib);
> +
> +
> +mysql_mutex_t cassandra_default_host_lock;
> +static char* cassandra_default_thrift_host = NULL;
> +static char cassandra_default_host_buf[256]="";
> +
> +static void
> +cassandra_default_thrift_host_update(THD *thd,
> + struct st_mysql_sys_var* var,
> + void* var_ptr, /*!< out: where the
> + formal string goes */
> + const void* save) /*!< in: immediate result
> + from check function */
> +{
> + const char *new_host= *((char**)save);
> + const size_t max_len= sizeof(cassandra_default_host_buf);
> +
> + mysql_mutex_lock(&cassandra_default_host_lock);
> +
> + if (new_host)
> + {
> + strncpy(cassandra_default_host_buf, new_host, max_len-1);
> + cassandra_default_host_buf[max_len-1]= 0;
> + cassandra_default_thrift_host= cassandra_default_host_buf;
> + }
> + else
> + {
> + cassandra_default_host_buf[0]= 0;
> + cassandra_default_thrift_host= NULL;
> + }
> +
> + *((const char**)var_ptr)= cassandra_default_thrift_host;
> +
> + mysql_mutex_unlock(&cassandra_default_host_lock);
> +}
> +
> +
> +static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
> + PLUGIN_VAR_RQCMDARG,
> + "Default host for Cassandra thrift connections",
> + /*check*/NULL,
> + cassandra_default_thrift_host_update,
> + /*default*/NULL);
> +
> +static struct st_mysql_sys_var* cassandra_system_variables[]= {
> + MYSQL_SYSVAR(insert_batch_size),
> + MYSQL_SYSVAR(multiget_batch_size),
> + MYSQL_SYSVAR(rnd_batch_size),
> +
> + MYSQL_SYSVAR(default_thrift_host),
> + MYSQL_SYSVAR(write_consistency),
> + MYSQL_SYSVAR(read_consistency),
> + MYSQL_SYSVAR(failure_retries),
> + NULL
> +};
> +
> +
> +static SHOW_VAR cassandra_status_variables[]= {
> + {"row_inserts",
> + (char*) &cassandra_counters.row_inserts, SHOW_LONG},
> + {"row_insert_batches",
> + (char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
> +
> + {"multiget_reads",
> + (char*) &cassandra_counters.multiget_reads, SHOW_LONG},
> + {"multiget_keys_scanned",
> + (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
> + {"multiget_rows_read",
> + (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
> +
> + {"timeout_exceptions",
> + (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
> + {"unavailable_exceptions",
> + (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
> + {NullS, NullS, SHOW_LONG}
> +};
> +
> +
> +Cassandra_status_vars cassandra_counters;
> +Cassandra_status_vars cassandra_counters_copy;
> +
> +
> +/**
> + @brief
> + Function we use in the creation of our hash to get key.
> +*/
> +
> +static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
> + my_bool not_used __attribute__((unused)))
> +{
> + *length=share->table_name_length;
> + return (uchar*) share->table_name;
> +}
> +
> +#ifdef HAVE_PSI_INTERFACE
> +static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;
> +
> +static PSI_mutex_info all_cassandra_mutexes[]=
> +{
> + { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
> + { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
> +};
> +
> +static void init_cassandra_psi_keys()
> +{
> + const char* category= "cassandra";
> + int count;
> +
> + if (PSI_server == NULL)
> + return;
> +
> + count= array_elements(all_cassandra_mutexes);
> + PSI_server->register_mutex(category, all_cassandra_mutexes, count);
> +}
> +#endif
> +
> +static int cassandra_init_func(void *p)
> +{
> + DBUG_ENTER("cassandra_init_func");
> +
> +#ifdef HAVE_PSI_INTERFACE
> + init_cassandra_psi_keys();
> +#endif
> +
> + cassandra_hton= (handlerton *)p;
> + mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
> + (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0,
> + (my_hash_get_key) cassandra_get_key,0,0);
> +
> + cassandra_hton->state= SHOW_OPTION_YES;
> + cassandra_hton->create= cassandra_create_handler;
> + /*
> + Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
> + TABLE to create an *empty* table from scratch. Cassandra table won't be
> + emptied if re-created.
> + */
HTON_ALTER_NOT_SUPPORTED is not set. ALTER works?
HTON_TEMPORARY_NOT_SUPPORTED is not set. CREATE TEMPORARY TABLE works?
HTON_NO_PARTITION is not set. Partitioning works?
> + cassandra_hton->flags= 0;
> + cassandra_hton->table_options= cassandra_table_option_list;
> + cassandra_hton->field_options= cassandra_field_option_list;
> +
> + mysql_mutex_init(0 /* no instrumentation */,
why no instrumentation ?
> + &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
> +
> + DBUG_RETURN(0);
> +}
> +
> +
> +static int cassandra_done_func(void *p)
> +{
> + int error= 0;
> + DBUG_ENTER("cassandra_done_func");
> + if (cassandra_open_tables.records)
> + error= 1;
> + my_hash_free(&cassandra_open_tables);
> + mysql_mutex_destroy(&cassandra_mutex);
> + mysql_mutex_destroy(&cassandra_default_host_lock);
> + DBUG_RETURN(error);
> +}
> +
> +
> +/**
> + @brief
> + Example of simple lock controls. The "share" it creates is a
> + structure we will pass to each cassandra handler. Do you have to have
> + one of these? Well, you have pieces that are used for locking, and
> + they are needed to function.
> +*/
> +
> +static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
> +{
> + CASSANDRA_SHARE *share;
> + uint length;
> + char *tmp_name;
> +
> + mysql_mutex_lock(&cassandra_mutex);
> + length=(uint) strlen(table_name);
> +
> + if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
> + (uchar*) table_name,
> + length)))
> + {
> + if (!(share=(CASSANDRA_SHARE *)
> + my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
> + &share, sizeof(*share),
> + &tmp_name, length+1,
> + NullS)))
> + {
> + mysql_mutex_unlock(&cassandra_mutex);
> + return NULL;
> + }
> +
> + share->use_count=0;
> + share->table_name_length=length;
> + share->table_name=tmp_name;
> + strmov(share->table_name,table_name);
> + if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
> + goto error;
> + thr_lock_init(&share->lock);
> + mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
> + &share->mutex, MY_MUTEX_INIT_FAST);
> + }
> + share->use_count++;
> + mysql_mutex_unlock(&cassandra_mutex);
> +
> + return share;
> +
> +error:
> + mysql_mutex_destroy(&share->mutex);
> + my_free(share);
> +
> + return NULL;
> +}
> +
> +
> +/**
> + @brief
> + Free lock controls. We call this whenever we close a table. If the table had
> + the last reference to the share, then we free memory associated with it.
> +*/
> +
> +static int free_share(CASSANDRA_SHARE *share)
> +{
> + mysql_mutex_lock(&cassandra_mutex);
> + if (!--share->use_count)
> + {
> + my_hash_delete(&cassandra_open_tables, (uchar*) share);
> + thr_lock_delete(&share->lock);
> + mysql_mutex_destroy(&share->mutex);
> + my_free(share);
> + }
> + mysql_mutex_unlock(&cassandra_mutex);
> +
> + return 0;
> +}
> +
> +
> +static handler* cassandra_create_handler(handlerton *hton,
> + TABLE_SHARE *table,
> + MEM_ROOT *mem_root)
> +{
> + return new (mem_root) ha_cassandra(hton, table);
> +}
> +
> +
> +ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
> + :handler(hton, table_arg),
> + se(NULL), field_converters(NULL),
> + special_type_field_converters(NULL),
> + special_type_field_names(NULL), n_special_type_fields(0),
> + rowkey_converter(NULL),
> + dyncol_field(0), dyncol_set(0)
> +{}
> +
> +
> +static const char *ha_cassandra_exts[] = {
> + NullS
> +};
> +
> +const char **ha_cassandra::bas_ext() const
> +{
> + return ha_cassandra_exts;
> +}
> +
> +
> +int ha_cassandra::connect_and_check_options(TABLE *table_arg)
> +{
> + ha_table_option_struct *options= table_arg->s->option_struct;
> + int res;
> + DBUG_ENTER("ha_cassandra::connect_and_check_options");
> +
> + if ((res= check_field_options(table_arg->s->field)) ||
> + (res= check_table_options(options)))
> + DBUG_RETURN(res);
> +
> + se= create_cassandra_se();
> + se->set_column_family(options->column_family);
> + const char *thrift_host= options->thrift_host? options->thrift_host:
> + cassandra_default_thrift_host;
> + if (se->connect(thrift_host, options->thrift_port, options->keyspace))
> + {
> + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
> + DBUG_RETURN(HA_ERR_NO_CONNECTION);
> + }
> +
> + if (setup_field_converters(table_arg->field, table_arg->s->fields))
> + {
> + DBUG_RETURN(HA_ERR_NO_CONNECTION);
> + }
> +
> + DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::check_field_options(Field **fields)
> +{
> + Field **field;
> + uint i;
> + DBUG_ENTER("ha_cassandra::check_field_options");
> + for (field= fields, i= 0; *field; field++, i++)
> + {
> + ha_field_option_struct *field_options= (*field)->option_struct;
> + if (field_options && field_options->dyncol_field)
> + {
> + if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
> + {
> + my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
> + DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> + }
> + dyncol_set= 1;
> + dyncol_field= i;
> + bzero(&dynamic_values, sizeof(dynamic_values));
> + bzero(&dynamic_names, sizeof(dynamic_names));
> + bzero(&dynamic_rec, sizeof(dynamic_rec));
> + }
> + }
> + DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
> +{
> + DBUG_ENTER("ha_cassandra::open");
> +
> + if (!(share = get_share(name, table)))
> + DBUG_RETURN(1);
> + thr_lock_data_init(&share->lock,&lock,NULL);
> +
> + DBUG_ASSERT(!se);
> + /*
> + Don't do the following on open: it prevents SHOW CREATE TABLE when the server
> + has gone away.
> + */
> + /*
> + int res;
> + if ((res= connect_and_check_options(table)))
> + {
> + DBUG_RETURN(res);
> + }
> + */
> +
> + info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
> + insert_lineno= 0;
> +
> + DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::close(void)
> +{
> + DBUG_ENTER("ha_cassandra::close");
> + delete se;
> + se= NULL;
> + free_field_converters();
> + DBUG_RETURN(free_share(share));
> +}
> +
> +
> +int ha_cassandra::check_table_options(ha_table_option_struct *options)
> +{
> + if (!options->thrift_host && (!cassandra_default_thrift_host ||
> + !cassandra_default_thrift_host[0]))
> + {
> + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
> + "thrift_host table option must be specified, or "
> + "@@cassandra_default_thrift_host must be set");
> + return HA_WRONG_CREATE_OPTION;
> + }
> +
> + if (!options->keyspace || !options->column_family)
> + {
> + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
> + "keyspace and column_family table options must be specified");
> + return HA_WRONG_CREATE_OPTION;
> + }
> + return 0;
> +}
> +
> +
> +/**
> + @brief
> + create() is called to create a table. The variable name will have the name
> + of the table.
> +
> + @details
> + When create() is called you do not need to worry about
> + opening the table. Also, the .frm file will have already been
> + created so adjusting create_info is not necessary. You can overwrite
> + the .frm file at this point if you wish to change the table
> + definition, but there are no methods currently provided for doing
> + so.
> +
> + Called from handle.cc by ha_create_table().
> +
> + @see
> + ha_create_table() in handle.cc
> +*/
> +
> +int ha_cassandra::create(const char *name, TABLE *table_arg,
> + HA_CREATE_INFO *create_info)
> +{
> + int res;
> + DBUG_ENTER("ha_cassandra::create");
> +
> + Field **pfield= table_arg->s->field;
> + if (!((*pfield)->flags & NOT_NULL_FLAG))
> + {
> + my_error(ER_WRONG_COLUMN_NAME, MYF(0), "First column must be NOT NULL");
> + DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> + }
this is unnecessary. the second check guarantees that the first column is
NOT NULL, that is, this if() is redundant
> +
> + if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
> + table_arg->key_info[0].key_parts != 1 ||
> + table_arg->key_info[0].key_part[0].fieldnr != 1)
> + {
> + my_error(ER_WRONG_COLUMN_NAME, MYF(0),
> + "Table must have PRIMARY KEY defined over the first column");
> + DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> + }
...
> +CASSANDRA_TYPE get_cassandra_type(const char *validator)
> +{
> + CASSANDRA_TYPE rc;
> + switch(validator[32])
> + {
> + case 'L':
> + rc= CT_BIGINT;
> + break;
> + case 'I':
> + rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
> + rc= CT_INT;
> + break;
> + case 'C':
> + rc= CT_COUNTER;
> + break;
> + case 'F':
> + rc= CT_FLOAT;
> + break;
> + case 'D':
> + switch (validator[33])
> + {
> + case 'o':
> + rc= CT_DOUBLE;
> + break;
> + case 'a':
> + rc= CT_TIMESTAMP;
> + break;
> + case 'e':
> + rc= CT_DECIMAL;
> + break;
> + default:
> + rc= CT_BLOB;
> + break;
> + }
> + break;
> + case 'B':
> + rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
> + break;
> + case 'A':
> + rc= CT_ASCII;
> + break;
> + case 'U':
> + rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
> + break;
> + default:
> + rc= CT_BLOB;
> + }
> + DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
> + return rc;
> +}
> +
> +ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
> +{
> + ColumnDataConverter *res= NULL;
> +
> + switch(field->type()) {
> + case MYSQL_TYPE_TINY:
> + if (!strcmp(validator_name, validator_boolean))
why do you strcmp here, while you're only check selected characters in
get_cassandra_type() ?
you could've called get_cassandra_type() for validator_name instead
and compare enum values, instead of strings.
> + {
> + res= new TinyintDataConverter;
> + break;
> + }
> + /* fall through: */
> + case MYSQL_TYPE_SHORT:
> + case MYSQL_TYPE_LONGLONG:
> + {
> + bool is_counter= false;
> + if (!strcmp(validator_name, validator_bigint) ||
> + !strcmp(validator_name, validator_timestamp) ||
> + (is_counter= !strcmp(validator_name, validator_counter)))
> + res= new BigintDataConverter(!is_counter);
> + break;
> + }
> + case MYSQL_TYPE_FLOAT:
> + if (!strcmp(validator_name, validator_float))
> + res= new FloatDataConverter;
> + break;
> +
> + case MYSQL_TYPE_DOUBLE:
> + if (!strcmp(validator_name, validator_double))
> + res= new DoubleDataConverter;
> + break;
> +
> + case MYSQL_TYPE_TIMESTAMP:
> + if (!strcmp(validator_name, validator_timestamp))
> + res= new TimestampDataConverter;
> + break;
> +
> + case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
> + if (!strcmp(validator_name, validator_uuid) &&
> + field->real_type() == MYSQL_TYPE_STRING &&
> + field->field_length == 36)
> + {
> + // UUID maps to CHAR(36), its text representation
> + res= new UuidDataConverter;
> + break;
> + }
> + /* fall through: */
> + case MYSQL_TYPE_VAR_STRING:
> + case MYSQL_TYPE_VARCHAR:
> + {
> + /*
> + Cassandra's "varint" type is a binary-encoded arbitary-length
> + big-endian number.
> + - It can be mapped to VARBINARY(N), with sufficiently big N.
> + - If the value does not fit into N bytes, it is an error. We should not
> + truncate it, because that is just as good as returning garbage.
> + - varint should not be mapped to BINARY(N), because BINARY(N) values
> + are zero-padded, which will work as multiplying the value by
> + 2^k for some value of k.
> + */
> + if (field->type() == MYSQL_TYPE_VARCHAR &&
> + field->binary() &&
> + (!strcmp(validator_name, validator_varint) ||
> + !strcmp(validator_name, validator_decimal)))
> + {
> + res= new StringCopyConverter(field->field_length);
> + break;
> + }
> +
> + if (!strcmp(validator_name, validator_blob) ||
> + !strcmp(validator_name, validator_ascii) ||
> + !strcmp(validator_name, validator_text))
> + {
> + res= new StringCopyConverter((size_t)-1);
> + }
> + break;
> + }
> + case MYSQL_TYPE_LONG:
> + if (!strcmp(validator_name, validator_int))
> + res= new Int32DataConverter;
> + break;
> +
> + default:;
> + }
> + return res;
> +}
> +
> +
> +bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
> +{
> + char *col_name;
> + int col_name_len;
> + char *col_type;
> + int col_type_len;
> + size_t ddl_fields= se->get_ddl_size();
> + const char *default_type= se->get_default_validator();
> + uint max_non_default_fields;
> + DBUG_ENTER("ha_cassandra::setup_field_converters");
> + DBUG_ASSERT(default_type);
> +
> + DBUG_ASSERT(!field_converters);
> + DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
> +
> + /*
> + We always should take into account that in case of using dynamic columns
> + sql description contain one field which does not described in
> + Cassandra DDL also key field is described separately. So that
> + is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
> + */
> + max_non_default_fields= ddl_fields + 2 - n_fields;
> + if (ddl_fields < (n_fields - dyncol_set - 1))
> + {
> + se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + /* allocate memory in one chunk */
> + size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
> + (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
> + (dyncol_set ? max_non_default_fields : 0);
> + if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
> + DBUG_RETURN(true);
> + bzero(field_converters, memsize);
> + n_field_converters= n_fields;
> +
> + if (dyncol_set)
> + {
> + special_type_field_converters=
> + (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
> + special_type_field_names=
> + ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
> + }
> +
> + if (dyncol_set)
> + {
> + if (init_dynamic_array(&dynamic_values,
> + sizeof(DYNAMIC_COLUMN_VALUE),
> + DYNCOL_USUAL, DYNCOL_DELTA))
> + DBUG_RETURN(true);
> + else
> + if (init_dynamic_array(&dynamic_names,
> + sizeof(LEX_STRING),
> + DYNCOL_USUAL, DYNCOL_DELTA))
> + {
> + delete_dynamic(&dynamic_values);
> + DBUG_RETURN(true);
> + }
> + else
> + if (init_dynamic_string(&dynamic_rec, NULL,
> + DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
> + {
> + delete_dynamic(&dynamic_values);
> + delete_dynamic(&dynamic_names);
> + DBUG_RETURN(true);
> + }
> +
> + /* Dynamic column field has special processing */
> + field_converters[dyncol_field]= NULL;
> +
> + default_type_def= cassandra_types + get_cassandra_type(default_type);
> + }
> +
> + se->first_ddl_column();
> + uint n_mapped= 0;
> + while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
> + &col_type_len))
> + {
> + Field **field;
> + uint i;
> + /* Mapping for the 1st field is already known */
> + for (field= field_arg + 1, i= 1; *field; field++, i++)
> + {
> + if ((!dyncol_set || dyncol_field != i) &&
> + !strcmp((*field)->field_name, col_name))
> + {
> + n_mapped++;
> + ColumnDataConverter **conv= field_converters + (*field)->field_index;
> + if (!(*conv= map_field_to_validator(*field, col_type)))
> + {
> + se->print_error("Failed to map column %s to datatype %s",
> + (*field)->field_name, col_type);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> + (*conv)->field= *field;
> + }
> + }
> + if (dyncol_set && !(*field)) // is needed and not found
> + {
> + DBUG_PRINT("info",("Field not found: %s", col_name));
> + if (strcmp(col_type, default_type))
> + {
> + DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
> + col_name, col_type));
> + special_type_field_names[n_special_type_fields].length= col_name_len;
> + special_type_field_names[n_special_type_fields].str= col_name;
> + special_type_field_converters[n_special_type_fields]=
> + cassandra_types[get_cassandra_type(col_type)];
> + n_special_type_fields++;
> + }
> + }
> + }
> +
> + if (n_mapped != n_fields - 1 - dyncol_set)
> + {
> + Field *first_unmapped= NULL;
> + /* Find the first field */
> + for (uint i= 1; i < n_fields;i++)
> + {
> + if (!field_converters[i])
> + {
> + first_unmapped= field_arg[i];
> + break;
> + }
> + }
> + DBUG_ASSERT(first_unmapped);
> +
> + se->print_error("Field `%s` could not be mapped to any field in Cassandra",
generally (here and everywhere) use %`s instead of `%s`
> + first_unmapped->field_name);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + /*
> + Setup type conversion for row_key.
> + */
> + se->get_rowkey_type(&col_name, &col_type);
> + if (col_name && strcmp(col_name, (*field_arg)->field_name))
> + {
> + se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
> + col_name);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> + if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
> + {
> + se->print_error("target column family has no key_alias defined, "
> + "PRIMARY KEY column must be named 'rowkey'");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + if (col_type != NULL)
> + {
> + if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
> + {
> + se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> + rowkey_converter->field= *field_arg;
> + }
> + else
> + {
> + se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + DBUG_RETURN(false);
> +}
> +
> +
> +void ha_cassandra::free_field_converters()
> +{
> + delete rowkey_converter;
> + rowkey_converter= NULL;
> +
> + if (dyncol_set)
> + {
> + delete_dynamic(&dynamic_values);
> + delete_dynamic(&dynamic_names);
> + dynstr_free(&dynamic_rec);
> + }
> + if (field_converters)
> + {
> + for (uint i=0; i < n_field_converters; i++)
> + if (field_converters[i])
> + {
> + DBUG_ASSERT(!dyncol_set || i == dyncol_field);
> + delete field_converters[i];
> + }
> + my_free(field_converters);
> + field_converters= NULL;
> + }
> +}
> +
> +
> +int ha_cassandra::index_init(uint idx, bool sorted)
> +{
> + int ires;
> + if (!se && (ires= connect_and_check_options(table)))
> + return ires;
> + return 0;
> +}
> +
> +void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
> +
> +int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
> + key_part_map keypart_map,
> + enum ha_rkey_function find_flag)
> +{
> + int rc= 0;
> + DBUG_ENTER("ha_cassandra::index_read_map");
> +
> + if (find_flag != HA_READ_KEY_EXACT)
> + DBUG_RETURN(HA_ERR_WRONG_COMMAND);
did you verify that the server expects HA_ERR_WRONG_COMMAND from
this method and correctly handles it?
or, perhaps, find_flag is always HA_READ_KEY_EXACT here,
because of your index_flags() ?
> +
> + uint key_len= calculate_key_len(table, active_index, key, keypart_map);
> + store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
> +
> + char *cass_key;
> + int cass_key_len;
> + my_bitmap_map *old_map;
> +
> + old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> + if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
> + {
> + /* We get here when making lookups like uuid_column='not-an-uuid' */
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> + DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
> + }
> +
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> + bool found;
> + if (se->get_slice(cass_key, cass_key_len, &found))
> + {
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
generally it's better not to use my_error() directly,
but only return an error code, and return your se->error_str()
from ha_cassandra::get_error_message()
> + rc= HA_ERR_INTERNAL_ERROR;
> + }
> +
> + /* TODO: what if we're not reading all columns?? */
> + if (!found)
> + rc= HA_ERR_KEY_NOT_FOUND;
> + else
> + rc= read_cassandra_columns(false);
> +
> + DBUG_RETURN(rc);
> +}
> +
> +
> +void ha_cassandra::print_conversion_error(const char *field_name,
> + char *cass_value,
> + int cass_value_len)
> +{
> + char buf[32];
> + char *p= cass_value;
> + size_t i= 0;
> + for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
why do you cast here?
> + {
> + buf[i++]= map2number[(*p >> 4) & 0xF];
> + buf[i++]= map2number[*p & 0xF];
> + }
> + buf[i]=0;
> +
> + se->print_error("Unable to convert value for field `%s` from Cassandra's data"
> + " format. Source data is %d bytes, 0x%s%s",
> + field_name, cass_value_len, buf,
> + (i == sizeof(buf) - 1)? "..." : "");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +}
> +
> +
> +void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num)
> +{
> + for (uint i= 0; i < num; i++)
> + if (vals[i].type == DYN_COL_STRING &&
> + !vals[i].x.string.nonfreeable)
> + my_free(vals[i].x.string.value.str);
> +}
> +
> +
> +CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
> + int cass_name_len)
> +{
> + CASSANDRA_TYPE_DEF *type= default_type_def;
> + for(uint i= 0; i < n_special_type_fields; i++)
> + {
> + if (cass_name_len == (int)special_type_field_names[i].length &&
> + memcmp(cass_name, special_type_field_names[i].str,
> + cass_name_len) == 0)
> + {
> + type= special_type_field_converters + i;
> + break;
> + }
> + }
> + return type;
> +}
> +
> +int ha_cassandra::read_cassandra_columns(bool unpack_pk)
> +{
> + char *cass_name;
> + char *cass_value;
> + int cass_value_len, cass_name_len;
> + Field **field;
> + int res= 0;
> + ulong total_name_len= 0;
> +
> + /*
> + cassandra_to_mariadb() calls will use field->store(...) methods, which
> + require that the column is in the table->write_set
> + */
> + my_bitmap_map *old_map;
> + old_map= dbug_tmp_use_all_columns(table, table->write_set);
> +
> + /* Start with all fields being NULL */
> + for (field= table->field + 1; *field; field++)
> + (*field)->set_null();
> +
> + while (!se->get_next_read_column(&cass_name, &cass_name_len,
> + &cass_value, &cass_value_len))
> + {
> + // map to our column. todo: use hash or something..
> + bool found= 0;
> + for (field= table->field + 1; *field; field++)
> + {
> + uint fieldnr= (*field)->field_index;
> + if ((!dyncol_set || dyncol_field != fieldnr) &&
> + !strcmp((*field)->field_name, cass_name))
> + {
> + found= 1;
> + (*field)->set_notnull();
> + if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
> + cass_value_len))
> + {
> + print_conversion_error((*field)->field_name, cass_value,
> + cass_value_len);
> + res=1;
> + goto err;
> + }
> + break;
> + }
> + }
> + if (dyncol_set && !found)
> + {
> + DYNAMIC_COLUMN_VALUE val;
> + LEX_STRING nm;
> + CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
> + cass_name_len);
> + nm.str= cass_name;
> + nm.length= cass_name_len;
> + if (nm.length > MAX_NAME_LENGTH)
> + {
> + se->print_error("Unable to convert value for field `%s`"
> + " from Cassandra's data format. Name"
> + " length exceed limit of %u: '%s'",
> + table->field[dyncol_field]->field_name,
> + (uint)MAX_NAME_LENGTH, cass_name);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + res=1;
> + goto err;
> + }
> + total_name_len+= cass_name_len;
> + if (nm.length > MAX_TOTAL_NAME_LENGTH)
> + {
> + se->print_error("Unable to convert value for field `%s`"
> + " from Cassandra's data format. Sum of all names"
> + " length exceed limit of %lu",
> + table->field[dyncol_field]->field_name,
> + cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + res=1;
> + goto err;
> + }
> +
> + if ((res= (*(type->cassandra_to_dynamic))(cass_value,
> + cass_value_len, &val)) ||
> + insert_dynamic(&dynamic_names, (uchar *) &nm) ||
> + insert_dynamic(&dynamic_values, (uchar *) &val))
> + {
> + if (res)
> + {
> + print_conversion_error(cass_name, cass_value, cass_value_len);
> + }
> + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
> + dynamic_values.elements);
> + // EOM shouldm be already reported if happened
> + res=1;
> + goto err;
> + }
> + }
> + }
> +
> + dynamic_rec.length= 0;
> + if (dyncol_set)
> + {
> + if (dynamic_column_create_many_internal_fmt(&dynamic_rec,
> + dynamic_names.elements,
> + dynamic_names.buffer,
> + (DYNAMIC_COLUMN_VALUE *)
> + dynamic_values.buffer,
> + FALSE,
> + TRUE) < 0)
> + dynamic_rec.length= 0;
> +
> + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
> + dynamic_values.elements);
> + dynamic_values.elements= dynamic_names.elements= 0;
> + }
> + if (dyncol_set)
why a separate if() ?
> + {
> + if (dynamic_rec.length == 0)
> + table->field[dyncol_field]->set_null();
> + else
> + {
> + Field_blob *blob= (Field_blob *)table->field[dyncol_field];
> + blob->set_notnull();
> + blob->store_length(dynamic_rec.length);
> + *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
> + dynamic_rec.str;
> + }
> + }
> +
> + if (unpack_pk)
> + {
> + /* Unpack rowkey to primary key */
> + field= table->field;
> + (*field)->set_notnull();
> + se->get_read_rowkey(&cass_value, &cass_value_len);
> + if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
> + {
> + print_conversion_error((*field)->field_name, cass_value, cass_value_len);
> + res=1;
> + goto err;
> + }
> + }
> +
> +err:
> + dbug_tmp_restore_column_map(table->write_set, old_map);
> + return res;
> +}
...
> +int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::rnd_pos");
> +
> + int save_active_index= active_index;
> + active_index= 0; /* The primary key */
> + rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);
> +
> + active_index= save_active_index;
a bit nicer would be to implement index_read_map_idx
and call it from here and from index_read_map
> +
> + DBUG_RETURN(rc);
> +}
> +
...
> +bool ha_cassandra::mrr_start_read()
> +{
> + uint key_len;
> +
> + my_bitmap_map *old_map;
> + old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> + se->new_lookup_keys();
> +
> + while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
> + {
> + char *cass_key;
> + int cass_key_len;
> +
> + DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
> +
> + uchar *key= (uchar*)mrr_cur_range.start_key.key;
> + key_len= mrr_cur_range.start_key.length;
> + //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
> + store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
> +
> + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
> +
> + // Primitive buffer control
> + if (se->add_lookup_key(cass_key, cass_key_len) >
> + THDVAR(table->in_use, multiget_batch_size))
> + break;
I'd suggest to add a status variable to count
how many times the buffer was refilled
> + }
> +
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> + return se->multiget_slice();
> +}
...
> +/////////////////////////////////////////////////////////////////////////////
> +// Dummy implementations start
> +/////////////////////////////////////////////////////////////////////////////
> +
> +
> +int ha_cassandra::index_next(uchar *buf)
Why did you do all these dummy methods?
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_next");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +
> +int ha_cassandra::index_prev(uchar *buf)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_prev");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +
> +int ha_cassandra::index_first(uchar *buf)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_first");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +int ha_cassandra::index_last(uchar *buf)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_last");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +
> +ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
> + key_range *max_key)
> +{
> + DBUG_ENTER("ha_cassandra::records_in_range");
> + //DBUG_RETURN(10); // low number to force index usage
> + DBUG_RETURN(HA_POS_ERROR);
> +}
> +
> +
> +class Column_name_enumerator_impl : public Column_name_enumerator
> +{
> + ha_cassandra *obj;
> + uint idx;
> +public:
> + Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
> + const char* get_next_name()
> + {
> + if (idx == obj->table->s->fields)
> + return NULL;
> + else
> + return obj->table->field[idx++]->field_name;
> + }
> +};
> +
> +
> +int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
> +{
> + DYNAMIC_ARRAY oldvals, oldnames, vals, names;
> + String oldvalcol, valcol;
> + char *oldfree_names= NULL, *free_names= NULL;
> + my_bitmap_map *old_map;
> + int res;
> + DBUG_ENTER("ha_cassandra::update_row");
> + /* Currently, it is guaranteed that new_data == table->record[0] */
> + DBUG_ASSERT(new_data == table->record[0]);
> + /* For now, just rewrite the full record */
> + se->clear_insert_buffer();
> +
> + old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> + char *old_key;
> + int old_key_len;
> + se->get_read_rowkey(&old_key, &old_key_len);
> +
> + /* Get the key we're going to write */
> + char *new_key;
> + int new_key_len;
> + if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
> + {
> + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
> + rowkey_converter->field->field_name, insert_lineno);
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
> + }
> +
> + /*
> + Compare it to the key we've read. For all types that Cassandra supports,
> + binary byte-wise comparison can be used
> + */
> + bool new_primary_key;
> + if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
> + new_primary_key= true;
> + else
> + new_primary_key= false;
> +
> + if (dyncol_set)
> + {
> + Field *field= table->field[dyncol_field];
> + /* move to get old_data */
> + my_ptrdiff_t diff;
> + diff= (my_ptrdiff_t) (old_data - new_data);
> + field->move_field_offset(diff); // Points now at old_data
> + if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names)))
> + DBUG_RETURN(res);
> + field->move_field_offset(-diff); // back to new_data
> + if ((res= read_dyncol(&vals, &names, &valcol, &free_names)))
> + {
> + free_dynamic_row(&oldnames, &oldvals, oldfree_names);
> + DBUG_RETURN(res);
> + }
> + }
> +
> + if (new_primary_key)
> + {
> + /*
> + Primary key value changed. This is essentially a DELETE + INSERT.
> + Add a DELETE operation into the batch
> + */
> + Column_name_enumerator_impl name_enumerator(this);
> + se->add_row_deletion(old_key, old_key_len, &name_enumerator,
> + (LEX_STRING *)oldnames.buffer,
> + (dyncol_set ? oldnames.elements : 0));
> + oldnames.elements= oldvals.elements= 0; // they will be deleted
> + }
> +
> + se->start_row_insert(new_key, new_key_len);
> +
> + /* Convert other fields */
> + for (uint i= 1; i < table->s->fields; i++)
I'd probably would've simply done delete+insert in all cases :)
but as you've went to the troubles of updating column by column,
it really make little sense to rewrite columns that didn't change.
could you check table->write_set here and memcmp with the old
column value before updating?
> + {
> + char *cass_data;
> + int cass_data_len;
> + if (dyncol_set && dyncol_field == i)
> + {
> + DBUG_ASSERT(field_converters[i] == NULL);
> + if ((res= write_dynamic_row(&vals, &names)))
> + goto err;
> + }
> + else
> + {
> + if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
> + {
> + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
> + field_converters[i]->field->field_name, insert_lineno);
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
> + }
> + se->add_insert_column(field_converters[i]->field->field_name, 0,
> + cass_data, cass_data_len);
> + }
> + }
> + if (dyncol_set)
> + {
> + /* find removed fields */
> + uint i= 0, j= 0;
> + LEX_STRING *onames= (LEX_STRING *)oldnames.buffer;
> + LEX_STRING *nnames= (LEX_STRING *)names.buffer;
> + /* both array are sorted */
> + for(; i < oldnames.elements; i++)
> + {
> + int scmp= 0;
> + while (j < names.elements &&
> + (nnames[j].length < onames[i].length ||
> + (nnames[j].length == onames[i].length &&
> + (scmp= memcmp(nnames[j].str, onames[i].str,
> + onames[i].length)) < 0)))
> + j++;
> + if (j < names.elements &&
> + nnames[j].length == onames[i].length &&
> + scmp == 0)
> + j++;
> + else
> + se->add_insert_delete_column(onames[i].str, onames[i].length);
> + }
> + }
> +
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> + res= se->do_insert();
> +
> + if (res)
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +
> +err:
> + if (dyncol_set)
> + {
> + free_dynamic_row(&oldnames, &oldvals, oldfree_names);
> + free_dynamic_row(&names, &vals, free_names);
> + }
> +
> + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
> +}
> +
> +
> +int ha_cassandra::extra(enum ha_extra_function operation)
> +{
> + DBUG_ENTER("ha_cassandra::extra");
> + DBUG_RETURN(0);
> +}
please. why do you like dummy methods that much? :)
> +
> +
> +/* The following function was copied from ha_blackhole::store_lock: */
> +THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
> + THR_LOCK_DATA **to,
> + enum thr_lock_type lock_type)
> +{
> + DBUG_ENTER("ha_cassandra::store_lock");
> + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
> + {
> + /*
> + Here is where we get into the guts of a row level lock.
> + If TL_UNLOCK is set
> + If we are not doing a LOCK TABLE or DISCARD/IMPORT
> + TABLESPACE, then allow multiple writers
> + */
> +
> + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
> + lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
> + && !thd_tablespace_op(thd))
1. tablespace op in cassandra? really? too much copy-paste is confusing.
(here and in the comments too)
2. did you test LOCK TABLES?
> + lock_type = TL_WRITE_ALLOW_WRITE;
that makes all changes to cassanrda immediately visible
by concurrently running threads. okay, if that's intentional,
but it needs to be documented, as it not the SQL semantics.
> +
> + /*
> + In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
> + MySQL would use the lock TL_READ_NO_INSERT on t2, and that
> + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
> + to t2. Convert the lock to a normal read lock to allow
> + concurrent inserts to t2.
> + */
> +
> + if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
> + lock_type = TL_READ;
this also breaks SBR for INSERT ... SELECT. same as above - okay if
intentional, but should be thouroughly documented.
> +
> + lock.type= lock_type;
> + }
> + *to++= &lock;
> + DBUG_RETURN(to);
> +}
> +
> +
> +int ha_cassandra::external_lock(THD *thd, int lock_type)
> +{
> + DBUG_ENTER("ha_cassandra::external_lock");
> + DBUG_RETURN(0);
> +}
> +
> +int ha_cassandra::delete_table(const char *name)
> +{
> + DBUG_ENTER("ha_cassandra::delete_table");
> + /*
> + Cassandra table is just a view. Dropping it doesn't affect the underlying
> + column family.
> + */
> + DBUG_RETURN(0);
> +}
more dummies...
> +
> +
> +/**
> + check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
> + if new and old definition are compatible
> +
> + @details If there are no other explicit signs like changed number of
> + fields this function will be called by compare_tables()
> + (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
> + file.
> +
> +*/
> +
> +bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
> + uint table_changes)
> +{
> + DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
> + /* Checked, we intend to have this empty for Cassandra SE. */
> + DBUG_RETURN(COMPATIBLE_DATA_YES);
> +}
> +
> +
> +/////////////////////////////////////////////////////////////////////////////
> +// Dummy implementations end
> +/////////////////////////////////////////////////////////////////////////////
::update_row and ::store_lock are hardly dummies :)
> +
> +static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
> +{
> + cassandra_counters_copy= cassandra_counters;
> +
> + var->type= SHOW_ARRAY;
> + var->value= (char *) &cassandra_status_variables;
what's the point? If you don't do anything in this function,
you can just as easily list all status variables below
in the func_status[] array.
> + return 0;
> +}
> +
> +
> +struct st_mysql_storage_engine cassandra_storage_engine=
> +{ MYSQL_HANDLERTON_INTERFACE_VERSION };
> +
> +static struct st_mysql_show_var func_status[]=
> +{
> + {"Cassandra", (char *)show_cassandra_vars, SHOW_FUNC},
> + {0,0,SHOW_UNDEF}
> +};
> +
> +maria_declare_plugin(cassandra)
> +{
> + MYSQL_STORAGE_ENGINE_PLUGIN,
> + &cassandra_storage_engine,
> + "CASSANDRA",
> + "Monty Program Ab",
> + "Cassandra storage engine",
> + PLUGIN_LICENSE_GPL,
> + cassandra_init_func, /* Plugin Init */
> + cassandra_done_func, /* Plugin Deinit */
> + 0x0001, /* version number (0.1) */
> + func_status, /* status variables */
> + cassandra_system_variables, /* system variables */
> + "0.1", /* string version */
> + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */
> +}
> +maria_declare_plugin_end;
Regards,
Sergei
2
3

Re: [Maria-developers] [Commits] Rev 3480: Post review changes in the interface part 1.
by Sergei Golubchik 19 Dec '12
by Sergei Golubchik 19 Dec '12
19 Dec '12
Hi, Sanja!
few preliminary comments:
On Dec 19, sanja(a)askmonty.org wrote:
> At file:///home/bell/maria/bzr/work-maria-10.0-cassandra/
> ------------------------------------------------------------
> revno: 3480
> revision-id: sanja(a)askmonty.org-20121219200839-oyxvfz7im9p7wqpw
> parent: sanja(a)montyprogram.com-20121119121604-5h5tu0zn11em0sb3
> committer: sanja(a)askmonty.org
> branch nick: work-maria-10.0-cassandra
> timestamp: Wed 2012-12-19 22:08:39 +0200
> message:
> Post review changes in the interface part 1.
> === modified file 'mysys/ma_dyncol.c'
> --- a/mysys/ma_dyncol.c 2012-09-28 12:27:16 +0000
> +++ b/mysys/ma_dyncol.c 2012-12-19 20:08:39 +0000
> @@ -26,6 +26,39 @@
> SUCH DAMAGE.
> */
>
> +/*
> + Numeric format:
> + ===============
> + * Fixed header part
> + 1 byte flags:
> + 0,1 bits - <offset size> - 1
> + 2-7 bits - 0
> + 2 bytes column counter
> + * Columns directory sorted by column number, each entry contains of:
> + 2 bytes column number
> + <offset size> bytes (1-4) combined offset from beginning of
> + the data segment + 3 bit type
> + * Data of above columns size of data and length depend on type
> +
> + Columns with names:
> + ===================
> + * Fixed header part
> + 1 byte flags:
> + 0,1 bits - <offset size> - 2
> + 2 bit - 1 (mens format with names)
s/mens/means/g
> + 3,4 bits - 01 (mens <names offset size> - 1, now 2 is only supported size)
eh. I would simply assume that if the bit 2 is 1, this also means
offset-2. Like, you have "old format" as above, and "new format" with
names, support for recursion (4 bits per type, offset-2), etc.
you only have 6 bits here, let's use them sparingly and keep things
simple.
> + 5-7 bits - 0
> + 2 bytes column counter
> + * Variable header part
> + <names offset size> (2) bytes size of stored names pool
> + * Column directory sorted by names, each consists of
> + <names offset size> (2) bytes offset of name
> + <offset size> bytes (1-4)bytes combined offset from beginning of
> + the data segment + 4 bit type
> + * Names stored one after another
> + * Data of above columns size of data and length depend on type
> +*/
> +
> #include "mysys_priv.h"
> #include <m_string.h>
> #include <ma_dyncol.h>
> === modified file 'storage/cassandra/ha_cassandra.cc'
> --- a/storage/cassandra/ha_cassandra.cc 2012-11-19 12:16:04 +0000
> +++ b/storage/cassandra/ha_cassandra.cc 2012-12-19 20:08:39 +0000
> @@ -893,9 +893,37 @@ public:
> /**
> Converting dynamic columns types to/from casandra types
> */
> +
> +
> +/**
> + Check and initialize (if it is needed) string MEM_ROOT
> +*/
> +static void alloc_strings_memroot(MEM_ROOT *mem_root)
> +{
> + if (mem_root->block_size == 0)
write
if (alloc_root_inited(mem_root))
> + {
> + /*
> + The mem_root used to allocate UUID (of length 36 + \0) so make
> + appropriate allocated size
> + */
> + init_alloc_root(mem_root,
> + (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
> + ALLOC_ROOT_MIN_BLOCK_SIZE,
> + (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
> + ALLOC_ROOT_MIN_BLOCK_SIZE);
> + }
> +}
> +
> +static void free_strings_memroot(MEM_ROOT *mem_root)
> +{
> + if (mem_root->block_size != 0)
and here
> + free_root(mem_root, MYF(0));
> +}
> +
> @@ -1730,6 +1759,7 @@ int ha_cassandra::read_cassandra_columns
> int res= 0;
> ulong total_name_len= 0;
>
> + strings_root.block_size= 0; // indicates uninitialized MEM_ROOT
Use
clear_alloc_root(&strings_root);
> /*
> cassandra_to_mariadb() calls will use field->store(...) methods, which
> require that the column is in the table->write_set
Regards,
Sergei
1
0

19 Dec '12
hi sergei! sorry about wrong place, i'm new in mariadb and i bit lost,
since i changed from mysql development and error correction to mysql user
and now mariadb user
i will add my email to list and start asking there
about s/key, shure! if you could send and maybe put in mariadb by default
could be very nice! i'm using some windows and some linux server, in this
case i can't use only pam.d, and some linux servers don't give me a unix
user to use it :/ that's why i'm trying to find a OTP solution to mariadb,
and if not try to develop one (when i have time)
well if you could send your solution we could develop some more ideas for
sure, i'will not use it to make money, just to secure my self
2012/12/19 <serg(a)montyprogram.com>
> ou ask a question - do it on the mailing list. Jira is for
> bug reports and feature requests.
>
--
Roberto Spadim
Spadim Technology / SPAEmpresarial
2
1

[Maria-developers] Patches in 10.0 missing in 10.0-base, causing buildbot failures
by Kristian Nielsen 19 Dec '12
by Kristian Nielsen 19 Dec '12
19 Dec '12
Hi,
I was looking at 10.0-base in Buildbot, in particular at the "update2"
failures in all the .deb builds.
I fixed one thing in Buildbot, but it still fails. I suspect it is because of
these two (at least) commits:
sergii(a)pisem.net-20121027121326-q87f5lt6ldxr1em6
sergii(a)pisem.net-20121020163522-uw2apo36c6nn6yi8
These were pushed to 10.0, while it seems they should instead have been pushed
to 10.0-base (where they are missing).
There are probably also other commits with same problem, eg. this one which is
a 5.5 merge according to commit message:
sergii(a)pisem.net-20121103112851-bvy1mlhirxbkedvf
Any suggestions what to do to fix this and get 10.0-base working in Buildbot?
- Kristian.
3
5

Re: [Maria-developers] [Commits] Rev 3449: MDEV-532: Fix some race conditions in test cases. in http://bazaar.launchpad.net/~maria-captains/maria/10.0
by Kristian Nielsen 19 Dec '12
by Kristian Nielsen 19 Dec '12
19 Dec '12
Hi Serg,
Just wanted to be sure you saw this. The async binlogging of the checkpoint
event after MDEV-532 means a few test cases need to wait for the event to be
logged, or they will get races causing failures with result difference in SHOW
BINLOG EVENTS or similar. Typically after running FLUSH LOGS.
I fixed the ones I found already. But it is possible that a few others will
pop up, and it is useful that someone else than me knows how to fix (so I'm
picking on you as the reviewer :-).
As seen in the patch, it's just a matter of including the file
include/wait_for_binlog_checkpoint.inc in the appropriate place.
- Kristian.
knielsen(a)knielsen-hq.org writes:
> At http://bazaar.launchpad.net/~maria-captains/maria/10.0
>
> ------------------------------------------------------------
> revno: 3449
> revision-id: knielsen(a)knielsen-hq.org-20121217114911-m3qv20xmk7htleez
> parent: igor(a)askmonty.org-20121217004919-xgon81zqncwimr3m
> committer: knielsen(a)knielsen-hq.org
> branch nick: mariadb-10.0-base
> timestamp: Mon 2012-12-17 12:49:11 +0100
> message:
> MDEV-532: Fix some race conditions in test cases.
>
> With MDEV-532, the binlog_checkpoint event is logged asynchronously
> from a binlog background thread. This causes some sporadic failures
> in some test cases whose output depends on order of events in
> binlog.
>
> Fix using an include file that waits until the binlog checkpoint
> event has been logged before proceeding with the test case.
> === modified file 'mysql-test/extra/rpl_tests/rpl_insert_delayed.test'
> --- a/mysql-test/extra/rpl_tests/rpl_insert_delayed.test 2012-09-22 14:11:40 +0000
> +++ b/mysql-test/extra/rpl_tests/rpl_insert_delayed.test 2012-12-17 11:49:11 +0000
> @@ -94,8 +94,10 @@ if (`SELECT @@global.binlog_format = 'S
> #flush the logs before the test
> connection slave;
> FLUSH LOGS;
> + source include/wait_for_binlog_checkpoint.inc;
> connection master;
> FLUSH LOGS;
> + source include/wait_for_binlog_checkpoint.inc;
> }
>
> CREATE TABLE t1(a int, UNIQUE(a));
>
> === modified file 'mysql-test/extra/rpl_tests/rpl_log.test'
> --- a/mysql-test/extra/rpl_tests/rpl_log.test 2011-10-19 19:45:18 +0000
> +++ b/mysql-test/extra/rpl_tests/rpl_log.test 2012-12-17 11:49:11 +0000
> @@ -43,6 +43,7 @@ let $binlog_limit= 1,4;
> source include/show_binlog_events.inc;
> let $binlog_limit=;
> flush logs;
> +--source include/wait_for_binlog_checkpoint.inc
>
> # We need an extra update before doing save_master_pos.
> # Otherwise, an unlikely scenario may occur:
>
> === modified file 'mysql-test/extra/rpl_tests/rpl_show_relaylog_events.inc'
> --- a/mysql-test/extra/rpl_tests/rpl_show_relaylog_events.inc 2011-01-13 14:31:37 +0000
> +++ b/mysql-test/extra/rpl_tests/rpl_show_relaylog_events.inc 2012-12-17 11:49:11 +0000
> @@ -41,8 +41,10 @@ INSERT INTO t1 VALUES (3);
> #
>
> FLUSH LOGS;
> +--source include/wait_for_binlog_checkpoint.inc
> -- connection master
> FLUSH LOGS;
> +--source include/wait_for_binlog_checkpoint.inc
> DROP TABLE t1;
>
> --let $is_relay_log= 0
>
> === added file 'mysql-test/include/wait_for_binlog_checkpoint.inc'
> --- a/mysql-test/include/wait_for_binlog_checkpoint.inc 1970-01-01 00:00:00 +0000
> +++ b/mysql-test/include/wait_for_binlog_checkpoint.inc 2012-12-17 11:49:11 +0000
> @@ -0,0 +1,53 @@
> +# include/wait_for_binlog_checkpoint.inc
> +#
> +# SUMMARY
> +#
> +# Wait until binlog checkpoint has been logged for current binlog file.
> +# This is useful to avoid races with output difference for binlog
> +# checkpoints, as these are logged asynchronously from the binlog
> +# background thread.
> +#
> +# USAGE:
> +#
> +# --source include/wait_for_binlog_checkpoint.inc
> +
> +let $_wait_count= 300;
> +
> +let $_found= 0;
> +
> +while ($_wait_count)
> +{
> + dec $_wait_count;
> + let $_cur_binlog= query_get_value(SHOW MASTER STATUS, File, 1);
> + let $_more= 1;
> + let $_row= 1;
> + while ($_more)
> + {
> + let $_event= query_get_value(SHOW BINLOG EVENTS IN "$_cur_binlog", Event_type, $_row);
> + if ($_event == "No such row")
> + {
> + let $_more= 0;
> + }
> + if ($_event == "Binlog_checkpoint")
> + {
> + let $_info= query_get_value(SHOW BINLOG EVENTS IN "$_cur_binlog", Info, $_row);
> + if (`SELECT INSTR("$_info", "$_cur_binlog") != 0`)
> + {
> + let $_more= 0;
> + let $_wait_count= 0;
> + let $_found= 1;
> + }
> + }
> + inc $_row;
> + }
> + if ($_wait_count)
> + {
> + real_sleep 0.1;
> + }
> +}
> +
> +if (!$_found)
> +{
> + eval SHOW BINLOG EVENTS IN "$_cur_binlog";
> + --die ERROR: failed while waiting for binlog checkpoint $_cur_binlog
> +}
>
> === modified file 'mysql-test/suite/innodb/t/binlog_consistent.test'
> --- a/mysql-test/suite/innodb/t/binlog_consistent.test 2012-02-07 15:22:36 +0000
> +++ b/mysql-test/suite/innodb/t/binlog_consistent.test 2012-12-17 11:49:11 +0000
> @@ -72,6 +72,7 @@ connection con3;
> --echo # Connection con3
> COMMIT;
> FLUSH LOGS;
> +--source include/wait_for_binlog_checkpoint.inc
>
> connection default;
> --echo # Connection default
>
> === modified file 'mysql-test/suite/multi_source/multisource.test'
> --- a/mysql-test/suite/multi_source/multisource.test 2012-10-04 20:36:17 +0000
> +++ b/mysql-test/suite/multi_source/multisource.test 2012-12-17 11:49:11 +0000
> @@ -169,6 +169,7 @@ select * from db2.t1;
>
> --connection master1
> flush logs;
> +--source include/wait_for_binlog_checkpoint.inc
> --save_master_pos
> --connection slave
> --sync_with_master 0, 'master1'
>
> _______________________________________________
> commits mailing list
> commits(a)mariadb.org
> https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
1
0
On 12/17/2012 06:43 AM, Rich Prohaska wrote:
> What do I need to change in my storage engine for extended keys?
> Where should I look?
> Thanks.
Hi,
see the code
/* Currently only InnoDB can use extended keys */
share->set_use_ext_keys_flag(legacy_db_type == DB_TYPE_INNODB);
in open_binary_frm(), table.cc (mariadb)
You have to change it.
There is a similar code in mysql, also in open_binary_frm(),table.cc
Regards,
Igor.
>
> On Sun, Dec 16, 2012 at 10:58 PM, Igor Babaev <igor(a)askmonty.org> wrote:
>> On 12/14/2012 11:33 AM, Rich Prohaska wrote:
>>> MySQL 5.6.9 changed how the number of key parts are measured. They
>>> replaced one simple counter (key_parts) with four counters. Glancing
>>> at the code, i assume that this was done to support extended keys. If
>>> so, the mariadb implementation of extended keys is a lot simpler. We
>>> have to maintain the tokudb storage engine on both mysql and mariadb.
>>> Is the mariadb design going to change to match the 5.6 implementation,
>>> or are the designs now separate?
>>> Thanks
>>> Rich Prohaska
>>
>> Hi,
>>
>> MariaDB supports extended keys starting with first version of 5.5.
>> See https://kb.askmonty.org/en/extended-keys/.
>>
>> See also blogs:
>>
>> http://igors-notes.blogspot.com/2011/12/3-way-join-that-touches-only-indexe…
>>
>> http://s.petrunia.net/blog/?p=74
>>
>> The interesting thing is that Oracle essentially pulled MariaDB
>> implementation for this feature into a mysql version that was in RC.
>> Yet one hardly can find anything about the feature in the changelog or
>> in the manual.
>>
>> Regards,
>> Igor.
>>
>>
>>
>>>
>>> _______________________________________________
>>> Mailing list: https://launchpad.net/~maria-developers
>>> Post to : maria-developers(a)lists.launchpad.net
>>> Unsubscribe : https://launchpad.net/~maria-developers
>>> More help : https://help.launchpad.net/ListHelp
>>
1
0

Re: [Maria-developers] [Commits] Rev 3592: Test commit for diff options in file:///H:/bzr/5.5/
by Sergei Golubchik 17 Dec '12
by Sergei Golubchik 17 Dec '12
17 Dec '12
Hi, Vladislav!
On Dec 17, Vladislav Vaintroub wrote:
> revno: 3592
> revision-id: wlad(a)montyprogram.com-20121217203725-f5e0mguvmjvb97su
> committer: Vladislav Vaintroub <wlad(a)montyprogram.com>
> branch nick: 5.5
> timestamp: Mon 2012-12-17 21:37:25 +0100
> message:
> Test commit for diff options
> === modified file 'sql/mysqld.cc'
> --- a/sql/mysqld.cc 2012-11-22 09:19:31 +0000
> +++ b/sql/mysqld.cc 2012-12-17 20:37:25 +0000
> @@ -2441,21 +2441,6 @@ void dec_connection_count(THD *thd)
The patch looks ok to me.
But I'd prefer a better changeset comment :)
Regards,
Sergei
1
0
MySQL 5.6.9 changed how the number of key parts are measured. They
replaced one simple counter (key_parts) with four counters. Glancing
at the code, i assume that this was done to support extended keys. If
so, the mariadb implementation of extended keys is a lot simpler. We
have to maintain the tokudb storage engine on both mysql and mariadb.
Is the mariadb design going to change to match the 5.6 implementation,
or are the designs now separate?
Thanks
Rich Prohaska
2
1

Re: [Maria-developers] [Commits] Rev 3435: MDEV-532: Async InnoDB commit checkpoint. in http://bazaar.launchpad.net/~maria-captains/maria/10.0
by Kristian Nielsen 14 Dec '12
by Kristian Nielsen 14 Dec '12
14 Dec '12
Hi Serg,
As we discussed under review of MDEV-232, here is a separate patch that makes
InnoDB/XtraDB commit checkpointing be more asynchroneous.
See MDEV-532 for further description of this task.
I hope you will review this, at your convenience.
- Kristian.
knielsen(a)knielsen-hq.org writes:
> At http://bazaar.launchpad.net/~maria-captains/maria/10.0
>
> ------------------------------------------------------------
> revno: 3435
> revision-id: knielsen(a)knielsen-hq.org-20120914124453-zsap6hjclq3vrb6n
> parent: knielsen(a)knielsen-hq.org-20120913123129-kaujy4cw0jc9o08k
> committer: knielsen(a)knielsen-hq.org
> branch nick: work-10.0-mdev225-181-232
> timestamp: Fri 2012-09-14 14:44:53 +0200
> message:
> MDEV-532: Async InnoDB commit checkpoint.
>
> Make the commit checkpoint inside InnoDB be asynchroneous.
> Implement a background thread in binlog to do the writing and flushing of
> binlog checkpoint events to disk.
> === modified file 'mysql-test/suite/binlog/r/binlog_checkpoint.result'
> --- a/mysql-test/suite/binlog/r/binlog_checkpoint.result 2012-09-13 12:31:29 +0000
> +++ b/mysql-test/suite/binlog/r/binlog_checkpoint.result 2012-09-14 12:44:53 +0000
> @@ -70,8 +70,14 @@ show binlog events in 'master-bin.000003
> Log_name Pos Event_type Server_id End_log_pos Info
> master-bin.000003 # Format_desc # # SERVER_VERSION, BINLOG_VERSION
> master-bin.000003 # Binlog_checkpoint # # master-bin.000001
> +SET DEBUG_SYNC= "RESET";
> +SET @old_dbug= @@global.DEBUG_DBUG;
> +SET GLOBAL debug_dbug="+d,binlog_background_checkpoint_processed";
> SET DEBUG_SYNC= "now SIGNAL con2_continue";
> con1 is still pending, no new binlog checkpoint should have been logged.
> +SET DEBUG_SYNC= "now WAIT_FOR binlog_background_checkpoint_processed";
> +SET GLOBAL debug_dbug= @old_dbug;
> +SET DEBUG_SYNC= "RESET";
> show binlog events in 'master-bin.000003' from <binlog_start>;
> Log_name Pos Event_type Server_id End_log_pos Info
> master-bin.000003 # Format_desc # # SERVER_VERSION, BINLOG_VERSION
>
> === modified file 'mysql-test/suite/binlog/r/binlog_xa_recover.result'
> --- a/mysql-test/suite/binlog/r/binlog_xa_recover.result 2012-09-13 12:31:29 +0000
> +++ b/mysql-test/suite/binlog/r/binlog_xa_recover.result 2012-09-14 12:44:53 +0000
> @@ -118,7 +118,11 @@ master-bin.00000<binlog_start> # Table_m
> master-bin.00000<binlog_start> # Write_rows # # table_id: # flags: STMT_END_F
> master-bin.00000<binlog_start> # Xid # # COMMIT /* XID */
> SET DEBUG_SYNC= "now SIGNAL con10_cont";
> +SET @old_dbug= @@global.DEBUG_DBUG;
> +SET GLOBAL debug_dbug="+d,binlog_background_checkpoint_processed";
> SET DEBUG_SYNC= "now SIGNAL con12_cont";
> +SET DEBUG_SYNC= "now WAIT_FOR binlog_background_checkpoint_processed";
> +SET GLOBAL debug_dbug= @old_dbug;
> SET DEBUG_SYNC= "now SIGNAL con11_cont";
> Checking that master-bin.000004 is the last binlog checkpoint
> show binlog events in 'master-bin.00000<binlog_start>' from <binlog_start>;
>
> === modified file 'mysql-test/suite/binlog/t/binlog_checkpoint.test'
> --- a/mysql-test/suite/binlog/t/binlog_checkpoint.test 2012-09-13 12:31:29 +0000
> +++ b/mysql-test/suite/binlog/t/binlog_checkpoint.test 2012-09-14 12:44:53 +0000
> @@ -71,6 +71,12 @@ SET DEBUG_SYNC= "now WAIT_FOR con2_ready
> --let $binlog_file= master-bin.000003
> --source include/show_binlog_events.inc
>
> +# We need to sync the test case with the background processing of the
> +# commit checkpoint, otherwise we get nondeterministic results.
> +SET DEBUG_SYNC= "RESET";
> +SET @old_dbug= @@global.DEBUG_DBUG;
> +SET GLOBAL debug_dbug="+d,binlog_background_checkpoint_processed";
> +
> SET DEBUG_SYNC= "now SIGNAL con2_continue";
>
> connection con2;
> @@ -78,6 +84,12 @@ reap;
>
> connection default;
> --echo con1 is still pending, no new binlog checkpoint should have been logged.
> +# Make sure commit checkpoint is processed before we check that no checkpoint
> +# event has been binlogged.
> +SET DEBUG_SYNC= "now WAIT_FOR binlog_background_checkpoint_processed";
> +SET GLOBAL debug_dbug= @old_dbug;
> +SET DEBUG_SYNC= "RESET";
> +
> --let $binlog_file= master-bin.000003
> --source include/show_binlog_events.inc
>
>
> === modified file 'mysql-test/suite/binlog/t/binlog_xa_recover.test'
> --- a/mysql-test/suite/binlog/t/binlog_xa_recover.test 2012-09-13 12:31:29 +0000
> +++ b/mysql-test/suite/binlog/t/binlog_xa_recover.test 2012-09-14 12:44:53 +0000
> @@ -14,8 +14,24 @@ CREATE TABLE t1 (a INT PRIMARY KEY, b ME
> # Insert some data to force a couple binlog rotations (3), so we get some
> # normal binlog checkpoints before starting the test.
> INSERT INTO t1 VALUES (100, REPEAT("x", 4100));
> +# Wait for the master-bin.000002 binlog checkpoint to appear.
> +--let $wait_for_all= 0
> +--let $show_statement= SHOW BINLOG EVENTS IN "master-bin.000002"
> +--let $field= Info
> +--let $condition= = "master-bin.000002"
> +--source include/wait_show_condition.inc
> INSERT INTO t1 VALUES (101, REPEAT("x", 4100));
> +--let $wait_for_all= 0
> +--let $show_statement= SHOW BINLOG EVENTS IN "master-bin.000003"
> +--let $field= Info
> +--let $condition= = "master-bin.000003"
> +--source include/wait_show_condition.inc
> INSERT INTO t1 VALUES (102, REPEAT("x", 4100));
> +--let $wait_for_all= 0
> +--let $show_statement= SHOW BINLOG EVENTS IN "master-bin.000004"
> +--let $field= Info
> +--let $condition= = "master-bin.000004"
> +--source include/wait_show_condition.inc
>
> # Now start a bunch of transactions that span multiple binlog
> # files. Leave then in the state prepared-but-not-committed in the engine
> @@ -153,10 +169,19 @@ SET DEBUG_SYNC= "now SIGNAL con10_cont";
> connection con10;
> reap;
> connection default;
> +
> +# We need to sync the test case with the background processing of the
> +# commit checkpoint, otherwise we get nondeterministic results.
> +SET @old_dbug= @@global.DEBUG_DBUG;
> +SET GLOBAL debug_dbug="+d,binlog_background_checkpoint_processed";
> +
> SET DEBUG_SYNC= "now SIGNAL con12_cont";
> connection con12;
> reap;
> connection default;
> +SET DEBUG_SYNC= "now WAIT_FOR binlog_background_checkpoint_processed";
> +SET GLOBAL debug_dbug= @old_dbug;
> +
> SET DEBUG_SYNC= "now SIGNAL con11_cont";
> connection con11;
> reap;
> @@ -210,7 +235,20 @@ RESET MASTER;
> # crash recovery fails due to the error insert used for previous test.
> INSERT INTO t1 VALUES (21, REPEAT("x", 4100));
> INSERT INTO t1 VALUES (22, REPEAT("x", 4100));
> +# Wait for the master-bin.000003 binlog checkpoint to appear.
> +--let $wait_for_all= 0
> +--let $show_statement= SHOW BINLOG EVENTS IN "master-bin.000003"
> +--let $field= Info
> +--let $condition= = "master-bin.000003"
> +--source include/wait_show_condition.inc
> INSERT INTO t1 VALUES (23, REPEAT("x", 4100));
> +# Wait for the last (master-bin.000004) binlog checkpoint to appear.
> +--let $wait_for_all= 0
> +--let $show_statement= SHOW BINLOG EVENTS IN "master-bin.000004"
> +--let $field= Info
> +--let $condition= = "master-bin.000004"
> +--source include/wait_show_condition.inc
> +
> --write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
> wait-binlog_xa_recover.test
> EOF
>
> === modified file 'mysql-test/suite/perfschema/r/all_instances.result'
> --- a/mysql-test/suite/perfschema/r/all_instances.result 2012-06-22 09:46:28 +0000
> +++ b/mysql-test/suite/perfschema/r/all_instances.result 2012-09-14 12:44:53 +0000
> @@ -76,6 +76,7 @@ wait/synch/mutex/sql/Master_info::run_lo
> wait/synch/mutex/sql/Master_info::sleep_lock
> wait/synch/mutex/sql/MDL_map::mutex
> wait/synch/mutex/sql/MDL_wait::LOCK_wait_status
> +wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_binlog_thread
> wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_index
> wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_xid_list
> wait/synch/mutex/sql/MYSQL_RELAY_LOG::LOCK_index
> @@ -129,6 +130,8 @@ wait/synch/cond/sql/Master_info::sleep_c
> wait/synch/cond/sql/Master_info::start_cond
> wait/synch/cond/sql/Master_info::stop_cond
> wait/synch/cond/sql/MDL_context::COND_wait_status
> +wait/synch/cond/sql/MYSQL_BIN_LOG::COND_binlog_thread
> +wait/synch/cond/sql/MYSQL_BIN_LOG::COND_binlog_thread_end
> wait/synch/cond/sql/MYSQL_BIN_LOG::COND_queue_busy
> wait/synch/cond/sql/MYSQL_BIN_LOG::COND_xid_list
> wait/synch/cond/sql/MYSQL_BIN_LOG::update_cond
>
> === modified file 'mysql-test/suite/perfschema/r/relaylog.result'
> --- a/mysql-test/suite/perfschema/r/relaylog.result 2012-06-22 09:46:28 +0000
> +++ b/mysql-test/suite/perfschema/r/relaylog.result 2012-09-14 12:44:53 +0000
> @@ -56,8 +56,11 @@ where event_name like "%MYSQL_BIN_LOG%"
> and event_name not like "%MYSQL_BIN_LOG::update_cond"
> order by event_name;
> EVENT_NAME COUNT_STAR
> +wait/synch/cond/sql/MYSQL_BIN_LOG::COND_binlog_thread NONE
> +wait/synch/cond/sql/MYSQL_BIN_LOG::COND_binlog_thread_end NONE
> wait/synch/cond/sql/MYSQL_BIN_LOG::COND_queue_busy NONE
> wait/synch/cond/sql/MYSQL_BIN_LOG::COND_xid_list NONE
> +wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_binlog_thread MANY
> wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_index MANY
> wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_xid_list MANY
> "Expect no slave relay log"
> @@ -131,8 +134,11 @@ where event_name like "%MYSQL_BIN_LOG%"
> and event_name not like "%MYSQL_BIN_LOG::update_cond"
> order by event_name;
> EVENT_NAME COUNT_STAR
> +wait/synch/cond/sql/MYSQL_BIN_LOG::COND_binlog_thread MANY
> +wait/synch/cond/sql/MYSQL_BIN_LOG::COND_binlog_thread_end NONE
> wait/synch/cond/sql/MYSQL_BIN_LOG::COND_queue_busy NONE
> -wait/synch/cond/sql/MYSQL_BIN_LOG::COND_xid_list NONE
> +wait/synch/cond/sql/MYSQL_BIN_LOG::COND_xid_list MANY
> +wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_binlog_thread MANY
> wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_index MANY
> wait/synch/mutex/sql/MYSQL_BIN_LOG::LOCK_xid_list MANY
> "Expect a slave relay log"
>
> === modified file 'sql/debug_sync.cc'
> --- a/sql/debug_sync.cc 2012-03-28 17:26:00 +0000
> +++ b/sql/debug_sync.cc 2012-09-14 12:44:53 +0000
> @@ -984,6 +984,7 @@ static bool debug_sync_eval_action(THD *
> DBUG_ENTER("debug_sync_eval_action");
> DBUG_ASSERT(thd);
> DBUG_ASSERT(action_str);
> + DBUG_PRINT("debug_sync", ("action_str='%s'", action_str));
>
> /*
> Get debug sync point name. Or a special command.
>
> === modified file 'sql/log.cc'
> --- a/sql/log.cc 2012-09-13 12:31:29 +0000
> +++ b/sql/log.cc 2012-09-14 12:44:53 +0000
> @@ -53,6 +53,7 @@
> #include "rpl_handler.h"
> #include "debug_sync.h"
> #include "sql_show.h"
> +#include "my_pthread.h"
>
> /* max size of the log message */
> #define MAX_LOG_BUFFER_SIZE 1024
> @@ -106,6 +107,14 @@ static SHOW_VAR binlog_status_vars_detai
> {NullS, NullS, SHOW_LONG}
> };
>
> +/* Variables for the binlog background thread. */
> +static bool binlog_thread_started= false;
> +static bool binlog_background_thread_stop= false;
> +static MYSQL_BIN_LOG::xid_count_per_binlog *
> + binlog_background_thread_queue= NULL;
> +
> +static bool start_binlog_background_thread();
> +
>
> /**
> purge logs, master and slave sides both, related error code
> @@ -2957,12 +2966,27 @@ void MYSQL_BIN_LOG::cleanup()
> my_free(b);
> }
>
> + /* Wait for the binlog thread to stop. */
> + if (!is_relay_log && binlog_thread_started)
> + {
> + mysql_mutex_lock(&LOCK_binlog_thread);
> + binlog_background_thread_stop= true;
> + mysql_cond_signal(&COND_binlog_thread);
> + while (binlog_background_thread_stop)
> + mysql_cond_wait(&COND_binlog_thread_end, &LOCK_binlog_thread);
> + mysql_mutex_unlock(&LOCK_binlog_thread);
> + binlog_thread_started= false;
> + }
> +
> mysql_mutex_destroy(&LOCK_log);
> mysql_mutex_destroy(&LOCK_index);
> mysql_mutex_destroy(&LOCK_xid_list);
> + mysql_mutex_destroy(&LOCK_binlog_thread);
> mysql_cond_destroy(&update_cond);
> mysql_cond_destroy(&COND_queue_busy);
> mysql_cond_destroy(&COND_xid_list);
> + mysql_cond_destroy(&COND_binlog_thread);
> + mysql_cond_destroy(&COND_binlog_thread_end);
> }
> DBUG_VOID_RETURN;
> }
> @@ -2988,6 +3012,11 @@ void MYSQL_BIN_LOG::init_pthread_objects
> mysql_cond_init(m_key_update_cond, &update_cond, 0);
> mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0);
> mysql_cond_init(key_BINLOG_COND_xid_list, &COND_xid_list, 0);
> +
> + mysql_mutex_init(key_BINLOG_LOCK_binlog_thread,
> + &LOCK_binlog_thread, MY_MUTEX_INIT_FAST);
> + mysql_cond_init(key_BINLOG_COND_binlog_thread, &COND_binlog_thread, 0);
> + mysql_cond_init(key_BINLOG_COND_binlog_thread_end, &COND_binlog_thread_end, 0);
> }
>
>
> @@ -3085,6 +3114,10 @@ bool MYSQL_BIN_LOG::open(const char *log
> DBUG_ENTER("MYSQL_BIN_LOG::open");
> DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg));
>
> + if (!is_relay_log && !binlog_thread_started &&
> + start_binlog_background_thread())
> + DBUG_RETURN(1);
> +
> if (init_and_set_log_file_name(log_name, new_name, log_type_arg,
> io_cache_type_arg))
> {
> @@ -5540,11 +5573,7 @@ bool general_log_write(THD *thd, enum en
> }
>
>
> -/*
> - I would like to make this function static, but this causes compiler warnings
> - when it is declared as friend function in log.h.
> -*/
> -void
> +static void
> binlog_checkpoint_callback(void *cookie)
> {
> MYSQL_BIN_LOG::xid_count_per_binlog *entry=
> @@ -8116,9 +8145,128 @@ int TC_LOG_BINLOG::unlog(ulong cookie, m
> void
> TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie)
> {
> - mark_xid_done(((xid_count_per_binlog *)cookie)->binlog_id, true);
> + xid_count_per_binlog *entry= static_cast<xid_count_per_binlog *>(cookie);
> + mysql_mutex_lock(&LOCK_binlog_thread);
> + entry->next_in_queue= binlog_background_thread_queue;
> + binlog_background_thread_queue= entry;
> + mysql_cond_signal(&COND_binlog_thread);
> + mysql_mutex_unlock(&LOCK_binlog_thread);
> }
>
> +/*
> + Binlog service thread.
> +
> + This thread is used to log binlog checkpoints in the background, rather than
> + in the context of random storage engine threads that happen to call
> + commit_checkpoint_notify_ha() and may not like the delays while syncing
> + binlog to disk or may not be setup with all my_thread_init() and other
> + necessary stuff.
> +
> + In the future, this thread could also be used to do log rotation in the
> + background, which could elimiate all stalls around binlog rotations.
> +*/
> +pthread_handler_t
> +binlog_background_thread(void *arg __attribute__((unused)))
> +{
> + bool stop;
> + MYSQL_BIN_LOG::xid_count_per_binlog *queue, *next;
> + THD *thd;
> +
> + my_thread_init();
> + thd= new THD;
> + thd->system_thread= SYSTEM_THREAD_BINLOG_BACKGROUND;
> + my_pthread_setspecific_ptr(THR_THD, thd);
> + mysql_mutex_lock(&LOCK_thread_count);
> + thd->thread_id= thread_id++;
> + mysql_mutex_unlock(&LOCK_thread_count);
> +
> + for (;;)
> + {
> + /*
> + Wait until there is something in the queue to process, or we are asked
> + to shut down.
> + */
> + thd_proc_info(thd, "Waiting for background binlog tasks");
> + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_thread);
> + for (;;)
> + {
> + stop= binlog_background_thread_stop;
> + queue= binlog_background_thread_queue;
> + if (stop || queue)
> + break;
> + mysql_cond_wait(&mysql_bin_log.COND_binlog_thread,
> + &mysql_bin_log.LOCK_binlog_thread);
> + }
> + /* Grab the queue, if any. */
> + binlog_background_thread_queue= NULL;
> + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_thread);
> +
> + /* Process any incoming commit_checkpoint_notify() calls. */
> + while (queue)
> + {
> + thd_proc_info(thd, "Processing binlog checkpoint notification");
> + /* Grab next pointer first, as mark_xid_done() may free the element. */
> + next= queue->next_in_queue;
> + mysql_bin_log.mark_xid_done(queue->binlog_id, true);
> + queue= next;
> +
> + DBUG_EXECUTE_IF("binlog_background_checkpoint_processed",
> + DBUG_ASSERT(!debug_sync_set_action(
> + thd,
> + STRING_WITH_LEN("now SIGNAL binlog_background_checkpoint_processed")));
> + );
> + }
> +
> + if (stop)
> + break;
> + }
> +
> + thd_proc_info(thd, "Stopping binlog background thread");
> +
> + mysql_mutex_lock(&LOCK_thread_count);
> + delete thd;
> + mysql_mutex_unlock(&LOCK_thread_count);
> +
> + my_thread_end();
> +
> + /* Signal that we are (almost) stopped. */
> + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_thread);
> + binlog_background_thread_stop= false;
> + mysql_cond_signal(&mysql_bin_log.COND_binlog_thread_end);
> + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_thread);
> +
> + return 0;
> +}
> +
> +#ifdef HAVE_PSI_INTERFACE
> +static PSI_thread_key key_thread_binlog;
> +
> +static PSI_thread_info all_binlog_threads[]=
> +{
> + { &key_thread_binlog, "binlog_background", PSI_FLAG_GLOBAL},
> +};
> +#endif /* HAVE_PSI_INTERFACE */
> +
> +static bool
> +start_binlog_background_thread()
> +{
> + pthread_t th;
> +
> +#ifdef HAVE_PSI_INTERFACE
> + if (PSI_server)
> + PSI_server->register_thread("sql", all_binlog_threads,
> + array_elements(all_binlog_threads));
> +#endif
> +
> + if (mysql_thread_create(key_thread_binlog, &th, NULL,
> + binlog_background_thread, NULL))
> + return 1;
> +
> + binlog_thread_started= true;
> + return 0;
> +}
> +
> +
> int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
> IO_CACHE *first_log,
> Format_description_log_event *fdle)
>
> === modified file 'sql/log.h'
> --- a/sql/log.h 2012-09-13 12:31:29 +0000
> +++ b/sql/log.h 2012-09-14 12:44:53 +0000
> @@ -395,8 +395,6 @@ class MYSQL_QUERY_LOG: public MYSQL_LOG
> #define BINLOG_COOKIE_IS_DUMMY(c) \
> ( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID )
>
> -void binlog_checkpoint_callback(void *cookie);
> -
> class binlog_cache_mngr;
> class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
> {
> @@ -451,27 +449,6 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
> };
>
> /*
> - A list of struct xid_count_per_binlog is used to keep track of how many
> - XIDs are in prepared, but not committed, state in each binlog. And how
> - many commit_checkpoint_request()'s are pending.
> -
> - When count drops to zero in a binlog after rotation, it means that there
> - are no more XIDs in prepared state, so that binlog is no longer needed
> - for XA crash recovery, and we can log a new binlog checkpoint event.
> -
> - The list is protected against simultaneous access from multiple
> - threads by LOCK_xid_list.
> - */
> - struct xid_count_per_binlog : public ilink {
> - char *binlog_name;
> - uint binlog_name_len;
> - ulong binlog_id;
> - /* Total prepared XIDs and pending checkpoint requests in this binlog. */
> - long xid_count;
> - xid_count_per_binlog(); /* Give link error if constructor used. */
> - };
> - I_List<xid_count_per_binlog> binlog_xid_count_list;
> - /*
> When this is set, a RESET MASTER is in progress.
>
> Then we should not write any binlog checkpoints into the binlog (that
> @@ -480,7 +457,6 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
> checkpoint arrives - when all have arrived, RESET MASTER will complete.
> */
> bool reset_master_pending;
> - friend void binlog_checkpoint_callback(void *cookie);
>
> /* LOCK_log and LOCK_index are inited by init_pthread_objects() */
> mysql_mutex_t LOCK_index;
> @@ -553,10 +529,35 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
> int write_transaction_or_stmt(group_commit_entry *entry);
> bool write_transaction_to_binlog_events(group_commit_entry *entry);
> void trx_group_commit_leader(group_commit_entry *leader);
> - void mark_xid_done(ulong cookie, bool write_checkpoint);
> - void mark_xids_active(ulong cookie, uint xid_count);
>
> public:
> + /*
> + A list of struct xid_count_per_binlog is used to keep track of how many
> + XIDs are in prepared, but not committed, state in each binlog. And how
> + many commit_checkpoint_request()'s are pending.
> +
> + When count drops to zero in a binlog after rotation, it means that there
> + are no more XIDs in prepared state, so that binlog is no longer needed
> + for XA crash recovery, and we can log a new binlog checkpoint event.
> +
> + The list is protected against simultaneous access from multiple
> + threads by LOCK_xid_list.
> + */
> + struct xid_count_per_binlog : public ilink {
> + char *binlog_name;
> + uint binlog_name_len;
> + ulong binlog_id;
> + /* Total prepared XIDs and pending checkpoint requests in this binlog. */
> + long xid_count;
> + /* For linking in requests to the binlog background thread. */
> + xid_count_per_binlog *next_in_queue;
> + xid_count_per_binlog(); /* Give link error if constructor used. */
> + };
> + I_List<xid_count_per_binlog> binlog_xid_count_list;
> + mysql_mutex_t LOCK_binlog_thread;
> + mysql_cond_t COND_binlog_thread;
> + mysql_cond_t COND_binlog_thread_end;
> +
> using MYSQL_LOG::generate_name;
> using MYSQL_LOG::is_open;
>
> @@ -712,6 +713,8 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
> bool appendv(const char* buf,uint len,...);
> bool append(Log_event* ev);
>
> + void mark_xids_active(ulong cookie, uint xid_count);
> + void mark_xid_done(ulong cookie, bool write_checkpoint);
> void make_log_name(char* buf, const char* log_ident);
> bool is_active(const char* log_file_name);
> bool can_purge_log(const char *log_file_name);
>
> === modified file 'sql/mysqld.cc'
> --- a/sql/mysqld.cc 2012-09-13 12:31:29 +0000
> +++ b/sql/mysqld.cc 2012-09-14 12:44:53 +0000
> @@ -724,6 +724,7 @@ PSI_mutex_key key_LOCK_des_key_file;
> #endif /* HAVE_OPENSSL */
>
> PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
> + key_BINLOG_LOCK_binlog_thread,
> key_delayed_insert_mutex, key_hash_filo_lock, key_LOCK_active_mi,
> key_LOCK_connection_count, key_LOCK_crypt, key_LOCK_delayed_create,
> key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log,
> @@ -766,6 +767,7 @@ static PSI_mutex_info all_server_mutexes
>
> { &key_BINLOG_LOCK_index, "MYSQL_BIN_LOG::LOCK_index", 0},
> { &key_BINLOG_LOCK_xid_list, "MYSQL_BIN_LOG::LOCK_xid_list", 0},
> + { &key_BINLOG_LOCK_binlog_thread, "MYSQL_BIN_LOG::LOCK_binlog_thread", 0},
> { &key_RELAYLOG_LOCK_index, "MYSQL_RELAY_LOG::LOCK_index", 0},
> { &key_delayed_insert_mutex, "Delayed_insert::mutex", 0},
> { &key_hash_filo_lock, "hash_filo::lock", 0},
> @@ -834,6 +836,7 @@ PSI_cond_key key_PAGE_cond, key_COND_act
> #endif /* HAVE_MMAP */
>
> PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
> + key_BINLOG_COND_binlog_thread, key_BINLOG_COND_binlog_thread_end,
> key_COND_cache_status_changed, key_COND_manager,
> key_COND_rpl_status, key_COND_server_started,
> key_delayed_insert_cond, key_delayed_insert_cond_client,
> @@ -863,6 +866,8 @@ static PSI_cond_info all_server_conds[]=
> #endif /* HAVE_MMAP */
> { &key_BINLOG_COND_xid_list, "MYSQL_BIN_LOG::COND_xid_list", 0},
> { &key_BINLOG_update_cond, "MYSQL_BIN_LOG::update_cond", 0},
> + { &key_BINLOG_COND_binlog_thread, "MYSQL_BIN_LOG::COND_binlog_thread", 0},
> + { &key_BINLOG_COND_binlog_thread_end, "MYSQL_BIN_LOG::COND_binlog_thread_end", 0},
> { &key_BINLOG_COND_queue_busy, "MYSQL_BIN_LOG::COND_queue_busy", 0},
> { &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
> { &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
>
> === modified file 'sql/mysqld.h'
> --- a/sql/mysqld.h 2012-09-13 12:31:29 +0000
> +++ b/sql/mysqld.h 2012-09-14 12:44:53 +0000
> @@ -226,6 +226,7 @@ extern PSI_mutex_key key_LOCK_des_key_fi
> #endif
>
> extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
> + key_BINLOG_LOCK_binlog_thread,
> key_delayed_insert_mutex, key_hash_filo_lock, key_LOCK_active_mi,
> key_LOCK_connection_count, key_LOCK_crypt, key_LOCK_delayed_create,
> key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log,
> @@ -257,6 +258,7 @@ extern PSI_cond_key key_PAGE_cond, key_C
> #endif /* HAVE_MMAP */
>
> extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
> + key_BINLOG_COND_binlog_thread, key_BINLOG_COND_binlog_thread_end,
> key_COND_cache_status_changed, key_COND_manager,
> key_COND_rpl_status, key_COND_server_started,
> key_delayed_insert_cond, key_delayed_insert_cond_client,
>
> === modified file 'sql/rpl_rli.cc'
> --- a/sql/rpl_rli.cc 2012-09-13 12:31:29 +0000
> +++ b/sql/rpl_rli.cc 2012-09-14 12:44:53 +0000
> @@ -58,6 +58,7 @@ Relay_log_info::Relay_log_info(bool is_s
> {
> DBUG_ENTER("Relay_log_info::Relay_log_info");
>
> + relay_log.is_relay_log= TRUE;
> #ifdef HAVE_PSI_INTERFACE
> relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
> key_RELAYLOG_update_cond,
> @@ -206,8 +207,6 @@ a file name for --relay-log-index option
> name_warning_sent= 1;
> }
>
> - rli->relay_log.is_relay_log= TRUE;
> -
> /*
> note, that if open() fails, we'll still have index file open
> but a destructor will take care of that
>
> === modified file 'sql/sql_class.h'
> --- a/sql/sql_class.h 2012-09-13 12:31:29 +0000
> +++ b/sql/sql_class.h 2012-09-14 12:44:53 +0000
> @@ -1244,7 +1244,8 @@ enum enum_thread_type
> SYSTEM_THREAD_SLAVE_SQL= 4,
> SYSTEM_THREAD_NDBCLUSTER_BINLOG= 8,
> SYSTEM_THREAD_EVENT_SCHEDULER= 16,
> - SYSTEM_THREAD_EVENT_WORKER= 32
> + SYSTEM_THREAD_EVENT_WORKER= 32,
> + SYSTEM_THREAD_BINLOG_BACKGROUND= 64
> };
>
> inline char const *
>
> === modified file 'storage/innobase/handler/ha_innodb.cc'
> --- a/storage/innobase/handler/ha_innodb.cc 2012-09-13 12:31:29 +0000
> +++ b/storage/innobase/handler/ha_innodb.cc 2012-09-14 12:44:53 +0000
> @@ -106,6 +106,7 @@ static ulong commit_threads = 0;
> static mysql_mutex_t commit_threads_m;
> static mysql_cond_t commit_cond;
> static mysql_mutex_t commit_cond_m;
> +static mysql_mutex_t pending_checkpoint_mutex;
> static bool innodb_inited = 0;
>
> #define INSIDE_HA_INNOBASE_CC
> @@ -222,11 +223,13 @@ static mysql_pfs_key_t innobase_share_mu
> static mysql_pfs_key_t commit_threads_m_key;
> static mysql_pfs_key_t commit_cond_mutex_key;
> static mysql_pfs_key_t commit_cond_key;
> +static mysql_pfs_key_t pending_checkpoint_mutex_key;
>
> static PSI_mutex_info all_pthread_mutexes[] = {
> {&commit_threads_m_key, "commit_threads_m", 0},
> {&commit_cond_mutex_key, "commit_cond_mutex", 0},
> - {&innobase_share_mutex_key, "innobase_share_mutex", 0}
> + {&innobase_share_mutex_key, "innobase_share_mutex", 0},
> + {&pending_checkpoint_mutex_key, "pending_checkpoint_mutex", 0}
> };
>
> static PSI_cond_info all_innodb_conds[] = {
> @@ -2592,6 +2595,9 @@ innobase_init(
> mysql_mutex_init(commit_cond_mutex_key,
> &commit_cond_m, MY_MUTEX_INIT_FAST);
> mysql_cond_init(commit_cond_key, &commit_cond, NULL);
> + mysql_mutex_init(pending_checkpoint_mutex_key,
> + &pending_checkpoint_mutex,
> + MY_MUTEX_INIT_FAST);
> innodb_inited= 1;
> #ifdef MYSQL_DYNAMIC_PLUGIN
> if (innobase_hton != p) {
> @@ -2639,6 +2645,7 @@ innobase_end(
> mysql_mutex_destroy(&commit_threads_m);
> mysql_mutex_destroy(&commit_cond_m);
> mysql_cond_destroy(&commit_cond);
> + mysql_mutex_destroy(&pending_checkpoint_mutex);
> }
>
> DBUG_RETURN(err);
> @@ -3008,6 +3015,16 @@ innobase_rollback_trx(
> DBUG_RETURN(convert_error_code_to_mysql(error, 0, NULL));
> }
>
> +
> +struct pending_checkpoint {
> + struct pending_checkpoint *next;
> + handlerton *hton;
> + void *cookie;
> + ib_uint64_t lsn;
> +};
> +static struct pending_checkpoint *pending_checkpoint_list;
> +static struct pending_checkpoint *pending_checkpoint_list_end;
> +
> /*****************************************************************//**
> Handle a commit checkpoint request from server layer.
> We simply flush the redo log immediately and do the notify call.*/
> @@ -3017,8 +3034,113 @@ innobase_checkpoint_request(
> handlerton *hton,
> void *cookie)
> {
> - log_buffer_flush_to_disk();
> - commit_checkpoint_notify_ha(hton, cookie);
> + ib_uint64_t lsn;
> + ib_uint64_t flush_lsn;
> + struct pending_checkpoint * entry;
> +
> + /* Do the allocation outside of lock to reduce contention. The normal
> + case is that not everything is flushed, so we will need to enqueue. */
> + entry = static_cast<struct pending_checkpoint *>
> + (my_malloc(sizeof(*entry), MYF(MY_WME)));
> + if (!entry) {
> + sql_print_error("Failed to allocate %u bytes."
> + " Commit checkpoint will be skipped.",
> + static_cast<unsigned>(sizeof(*entry)));
> + return;
> + }
> +
> + entry->next = NULL;
> + entry->hton = hton;
> + entry->cookie = cookie;
> +
> + mysql_mutex_lock(&pending_checkpoint_mutex);
> + lsn = log_get_lsn();
> + flush_lsn = log_get_flush_lsn();
> + if (lsn > flush_lsn) {
> + /* Put the request in queue.
> + When the log gets flushed past the lsn, we will remove the
> + entry from the queue and notify the upper layer. */
> + entry->lsn = lsn;
> + if (pending_checkpoint_list_end) {
> + pending_checkpoint_list_end->next = entry;
> + } else {
> + pending_checkpoint_list = entry;
> + }
> + pending_checkpoint_list_end = entry;
> + entry = NULL;
> + }
> + mysql_mutex_unlock(&pending_checkpoint_mutex);
> +
> + if (entry) {
> + /* We are already flushed. Notify the checkpoint immediately. */
> + commit_checkpoint_notify_ha(entry->hton, entry->cookie);
> + my_free(entry);
> + }
> +}
> +
> +/*****************************************************************//**
> +Log code calls this whenever log has been written and/or flushed up
> +to a new position. We use this to notify upper layer of a new commit
> +checkpoint when necessary.*/
> +extern "C" UNIV_INTERN
> +void
> +innobase_mysql_log_notify(
> +/*===============*/
> + ib_uint64_t write_lsn, /*!< in: LSN written to log file */
> + ib_uint64_t flush_lsn) /*!< in: LSN flushed to disk */
> +{
> + struct pending_checkpoint * pending;
> + struct pending_checkpoint * entry;
> + struct pending_checkpoint * last_ready;
> +
> + /* It is safe to do a quick check for NULL first without lock.
> + Even if we should race, we will at most skip one checkpoint and
> + take the next one, which is harmless. */
> + if (!pending_checkpoint_list)
> + return;
> +
> + mysql_mutex_lock(&pending_checkpoint_mutex);
> + pending = pending_checkpoint_list;
> + if (!pending)
> + {
> + mysql_mutex_unlock(&pending_checkpoint_mutex);
> + return;
> + }
> +
> + last_ready = NULL;
> + for (entry = pending; entry != NULL; entry = entry -> next)
> + {
> + if (entry->lsn > flush_lsn)
> + break;
> + last_ready = entry;
> + }
> +
> + if (last_ready)
> + {
> + /* We found some pending checkpoints that are now flushed to
> + disk. So remove them from the list. */
> + pending_checkpoint_list = entry;
> + if (!entry)
> + pending_checkpoint_list_end = NULL;
> + }
> +
> + mysql_mutex_unlock(&pending_checkpoint_mutex);
> +
> + if (!last_ready)
> + return;
> +
> + /* Now that we have released the lock, notify upper layer about all
> + commit checkpoints that have now completed. */
> + for (;;) {
> + entry = pending;
> + pending = pending->next;
> +
> + commit_checkpoint_notify_ha(entry->hton, entry->cookie);
> +
> + my_free(entry);
> + if (entry == last_ready)
> + break;
> + }
> }
>
> /*****************************************************************//**
>
> === modified file 'storage/innobase/include/ha_prototypes.h'
> --- a/storage/innobase/include/ha_prototypes.h 2011-04-26 17:55:52 +0000
> +++ b/storage/innobase/include/ha_prototypes.h 2012-09-14 12:44:53 +0000
> @@ -136,6 +136,17 @@ innobase_mysql_print_thd(
> uint max_query_len); /*!< in: max query length to print, or 0 to
> use the default max length */
>
> +/*****************************************************************//**
> +Log code calls this whenever log has been written and/or flushed up
> +to a new position. We use this to notify upper layer of a new commit
> +checkpoint when necessary.*/
> +UNIV_INTERN
> +void
> +innobase_mysql_log_notify(
> +/*===============*/
> + ib_uint64_t write_lsn, /*!< in: LSN written to log file */
> + ib_uint64_t flush_lsn); /*!< in: LSN flushed to disk */
> +
> /**************************************************************//**
> Converts a MySQL type to an InnoDB type. Note that this function returns
> the 'mtype' of InnoDB. InnoDB differentiates between MySQL's old <= 4.1
>
> === modified file 'storage/innobase/include/log0log.h'
> --- a/storage/innobase/include/log0log.h 2012-06-07 13:44:26 +0000
> +++ b/storage/innobase/include/log0log.h 2012-09-14 12:44:53 +0000
> @@ -151,6 +151,13 @@ UNIV_INLINE
> ib_uint64_t
> log_get_lsn(void);
> /*=============*/
> +/************************************************************//**
> +Gets the last lsn that is fully flushed to disk.
> +@return last flushed lsn */
> +UNIV_INLINE
> +ib_uint64_t
> +log_get_flush_lsn(void);
> +/*=============*/
> /****************************************************************
> Gets the log group capacity. It is OK to read the value without
> holding log_sys->mutex because it is constant.
>
> === modified file 'storage/innobase/include/log0log.ic'
> --- a/storage/innobase/include/log0log.ic 2011-04-05 07:18:43 +0000
> +++ b/storage/innobase/include/log0log.ic 2012-09-14 12:44:53 +0000
> @@ -411,6 +411,25 @@ log_get_lsn(void)
> return(lsn);
> }
>
> +/************************************************************//**
> +Gets the last lsn that is fully flushed to disk.
> +@return last flushed lsn */
> +UNIV_INLINE
> +ib_uint64_t
> +log_get_flush_lsn(void)
> +/*=============*/
> +{
> + ib_uint64_t lsn;
> +
> + mutex_enter(&(log_sys->mutex));
> +
> + lsn = log_sys->flushed_to_disk_lsn;
> +
> + mutex_exit(&(log_sys->mutex));
> +
> + return(lsn);
> +}
> +
> /****************************************************************
> Gets the log group capacity. It is OK to read the value without
> holding log_sys->mutex because it is constant.
>
> === modified file 'storage/innobase/log/log0log.c'
> --- a/storage/innobase/log/log0log.c 2012-03-21 03:48:12 +0000
> +++ b/storage/innobase/log/log0log.c 2012-09-14 12:44:53 +0000
> @@ -1353,6 +1353,8 @@ log_write_up_to(
> ulint loop_count = 0;
> #endif /* UNIV_DEBUG */
> ulint unlock;
> + ib_uint64_t write_lsn;
> + ib_uint64_t flush_lsn;
>
> if (recv_no_ibuf_operations) {
> /* Recovery is running and no operations on the log files are
> @@ -1530,8 +1532,13 @@ log_write_up_to(
>
> log_flush_do_unlocks(unlock);
>
> + write_lsn = log_sys->write_lsn;
> + flush_lsn = log_sys->flushed_to_disk_lsn;
> +
> mutex_exit(&(log_sys->mutex));
>
> + innobase_mysql_log_notify(write_lsn, flush_lsn);
> +
> return;
>
> do_waits:
>
> === modified file 'storage/xtradb/handler/ha_innodb.cc'
> --- a/storage/xtradb/handler/ha_innodb.cc 2012-09-13 12:31:29 +0000
> +++ b/storage/xtradb/handler/ha_innodb.cc 2012-09-14 12:44:53 +0000
> @@ -120,6 +120,7 @@ static ulong commit_threads = 0;
> static mysql_mutex_t commit_threads_m;
> static mysql_cond_t commit_cond;
> static mysql_mutex_t commit_cond_m;
> +static mysql_mutex_t pending_checkpoint_mutex;
> static bool innodb_inited = 0;
>
>
> @@ -253,11 +254,13 @@ static mysql_pfs_key_t innobase_share_mu
> static mysql_pfs_key_t commit_threads_m_key;
> static mysql_pfs_key_t commit_cond_mutex_key;
> static mysql_pfs_key_t commit_cond_key;
> +static mysql_pfs_key_t pending_checkpoint_mutex_key;
>
> static PSI_mutex_info all_pthread_mutexes[] = {
> {&commit_threads_m_key, "commit_threads_m", 0},
> {&commit_cond_mutex_key, "commit_cond_mutex", 0},
> - {&innobase_share_mutex_key, "innobase_share_mutex", 0}
> + {&innobase_share_mutex_key, "innobase_share_mutex", 0},
> + {&pending_checkpoint_mutex_key, "pending_checkpoint_mutex", 0}
> };
>
> static PSI_cond_info all_innodb_conds[] = {
> @@ -3060,6 +3063,9 @@ innobase_init(
> mysql_mutex_init(commit_cond_mutex_key,
> &commit_cond_m, MY_MUTEX_INIT_FAST);
> mysql_cond_init(commit_cond_key, &commit_cond, NULL);
> + mysql_mutex_init(pending_checkpoint_mutex_key,
> + &pending_checkpoint_mutex,
> + MY_MUTEX_INIT_FAST);
> innodb_inited= 1;
> #ifdef MYSQL_DYNAMIC_PLUGIN
> if (innobase_hton != p) {
> @@ -3107,6 +3113,7 @@ innobase_end(
> mysql_mutex_destroy(&commit_threads_m);
> mysql_mutex_destroy(&commit_cond_m);
> mysql_cond_destroy(&commit_cond);
> + mysql_mutex_destroy(&pending_checkpoint_mutex);
> }
>
> DBUG_RETURN(err);
> @@ -3500,6 +3507,16 @@ innobase_rollback_trx(
> DBUG_RETURN(convert_error_code_to_mysql(error, 0, NULL));
> }
>
> +
> +struct pending_checkpoint {
> + struct pending_checkpoint *next;
> + handlerton *hton;
> + void *cookie;
> + ib_uint64_t lsn;
> +};
> +static struct pending_checkpoint *pending_checkpoint_list;
> +static struct pending_checkpoint *pending_checkpoint_list_end;
> +
> /*****************************************************************//**
> Handle a commit checkpoint request from server layer.
> We simply flush the redo log immediately and do the notify call.*/
> @@ -3509,8 +3526,113 @@ innobase_checkpoint_request(
> handlerton *hton,
> void *cookie)
> {
> - log_buffer_flush_to_disk();
> - commit_checkpoint_notify_ha(hton, cookie);
> + ib_uint64_t lsn;
> + ib_uint64_t flush_lsn;
> + struct pending_checkpoint * entry;
> +
> + /* Do the allocation outside of lock to reduce contention. The normal
> + case is that not everything is flushed, so we will need to enqueue. */
> + entry = static_cast<struct pending_checkpoint *>
> + (my_malloc(sizeof(*entry), MYF(MY_WME)));
> + if (!entry) {
> + sql_print_error("Failed to allocate %u bytes."
> + " Commit checkpoint will be skipped.",
> + static_cast<unsigned>(sizeof(*entry)));
> + return;
> + }
> +
> + entry->next = NULL;
> + entry->hton = hton;
> + entry->cookie = cookie;
> +
> + mysql_mutex_lock(&pending_checkpoint_mutex);
> + lsn = log_get_lsn();
> + flush_lsn = log_get_flush_lsn();
> + if (lsn > flush_lsn) {
> + /* Put the request in queue.
> + When the log gets flushed past the lsn, we will remove the
> + entry from the queue and notify the upper layer. */
> + entry->lsn = lsn;
> + if (pending_checkpoint_list_end) {
> + pending_checkpoint_list_end->next = entry;
> + } else {
> + pending_checkpoint_list = entry;
> + }
> + pending_checkpoint_list_end = entry;
> + entry = NULL;
> + }
> + mysql_mutex_unlock(&pending_checkpoint_mutex);
> +
> + if (entry) {
> + /* We are already flushed. Notify the checkpoint immediately. */
> + commit_checkpoint_notify_ha(entry->hton, entry->cookie);
> + my_free(entry);
> + }
> +}
> +
> +/*****************************************************************//**
> +Log code calls this whenever log has been written and/or flushed up
> +to a new position. We use this to notify upper layer of a new commit
> +checkpoint when necessary.*/
> +extern "C" UNIV_INTERN
> +void
> +innobase_mysql_log_notify(
> +/*===============*/
> + ib_uint64_t write_lsn, /*!< in: LSN written to log file */
> + ib_uint64_t flush_lsn) /*!< in: LSN flushed to disk */
> +{
> + struct pending_checkpoint * pending;
> + struct pending_checkpoint * entry;
> + struct pending_checkpoint * last_ready;
> +
> + /* It is safe to do a quick check for NULL first without lock.
> + Even if we should race, we will at most skip one checkpoint and
> + take the next one, which is harmless. */
> + if (!pending_checkpoint_list)
> + return;
> +
> + mysql_mutex_lock(&pending_checkpoint_mutex);
> + pending = pending_checkpoint_list;
> + if (!pending)
> + {
> + mysql_mutex_unlock(&pending_checkpoint_mutex);
> + return;
> + }
> +
> + last_ready = NULL;
> + for (entry = pending; entry != NULL; entry = entry -> next)
> + {
> + if (entry->lsn > flush_lsn)
> + break;
> + last_ready = entry;
> + }
> +
> + if (last_ready)
> + {
> + /* We found some pending checkpoints that are now flushed to
> + disk. So remove them from the list. */
> + pending_checkpoint_list = entry;
> + if (!entry)
> + pending_checkpoint_list_end = NULL;
> + }
> +
> + mysql_mutex_unlock(&pending_checkpoint_mutex);
> +
> + if (!last_ready)
> + return;
> +
> + /* Now that we have released the lock, notify upper layer about all
> + commit checkpoints that have now completed. */
> + for (;;) {
> + entry = pending;
> + pending = pending->next;
> +
> + commit_checkpoint_notify_ha(entry->hton, entry->cookie);
> +
> + my_free(entry);
> + if (entry == last_ready)
> + break;
> + }
> }
>
> /*****************************************************************//**
>
> === modified file 'storage/xtradb/include/ha_prototypes.h'
> --- a/storage/xtradb/include/ha_prototypes.h 2012-02-21 19:51:56 +0000
> +++ b/storage/xtradb/include/ha_prototypes.h 2012-09-14 12:44:53 +0000
> @@ -136,6 +136,17 @@ innobase_mysql_print_thd(
> uint max_query_len); /*!< in: max query length to print, or 0 to
> use the default max length */
>
> +/*****************************************************************//**
> +Log code calls this whenever log has been written and/or flushed up
> +to a new position. We use this to notify upper layer of a new commit
> +checkpoint when necessary.*/
> +UNIV_INTERN
> +void
> +innobase_mysql_log_notify(
> +/*===============*/
> + ib_uint64_t write_lsn, /*!< in: LSN written to log file */
> + ib_uint64_t flush_lsn); /*!< in: LSN flushed to disk */
> +
> /**************************************************************//**
> Converts a MySQL type to an InnoDB type. Note that this function returns
> the 'mtype' of InnoDB. InnoDB differentiates between MySQL's old <= 4.1
>
> === modified file 'storage/xtradb/include/log0log.h'
> --- a/storage/xtradb/include/log0log.h 2012-08-27 16:13:17 +0000
> +++ b/storage/xtradb/include/log0log.h 2012-09-14 12:44:53 +0000
> @@ -151,6 +151,13 @@ UNIV_INLINE
> ib_uint64_t
> log_get_lsn(void);
> /*=============*/
> +/************************************************************//**
> +Gets the last lsn that is fully flushed to disk.
> +@return last flushed lsn */
> +UNIV_INLINE
> +ib_uint64_t
> +log_get_flush_lsn(void);
> +/*=============*/
> /****************************************************************
> Gets the log group capacity. It is OK to read the value without
> holding log_sys->mutex because it is constant.
>
> === modified file 'storage/xtradb/include/log0log.ic'
> --- a/storage/xtradb/include/log0log.ic 2011-07-14 19:22:41 +0000
> +++ b/storage/xtradb/include/log0log.ic 2012-09-14 12:44:53 +0000
> @@ -411,6 +411,25 @@ log_get_lsn(void)
> return(lsn);
> }
>
> +/************************************************************//**
> +Gets the last lsn that is fully flushed to disk.
> +@return last flushed lsn */
> +UNIV_INLINE
> +ib_uint64_t
> +log_get_flush_lsn(void)
> +/*=============*/
> +{
> + ib_uint64_t lsn;
> +
> + mutex_enter(&(log_sys->mutex));
> +
> + lsn = log_sys->flushed_to_disk_lsn;
> +
> + mutex_exit(&(log_sys->mutex));
> +
> + return(lsn);
> +}
> +
> /****************************************************************
> Gets the log group capacity. It is OK to read the value without
> holding log_sys->mutex because it is constant.
>
> === modified file 'storage/xtradb/log/log0log.c'
> --- a/storage/xtradb/log/log0log.c 2012-08-27 16:13:17 +0000
> +++ b/storage/xtradb/log/log0log.c 2012-09-14 12:44:53 +0000
> @@ -1390,6 +1390,8 @@ log_write_up_to(
> ulint loop_count = 0;
> #endif /* UNIV_DEBUG */
> ulint unlock;
> + ib_uint64_t write_lsn;
> + ib_uint64_t flush_lsn;
>
> if (recv_no_ibuf_operations) {
> /* Recovery is running and no operations on the log files are
> @@ -1568,8 +1570,13 @@ log_write_up_to(
>
> log_flush_do_unlocks(unlock);
>
> + write_lsn = log_sys->write_lsn;
> + flush_lsn = log_sys->flushed_to_disk_lsn;
> +
> mutex_exit(&(log_sys->mutex));
>
> + innobase_mysql_log_notify(write_lsn, flush_lsn);
> +
> return;
>
> do_waits:
>
> _______________________________________________
> commits mailing list
> commits(a)mariadb.org
> https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits
2
2

[Maria-developers] review of the MDEV-377 Name support for dynamic columns
by Sergei Golubchik 14 Dec '12
by Sergei Golubchik 14 Dec '12
14 Dec '12
Hi, Sanja,
See my review comments below:
> === modified file 'include/ma_dyncol.h'
> --- include/ma_dyncol.h 2011-09-22 09:04:00 +0000
> +++ include/ma_dyncol.h 2012-11-12 15:11:23 +0000
> @@ -39,6 +39,12 @@
> */
> #define MAX_DYNAMIC_COLUMN_LENGTH 0X1FFFFFFFL
>
> +/*
> + Limits of implementation
> +*/
> +#define MAX_NAME_LENGTH 255
If we store only offsets, this limit can go away
> +#define MAX_TOTAL_NAME_LENGTH 65535
I'd suggest to store the offset length in the header, just as
you do for data offsets. You might hard-code the fixed offset of 2,
but if it's in the header, you can later change it, if needed.
> +
> /* NO and OK is the same used just to show semantics */
> #define ER_DYNCOL_NO ER_DYNCOL_OK
>
> @@ -81,6 +88,7 @@ struct st_dynamic_column_value
> struct {
> LEX_STRING value;
> CHARSET_INFO *charset;
> + my_bool nonfreeable;
Yuck. That doesn't look correct. Dynamic column code does not allocate
or frees values. The concept of "nonfreeable" values simply has
no place in the dynamic column universe.
I realize that ha_cassandra needs to know whether to free certain strings
or not. But it's the internal problem of ha_cassandra, it has nothing
to do with dynaic columns, besides the fact that these certain strings
are sometines stored in the struct st_dynamic_column_value.
> } string;
> struct {
> decimal_digit_t buffer[DECIMAL_BUFF_LENGTH];
> @@ -103,6 +111,20 @@ dynamic_column_create_many(DYNAMIC_COLUM
> DYNAMIC_COLUMN_VALUE *values);
>
> enum enum_dyncol_func_result
> +dynamic_column_create_many_fmt(DYNAMIC_COLUMN *str,
> + uint column_count,
> + uchar *column_keys,
> + DYNAMIC_COLUMN_VALUE *values,
> + my_bool names);
I think this should rather be
dynamic_column_create_many_named(DYNAMIC_COLUMN *str,
uint column_count,
LEX_STRING *column_keys,
DYNAMIC_COLUMN_VALUE *values);
> +enum enum_dyncol_func_result
> +dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
> + uint column_count,
> + void *column_keys,
> + DYNAMIC_COLUMN_VALUE *values,
> + my_bool new_str,
> + my_bool string_keys);
*internal* should not be in include/, it's not part of the API
> +
> +enum enum_dyncol_func_result
> dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr,
> DYNAMIC_COLUMN_VALUE *value);
> enum enum_dyncol_func_result
> @@ -110,16 +132,30 @@ dynamic_column_update_many(DYNAMIC_COLUM
> uint add_column_count,
> uint *column_numbers,
> DYNAMIC_COLUMN_VALUE *values);
> +enum enum_dyncol_func_result
> +dynamic_column_update_many_fmt(DYNAMIC_COLUMN *str,
> + uint add_column_count,
> + void *column_keys,
> + DYNAMIC_COLUMN_VALUE *values,
> + my_bool string_keys);
same as above, make it dynamic_column_update_many_named
>
> enum enum_dyncol_func_result
> dynamic_column_delete(DYNAMIC_COLUMN *org, uint column_nr);
>
> enum enum_dyncol_func_result
> dynamic_column_exists(DYNAMIC_COLUMN *org, uint column_nr);
> +enum enum_dyncol_func_result
> +dynamic_column_exists_str(DYNAMIC_COLUMN *str, LEX_STRING *name);
> +enum enum_dyncol_func_result
> +dynamic_column_exists_fmt(DYNAMIC_COLUMN *str, void *key, my_bool string_keys);
remove the second one, and rename the first s/_str/_named/
>
> /* List of not NULL columns */
> enum enum_dyncol_func_result
> dynamic_column_list(DYNAMIC_COLUMN *org, DYNAMIC_ARRAY *array_of_uint);
> +enum enum_dyncol_func_result
> +dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr);
> +enum enum_dyncol_func_result
> +dynamic_column_list_fmt(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array, my_bool string_keys);
Same.
>
> /*
> if the column do not exists it is NULL
> @@ -127,10 +163,36 @@ dynamic_column_list(DYNAMIC_COLUMN *org,
> enum enum_dyncol_func_result
> dynamic_column_get(DYNAMIC_COLUMN *org, uint column_nr,
> DYNAMIC_COLUMN_VALUE *store_it_here);
> +enum enum_dyncol_func_result
> +dynamic_column_get_str(DYNAMIC_COLUMN *str, LEX_STRING *name,
> + DYNAMIC_COLUMN_VALUE *store_it_here);
s/_str/_named
> +
> +my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str);
> +
> +enum enum_dyncol_func_result
> +dynamic_column_check(DYNAMIC_COLUMN *str);
> +
> +enum enum_dyncol_func_result
> +dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json);
>
> #define dynamic_column_initialize(A) memset((A), 0, sizeof(*(A)))
> #define dynamic_column_column_free(V) dynstr_free(V)
>
> +/* conversion of values to 3 base types */
> +enum enum_dyncol_func_result
> +dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
> + CHARSET_INFO *cs, my_bool quote);
instead of my_bool quote, better pass a char quote_char.
when it's 0 - no quoting. Otherwise it can be " or `
To make it less error prone, you can create an enum
NOT_QUOTED=0, QUOTED='`', ANSI_QUOTED='"'
> +enum enum_dyncol_func_result
> +dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val);
> +enum enum_dyncol_func_result
> +dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val);
> +
> +
> +enum enum_dyncol_func_result
> +dynamic_column_vals(DYNAMIC_COLUMN *str,
> + DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
> + char **free_names);
Eh. may be dynamic_column_unpack() ?
> +
> /***************************************************************************
> Internal functions, don't use if you don't know what you are doing...
> ***************************************************************************/
>
> === modified file 'mysql-test/t/dyncol.test'
> --- mysql-test/t/dyncol.test 2012-02-29 20:55:04 +0000
> +++ mysql-test/t/dyncol.test 2012-11-12 15:11:23 +0000
> @@ -550,3 +550,125 @@ select hex(COLUMN_CREATE(0, COLUMN_GET(@
>
> select hex(COLUMN_CREATE(0, COLUMN_GET(COLUMN_CREATE(0, 0.0 as decimal), 0 as decimal)));
> select hex(COLUMN_CREATE(0, 0.0 as decimal));
> +
> +--echo #
> +--echo # test of symbolic names
> +--echo #
> +--echo # creation test (names)
> +set names utf8;
> +select hex(column_create("адын", 1212));
> +select hex(column_create("1212", 1212));
> +select hex(column_create(1212, 2, "www", 3));
> +select hex(column_create("1212", 2, "www", 3));
> +select hex(column_create("1212", 2, 3, 3));
> +select hex(column_create("1212", 2, "адын", 1, 3, 3));
to test quoting, add a column with a ` (backtick) in the name
and few other strange names - with a single quote, double quote, backslash,
space, dot, comma, whatever
> +set names default;
...
> === modified file 'sql/sql_yacc.yy'
> --- sql/sql_yacc.yy 2012-06-13 23:28:47 +0000
> +++ sql/sql_yacc.yy 2012-11-12 15:11:23 +0000
> @@ -8799,6 +8801,13 @@ function_call_nonkeyword:
> MYSQL_YYABORT;
> }
> |
> + COLUMN_CHECK_SYM '(' expr ')'
> + {
> + $$= new (YYTHD->mem_root) Item_func_dyncol_check($3);
> + if ($$ == NULL)
> + MYSQL_YYABORT;
> + }
> + |
This should be removed completely. See, for example, how EXP function
is created (hint: in item_create.cc) and do the same
> COLUMN_EXISTS_SYM '(' expr ',' expr ')'
> {
> $$= new (YYTHD->mem_root) Item_func_dyncol_exists($3, $5);
> @@ -8820,6 +8829,13 @@ function_call_nonkeyword:
> MYSQL_YYABORT;
> }
> |
> + COLUMN_JSON_SYM '(' expr ')'
> + {
> + $$= create_func_dyncol_json(YYTHD, $3);
> + if ($$ == NULL)
> + MYSQL_YYABORT;
> + }
> + |
same. please, remove the dedicated rule for this and most of other
COLUMN_xxx functions.
> COLUMN_GET_SYM '(' expr ',' expr AS cast_type ')'
> {
> LEX *lex= Lex;
>
> === modified file 'mysys/ma_dyncol.c'
> --- mysys/ma_dyncol.c 2011-11-22 17:04:38 +0000
> +++ mysys/ma_dyncol.c 2012-11-12 15:11:23 +0000
> @@ -37,19 +41,42 @@
> */
> /* mask to get above bits */
> #define DYNCOL_FLG_OFFSET 3
> +#define DYNCOL_FLG_NAMES 4
> /* All known flags mask */
> -#define DYNCOL_FLG_KNOWN 3
> +#define DYNCOL_FLG_KNOWN 7
> +
> +/* formats */
> +#define DYNCOL_FMT_NUM 0
> +#define DYNCOL_FMT_STR 1
why not an enum?
>
> /* dynamic column size reserve */
> #define DYNCOL_SYZERESERVE 80
>
> +#define DYNCOL_OFFSET_ERROR 0xffffffff
> +
> /* length of fixed string header 1 byte - flags, 2 bytes - columns counter */
> #define FIXED_HEADER_SIZE 3
> +/*
> + length of fixed string header with names
> + 1 byte - flags, 2 bytes - columns counter, 2 bytes - name pool size
> +*/
> +#define FIXED_HEADER_SIZE_NM 5
>
> #define COLUMN_NUMBER_SIZE 2
> +/* 1 byte name length + 2 bytes offset from the name pool */
> +#define COLUMN_NAMEPTR_SIZE 3
>
> #define MAX_OFFSET_LENGTH 5
>
> +#define DYNCOL_NUM_CHAR 6
> +
> +my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str)
> +{
> + if (str->length < 1)
> + return FALSE;
> + return test(str->str[0] & DYNCOL_FLG_NAMES);
> +}
> +
> static enum enum_dyncol_func_result
> dynamic_column_time_store(DYNAMIC_COLUMN *str,
> MYSQL_TIME *value);
> @@ -62,6 +89,311 @@ dynamic_column_time_read_internal(DYNAMI
> static enum enum_dyncol_func_result
> dynamic_column_date_read_internal(DYNAMIC_COLUMN_VALUE *store_it_here,
> uchar *data, size_t length);
> +static enum enum_dyncol_func_result
> +dynamic_column_get_internal(DYNAMIC_COLUMN *str,
> + DYNAMIC_COLUMN_VALUE *store_it_here,
> + uint num_key, LEX_STRING *str_key);
> +static enum enum_dyncol_func_result
> +dynamic_column_exists_internal(DYNAMIC_COLUMN *str, uint num_key,
> + LEX_STRING *str_key);
> +enum enum_dyncol_func_result
> +dynamic_column_update_many_fmt(DYNAMIC_COLUMN *str,
> + uint add_column_count,
> + void *column_keys,
> + DYNAMIC_COLUMN_VALUE *values,
> + my_bool string_keys);
> +static int plan_sort_num(const void *a, const void *b);
> +static int plan_sort_str(const void *a, const void *b);
> +
> +/*
> + Structure to hold information about dynamic columns record and
> + iterate through it.
> +*/
> +
> +struct st_dyn_header
> +{
> + uchar *header, *nmpool, *dtpool, *data_end;
> + size_t offset_size;
> + size_t entry_size;
> + size_t header_size;
> + size_t nmpool_size;
> + size_t data_size;
> + /* DYNCOL_FMT_NUM - numeric columns, DYNCOL_FMT_STR - column names */
> + uint format;
> + uint column_count;
> +
> + uchar *entry, *data, *name;
> + size_t offset;
> + uint length;
> + enum enum_dynamic_column_type type;
> +};
> +
> +typedef struct st_dyn_header DYN_HEADER;
> +
> +static inline my_bool read_fixed_header(DYN_HEADER *hdr,
> + DYNAMIC_COLUMN *str);
> +static void set_fixed_header(DYNAMIC_COLUMN *str,
> + uint offset_size,
> + uint column_count);
> +static my_bool type_and_offset_store(uchar *place, size_t offset_size,
> + DYNAMIC_COLUMN_TYPE type,
> + size_t offset);
> +
> +/*
> + Calculate entry size (E) and header size (H) by offset size (O) and column
> + count (C) and fixed part of entry size (F).
> +*/
> +
> +#define calc_param(E,H,F,O,C) do { \
> + (*(E))= (O) + F; \
> + (*(H))= (*(E)) * (C); \
> +}while(0);
> +
> +
> +/**
> + Name pool size functions, for numeric format it is 0
> +*/
> +
> +size_t name_size_num(void *keys __attribute__((unused)),
> + uint i __attribute__((unused)))
> +{
> + return 0;
> +}
> +
> +
> +/**
> + Name pool size functions.
> +*/
> +size_t name_size_str(void *keys, uint i)
> +{
> + return ((LEX_STRING *) keys)[i].length;
> +}
> +
> +
> +/**
> + Comparator function for references on column numbers for qsort
> + (numeric format)
> +*/
> +
> +static int column_sort_num(const void *a, const void *b)
> +{
> + return **((uint **)a) - **((uint **)b);
> +}
> +
> +
> +/**
> + Comparator function for references on column numbers for qsort
> + (names format)
> +*/
> +
> +static int column_sort_str(const void *a, const void *b)
> +{
> + LEX_STRING *s1= *((LEX_STRING **)a);
> + LEX_STRING *s2= *((LEX_STRING **)b);
> + int rc= s1->length - s2->length;
> + if (rc == 0)
> + rc= memcmp((void *)s1->str, (void *)s2->str, (size_t) s1->length);
> + return rc;
> +}
> +
> +
> +/**
> + Check limit function (numeric format)
> +*/
> +
> +static my_bool check_limit_num(const void *val)
> +{
> + return **((uint **)val) > UINT_MAX16;
> +}
> +
> +
> +/**
> + Check limit function (names format)
> +*/
> +
> +static my_bool check_limit_str(const void *val)
> +{
> + return (*((LEX_STRING **)val))->length > MAX_NAME_LENGTH;
> +}
> +
> +
> +/**
> + Write numeric format static header part.
> +*/
> +
> +void set_fixed_header_num(DYNAMIC_COLUMN *str, DYN_HEADER *hdr)
> +{
> + set_fixed_header(str, hdr->offset_size, hdr->column_count);
> + hdr->header= (uchar *)str->str + FIXED_HEADER_SIZE;
> + hdr->nmpool= hdr->dtpool= hdr->header + hdr->header_size;
> +}
> +
> +
> +/**
> + Write names format static header part.
> +*/
> +
> +void set_fixed_header_str(DYNAMIC_COLUMN *str, DYN_HEADER *hdr)
> +{
> + set_fixed_header(str, hdr->offset_size, hdr->column_count);
There's no need to write the column count there, you save two bytes per record.
a column count = (offset of the first name - size of the fized header) / (size of the fixed per column header)
> + str->str[0]|= DYNCOL_FLG_NAMES;
> + int2store(str->str + 3, hdr->nmpool_size);
> + hdr->header= (uchar *)str->str + FIXED_HEADER_SIZE_NM;
> + hdr->nmpool= hdr->header + hdr->header_size;
> + hdr->dtpool= hdr->nmpool + hdr->nmpool_size;
> +}
> +
> +
> +/**
> + Write numeric format header entry
> + 2 bytes - column number
> + 1-4 bytes - data offset combined with type
> +
> + @param hdr descriptor of dynamic column record
> + @param column_key pointer to uint (column number)
> + @param value value which will be written (only type used)
> + @param offset offset of the data
> +*/
> +
> +my_bool put_header_entry_num(DYN_HEADER *hdr,
this is and many other functions should be static
> + void *column_key,
> + DYNAMIC_COLUMN_VALUE *value,
> + size_t offset)
> +{
> + uint *column_number= (uint *)column_key;
> + int2store(hdr->entry, *column_number);
> + DBUG_ASSERT(hdr->nmpool_size == 0);
> + if (type_and_offset_store(hdr->entry, hdr->offset_size,
> + value->type,
> + offset))
> + return TRUE;
> + hdr->entry= hdr->entry + hdr->entry_size;
> + return FALSE;
> +}
> +
> +
> +/**
> + Write names format header entry
> + 1 byte - name length
> + 2 bytes - name offset in the name pool
as discussed on the irc, please store either lengths or offsets, but not both
> + 1-4 bytes - data offset combined with type
> +
> + @param hdr descriptor of dynamic column record
> + @param column_key pointer to LEX_STRING (column name)
> + @param value value which will be written (only type used)
> + @param offset offset of the data
> +*/
> +
> +my_bool put_header_entry_str(DYN_HEADER *hdr,
> + void *column_key,
> + DYNAMIC_COLUMN_VALUE *value,
> + size_t offset)
> +{
> + LEX_STRING *column_name= (LEX_STRING *)column_key;
> + DBUG_ASSERT(column_name->length <= MAX_NAME_LENGTH);
> + hdr->entry[0]= column_name->length;
> + DBUG_ASSERT(hdr->name - hdr->nmpool < (long) 0x10000L);
> + int2store(hdr->entry + 1, hdr->name - hdr->nmpool);
> + memcpy(hdr->name, column_name->str, column_name->length);
> + DBUG_ASSERT(hdr->nmpool_size != 0 || column_name->length == 0);
> + if (type_and_offset_store(hdr->entry + 1, hdr->offset_size,
> + value->type,
> + offset))
> + return TRUE;
> + hdr->entry+= hdr->entry_size;
> + hdr->name+= column_name->length;
> + return FALSE;
> +}
> +
> +
> +/**
> + Format descriptor, contain constants and function references for
> + format processing
> +*/
> +
> +struct st_service_funcs
> +{
> + /* size of fixed header */
> + uint fixed_hdr;
> + /* size of fixed part of header entry */
> + uint fixed_hdr_entry;
> +
> + /*size of array element which stores keys */
> + uint key_size_in_array;
> +
> + size_t (*name_size)
> + (void *, uint);
> + int (*column_sort)
> + (const void *a, const void *b);
> + my_bool (*check_limit)
> + (const void *val);
> + void (*set_fixed_hdr)
> + (DYNAMIC_COLUMN *str, DYN_HEADER *hdr);
> + my_bool (*put_header_entry)(DYN_HEADER *hdr,
> + void *column_key,
> + DYNAMIC_COLUMN_VALUE *value,
> + size_t offset);
> + int (*plan_sort)(const void *a, const void *b);
> +};
> +
> +
> +/**
> + Actual our 2 format descriptors
> +*/
> +
> +static struct st_service_funcs fmt_data[2]=
> +{
> + {
> + FIXED_HEADER_SIZE,
> + COLUMN_NUMBER_SIZE,
> + sizeof(uint),
> + &name_size_num,
> + &column_sort_num,
> + &check_limit_num,
> + &set_fixed_header_num,
> + &put_header_entry_num,
> + &plan_sort_num
> + },
> + {
> + FIXED_HEADER_SIZE_NM,
> + COLUMN_NAMEPTR_SIZE,
> + sizeof(LEX_STRING),
> + &name_size_str,
> + &column_sort_str,
> + &check_limit_str,
> + &set_fixed_header_str,
> + &put_header_entry_str,
> + &plan_sort_str
> + }
> +};
> +
> +
> +/**
> + Read dynamic column record header and fill the descriptor
> +
> + @param hdr dynamic columns record descriptor to fill
> + @param str dynamic columns record
> +
> + @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +init_read_hdr(DYN_HEADER *hdr, DYNAMIC_COLUMN *str)
> +{
> + if (read_fixed_header(hdr, str))
> + return ER_DYNCOL_FORMAT;
> + hdr->header= (uchar*)str->str + fmt_data[hdr->format].fixed_hdr;
> + calc_param(&hdr->entry_size, &hdr->header_size,
> + fmt_data[hdr->format].fixed_hdr_entry, hdr->offset_size,
> + hdr->column_count);
> + hdr->nmpool= hdr->header + hdr->header_size;
> + hdr->dtpool= hdr->nmpool + hdr->nmpool_size;
> + hdr->data_size= str->length - fmt_data[hdr->format].fixed_hdr -
> + hdr->header_size - hdr->nmpool_size;
> + hdr->data_end= (uchar*)str->str + str->length;
> + return ER_DYNCOL_OK;
> +}
> +
>
> /**
> Initialize dynamic column string with (make it empty but correct format)
> @@ -1640,83 +2132,763 @@ dynamic_column_list(DYNAMIC_COLUMN *str,
>
>
> /**
> + List not-null columns in the packed string (any format)
> +
> + @param str The packed string
> + @param array_of_lexstr Where to put reference on created array
> +
> + @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr)
> +{
> + DYN_HEADER header;
> + uchar *read;
> + struct st_service_funcs *fmt;
> + uint i;
> + enum enum_dyncol_func_result rc;
> +
> + bzero(array_of_lexstr, sizeof(*array_of_lexstr)); /* In case of errors */
> + if (str->length == 0)
> + return ER_DYNCOL_OK; /* no columns */
> +
> + if ((rc= init_read_hdr(&header, str)) < 0)
> + return rc;
> +
> + fmt= fmt_data + header.format;
> +
> + if (header.entry_size * header.column_count + fmt->fixed_hdr >
> + str->length)
> + return ER_DYNCOL_FORMAT;
> +
> + if (init_dynamic_array(array_of_lexstr, sizeof(LEX_STRING),
> + header.column_count, 0))
> + return ER_DYNCOL_RESOURCE;
> +
> + for (i= 0, read= header.header;
> + i < header.column_count;
> + i++, read+= header.entry_size)
> + {
> + LEX_STRING tmp;
> + if (header.format == DYNCOL_FMT_NUM)
> + {
> + uint nm= uint2korr(read);
> + tmp.str= my_malloc(DYNCOL_NUM_CHAR, MYF(0));
ouch. do you really need to do header.column_count small mallocs here?
> + if (!tmp.str)
> + return ER_DYNCOL_RESOURCE;
> + tmp.length= snprintf(tmp.str, DYNCOL_NUM_CHAR, "%u", nm);
longlong2str or longlong10_to_str
> + }
> + else
> + {
> + tmp.length= read[0];
> + tmp.str= my_malloc(tmp.length + 1, MYF(0));
ouch. do you really need to do header.column_count small mallocs here?
> + if(!tmp.str)
> + return ER_DYNCOL_RESOURCE;
> + memcpy(tmp.str, (const void *)header.nmpool + uint2korr(read + 1),
> + tmp.length);
> + tmp.str[tmp.length]= '\0'; // just for safety
> + }
> + /* Insert can't never fail as it's pre-allocated above */
> + (void) insert_dynamic(array_of_lexstr, (uchar *)&tmp);
> + }
> + return ER_DYNCOL_OK;
> +}
> +
> +/**
> Find the place of the column in the header or place where it should be put
>
> - @param num Number of the column
> - @param header Pointer to the header
> - @param entry_size Size of a header entry
> - @param column_count Number of columns in the packed string
> - @param entry Return pointer to the entry or next entry
> + @param hdr descriptor of dynamic column record
> + @param key Name or number of column to fetch
> + (depends on string_key)
> + @param string_key True if we gave pointer to LEX_STRING.
>
> @retval TRUE found
> @retval FALSE pointer set to the next row
> */
>
> static my_bool
> -find_place(uint num, uchar *header, size_t entry_size,
> - uint column_count, uchar **entry)
> +find_place(DYN_HEADER *hdr, void *key, my_bool string_keys)
> {
> uint mid, start, end, val;
> int flag;
> + LEX_STRING str;
> + char buff[DYNCOL_NUM_CHAR];
> + my_bool need_conversion= ((string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM) !=
> + hdr->format);
> LINT_INIT(flag); /* 100 % safe */
UNINIT_VAR is preferable, when possible (but it doesn't work with structures)
> + /* new format can't be numeric if the old one is names */
> + DBUG_ASSERT(string_keys ||
> + hdr->format == DYNCOL_FMT_NUM);
>
> start= 0;
> - end= column_count -1;
> + end= hdr->column_count -1;
> mid= 1;
> while (start != end)
> {
> - uint val;
> - mid= (start + end) / 2;
> - val= uint2korr(header + mid * entry_size);
> - if ((flag= CMP_NUM(num, val)) <= 0)
> - end= mid;
> - else
> - start= mid + 1;
> + uint val;
> + mid= (start + end) / 2;
> + hdr->entry= hdr->header + mid * hdr->entry_size;
> + if (!string_keys)
> + {
> + val= uint2korr(hdr->entry);
> + flag= CMP_NUM(*((uint *)key), val);
> + }
> + else
> + {
> + if (need_conversion)
> + {
> + str.str= backwritenum(buff + sizeof(buff), uint2korr(hdr->entry));
> + str.length= (buff + sizeof(buff)) - str.str;
> + }
> + else
> + {
> + DBUG_ASSERT(hdr->format == DYNCOL_FMT_STR);
> + str.length= hdr->entry[0];
> + str.str= (char *)hdr->nmpool + uint2korr(hdr->entry + 1);
> + }
> + flag= ((LEX_STRING *) key)->length - str.length;
> + if (flag == 0)
> + flag= memcmp(((LEX_STRING *) key)->str, str.str, str.length);
> + }
> + if (flag <= 0)
> + end= mid;
> + else
> + start= mid + 1;
> }
> + hdr->entry= hdr->header + start * hdr->entry_size;
> if (start != mid)
> {
> - val= uint2korr(header + start * entry_size);
> - flag= CMP_NUM(num, val);
> + if (!string_keys)
> + {
> + val= uint2korr(hdr->entry);
> + flag= CMP_NUM(*((uint *)key), val);
> + }
> + else
> + {
> + if (need_conversion)
> + {
> + str.str= backwritenum(buff + sizeof(buff), uint2korr(hdr->entry));
> + str.length= (buff + sizeof(buff)) - str.str;
> + }
> + else
> + {
> + DBUG_ASSERT(hdr->format == DYNCOL_FMT_STR);
> + str.length= hdr->entry[0];
> + str.str= (char*) hdr->nmpool + uint2korr(hdr->entry + 1);
> + }
> + flag= ((LEX_STRING *) key)->length - str.length;
> + if (flag == 0)
> + flag= memcmp(((LEX_STRING *) key)->str, str.str, str.length);
> + }
> }
> - *entry= header + start * entry_size;
> if (flag > 0)
> - *entry+= entry_size; /* Point at next bigger key */
> + hdr->entry+= hdr->entry_size; /* Point at next bigger key */
> return flag == 0;
> }
>
>
> /*
> - Description of plan of adding/removing/updating a packed string
> + It is internal structure which describes plan of chenging the record
s/chenging/changing/
what do you mean "a plan" ?
> + of dynamic columns
> */
>
> typedef enum {PLAN_REPLACE, PLAN_ADD, PLAN_DELETE, PLAN_NOP} PLAN_ACT;
>
> struct st_plan {
> DYNAMIC_COLUMN_VALUE *val;
> - uint *num;
> + void *key;
> uchar *place;
> size_t length;
> - int hdelta, ddelta;
> + int hdelta, ddelta, ndelta;
> + uint mv_offset, mv_length, mv_end;
> PLAN_ACT act;
> };
> typedef struct st_plan PLAN;
>
>
> -static int plan_sort(const void *a, const void *b)
> +/**
> + Sort function for plan by column number
> +*/
> +
> +static int plan_sort_num(const void *a, const void *b)
> {
> - return ((PLAN *)a)->num[0] - ((PLAN *)b)->num[0];
> + return *((uint *)((PLAN *)a)->key) - *((uint *)((PLAN *)b)->key);
> +}
> +
> +
> +/**
> + Sort function for plan by column name
> +*/
> +
> +static int plan_sort_str(const void *a, const void *b)
> +{
> + int res= (((LEX_STRING *)((PLAN *)a)->key)->length -
> + ((LEX_STRING *)((PLAN *)b)->key)->length);
> + if (res == 0)
> + res= memcmp(((LEX_STRING *)((PLAN *)a)->key)->str,
> + ((LEX_STRING *)((PLAN *)b)->key)->str,
> + ((LEX_STRING *)((PLAN *)a)->key)->length);
> + return res;
> +}
> +
> +#define DELTA_CHECK(S, D, C) \
> + if ((S) == 0) \
> + (S)= (D); \
> + else if (((S) > 0 && (D) < 0) || \
> + ((S) < 0 && (D) > 0)) \
> + { \
> + (C)= TRUE; \
> + }
> +
> +/**
> + Update dynamic column by copying in a new record (string).
> +
> + @param str Dynamic column record to change
> + @param plan Plan of changing the record
> + @param add_column_count number of records in the plan array.
> + @param hdr descriptor of old dynamic column record
> + @param new_hdr descriptor of new dynamic column record
> + @param convert need conversion from numeric to names format
> +
> + @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +dynamic_column_update_copy(DYNAMIC_COLUMN *str, PLAN *plan,
> + uint add_column_count,
> + DYN_HEADER *hdr, DYN_HEADER *new_hdr,
> + my_bool convert)
> +{
> + DYNAMIC_COLUMN tmp;
> + struct st_service_funcs *fmt= fmt_data + hdr->format,
> + *new_fmt= fmt_data + new_hdr->format;
> + uint i, j, k;
> + size_t all_headers_size;
> +
> + if (dynamic_column_init_str(&tmp,
> + (new_fmt->fixed_hdr + new_hdr->header_size +
> + new_hdr->nmpool_size +
> + new_hdr->data_size + DYNCOL_SYZERESERVE)))
> + {
> + return ER_DYNCOL_RESOURCE;
> + }
> + bzero(tmp.str, new_fmt->fixed_hdr);
> + (*new_fmt->set_fixed_hdr)(&tmp, new_hdr);
> + /* Adjust tmp to contain whole the future header */
> + tmp.length= new_fmt->fixed_hdr + new_hdr->header_size + new_hdr->nmpool_size;
> +
> +
> + /*
> + Copy data to the new string
> + i= index in array of changes
> + j= index in packed string header index
> + */
> + new_hdr->entry= new_hdr->header;
> + new_hdr->name= new_hdr->nmpool;
> + all_headers_size= new_fmt->fixed_hdr +
> + new_hdr->header_size + new_hdr->nmpool_size;
> + for (i= 0, j= 0; i < add_column_count || j < hdr->column_count; i++)
> + {
> + size_t first_offset;
> + uint start= j, end;
> + LINT_INIT(first_offset);
> +
> + /*
> + Search in i and j for the next column to add from i and where to
> + add.
> + */
> +
> + while (i < add_column_count && plan[i].act == PLAN_NOP)
> + i++; /* skip NOP */
> +
> + if (i == add_column_count)
> + j= end= hdr->column_count;
> + else
> + {
> + /*
> + old data portion. We don't need to check that j < column_count
> + as plan[i].place is guaranteed to have a pointer inside the
> + data.
> + */
> + while (hdr->header + j * hdr->entry_size < plan[i].place)
> + j++;
> + end= j;
> + if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> + j++; /* data at 'j' will be removed */
> + }
> +
> + /*
> + Adjust all headers since last loop.
> + We have to do this as the offset for data has moved
> + */
> + for (k= start; k < end; k++)
> + {
> + uchar *read= hdr->header + k * hdr->entry_size;
> + void *key;
> + LEX_STRING name;
> + size_t offs;
> + uint nm;
> + DYNAMIC_COLUMN_TYPE tp;
> + char buff[DYNCOL_NUM_CHAR];
> +
> + if (hdr->format == DYNCOL_FMT_NUM)
> + {
> + if (convert)
> + {
> + name.str= backwritenum(buff + sizeof(buff), uint2korr(read));
> + name.length= (buff + sizeof(buff)) - name.str;
> + key= &name;
> + }
> + else
> + {
> + nm= uint2korr(read); /* Column nummber */
> + key= &nm;
> + }
> + }
> + else
> + {
> + name.length= read[0];
> + name.str= (char *) hdr->nmpool + uint2korr(read + 1);
> + key= &name;
> + }
> + if (type_and_offset_read(&tp, &offs,
> + read + fmt->fixed_hdr_entry, hdr->offset_size))
> + goto err;
> + if (k == start)
> + first_offset= offs;
> + else if (offs < first_offset)
> + goto err;
> +
> + offs+= plan[i].ddelta;
> + {
> + DYNAMIC_COLUMN_VALUE val;
> + val.type= tp; // only the type used in the header
> + if ((*new_fmt->put_header_entry)(new_hdr, key, &val, offs))
> + goto err;
> + }
> + }
> +
> + /* copy first the data that was not replaced in original packed data */
> + if (start < end)
> + {
> + size_t data_size;
> + /* Add old data last in 'tmp' */
> + hdr->entry= hdr->header + start * hdr->entry_size;
> + data_size=
> + hdr_interval_length(hdr, hdr->header + end * hdr->entry_size);
> + if (data_size == DYNCOL_OFFSET_ERROR ||
> + (long) data_size < 0 ||
> + data_size > hdr->data_size - first_offset)
> + goto err;
> +
> + memcpy(tmp.str + tmp.length, (char *)hdr->dtpool + first_offset,
> + data_size);
> + tmp.length+= data_size;
> + }
> +
> + /* new data adding */
> + if (i < add_column_count)
> + {
> + if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> + {
> + if ((*new_fmt->put_header_entry)(new_hdr, plan[i].key,
> + plan[i].val,
> + tmp.length - all_headers_size))
> + goto err;
> + data_store(&tmp, plan[i].val); /* Append new data */
> + }
> + }
> + }
> + dynamic_column_column_free(str);
> + *str= tmp;
> + return ER_DYNCOL_OK;
> +err:
> + dynamic_column_column_free(&tmp);
> + return ER_DYNCOL_FORMAT;
> +}
> +
> +enum enum_dyncol_func_result
> +dynamic_column_update_move_left(DYNAMIC_COLUMN *str, PLAN *plan,
> + size_t offset_size,
> + size_t entry_size,
> + size_t header_size,
> + size_t new_offset_size,
> + size_t new_entry_size,
> + size_t new_header_size,
> + uint column_count,
> + uint new_column_count,
> + uint add_column_count,
> + uchar *header_end,
> + size_t max_offset)
> +{
> + uchar *write;
> + uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE;
> + uint i, j, k;
> + size_t curr_offset;
> +
> + write= (uchar *)str->str + FIXED_HEADER_SIZE;
> + set_fixed_header(str, new_offset_size, new_column_count);
> +
> + /*
> + Move headers first.
> + i= index in array of changes
> + j= index in packed string header index
> + */
> + for (curr_offset= 0, i= 0, j= 0;
> + i < add_column_count || j < column_count;
> + i++)
> + {
> + size_t first_offset;
> + uint start= j, end;
> + LINT_INIT(first_offset);
> +
> + /*
> + Search in i and j for the next column to add from i and where to
> + add.
> + */
> +
> + while (i < add_column_count && plan[i].act == PLAN_NOP)
> + i++; /* skip NOP */
> +
> + if (i == add_column_count)
> + j= end= column_count;
> + else
> + {
> + /*
> + old data portion. We don't need to check that j < column_count
> + as plan[i].place is guaranteed to have a pointer inside the
> + data.
> + */
> + while (header_base + j * entry_size < plan[i].place)
> + j++;
> + end= j;
> + if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> + j++; /* data at 'j' will be removed */
> + }
> + plan[i].mv_end= end;
> +
> + {
> + DYNAMIC_COLUMN_TYPE tp;
> + if (type_and_offset_read(&tp, &first_offset,
> + header_base + start * entry_size +
> + COLUMN_NUMBER_SIZE, offset_size))
> + return ER_DYNCOL_FORMAT;
> + }
> + /* find data to be moved */
> + if (start < end)
> + {
> + size_t data_size=
> + get_length_interval(header_base + start * entry_size,
> + header_base + end * entry_size,
> + header_end, offset_size, max_offset);
> + if (data_size == DYNCOL_OFFSET_ERROR ||
> + (long) data_size < 0 ||
> + data_size > max_offset - first_offset)
> + {
> + str->length= 0; // just something valid
> + return ER_DYNCOL_FORMAT;
> + }
> + DBUG_ASSERT(curr_offset == first_offset + plan[i].ddelta);
> + plan[i].mv_offset= first_offset;
> + plan[i].mv_length= data_size;
> + curr_offset+= data_size;
> + }
> + else
> + {
> + plan[i].mv_length= 0;
> + plan[i].mv_offset= curr_offset;
> + }
> +
> + if (plan[i].ddelta == 0 && offset_size == new_offset_size &&
> + plan[i].act != PLAN_DELETE)
> + write+= entry_size * (end - start);
> + else
> + {
> + /*
> + Adjust all headers since last loop.
> + We have to do this as the offset for data has moved
> + */
> + for (k= start; k < end; k++)
> + {
> + uchar *read= header_base + k * entry_size;
> + size_t offs;
> + uint nm;
> + DYNAMIC_COLUMN_TYPE tp;
> +
> + nm= uint2korr(read); /* Column nummber */
> + if (type_and_offset_read(&tp, &offs, read + COLUMN_NUMBER_SIZE,
> + offset_size))
> + return ER_DYNCOL_FORMAT;
> +
> + if (k > start && offs < first_offset)
> + {
> + str->length= 0; // just something valid
> + return ER_DYNCOL_FORMAT;
> + }
> +
> + offs+= plan[i].ddelta;
> + int2store(write, nm);
> + /* write rest of data at write + COLUMN_NUMBER_SIZE */
> + type_and_offset_store(write, new_offset_size, tp, offs);
> + write+= new_entry_size;
> + }
> + }
> +
> + /* new data adding */
> + if (i < add_column_count)
> + {
> + if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> + {
> + int2store(write, *((uint *)plan[i].key));
> + type_and_offset_store(write, new_offset_size,
> + plan[i].val[0].type,
> + curr_offset);
> + write+= new_entry_size;
> + curr_offset+= plan[i].length;
> + }
> + }
> + }
> +
> + /*
> + Move data.
> + i= index in array of changes
> + j= index in packed string header index
> + */
> + str->length= (FIXED_HEADER_SIZE + new_header_size);
> + for (i= 0, j= 0;
> + i < add_column_count || j < column_count;
> + i++)
> + {
> + uint start= j, end;
> +
> + /*
> + Search in i and j for the next column to add from i and where to
> + add.
> + */
> +
> + while (i < add_column_count && plan[i].act == PLAN_NOP)
> + i++; /* skip NOP */
> +
> + j= end= plan[i].mv_end;
> + if (i != add_column_count &&
> + (plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> + j++;
> +
> + /* copy first the data that was not replaced in original packed data */
> + if (start < end && plan[i].mv_length)
> + {
> + memmove((header_base + new_header_size +
> + plan[i].mv_offset + plan[i].ddelta),
> + header_base + header_size + plan[i].mv_offset,
> + plan[i].mv_length);
> + }
> + str->length+= plan[i].mv_length;
> +
> + /* new data adding */
> + if (i < add_column_count)
> + {
> + if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> + {
> + data_store(str, plan[i].val); /* Append new data */
> + }
> + }
> + }
> + return ER_DYNCOL_OK;
> +}
> +
> +enum enum_dyncol_func_result
> +dynamic_column_update_move_right(DYNAMIC_COLUMN *str, PLAN *plan,
> + size_t offset_size,
> + size_t entry_size,
> + size_t header_size,
> + size_t new_offset_size,
> + size_t new_entry_size,
> + size_t new_header_size,
> + uint column_count,
> + uint new_column_count,
> + uint add_column_count,
> + uchar *header_end,
> + size_t max_offset)
> +{
> + uchar *write;
> + uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE;
> + uint i, j, k;
> + size_t curr_offset;
> +
> + write= (uchar *)str->str + FIXED_HEADER_SIZE;
> + set_fixed_header(str, new_offset_size, new_column_count);
> +
> + /*
> + Move data first.
> + i= index in array of changes
> + j= index in packed string header index
> + */
> + for (curr_offset= 0, i= 0, j= 0;
> + i < add_column_count || j < column_count;
> + i++)
> + {
> + size_t first_offset;
> + uint start= j, end;
> + LINT_INIT(first_offset);
> +
> + /*
> + Search in i and j for the next column to add from i and where to
> + add.
> + */
> +
> + while (i < add_column_count && plan[i].act == PLAN_NOP)
> + i++; /* skip NOP */
> +
> + if (i == add_column_count)
> + j= end= column_count;
> + else
> + {
> + /*
> + old data portion. We don't need to check that j < column_count
> + as plan[i].place is guaranteed to have a pointer inside the
> + data.
> + */
> + while (header_base + j * entry_size < plan[i].place)
> + j++;
> + end= j;
> + if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> + j++; /* data at 'j' will be removed */
> + }
> + plan[i].mv_end= end;
> +
> + {
> + DYNAMIC_COLUMN_TYPE tp;
> + type_and_offset_read(&tp, &first_offset,
> + header_base + start * entry_size + COLUMN_NUMBER_SIZE, offset_size);
> + }
> + /* find data to be moved */
> + if (start < end)
> + {
> + size_t data_size=
> + get_length_interval(header_base + start * entry_size,
> + header_base + end * entry_size,
> + header_end, offset_size, max_offset);
> + if (data_size == DYNCOL_OFFSET_ERROR ||
> + (long) data_size < 0 ||
> + data_size > max_offset - first_offset)
> + {
> + str->length= 0; // just something valid
> + return ER_DYNCOL_FORMAT;
> + }
> + DBUG_ASSERT(curr_offset == first_offset + plan[i].ddelta);
> + plan[i].mv_offset= first_offset;
> + plan[i].mv_length= data_size;
> + curr_offset+= data_size;
> + }
> + else
> + {
> + plan[i].mv_length= 0;
> + plan[i].mv_offset= curr_offset;
> + }
> +
> + if (plan[i].ddelta == 0 && offset_size == new_offset_size &&
> + plan[i].act != PLAN_DELETE)
> + write+= entry_size * (end - start);
> + else
> + {
> + /*
> + Adjust all headers since last loop.
> + We have to do this as the offset for data has moved
> + */
> + for (k= start; k < end; k++)
> + {
> + uchar *read= header_base + k * entry_size;
> + size_t offs;
> + uint nm;
> + DYNAMIC_COLUMN_TYPE tp;
> +
> + nm= uint2korr(read); /* Column nummber */
> + type_and_offset_read(&tp, &offs, read + COLUMN_NUMBER_SIZE, offset_size);
> + if (k > start && offs < first_offset)
> + {
> + str->length= 0; // just something valid
> + return ER_DYNCOL_FORMAT;
> + }
> +
> + offs+= plan[i].ddelta;
> + int2store(write, nm);
> + /* write rest of data at write + COLUMN_NUMBER_SIZE */
> + if (type_and_offset_store(write, new_offset_size, tp, offs))
> + {
> + str->length= 0; // just something valid
> + return ER_DYNCOL_FORMAT;
> + }
> + write+= new_entry_size;
> + }
> + }
> +
> + /* new data adding */
> + if (i < add_column_count)
> + {
> + if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> + {
> + int2store(write, *((uint *)plan[i].key));
> + if (type_and_offset_store(write, new_offset_size,
> + plan[i].val[0].type,
> + curr_offset))
> + {
> + str->length= 0; // just something valid
> + return ER_DYNCOL_FORMAT;
> + }
> + write+= new_entry_size;
> + curr_offset+= plan[i].length;
> + }
> + }
> + }
> +
> + /*
> + Move headers.
> + i= index in array of changes
> + j= index in packed string header index
> + */
> + str->length= (FIXED_HEADER_SIZE + new_header_size);
> + for (i= 0, j= 0;
> + i < add_column_count || j < column_count;
> + i++)
> + {
> + uint start= j, end;
> +
> + /*
> + Search in i and j for the next column to add from i and where to
> + add.
> + */
> +
> + while (i < add_column_count && plan[i].act == PLAN_NOP)
> + i++; /* skip NOP */
> +
> + j= end= plan[i].mv_end;
> + if (i != add_column_count &&
> + (plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> + j++;
> +
> + /* copy first the data that was not replaced in original packed data */
> + if (start < end && plan[i].mv_length)
> + {
> + memmove((header_base + new_header_size +
> + plan[i].mv_offset + plan[i].ddelta),
> + header_base + header_size + plan[i].mv_offset,
> + plan[i].mv_length);
> + }
> + str->length+= plan[i].mv_length;
> +
> + /* new data adding */
> + if (i < add_column_count)
> + {
> + if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> + {
> + data_store(str, plan[i].val); /* Append new data */
> + }
> + }
> + }
> + return ER_DYNCOL_OK;
> }
>
> -#define DELTA_CHECK(S, D, C) \
> - if ((S) == 0) \
> - (S)= (D); \
> - else if (((S) > 0 && (D) < 0) || \
> - ((S) < 0 && (D) > 0)) \
> - { \
> - (C)= TRUE; \
> - break; \
> - } \
> -
>
> /**
> Update the packed string with the given columns
> @@ -1826,26 +3044,36 @@ dynamic_column_update_many(DYNAMIC_COLUM
>
> /* Set common variables for all plans */
> plan[i].ddelta= data_delta;
> + plan[i].ndelta= name_delta;
> /* get header delta in entries */
> plan[i].hdelta= header_delta;
> plan[i].length= 0; /* Length if NULL */
>
> - if (find_place(plan[i].num[0],
> - (uchar *)str->str + FIXED_HEADER_SIZE,
> - entry_size, column_count, &entry))
> + if (find_place(&header, plan[i].key, string_keys))
> {
> - size_t entry_data_size;
> + size_t entry_data_size, entry_name_size= 0;
>
> /* Data existed; We have to replace or delete it */
>
> - entry_data_size= get_length(entry, header_end,
> - offset_size, max_offset);
> - if ((long) entry_data_size < 0)
> + entry_data_size= hdr_interval_length(&header, header.entry +
> + header.entry_size);
> + if (entry_data_size == DYNCOL_OFFSET_ERROR ||
> + (long) entry_data_size < 0)
> {
> rc= ER_DYNCOL_FORMAT;
> goto end;
> }
>
> + //get_length(header.entry, header.dtpool, header.offset_size,
> + //header.data_size);
did you forget to delete this?
> + if (new_header.format == DYNCOL_FMT_STR)
> + {
> + if (header.format == DYNCOL_FMT_STR)
> + entry_name_size= header.entry[0];
> + else
> + entry_name_size= numlen(uint2korr(header.entry));
> + }
> +
> if (plan[i].val->type == DYN_COL_NULL)
> {
> /* Inserting a NULL means delete the old data */
> @@ -1891,219 +3124,759 @@ dynamic_column_update_many(DYNAMIC_COLUM
> goto end;
> }
> data_delta+= plan[i].length;
> + if (new_header.format == DYNCOL_FMT_STR)
> + name_delta+= ((LEX_STRING *)plan[i].key)->length;
> }
> }
> - plan[i].place= entry;
> + plan[i].place= header.entry;
> }
> plan[add_column_count].hdelta= header_delta;
> plan[add_column_count].ddelta= data_delta;
> - new_column_count= column_count + header_delta;
> + plan[add_column_count].act= PLAN_NOP;
> + plan[add_column_count].place= header.dtpool;
> +
> + new_header.column_count= header.column_count + header_delta;
>
> /*
> Check if it is only "increasing" or only "decreasing" plan for (header
> and data separately).
> */
> - data_size= str->length - header_size - FIXED_HEADER_SIZE;
> - new_data_size= data_size + data_delta;
> - if ((new_offset_size= dynamic_column_offset_bytes(new_data_size)) >=
> + new_header.data_size= header.data_size + data_delta;
> + new_header.nmpool_size= new_header.nmpool_size + name_delta;
> + DBUG_ASSERT(new_header.format != DYNCOL_FMT_NUM ||
> + new_header.nmpool_size == 0);
> + if ((new_header.offset_size=
> + dynamic_column_offset_bytes(new_header.data_size)) >=
> MAX_OFFSET_LENGTH)
> {
> rc= ER_DYNCOL_LIMIT;
> goto end;
> }
>
> -#ifdef NOT_IMPLEMENTED
> - /* if (new_offset_size != offset_size) then we have to rewrite header */
> - header_delta_sign= new_offset_size - offset_size;
> + copy= ((header.format != new_header.format) ||
> + (new_header.format == DYNCOL_FMT_STR));
> + /* if (new_header.offset_size!=offset_size) then we have to rewrite header */
> + header_delta_sign=
> + ((int)new_header.offset_size + new_fmt->fixed_hdr_entry) -
> + ((int)header.offset_size + fmt->fixed_hdr_entry);
> data_delta_sign= 0;
> - for (i= 0; i < add_column_count; i++)
> + // plan[add_column_count] contains last deltas.
> + for (i= 0; i <= add_column_count && !copy; i++)
> {
> /* This is the check for increasing/decreasing */
> DELTA_CHECK(header_delta_sign, plan[i].hdelta, copy);
> DELTA_CHECK(data_delta_sign, plan[i].ddelta, copy);
> }
> -#endif
> - calc_param(&new_entry_size, &new_header_size,
> - new_offset_size, new_column_count);
> + calc_param(&new_header.entry_size, &new_header.header_size,
> + new_fmt->fixed_hdr_entry,
> + new_header.offset_size, new_header.column_count);
>
> /*
> - The following code always make a copy. In future we can do a more
> - optimized version when data is only increasing / decreasing.
> + Need copy because:
> + 1. Header/data parts moved in different directions.
> + 2. There is no enough allocated space in the string.
> + 3. Header and data moved in different directions.
> */
> + if (copy || /*1*/
forgot to delete /*1*/ ?
> + str->max_length < str->length + header_delta + data_delta || /*2*/
> + ((header_delta_sign < 0 && data_delta_sign > 0) ||
> + (header_delta_sign > 0 && data_delta_sign < 0))) /*3*/
> + rc= dynamic_column_update_copy(str, plan, add_column_count,
> + &header, &new_header,
> + convert);
> + else
> + if (header_delta_sign < 0)
> + rc= dynamic_column_update_move_left(str, plan, header.offset_size,
> + header.entry_size,
> + header.header_size,
> + new_header.offset_size,
> + new_header.entry_size,
> + new_header.header_size,
> + header.column_count,
> + new_header.column_count,
> + add_column_count, header.dtpool,
> + header.data_size);
> + else
> + /*
> + rc= dynamic_column_update_move_right(str, plan, offset_size,
> + entry_size, header_size,
> + new_header.offset_size,
> + new_header.entry_size,
> + new_heder.header_size, column_count,
> + new_header.column_count,
> + add_column_count, header_end,
> + header.data_size);
> + */
> + rc= dynamic_column_update_copy(str, plan, add_column_count,
> + &header, &new_header,
> + convert);
> +end:
> + my_free(alloc_plan);
> + return rc;
> +
> +create_new_string:
> + /* There is no columns from before, so let's just add the new ones */
> + rc= ER_DYNCOL_OK;
> + my_free(alloc_plan);
> + if (not_null != 0)
> + rc= dynamic_column_create_many_internal_fmt(str, add_column_count,
> + (uint*)column_keys, values,
> + str->str == NULL,
> + string_keys);
> + goto end;
> +}
> +
> +
> +/**
> + Update the packed string with the given column
> +
> + @param str String where to write the data
> + @param column_number Array of columns number
> + @param values Array of columns values
> +
> + @return ER_DYNCOL_* return code
> +*/
> +
> +
> +int dynamic_column_update(DYNAMIC_COLUMN *str, uint column_nr,
> + DYNAMIC_COLUMN_VALUE *value)
> +{
> + return dynamic_column_update_many(str, 1, &column_nr, value);
> +}
> +
> +
> +enum enum_dyncol_func_result
> +dynamic_column_check(DYNAMIC_COLUMN *str)
> +{
> + struct st_service_funcs *fmt;
> + enum enum_dyncol_func_result rc= ER_DYNCOL_FORMAT;
> + DYN_HEADER header;
> + uint i;
> + size_t data_offset= 0, name_offset= 0;
> + size_t prev_data_offset= 0, prev_name_offset= 0;
> + LEX_STRING name= {0,0}, prev_name= {0,0};
> + uint num= 0, prev_num= 0;
> + void *key, *prev_key;
> + enum enum_dynamic_column_type type= DYN_COL_NULL, prev_type= DYN_COL_NULL;
> +
> + DBUG_ENTER("dynamic_column_check");
>
> - /*if (copy) */
> + if (str->length == 0)
> {
> - DYNAMIC_COLUMN tmp;
> - uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE,
> - *write;
> - if (dynamic_column_init_str(&tmp,
> - (FIXED_HEADER_SIZE + new_header_size +
> - new_data_size + DYNCOL_SYZERESERVE)))
> - {
> - rc= ER_DYNCOL_RESOURCE;
> - goto end;
> - }
> - write= (uchar *)tmp.str + FIXED_HEADER_SIZE;
> - /* Adjust tmp to contain whole the future header */
> - tmp.length= FIXED_HEADER_SIZE + new_header_size;
> - set_fixed_header(&tmp, new_offset_size, new_column_count);
> - data_delta= 0;
> + DBUG_PRINT("info", ("empty string is OK"));
> + DBUG_RETURN(ER_DYNCOL_OK);
> + }
>
> - /*
> - Copy data to the new string
> - i= index in array of changes
> - j= index in packed string header index
> - */
> + bzero(&header, sizeof(header));
>
> - for (i= 0, j= 0; i < add_column_count || j < column_count; i++)
> - {
> - size_t first_offset;
> - uint start= j, end;
> - LINT_INIT(first_offset);
> + /* Check that header is OK */
> + if (read_fixed_header(&header, str))
> + {
> + DBUG_PRINT("info", ("Reading fixed string header failed"));
> + goto end;
> + }
> + fmt= fmt_data + header.format;
> + calc_param(&header.entry_size, &header.header_size,
> + fmt->fixed_hdr_entry, header.offset_size,
> + header.column_count);
> + /* headers are out of string length (no space for data and part of headers) */
> + if (fmt->fixed_hdr + header.header_size + header.nmpool_size > str->length)
> + {
> + DBUG_PRINT("info", ("Fixed header: %u Header size: %u "
> + "Name pool size: %u but Strig length: %u",
> + (uint)fmt->fixed_hdr,
> + (uint)header.header_size,
> + (uint)header.nmpool_size,
> + (uint)str->length));
> + goto end;
> + }
> + header.header= (uchar*)str->str + fmt->fixed_hdr;
> + header.nmpool= header.header + header.header_size;
> + header.dtpool= header.nmpool + header.nmpool_size;
> + header.data_size= str->length - fmt->fixed_hdr -
> + header.header_size - header.nmpool_size;
>
> - /*
> - Search in i and j for the next column to add from i and where to
> - add.
> - */
> + /* read and check headers */
> + if (header.format == DYNCOL_FMT_NUM)
> + {
> + key= #
> + prev_key= &prev_num;
> + }
> + else
> + {
> + key= &name;
> + prev_key= &prev_name;
> + }
> + for (i= 0, header.entry= header.header;
> + i < header.column_count;
> + i++, header.entry+= header.entry_size)
> + {
>
> - while (i < add_column_count && plan[i].act == PLAN_NOP)
> - i++; /* skip NOP */
> - if (i == add_column_count)
> - j= end= column_count;
> - else
> + if (header.format == DYNCOL_FMT_NUM)
> + {
> + num= uint2korr(header.entry);
> + }
> + else
> + {
> + DBUG_ASSERT(header.format == DYNCOL_FMT_STR);
> + name.length= header.entry[0];
> + name_offset= uint2korr(header.entry + 1);
> + name.str= (char *)header.nmpool + name_offset;
> + }
> + if (type_and_offset_read(&type, &data_offset,
> + header.entry + fmt->fixed_hdr_entry,
> + header.offset_size))
> + goto end;
> +
> + DBUG_ASSERT(type != DYN_COL_NULL);
> + if (data_offset > header.data_size)
> + {
> + DBUG_PRINT("info", ("Field order: %u Data offset: %u"
> + " > Data pool size: %u",
> + (uint)i,
> + (uint)name_offset,
> + (uint)header.nmpool_size));
> + goto end;
> + }
> + if (name_offset > header.nmpool_size)
> + {
> + DBUG_PRINT("info", ("Field order: %u Name offset: %u"
> + " > Name pool size: %u",
> + (uint)i,
> + (uint)name_offset,
> + (uint)header.nmpool_size));
> + goto end;
> + }
> + if (prev_type != DYN_COL_NULL)
> + {
> + /* It is not first entry */
> + if (prev_data_offset >= data_offset)
> {
> - /*
> - old data portion. We don't need to check that j < column_count
> - as plan[i].place is guaranteed to have a pointer inside the
> - data.
> - */
> - while (header_base + j * entry_size < plan[i].place)
> - j++;
> - end= j;
> - if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> - j++; /* data at 'j' will be removed */
> + DBUG_PRINT("info", ("Field order: %u Previous data offset: %u"
> + " >= Current data offset: %u",
> + (uint)i,
> + (uint)prev_data_offset,
> + (uint)data_offset));
> + goto end;
> }
> -
> - if (plan[i].ddelta == 0 && offset_size == new_offset_size)
> + if (prev_name_offset > name_offset)
> {
> - uchar *read= header_base + start * entry_size;
> - DYNAMIC_COLUMN_TYPE tp;
> - /*
> - It's safe to copy the header unchanged. This is usually the
> - case for the first header block before any changed data.
> - */
> - if (start < end) /* Avoid memcpy with 0 */
> - {
> - size_t length= entry_size * (end - start);
> - memcpy(write, read, length);
> - write+= length;
> - }
> - /* Read first_offset */
> - type_and_offset_read(&tp, &first_offset, read, offset_size);
> + DBUG_PRINT("info", ("Field order: %u Previous name offset: %u"
> + " > Current name offset: %u",
> + (uint)i,
> + (uint)prev_data_offset,
> + (uint)data_offset));
> + goto end;
> }
> - else
> + if ((*fmt->column_sort)(&prev_key, &key) >= 0)
> {
> - /*
> - Adjust all headers since last loop.
> - We have to do this as the offset for data has moved
> - */
> - for (k= start; k < end; k++)
> - {
> - uchar *read= header_base + k * entry_size;
> - size_t offs;
> - uint nm;
> - DYNAMIC_COLUMN_TYPE tp;
> + DBUG_PRINT("info", ("Field order: %u Previous key >= Current key",
> + (uint)i));
> + goto end;
> + }
> + }
> + prev_num= num;
> + prev_name= name;
> + prev_data_offset= data_offset;
> + prev_name_offset= name_offset;
> + prev_type= type;
> + }
> +
> + /* check data, which we can */
> + for (i= 0, header.entry= header.header;
> + i < header.column_count;
> + i++, header.entry+= header.entry_size)
> + {
> + DYNAMIC_COLUMN_VALUE store;
> + // already checked by previouse pass
> + type_and_offset_read(&header.type, &header.offset,
> + header.entry + fmt->fixed_hdr_entry,
> + header.offset_size);
> + header.length=
> + hdr_interval_length(&header, header.entry + header.entry_size);
> + header.data= header.dtpool + header.offset;
> + switch ((header.type)) {
> + case DYN_COL_INT:
> + rc= dynamic_column_sint_read(&store, header.data, header.length);
> + break;
> + case DYN_COL_UINT:
> + rc= dynamic_column_uint_read(&store, header.data, header.length);
> + break;
> + case DYN_COL_DOUBLE:
> + rc= dynamic_column_double_read(&store, header.data, header.length);
> + break;
> + case DYN_COL_STRING:
> + rc= dynamic_column_string_read(&store, header.data, header.length);
> + break;
> + case DYN_COL_DECIMAL:
> + rc= dynamic_column_decimal_read(&store, header.data, header.length);
> + break;
> + case DYN_COL_DATETIME:
> + rc= dynamic_column_date_time_read(&store, header.data,
> + header.length);
> + break;
> + case DYN_COL_DATE:
> + rc= dynamic_column_date_read(&store, header.data, header.length);
> + break;
> + case DYN_COL_TIME:
> + rc= dynamic_column_time_read(&store, header.data, header.length);
> + break;
> + case DYN_COL_NULL:
> + default:
> + rc= ER_DYNCOL_FORMAT;
> + goto end;
> + }
> + if (rc != ER_DYNCOL_OK)
> + {
> + DBUG_ASSERT(rc < 0);
> + DBUG_PRINT("info", ("Field order: %u Can't read data: %i",
> + (uint)i, (int) rc));
> + goto end;
> + }
> + }
>
> - nm= uint2korr(read); /* Column nummber */
> - type_and_offset_read(&tp, &offs, read, offset_size);
> - if (k == start)
> - first_offset= offs;
> - else if (offs < first_offset)
> + rc= ER_DYNCOL_OK;
> +end:
> + DBUG_RETURN(rc);
> +}
> +
> +
> +enum enum_dyncol_func_result
> +dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
> + CHARSET_INFO *cs, my_bool quote)
> +{
> + char buff[40];
> + int len;
> + switch (val->type) {
> + case DYN_COL_INT:
> + len= snprintf(buff, sizeof(buff), "%lld", val->x.long_value);
> + if (dynstr_append_mem(str, buff, len))
> + return ER_DYNCOL_RESOURCE;
> + break;
> + case DYN_COL_UINT:
> + len= snprintf(buff, sizeof(buff), "%llu", val->x.ulong_value);
> + if (dynstr_append_mem(str, buff, len))
> + return ER_DYNCOL_RESOURCE;
> + break;
> + case DYN_COL_DOUBLE:
> + len= snprintf(buff, sizeof(buff), "%lg", val->x.double_value);
> + if (dynstr_realloc(str, len + (quote ? 2 : 0)))
> + return ER_DYNCOL_RESOURCE;
> + if (quote)
> + str->str[str->length++]= '"';
> + dynstr_append_mem(str, buff, len);
> + if (quote)
> + str->str[str->length++]= '"';
> + break;
> + case DYN_COL_STRING:
> + {
> + char *alloc= NULL;
> + char *from= val->x.string.value.str;
> + uint bufflen;
> + my_bool conv= !my_charset_same(val->x.string.charset, cs);
> + my_bool rc;
> + len= val->x.string.value.length;
> + bufflen= (len * (conv ? cs->mbmaxlen : 1));
> + if (dynstr_realloc(str, bufflen))
> + return ER_DYNCOL_RESOURCE;
> +
> + // guaranty UTF-8 string for value
> + if (!my_charset_same(val->x.string.charset, cs))
> + {
> + uint dummy_errors;
> + if (!quote)
> {
> - dynamic_column_column_free(&tmp);
> - rc= ER_DYNCOL_FORMAT;
> - goto end;
> + /* convert to the destination */
> + str->length+= copy_and_convert_extended(str->str, bufflen,
> + cs,
> + from, len,
> + val->x.string.charset,
> + &dummy_errors);
> + return ER_DYNCOL_OK;
> }
> -
> - offs+= plan[i].ddelta;
> - int2store(write, nm);
> - /* write rest of data at write + COLUMN_NUMBER_SIZE */
> - type_and_offset_store(write, new_offset_size, tp, offs);
> - write+= new_entry_size;
> + if ((alloc= (char *)my_malloc(bufflen, MYF(0))))
> + {
> + len=
> + copy_and_convert_extended(alloc, bufflen, cs,
> + from, len, val->x.string.charset,
> + &dummy_errors);
> + from= alloc;
> + }
> + else
> + return ER_DYNCOL_RESOURCE;
> }
> + if (quote)
> + rc= dynstr_append_quoted(str, from, len);
> + else
> + rc= dynstr_append_mem(str, from, len);
> + if (alloc)
> + my_free(alloc);
> + if (rc)
> + return ER_DYNCOL_RESOURCE;
> + break;
> }
> + case DYN_COL_DECIMAL:
> + len= sizeof(buff);
> + decimal2string(&val->x.decimal.value, buff, &len,
> + 0, val->x.decimal.value.frac,
> + '0');
> + if (dynstr_append_mem(str, buff, len))
> + return ER_DYNCOL_RESOURCE;
> + break;
> + case DYN_COL_DATETIME:
> + case DYN_COL_DATE:
> + case DYN_COL_TIME:
> + len= my_TIME_to_str(&val->x.time_value, buff, AUTO_SEC_PART_DIGITS);
> + if (dynstr_realloc(str, len + (quote ? 2 : 0)))
> + return ER_DYNCOL_RESOURCE;
> + if (quote)
> + str->str[str->length++]= '"';
> + dynstr_append_mem(str, buff, len);
> + if (quote)
> + str->str[str->length++]= '"';
> + break;
> + case DYN_COL_NULL:
> + if (dynstr_append_mem(str, "null", 4))
> + return ER_DYNCOL_RESOURCE;
> + break;
> + default:
> + return(ER_DYNCOL_FORMAT);
> + }
> + return(ER_DYNCOL_OK);
> +}
>
> - /* copy first the data that was not replaced in original packed data */
> - if (start < end)
> +
> +enum enum_dyncol_func_result
> +dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val)
> +{
> + enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
> + *ll= 0;
> + switch (val->type) {
> + case DYN_COL_INT:
> + *ll= val->x.long_value;
> + break;
> + case DYN_COL_UINT:
> + *ll= (longlong)val->x.ulong_value;
> + if (val->x.ulong_value > ULONGLONG_MAX)
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + case DYN_COL_DOUBLE:
> + *ll= (longlong)val->x.double_value;
> + if (((double) *ll) != val->x.double_value)
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + case DYN_COL_STRING:
> {
> - /* Add old data last in 'tmp' */
> - size_t data_size=
> - get_length_interval(header_base + start * entry_size,
> - header_base + end * entry_size,
> - header_end, offset_size, max_offset);
> - if ((long) data_size < 0 ||
> - data_size > max_offset - first_offset)
> + longlong i= 0, sign= 1;
> + char *src= val->x.string.value.str;
> + uint len= val->x.string.value.length;
> +
> + while (len && my_isspace(&my_charset_latin1, *src)) src++,len--;
> +
> + if (len)
> {
> - dynamic_column_column_free(&tmp);
> - rc= ER_DYNCOL_FORMAT;
> - goto end;
> + if (*src == '-')
> + {
> + sign= -1;
> + src++;
> + } else if (*src == '-')
> + src++;
> + while(len && my_isdigit(&my_charset_latin1, *src))
> + {
> + i= i * 10 + (*src - '0');
> + src++;
> + }
> }
> -
> - memcpy(tmp.str + tmp.length, (char *)header_end + first_offset,
> - data_size);
> - tmp.length+= data_size;
> + else
> + rc= ER_DYNCOL_TRUNCATED;
> + if (len)
> + rc= ER_DYNCOL_TRUNCATED;
> + *ll= i * sign;
> + break;
> }
> + case DYN_COL_DECIMAL:
> + if (decimal2longlong(&val->x.decimal.value, ll) != E_DEC_OK)
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + case DYN_COL_DATETIME:
> + *ll= (val->x.time_value.year * 10000000000L +
> + val->x.time_value.month * 100000000L +
> + val->x.time_value.day * 1000000 +
> + val->x.time_value.hour * 10000 +
> + val->x.time_value.minute * 100 +
> + val->x.time_value.second) *
> + (val->x.time_value.neg ? -1 : 1);
> + break;
> + case DYN_COL_DATE:
> + *ll= (val->x.time_value.year * 10000 +
> + val->x.time_value.month * 100 +
> + val->x.time_value.day) *
> + (val->x.time_value.neg ? -1 : 1);
> + break;
> + case DYN_COL_TIME:
> + *ll= (val->x.time_value.hour * 10000 +
> + val->x.time_value.minute * 100 +
> + val->x.time_value.second) *
> + (val->x.time_value.neg ? -1 : 1);
> + break;
> + case DYN_COL_NULL:
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + default:
> + return(ER_DYNCOL_FORMAT);
> + }
> + return(rc);
> +}
> +
>
> - /* new data adding */
> - if (i < add_column_count)
> +enum enum_dyncol_func_result
> +dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val)
> +{
> + enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
> + *dbl= 0;
> + switch (val->type) {
> + case DYN_COL_INT:
> + *dbl= (double)val->x.long_value;
> + if (((longlong) *dbl) != val->x.long_value)
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + case DYN_COL_UINT:
> + *dbl= (double)val->x.ulong_value;
> + if (((ulonglong) *dbl) != val->x.ulong_value)
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + case DYN_COL_DOUBLE:
> + *dbl= val->x.double_value;
> + break;
> + case DYN_COL_STRING:
> {
> - if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> - {
> - int2store(write, plan[i].num[0]);
> - type_and_offset_store(write, new_offset_size,
> - plan[i].val[0].type,
> - tmp.length -
> - (FIXED_HEADER_SIZE + new_header_size));
> - write+= new_entry_size;
> - data_store(&tmp, plan[i].val); /* Append new data */
> - }
> - data_delta= plan[i].ddelta;
> + char *str, *end;
> + if ((str= malloc(val->x.string.value.length + 1)))
> + return ER_DYNCOL_RESOURCE;
> + memcpy(str, val->x.string.value.str, val->x.string.value.length);
> + str[val->x.string.value.length]= '\0';
> + *dbl= strtod(str, &end);
> + if (*end != '\0')
> + rc= ER_DYNCOL_TRUNCATED;
> }
> - }
> - dynamic_column_column_free(str);
> - *str= tmp;
> + case DYN_COL_DECIMAL:
> + if (decimal2double(&val->x.decimal.value, dbl) != E_DEC_OK)
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + case DYN_COL_DATETIME:
> + *dbl= (double)(val->x.time_value.year * 10000000000L +
> + val->x.time_value.month * 100000000L +
> + val->x.time_value.day * 1000000 +
> + val->x.time_value.hour * 10000 +
> + val->x.time_value.minute * 100 +
> + val->x.time_value.second) *
> + (val->x.time_value.neg ? -1 : 1);
> + break;
> + case DYN_COL_DATE:
> + *dbl= (double)(val->x.time_value.year * 10000 +
> + val->x.time_value.month * 100 +
> + val->x.time_value.day) *
> + (val->x.time_value.neg ? -1 : 1);
> + break;
> + case DYN_COL_TIME:
> + *dbl= (double)(val->x.time_value.hour * 10000 +
> + val->x.time_value.minute * 100 +
> + val->x.time_value.second) *
> + (val->x.time_value.neg ? -1 : 1);
> + break;
> + case DYN_COL_NULL:
> + rc= ER_DYNCOL_TRUNCATED;
> + break;
> + default:
> + return(ER_DYNCOL_FORMAT);
> }
> + return(rc);
> +}
>
> - rc= ER_DYNCOL_OK;
>
> -end:
> - my_free(plan);
> - return rc;
> +/**
> + Convert to JSON
>
> -create_new_string:
> - /* There is no columns from before, so let's just add the new ones */
> - rc= ER_DYNCOL_OK;
> - if (not_null != 0)
> - rc= dynamic_column_create_many_internal(str, add_column_count,
> - column_numbers, values,
> - str->str == NULL);
> - goto end;
> + @param str The packed string
> + @param json Where to put json result
> +
> + @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json)
> +{
> + DYN_HEADER header;
> + uint i;
> + enum enum_dyncol_func_result rc;
> +
> + bzero(json, sizeof(DYNAMIC_STRING)); /* In case of errors */
> + if (str->length == 0)
> + return ER_DYNCOL_OK; /* no columns */
> +
> + if ((rc= init_read_hdr(&header, str)) < 0)
> + return rc;
> +
> + if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
> + str->length)
> + return ER_DYNCOL_FORMAT;
> +
> + if (init_dynamic_string(json, NULL, str->length * 2, 100))
> + return ER_DYNCOL_RESOURCE;
> +
> + if (dynstr_append_mem(json, "[", 1))
> + return ER_DYNCOL_RESOURCE;
> + rc= ER_DYNCOL_RESOURCE;
> + for (i= 0, header.entry= header.header;
> + i < header.column_count;
> + i++, header.entry+= header.entry_size)
> + {
> + DYNAMIC_COLUMN_VALUE val;
> + if (i != 0 && dynstr_append_mem(json, ",", 1))
> + goto err;
> + header.length=
> + hdr_interval_length(&header, header.entry + header.entry_size);
> + header.data= header.dtpool + header.offset;
> + /*
> + Check that the found data is withing the ranges. This can happen if
> + we get data with wrong offsets.
> + */
> + if (header.length == DYNCOL_OFFSET_ERROR ||
> + header.length > INT_MAX || header.offset > header.data_size)
> + {
> + rc= ER_DYNCOL_FORMAT;
> + goto err;
> + }
> + if ((rc= dynamic_column_get_value(&header, &val)) < 0 ||
> + dynstr_append_mem(json, "{", 1))
> + goto err;
> + if (header.format == DYNCOL_FMT_NUM)
> + {
> + uint nm= uint2korr(header.entry);
> + if (dynstr_realloc(json, DYNCOL_NUM_CHAR + 3))
> + goto err;
> + json->str[json->length++]= '"';
> + json->length+= (snprintf(json->str + json->length,
> + DYNCOL_NUM_CHAR, "%u", nm));
> + }
> + else
> + {
> + uint len= header.entry[0];
> + if (dynstr_realloc(json, len + 3))
> + goto err;
> + json->str[json->length++]= '"';
> + memcpy(json->str + json->length, (const void *)header.nmpool +
> + uint2korr(header.entry + 1), len);
> + json->length+= len;
> + }
> + json->str[json->length++]= '"';
> + json->str[json->length++]= ':';
> + if ((rc= dynamic_column_val_str(json, &val,
> + &my_charset_utf8_general_ci, TRUE)) < 0 ||
> + dynstr_append_mem(json, "}", 1))
> + goto err;
> + }
> + if (dynstr_append_mem(json, "]", 1))
> + return ER_DYNCOL_RESOURCE;
> + return ER_DYNCOL_OK;
> +
> +err:
> + json->length= 0;
> + return rc;
> }
>
>
> /**
> - Update the packed string with the given column
> + Convert to DYNAMIC_COLUMN_VALUE values and names (LEX_STING) dynamic array
>
> - @param str String where to write the data
> - @param column_number Array of columns number
> - @param values Array of columns values
> + @param str The packed string
> + @param names Where to put names
> + @param vals Where to put values
> + @param free_names pointer to free names buffer if there is it.
>
> @return ER_DYNCOL_* return code
> */
>
> -
> -int dynamic_column_update(DYNAMIC_COLUMN *str, uint column_nr,
> - DYNAMIC_COLUMN_VALUE *value)
> +enum enum_dyncol_func_result
> +dynamic_column_vals(DYNAMIC_COLUMN *str,
> + DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
> + char **free_names)
> {
> - return dynamic_column_update_many(str, 1, &column_nr, value);
> + DYN_HEADER header;
> + char *nm;
> + uint i;
> + enum enum_dyncol_func_result rc;
> +
> + *free_names= 0;
> + bzero(names, sizeof(DYNAMIC_ARRAY)); /* In case of errors */
> + bzero(vals, sizeof(DYNAMIC_ARRAY)); /* In case of errors */
> + if (str->length == 0)
> + return ER_DYNCOL_OK; /* no columns */
> +
> + if ((rc= init_read_hdr(&header, str)) < 0)
> + return rc;
> +
> + if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
> + str->length)
> + return ER_DYNCOL_FORMAT;
> +
> + if (init_dynamic_array(names, sizeof(LEX_STRING),
> + header.column_count, 0) ||
> + init_dynamic_array(vals, sizeof(DYNAMIC_COLUMN_VALUE),
> + header.column_count, 0) ||
> + (header.format == DYNCOL_FMT_NUM &&
> + !(*free_names= (char *)malloc(DYNCOL_NUM_CHAR * header.column_count))))
why do you need a special malloc() for names, you can put them
in the buffer of 'names' dynarray. it'll be a lot more convenient for
the caller (no free_names to care about).
> + {
> + rc= ER_DYNCOL_RESOURCE;
> + goto err;
> + }
> + nm= *free_names;
> +
> + for (i= 0, header.entry= header.header;
> + i < header.column_count;
> + i++, header.entry+= header.entry_size)
> + {
> + DYNAMIC_COLUMN_VALUE val;
> + LEX_STRING name;
> + header.length=
> + hdr_interval_length(&header, header.entry + header.entry_size);
> + header.data= header.dtpool + header.offset;
> + /*
> + Check that the found data is withing the ranges. This can happen if
> + we get data with wrong offsets.
> + */
> + if (header.length == DYNCOL_OFFSET_ERROR ||
> + header.length > INT_MAX || header.offset > header.data_size)
> + {
> + rc= ER_DYNCOL_FORMAT;
> + goto err;
> + }
> + if ((rc= dynamic_column_get_value(&header, &val)) < 0)
> + goto err;
> +
> + if (header.format == DYNCOL_FMT_NUM)
> + {
> + uint num= uint2korr(header.entry);
> + name.str= nm;
> + name.length= snprintf(nm, DYNCOL_NUM_CHAR, "%u", num);
> + nm+= name.length + 1;
> + }
> + else
> + {
> + name.length= header.entry[0];
> + name.str= (char *)header.nmpool + uint2korr(header.entry + 1);
> + }
> + /* following is preallocated and so do not fail */
> + (void) insert_dynamic(names, (uchar *)&name);
> + (void) insert_dynamic(vals, (uchar *)&val);
> + }
> + return ER_DYNCOL_OK;
> +
> +err:
> + delete_dynamic(names);
> + delete_dynamic(vals);
> + if (*free_names)
> + my_free(*free_names);
> + *free_names= 0;
> + return rc;
> }
>
Regards,
Sergei
1
0