[Maria-developers] MDEV-3792: review of the handler part of the cassandra engine
Hi, Sergey, The handler is quite ok. I've only had few mostly cosmetical comments.
=== modified file 'cmake/build_configurations/mysql_release.cmake' --- cmake/build_configurations/mysql_release.cmake 2012-06-06 12:15:29 +0000 +++ cmake/build_configurations/mysql_release.cmake 2012-11-12 15:11:23 +0000 @@ -46,6 +46,8 @@ SET(FEATURE_SET_large 5) SET(FEATURE_SET_xlarge 6) SET(FEATURE_SET_community 7)
+SET(WITH_CASSANDRA_STORAGE_ENGINE ON) +
This probably should be reverted before pushing into main, right?
IF(FEATURE_SET) STRING(TOLOWER ${FEATURE_SET} feature_set) SET(num ${FEATURE_SET_${feature_set}})
=== modified file 'sql/sql_join_cache.cc' --- sql/sql_join_cache.cc 2012-03-24 17:21:22 +0000 +++ sql/sql_join_cache.cc 2012-11-12 15:11:23 +0000 @@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_m { last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0; if (no_association && - (curr_matching_chain= get_matching_chain_by_join_key())) + !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
Is that something that should be pushed into the main branch or it's a temporary hack for cassandra, that should be reverted?
return 1; last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain); return 0;
=== added file 'storage/cassandra/CMakeLists.txt' --- storage/cassandra/CMakeLists.txt 1970-01-01 00:00:00 +0000 +++ storage/cassandra/CMakeLists.txt 2012-11-12 15:11:23 +0000 @@ -0,0 +1,25 @@ + +SET(cassandra_sources + ha_cassandra.cc + ha_cassandra.h + cassandra_se.h + cassandra_se.cc + gen-cpp/Cassandra.cpp + gen-cpp/cassandra_types.h + gen-cpp/cassandra_types.cpp + gen-cpp/cassandra_constants.h + gen-cpp/cassandra_constants.cpp + gen-cpp/Cassandra.h) + +#INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS}) + +#INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift) +INCLUDE_DIRECTORIES(AFTER /home/buildbot/build/thrift-inst/include/thrift/)
this needs to be fixed before pushing into the main tree I suppose, you'll need to detect here if thrift is available and disable the engine otherwise
+ +# +STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) +STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) +#LINK_DIRECTORIES(/home/psergey/cassandra/thrift/lib) + +MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE LINK_LIBRARIES thrift) +# was: STORAGE_ENGINE MANDATORY
=== added file 'storage/cassandra/cassandra_se.cc' --- storage/cassandra/cassandra_se.cc 1970-01-01 00:00:00 +0000 +++ storage/cassandra/cassandra_se.cc 2012-11-12 15:11:23 +0000 @@ -0,0 +1,809 @@ + +// Cassandra includes: +#include <inttypes.h> +#include <netinet/in.h> +#include <sys/time.h> +#include <stdio.h> +#include <stdarg.h> + +#include "Thrift.h" +#include "transport/TSocket.h" +#include "transport/TTransport.h" +#include "transport/TBufferTransports.h" +#include "protocol/TProtocol.h" +#include "protocol/TBinaryProtocol.h" +#include "gen-cpp/Cassandra.h" +// cassandra includes end + +#include "cassandra_se.h" + +struct st_mysql_lex_string +{ + char *str; + size_t length; +}; + +using namespace std; +using namespace apache::thrift; +using namespace apache::thrift::transport; +using namespace apache::thrift::protocol; +using namespace org::apache::cassandra; + + +void Cassandra_se_interface::print_error(const char *format, ...) +{ + va_list ap; + va_start(ap, format); + // it's not a problem if output was truncated + vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
my_vsnprintf please
+ va_end(ap); +} + ... === added file 'storage/cassandra/ha_cassandra.h' --- storage/cassandra/ha_cassandra.h 1970-01-01 00:00:00 +0000 +++ storage/cassandra/ha_cassandra.h 2012-11-12 15:11:23 +0000 @@ -0,0 +1,328 @@ +/* + Copyright (c) 2012, Monty Program Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifdef USE_PRAGMA_INTERFACE +#pragma interface /* gcc class implementation */ +#endif + + +#include "my_global.h" /* ulonglong */ +#include "thr_lock.h" /* THR_LOCK, THR_LOCK_DATA */ +#include "handler.h" /* handler */ +#include "my_base.h" /* ha_rows */ + +#include "cassandra_se.h" + +/** @brief + CASSANDRA_SHARE is a structure that will be shared among all open handlers. + This example implements the minimum of what you will probably need. +*/ +typedef struct st_cassandra_share { + char *table_name; + uint table_name_length,use_count; + mysql_mutex_t mutex; + THR_LOCK lock; +} CASSANDRA_SHARE; + +class ColumnDataConverter; + +struct ha_table_option_struct; + + +struct st_dynamic_column_value; + +typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data, + int cass_data_len, + struct st_dynamic_column_value *value); +typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value, + char **cass_data, + int *cass_data_len, + void *buf, void **freemem); +struct cassandra_type_def +{ + const char *name; + CAS2DYN_CONVERTER cassandra_to_dynamic; + DYN2CAS_CONVERTER dynamic_to_cassandra; +}; + +typedef struct cassandra_type_def CASSANDRA_TYPE_DEF; + +enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE, + CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT, + CT_DECIMAL}; + +typedef enum cassandtra_type_enum CASSANDRA_TYPE; + + + +/** @brief + Class definition for the storage engine +*/ +class ha_cassandra: public handler +{ + friend class Column_name_enumerator_impl; + THR_LOCK_DATA lock; ///< MySQL lock + CASSANDRA_SHARE *share; ///< Shared lock info + + Cassandra_se_interface *se; + + /* description of static part of the table definition */ + ColumnDataConverter **field_converters; + uint n_field_converters; + + CASSANDRA_TYPE_DEF *default_type_def; + /* description of dynamic columns part */ + CASSANDRA_TYPE_DEF *special_type_field_converters; + LEX_STRING *special_type_field_names; + uint n_special_type_fields; + DYNAMIC_ARRAY dynamic_values, dynamic_names; + DYNAMIC_STRING dynamic_rec; + + ColumnDataConverter *rowkey_converter; + + bool setup_field_converters(Field **field, uint n_fields); + void free_field_converters(); + + int read_cassandra_columns(bool unpack_pk); + int check_table_options(struct ha_table_option_struct* options); + + bool doing_insert_batch; + ha_rows insert_rows_batched; + + uint dyncol_field; + bool dyncol_set; + + /* Used to produce 'wrong column %s at row %lu' warnings */ + ha_rows insert_lineno; + void print_conversion_error(const char *field_name, + char *cass_value, int cass_value_len); + int connect_and_check_options(TABLE *table_arg); +public: + ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); + ~ha_cassandra() + { + free_field_converters(); + delete se; + } + + /** @brief + The name that will be used for display purposes. + */ + const char *table_type() const { return "CASSANDRA"; } + + /** @brief + The name of the index type that will be used for display. + Don't implement this method unless you really have indexes. + */ + const char *index_type(uint inx) { return "HASH"; } + + /** @brief + The file extensions. + */ + const char **bas_ext() const; + + /** @brief + This is a list of flags that indicate what functionality the storage engine + implements. The current table flags are documented in handler.h + */ + ulonglong table_flags() const + { + /* + HA_BINLOG_STMT_CAPABLE + We are saying that this engine is just statement capable to have + an engine that can only handle statement-based logging. This is + used in testing. + HA_REC_NOT_IN_SEQ + If we don't set it, filesort crashes, because it assumes rowids are + 1..8 byte numbers + */ + return HA_BINLOG_STMT_CAPABLE | + HA_REC_NOT_IN_SEQ;
HA_NO_TRANSACTIONS is unset. Is this engine transactional? HA_PARTIAL_COLUMN_READ is unset. Do you always work with all columns, ignoring read_set and write_set? HA_TABLE_SCAN_ON_INDEX is unset. Do you store the data MyISAM-style, index and data in separate files? HA_FAST_KEY_READ is unset. is reading keys in order faster in cassandra, than random reads? HA_REQUIRE_PRIMARY_KEY is unset. but a primary key is required. HA_PRIMARY_KEY_IN_READ_INDEX is unset. but as far as I see, primary key columns are always returned (and needed for position()) HA_PRIMARY_KEY_REQUIRED_FOR_POSITION is unset, but a primary key is required for position() HA_NO_AUTO_INCREMENT is unset. is auto-increment supported?
+ + } + + /** @brief + This is a bitmap of flags that indicates how the storage engine + implements indexes. The current index flags are documented in + handler.h. If you do not implement indexes, just return zero here. + + @details + part is the key part to check. First key part is 0. + If all_parts is set, MySQL wants to know the flags for the combined + index, up to and including 'part'. + */ + ulong index_flags(uint inx, uint part, bool all_parts) const + { + return 0; + } + + /** @brief + unireg.cc will call max_supported_record_length(), max_supported_keys(), + max_supported_key_parts(), uint max_supported_key_length() + to make sure that the storage engine can handle the data it is about to + send. Return *real* limits of your storage engine here; MySQL will do + min(your_limits, MySQL_limits) automatically. + */ + uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; } + + /* Support only one Primary Key, for now */ + uint max_supported_keys() const { return 1; } + uint max_supported_key_parts() const { return 1; } + + /** @brief + unireg.cc will call this to make sure that the storage engine can handle + the data it is about to send. Return *real* limits of your storage engine + here; MySQL will do min(your_limits, MySQL_limits) automatically. + + @details + There is no need to implement ..._key_... methods if your engine doesn't + support indexes. + */ + uint max_supported_key_length() const { return 16*1024; /* just to return something*/ } + + int index_init(uint idx, bool sorted); + + int index_read_map(uchar * buf, const uchar * key, + key_part_map keypart_map, + enum ha_rkey_function find_flag); + + /** @brief + Called in test_quick_select to determine if indexes should be used. + */ + virtual double scan_time() { return (double) (stats.records+stats.deleted) / 20.0+10; }
does that make any sense? (I understand that it's copy-paste) stats.deleted, for example, is never set.
+ + /** @brief + This method will never be called if you do not implement indexes. + */ + virtual double read_time(uint, uint, ha_rows rows) + { return (double) rows / 20.0+1; }
same question
+ + virtual void start_bulk_insert(ha_rows rows); + virtual int end_bulk_insert(); + + virtual int reset(); + + + int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param, + uint n_ranges, uint mode, HANDLER_BUFFER *buf); + int multi_range_read_next(range_id_t *range_info); + ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, + void *seq_init_param, + uint n_ranges, uint *bufsz, + uint *flags, COST_VECT *cost); + ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys, + uint key_parts, uint *bufsz, + uint *flags, COST_VECT *cost); + int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size); + +private: + bool source_exhausted; + bool mrr_start_read(); + int check_field_options(Field **fields); + int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names, + String *valcol, char **freenames); + int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals); + void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names, + char *free_names); + CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name, + int cass_name_length); +public: + + /* + Everything below are methods that we implement in ha_example.cc.
may be it'd be better to remove the references to ha_example.cc (and example in general) before pushing this into the main tree?
+ + Most of these methods are not obligatory, skip them and + MySQL will treat them as not implemented + */ + /** @brief + We implement this in ha_example.cc; it's a required method. + */ + int open(const char *name, int mode, uint test_if_locked); // required + + /** @brief + We implement this in ha_example.cc; it's a required method. + */ + int close(void); // required + + /** @brief + We implement this in ha_example.cc. It's not an obligatory method; + skip it and and MySQL will treat it as not implemented. + */ + int write_row(uchar *buf); + + /** @brief + We implement this in ha_example.cc. It's not an obligatory method; + skip it and and MySQL will treat it as not implemented. + */ + int update_row(const uchar *old_data, uchar *new_data); + + /** @brief + We implement this in ha_example.cc. It's not an obligatory method; + skip it and and MySQL will treat it as not implemented. + */ + int delete_row(const uchar *buf); + + /** @brief + We implement this in ha_example.cc. It's not an obligatory method; + skip it and and MySQL will treat it as not implemented. + */ + int index_next(uchar *buf); + + /** @brief + We implement this in ha_example.cc. It's not an obligatory method; + skip it and and MySQL will treat it as not implemented. + */ + int index_prev(uchar *buf); + + /** @brief + We implement this in ha_example.cc. It's not an obligatory method; + skip it and and MySQL will treat it as not implemented. + */ + int index_first(uchar *buf);
why to have unimplemented methods here? I'd rather remove them.
+ + /** @brief + We implement this in ha_example.cc. It's not an obligatory method; + skip it and and MySQL will treat it as not implemented. + */ + int index_last(uchar *buf); + + /** @brief + Unlike index_init(), rnd_init() can be called two consecutive times + without rnd_end() in between (it only makes sense if scan=1). In this + case, the second call should prepare for the new table scan (e.g if + rnd_init() allocates the cursor, the second call should position the + cursor to the start of the table; no need to deallocate and allocate + it again. This is a required method. + */ + int rnd_init(bool scan); //required + int rnd_end(); + int rnd_next(uchar *buf); ///< required + int rnd_pos(uchar *buf, uchar *pos); ///< required + void position(const uchar *record); ///< required + int info(uint); ///< required + int extra(enum ha_extra_function operation); + int external_lock(THD *thd, int lock_type); ///< required + int delete_all_rows(void); + ha_rows records_in_range(uint inx, key_range *min_key, + key_range *max_key); + int delete_table(const char *from); + int create(const char *name, TABLE *form, + HA_CREATE_INFO *create_info); ///< required + bool check_if_incompatible_data(HA_CREATE_INFO *info, + uint table_changes); + + THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, + enum thr_lock_type lock_type); ///< required +};
=== added file 'storage/cassandra/ha_cassandra.cc' --- storage/cassandra/ha_cassandra.cc 1970-01-01 00:00:00 +0000 +++ storage/cassandra/ha_cassandra.cc 2012-11-12 15:11:23 +0000 @@ -0,0 +1,2674 @@ +/* + Copyright (c) 2012, Monty Program Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + +#include <mysql/plugin.h> +#include "ha_cassandra.h" +#include "sql_class.h" + +#define DYNCOL_USUAL 20 +#define DYNCOL_DELTA 100 +#define DYNCOL_USUAL_REC 1024 +#define DYNCOL_DELTA_REC 1024 + +static handler *cassandra_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root); + +extern int dynamic_column_error_message(enum_dyncol_func_result rc); + +handlerton *cassandra_hton; + + +/* + Hash used to track the number of open tables; variable for example share + methods +*/ +static HASH cassandra_open_tables; + +/* The mutex used to init the hash; variable for example share methods */ +mysql_mutex_t cassandra_mutex; + + +/** + Structure for CREATE TABLE options (table options). + It needs to be called ha_table_option_struct. + + The option values can be specified in the CREATE TABLE at the end: + CREATE TABLE ( ... ) *here* +*/ + +struct ha_table_option_struct +{ + const char *thrift_host; + int thrift_port; + const char *keyspace; + const char *column_family; +}; + + +ha_create_table_option cassandra_table_option_list[]= +{ + /* + one option that takes an arbitrary string + */ + HA_TOPTION_STRING("thrift_host", thrift_host), + HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0), + HA_TOPTION_STRING("keyspace", keyspace), + HA_TOPTION_STRING("column_family", column_family), + HA_TOPTION_END +}; + +/** + Structure for CREATE TABLE options (field options). +*/ + +struct ha_field_option_struct +{ + bool dyncol_field; +}; + +ha_create_table_option cassandra_field_option_list[]= +{ + /* + Collect all other columns as dynamic here, + the valid values are YES/NO, ON/OFF, 1/0. + The default is 0, that is true, yes, on.
really? looks like a typo. 0 is false, 1 is true.
+ */ + HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0), + HA_FOPTION_END +}; + +static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG, + "Number of rows in an INSERT batch", + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0); + +static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG, + "Number of rows in a multiget(MRR) batch", + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0); + +static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG, + "Number of rows in an rnd_read (full scan) batch", + NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0); + +static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG, + "Number of times to retry Cassandra calls that failed due to timeouts or " + "network communication problems. The default, 0, means not to retry.", + NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0); + +/* These match values in enum_cassandra_consistency_level */ +const char *cassandra_consistency_level[] = +{ + "ONE", + "QUORUM", + "LOCAL_QUORUM", + "EACH_QUORUM", + "ALL", + "ANY", + "TWO", + "THREE", + NullS +}; + +TYPELIB cassandra_consistency_level_typelib= { + array_elements(cassandra_consistency_level) - 1, "", + cassandra_consistency_level, NULL +}; + + +static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG, + "Cassandra consistency level to use for write operations", NULL, NULL, + ONE, &cassandra_consistency_level_typelib); + +static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG, + "Cassandra consistency level to use for read operations", NULL, NULL, + ONE, &cassandra_consistency_level_typelib); + + +mysql_mutex_t cassandra_default_host_lock; +static char* cassandra_default_thrift_host = NULL; +static char cassandra_default_host_buf[256]=""; + +static void +cassandra_default_thrift_host_update(THD *thd, + struct st_mysql_sys_var* var, + void* var_ptr, /*!< out: where the + formal string goes */ + const void* save) /*!< in: immediate result + from check function */ +{ + const char *new_host= *((char**)save); + const size_t max_len= sizeof(cassandra_default_host_buf); + + mysql_mutex_lock(&cassandra_default_host_lock); + + if (new_host) + { + strncpy(cassandra_default_host_buf, new_host, max_len-1); + cassandra_default_host_buf[max_len-1]= 0; + cassandra_default_thrift_host= cassandra_default_host_buf; + } + else + { + cassandra_default_host_buf[0]= 0; + cassandra_default_thrift_host= NULL; + } + + *((const char**)var_ptr)= cassandra_default_thrift_host; + + mysql_mutex_unlock(&cassandra_default_host_lock); +} + + +static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host, + PLUGIN_VAR_RQCMDARG, + "Default host for Cassandra thrift connections", + /*check*/NULL, + cassandra_default_thrift_host_update, + /*default*/NULL); + +static struct st_mysql_sys_var* cassandra_system_variables[]= { + MYSQL_SYSVAR(insert_batch_size), + MYSQL_SYSVAR(multiget_batch_size), + MYSQL_SYSVAR(rnd_batch_size), + + MYSQL_SYSVAR(default_thrift_host), + MYSQL_SYSVAR(write_consistency), + MYSQL_SYSVAR(read_consistency), + MYSQL_SYSVAR(failure_retries), + NULL +}; + + +static SHOW_VAR cassandra_status_variables[]= { + {"row_inserts", + (char*) &cassandra_counters.row_inserts, SHOW_LONG}, + {"row_insert_batches", + (char*) &cassandra_counters.row_insert_batches, SHOW_LONG}, + + {"multiget_reads", + (char*) &cassandra_counters.multiget_reads, SHOW_LONG}, + {"multiget_keys_scanned", + (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG}, + {"multiget_rows_read", + (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, + + {"timeout_exceptions", + (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG}, + {"unavailable_exceptions", + (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG}, + {NullS, NullS, SHOW_LONG} +}; + + +Cassandra_status_vars cassandra_counters; +Cassandra_status_vars cassandra_counters_copy; + + +/** + @brief + Function we use in the creation of our hash to get key. +*/ + +static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length, + my_bool not_used __attribute__((unused))) +{ + *length=share->table_name_length; + return (uchar*) share->table_name; +} + +#ifdef HAVE_PSI_INTERFACE +static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex; + +static PSI_mutex_info all_cassandra_mutexes[]= +{ + { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL}, + { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0} +}; + +static void init_cassandra_psi_keys() +{ + const char* category= "cassandra"; + int count; + + if (PSI_server == NULL) + return; + + count= array_elements(all_cassandra_mutexes); + PSI_server->register_mutex(category, all_cassandra_mutexes, count); +} +#endif + +static int cassandra_init_func(void *p) +{ + DBUG_ENTER("cassandra_init_func"); + +#ifdef HAVE_PSI_INTERFACE + init_cassandra_psi_keys(); +#endif + + cassandra_hton= (handlerton *)p; + mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST); + (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0, + (my_hash_get_key) cassandra_get_key,0,0); + + cassandra_hton->state= SHOW_OPTION_YES; + cassandra_hton->create= cassandra_create_handler; + /* + Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE + TABLE to create an *empty* table from scratch. Cassandra table won't be + emptied if re-created. + */
HTON_ALTER_NOT_SUPPORTED is not set. ALTER works? HTON_TEMPORARY_NOT_SUPPORTED is not set. CREATE TEMPORARY TABLE works? HTON_NO_PARTITION is not set. Partitioning works?
+ cassandra_hton->flags= 0; + cassandra_hton->table_options= cassandra_table_option_list; + cassandra_hton->field_options= cassandra_field_option_list; + + mysql_mutex_init(0 /* no instrumentation */,
why no instrumentation ?
+ &cassandra_default_host_lock, MY_MUTEX_INIT_FAST); + + DBUG_RETURN(0); +} + + +static int cassandra_done_func(void *p) +{ + int error= 0; + DBUG_ENTER("cassandra_done_func"); + if (cassandra_open_tables.records) + error= 1; + my_hash_free(&cassandra_open_tables); + mysql_mutex_destroy(&cassandra_mutex); + mysql_mutex_destroy(&cassandra_default_host_lock); + DBUG_RETURN(error); +} + + +/** + @brief + Example of simple lock controls. The "share" it creates is a + structure we will pass to each cassandra handler. Do you have to have + one of these? Well, you have pieces that are used for locking, and + they are needed to function. +*/ + +static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table) +{ + CASSANDRA_SHARE *share; + uint length; + char *tmp_name; + + mysql_mutex_lock(&cassandra_mutex); + length=(uint) strlen(table_name); + + if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables, + (uchar*) table_name, + length))) + { + if (!(share=(CASSANDRA_SHARE *) + my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), + &share, sizeof(*share), + &tmp_name, length+1, + NullS))) + { + mysql_mutex_unlock(&cassandra_mutex); + return NULL; + } + + share->use_count=0; + share->table_name_length=length; + share->table_name=tmp_name; + strmov(share->table_name,table_name); + if (my_hash_insert(&cassandra_open_tables, (uchar*) share)) + goto error; + thr_lock_init(&share->lock); + mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex, + &share->mutex, MY_MUTEX_INIT_FAST); + } + share->use_count++; + mysql_mutex_unlock(&cassandra_mutex); + + return share; + +error: + mysql_mutex_destroy(&share->mutex); + my_free(share); + + return NULL; +} + + +/** + @brief + Free lock controls. We call this whenever we close a table. If the table had + the last reference to the share, then we free memory associated with it. +*/ + +static int free_share(CASSANDRA_SHARE *share) +{ + mysql_mutex_lock(&cassandra_mutex); + if (!--share->use_count) + { + my_hash_delete(&cassandra_open_tables, (uchar*) share); + thr_lock_delete(&share->lock); + mysql_mutex_destroy(&share->mutex); + my_free(share); + } + mysql_mutex_unlock(&cassandra_mutex); + + return 0; +} + + +static handler* cassandra_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_cassandra(hton, table); +} + + +ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg) + :handler(hton, table_arg), + se(NULL), field_converters(NULL), + special_type_field_converters(NULL), + special_type_field_names(NULL), n_special_type_fields(0), + rowkey_converter(NULL), + dyncol_field(0), dyncol_set(0) +{} + + +static const char *ha_cassandra_exts[] = { + NullS +}; + +const char **ha_cassandra::bas_ext() const +{ + return ha_cassandra_exts; +} + + +int ha_cassandra::connect_and_check_options(TABLE *table_arg) +{ + ha_table_option_struct *options= table_arg->s->option_struct; + int res; + DBUG_ENTER("ha_cassandra::connect_and_check_options"); + + if ((res= check_field_options(table_arg->s->field)) || + (res= check_table_options(options))) + DBUG_RETURN(res); + + se= create_cassandra_se(); + se->set_column_family(options->column_family); + const char *thrift_host= options->thrift_host? options->thrift_host: + cassandra_default_thrift_host; + if (se->connect(thrift_host, options->thrift_port, options->keyspace)) + { + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str()); + DBUG_RETURN(HA_ERR_NO_CONNECTION); + } + + if (setup_field_converters(table_arg->field, table_arg->s->fields)) + { + DBUG_RETURN(HA_ERR_NO_CONNECTION); + } + + DBUG_RETURN(0); +} + + +int ha_cassandra::check_field_options(Field **fields) +{ + Field **field; + uint i; + DBUG_ENTER("ha_cassandra::check_field_options"); + for (field= fields, i= 0; *field; field++, i++) + { + ha_field_option_struct *field_options= (*field)->option_struct; + if (field_options && field_options->dyncol_field) + { + if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB) + { + my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name); + DBUG_RETURN(HA_WRONG_CREATE_OPTION); + } + dyncol_set= 1; + dyncol_field= i; + bzero(&dynamic_values, sizeof(dynamic_values)); + bzero(&dynamic_names, sizeof(dynamic_names)); + bzero(&dynamic_rec, sizeof(dynamic_rec)); + } + } + DBUG_RETURN(0); +} + + +int ha_cassandra::open(const char *name, int mode, uint test_if_locked) +{ + DBUG_ENTER("ha_cassandra::open"); + + if (!(share = get_share(name, table))) + DBUG_RETURN(1); + thr_lock_data_init(&share->lock,&lock,NULL); + + DBUG_ASSERT(!se); + /* + Don't do the following on open: it prevents SHOW CREATE TABLE when the server + has gone away. + */ + /* + int res; + if ((res= connect_and_check_options(table))) + { + DBUG_RETURN(res); + } + */ + + info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST); + insert_lineno= 0; + + DBUG_RETURN(0); +} + + +int ha_cassandra::close(void) +{ + DBUG_ENTER("ha_cassandra::close"); + delete se; + se= NULL; + free_field_converters(); + DBUG_RETURN(free_share(share)); +} + + +int ha_cassandra::check_table_options(ha_table_option_struct *options) +{ + if (!options->thrift_host && (!cassandra_default_thrift_host || + !cassandra_default_thrift_host[0])) + { + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), + "thrift_host table option must be specified, or " + "@@cassandra_default_thrift_host must be set"); + return HA_WRONG_CREATE_OPTION; + } + + if (!options->keyspace || !options->column_family) + { + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), + "keyspace and column_family table options must be specified"); + return HA_WRONG_CREATE_OPTION; + } + return 0; +} + + +/** + @brief + create() is called to create a table. The variable name will have the name + of the table. + + @details + When create() is called you do not need to worry about + opening the table. Also, the .frm file will have already been + created so adjusting create_info is not necessary. You can overwrite + the .frm file at this point if you wish to change the table + definition, but there are no methods currently provided for doing + so. + + Called from handle.cc by ha_create_table(). + + @see + ha_create_table() in handle.cc +*/ + +int ha_cassandra::create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *create_info) +{ + int res; + DBUG_ENTER("ha_cassandra::create"); + + Field **pfield= table_arg->s->field; + if (!((*pfield)->flags & NOT_NULL_FLAG)) + { + my_error(ER_WRONG_COLUMN_NAME, MYF(0), "First column must be NOT NULL"); + DBUG_RETURN(HA_WRONG_CREATE_OPTION); + }
this is unnecessary. the second check guarantees that the first column is NOT NULL, that is, this if() is redundant
+ + if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 || + table_arg->key_info[0].key_parts != 1 || + table_arg->key_info[0].key_part[0].fieldnr != 1) + { + my_error(ER_WRONG_COLUMN_NAME, MYF(0), + "Table must have PRIMARY KEY defined over the first column"); + DBUG_RETURN(HA_WRONG_CREATE_OPTION); + } ... +CASSANDRA_TYPE get_cassandra_type(const char *validator) +{ + CASSANDRA_TYPE rc; + switch(validator[32]) + { + case 'L': + rc= CT_BIGINT; + break; + case 'I': + rc= (validator[35] == '3' ? CT_INT : CT_VARINT); + rc= CT_INT; + break; + case 'C': + rc= CT_COUNTER; + break; + case 'F': + rc= CT_FLOAT; + break; + case 'D': + switch (validator[33]) + { + case 'o': + rc= CT_DOUBLE; + break; + case 'a': + rc= CT_TIMESTAMP; + break; + case 'e': + rc= CT_DECIMAL; + break; + default: + rc= CT_BLOB; + break; + } + break; + case 'B': + rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB); + break; + case 'A': + rc= CT_ASCII; + break; + case 'U': + rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID); + break; + default: + rc= CT_BLOB; + } + DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0); + return rc; +} + +ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) +{ + ColumnDataConverter *res= NULL; + + switch(field->type()) { + case MYSQL_TYPE_TINY: + if (!strcmp(validator_name, validator_boolean))
why do you strcmp here, while you're only check selected characters in get_cassandra_type() ? you could've called get_cassandra_type() for validator_name instead and compare enum values, instead of strings.
+ { + res= new TinyintDataConverter; + break; + } + /* fall through: */ + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONGLONG: + { + bool is_counter= false; + if (!strcmp(validator_name, validator_bigint) || + !strcmp(validator_name, validator_timestamp) || + (is_counter= !strcmp(validator_name, validator_counter))) + res= new BigintDataConverter(!is_counter); + break; + } + case MYSQL_TYPE_FLOAT: + if (!strcmp(validator_name, validator_float)) + res= new FloatDataConverter; + break; + + case MYSQL_TYPE_DOUBLE: + if (!strcmp(validator_name, validator_double)) + res= new DoubleDataConverter; + break; + + case MYSQL_TYPE_TIMESTAMP: + if (!strcmp(validator_name, validator_timestamp)) + res= new TimestampDataConverter; + break; + + case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings. + if (!strcmp(validator_name, validator_uuid) && + field->real_type() == MYSQL_TYPE_STRING && + field->field_length == 36) + { + // UUID maps to CHAR(36), its text representation + res= new UuidDataConverter; + break; + } + /* fall through: */ + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + { + /* + Cassandra's "varint" type is a binary-encoded arbitary-length + big-endian number. + - It can be mapped to VARBINARY(N), with sufficiently big N. + - If the value does not fit into N bytes, it is an error. We should not + truncate it, because that is just as good as returning garbage. + - varint should not be mapped to BINARY(N), because BINARY(N) values + are zero-padded, which will work as multiplying the value by + 2^k for some value of k. + */ + if (field->type() == MYSQL_TYPE_VARCHAR && + field->binary() && + (!strcmp(validator_name, validator_varint) || + !strcmp(validator_name, validator_decimal))) + { + res= new StringCopyConverter(field->field_length); + break; + } + + if (!strcmp(validator_name, validator_blob) || + !strcmp(validator_name, validator_ascii) || + !strcmp(validator_name, validator_text)) + { + res= new StringCopyConverter((size_t)-1); + } + break; + } + case MYSQL_TYPE_LONG: + if (!strcmp(validator_name, validator_int)) + res= new Int32DataConverter; + break; + + default:; + } + return res; +} + + +bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) +{ + char *col_name; + int col_name_len; + char *col_type; + int col_type_len; + size_t ddl_fields= se->get_ddl_size(); + const char *default_type= se->get_default_validator(); + uint max_non_default_fields; + DBUG_ENTER("ha_cassandra::setup_field_converters"); + DBUG_ASSERT(default_type); + + DBUG_ASSERT(!field_converters); + DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1); + + /* + We always should take into account that in case of using dynamic columns + sql description contain one field which does not described in + Cassandra DDL also key field is described separately. So that + is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2". + */ + max_non_default_fields= ddl_fields + 2 - n_fields; + if (ddl_fields < (n_fields - dyncol_set - 1)) + { + se->print_error("Some of SQL fields were not mapped to Cassandra's fields"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + /* allocate memory in one chunk */ + size_t memsize= sizeof(ColumnDataConverter*) * n_fields + + (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))* + (dyncol_set ? max_non_default_fields : 0); + if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0)))) + DBUG_RETURN(true); + bzero(field_converters, memsize); + n_field_converters= n_fields; + + if (dyncol_set) + { + special_type_field_converters= + (CASSANDRA_TYPE_DEF *)(field_converters + n_fields); + special_type_field_names= + ((LEX_STRING*)(special_type_field_converters + max_non_default_fields)); + } + + if (dyncol_set) + { + if (init_dynamic_array(&dynamic_values, + sizeof(DYNAMIC_COLUMN_VALUE), + DYNCOL_USUAL, DYNCOL_DELTA)) + DBUG_RETURN(true); + else + if (init_dynamic_array(&dynamic_names, + sizeof(LEX_STRING), + DYNCOL_USUAL, DYNCOL_DELTA)) + { + delete_dynamic(&dynamic_values); + DBUG_RETURN(true); + } + else + if (init_dynamic_string(&dynamic_rec, NULL, + DYNCOL_USUAL_REC, DYNCOL_DELTA_REC)) + { + delete_dynamic(&dynamic_values); + delete_dynamic(&dynamic_names); + DBUG_RETURN(true); + } + + /* Dynamic column field has special processing */ + field_converters[dyncol_field]= NULL; + + default_type_def= cassandra_types + get_cassandra_type(default_type); + } + + se->first_ddl_column(); + uint n_mapped= 0; + while (!se->next_ddl_column(&col_name, &col_name_len, &col_type, + &col_type_len)) + { + Field **field; + uint i; + /* Mapping for the 1st field is already known */ + for (field= field_arg + 1, i= 1; *field; field++, i++) + { + if ((!dyncol_set || dyncol_field != i) && + !strcmp((*field)->field_name, col_name)) + { + n_mapped++; + ColumnDataConverter **conv= field_converters + (*field)->field_index; + if (!(*conv= map_field_to_validator(*field, col_type))) + { + se->print_error("Failed to map column %s to datatype %s", + (*field)->field_name, col_type); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + (*conv)->field= *field; + } + } + if (dyncol_set && !(*field)) // is needed and not found + { + DBUG_PRINT("info",("Field not found: %s", col_name)); + if (strcmp(col_type, default_type)) + { + DBUG_PRINT("info",("Field '%s' non-default type: '%s'", + col_name, col_type)); + special_type_field_names[n_special_type_fields].length= col_name_len; + special_type_field_names[n_special_type_fields].str= col_name; + special_type_field_converters[n_special_type_fields]= + cassandra_types[get_cassandra_type(col_type)]; + n_special_type_fields++; + } + } + } + + if (n_mapped != n_fields - 1 - dyncol_set) + { + Field *first_unmapped= NULL; + /* Find the first field */ + for (uint i= 1; i < n_fields;i++) + { + if (!field_converters[i]) + { + first_unmapped= field_arg[i]; + break; + } + } + DBUG_ASSERT(first_unmapped); + + se->print_error("Field `%s` could not be mapped to any field in Cassandra",
generally (here and everywhere) use %`s instead of `%s`
+ first_unmapped->field_name); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + /* + Setup type conversion for row_key. + */ + se->get_rowkey_type(&col_name, &col_type); + if (col_name && strcmp(col_name, (*field_arg)->field_name)) + { + se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", + col_name); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + if (!col_name && strcmp("rowkey", (*field_arg)->field_name)) + { + se->print_error("target column family has no key_alias defined, " + "PRIMARY KEY column must be named 'rowkey'"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + if (col_type != NULL) + { + if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type))) + { + se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + rowkey_converter->field= *field_arg; + } + else + { + se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + DBUG_RETURN(false); +} + + +void ha_cassandra::free_field_converters() +{ + delete rowkey_converter; + rowkey_converter= NULL; + + if (dyncol_set) + { + delete_dynamic(&dynamic_values); + delete_dynamic(&dynamic_names); + dynstr_free(&dynamic_rec); + } + if (field_converters) + { + for (uint i=0; i < n_field_converters; i++) + if (field_converters[i]) + { + DBUG_ASSERT(!dyncol_set || i == dyncol_field); + delete field_converters[i]; + } + my_free(field_converters); + field_converters= NULL; + } +} + + +int ha_cassandra::index_init(uint idx, bool sorted) +{ + int ires; + if (!se && (ires= connect_and_check_options(table))) + return ires; + return 0; +} + +void store_key_image_to_rec(Field *field, uchar *ptr, uint len); + +int ha_cassandra::index_read_map(uchar *buf, const uchar *key, + key_part_map keypart_map, + enum ha_rkey_function find_flag) +{ + int rc= 0; + DBUG_ENTER("ha_cassandra::index_read_map"); + + if (find_flag != HA_READ_KEY_EXACT) + DBUG_RETURN(HA_ERR_WRONG_COMMAND);
did you verify that the server expects HA_ERR_WRONG_COMMAND from this method and correctly handles it? or, perhaps, find_flag is always HA_READ_KEY_EXACT here, because of your index_flags() ?
+ + uint key_len= calculate_key_len(table, active_index, key, keypart_map); + store_key_image_to_rec(table->field[0], (uchar*)key, key_len); + + char *cass_key; + int cass_key_len; + my_bitmap_map *old_map; + + old_map= dbug_tmp_use_all_columns(table, table->read_set); + + if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len)) + { + /* We get here when making lookups like uuid_column='not-an-uuid' */ + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); + } + + dbug_tmp_restore_column_map(table->read_set, old_map); + + bool found; + if (se->get_slice(cass_key, cass_key_len, &found)) + { + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
generally it's better not to use my_error() directly, but only return an error code, and return your se->error_str() from ha_cassandra::get_error_message()
+ rc= HA_ERR_INTERNAL_ERROR; + } + + /* TODO: what if we're not reading all columns?? */ + if (!found) + rc= HA_ERR_KEY_NOT_FOUND; + else + rc= read_cassandra_columns(false); + + DBUG_RETURN(rc); +} + + +void ha_cassandra::print_conversion_error(const char *field_name, + char *cass_value, + int cass_value_len) +{ + char buf[32]; + char *p= cass_value; + size_t i= 0; + for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
why do you cast here?
+ { + buf[i++]= map2number[(*p >> 4) & 0xF]; + buf[i++]= map2number[*p & 0xF]; + } + buf[i]=0; + + se->print_error("Unable to convert value for field `%s` from Cassandra's data" + " format. Source data is %d bytes, 0x%s%s", + field_name, cass_value_len, buf, + (i == sizeof(buf) - 1)? "..." : ""); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); +} + + +void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num) +{ + for (uint i= 0; i < num; i++) + if (vals[i].type == DYN_COL_STRING && + !vals[i].x.string.nonfreeable) + my_free(vals[i].x.string.value.str); +} + + +CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name, + int cass_name_len) +{ + CASSANDRA_TYPE_DEF *type= default_type_def; + for(uint i= 0; i < n_special_type_fields; i++) + { + if (cass_name_len == (int)special_type_field_names[i].length && + memcmp(cass_name, special_type_field_names[i].str, + cass_name_len) == 0) + { + type= special_type_field_converters + i; + break; + } + } + return type; +} + +int ha_cassandra::read_cassandra_columns(bool unpack_pk) +{ + char *cass_name; + char *cass_value; + int cass_value_len, cass_name_len; + Field **field; + int res= 0; + ulong total_name_len= 0; + + /* + cassandra_to_mariadb() calls will use field->store(...) methods, which + require that the column is in the table->write_set + */ + my_bitmap_map *old_map; + old_map= dbug_tmp_use_all_columns(table, table->write_set); + + /* Start with all fields being NULL */ + for (field= table->field + 1; *field; field++) + (*field)->set_null(); + + while (!se->get_next_read_column(&cass_name, &cass_name_len, + &cass_value, &cass_value_len)) + { + // map to our column. todo: use hash or something.. + bool found= 0; + for (field= table->field + 1; *field; field++) + { + uint fieldnr= (*field)->field_index; + if ((!dyncol_set || dyncol_field != fieldnr) && + !strcmp((*field)->field_name, cass_name)) + { + found= 1; + (*field)->set_notnull(); + if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value, + cass_value_len)) + { + print_conversion_error((*field)->field_name, cass_value, + cass_value_len); + res=1; + goto err; + } + break; + } + } + if (dyncol_set && !found) + { + DYNAMIC_COLUMN_VALUE val; + LEX_STRING nm; + CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name, + cass_name_len); + nm.str= cass_name; + nm.length= cass_name_len; + if (nm.length > MAX_NAME_LENGTH) + { + se->print_error("Unable to convert value for field `%s`" + " from Cassandra's data format. Name" + " length exceed limit of %u: '%s'", + table->field[dyncol_field]->field_name, + (uint)MAX_NAME_LENGTH, cass_name); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + res=1; + goto err; + } + total_name_len+= cass_name_len; + if (nm.length > MAX_TOTAL_NAME_LENGTH) + { + se->print_error("Unable to convert value for field `%s`" + " from Cassandra's data format. Sum of all names" + " length exceed limit of %lu", + table->field[dyncol_field]->field_name, + cass_name, (uint)MAX_TOTAL_NAME_LENGTH); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + res=1; + goto err; + } + + if ((res= (*(type->cassandra_to_dynamic))(cass_value, + cass_value_len, &val)) || + insert_dynamic(&dynamic_names, (uchar *) &nm) || + insert_dynamic(&dynamic_values, (uchar *) &val)) + { + if (res) + { + print_conversion_error(cass_name, cass_value, cass_value_len); + } + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer, + dynamic_values.elements); + // EOM shouldm be already reported if happened + res=1; + goto err; + } + } + } + + dynamic_rec.length= 0; + if (dyncol_set) + { + if (dynamic_column_create_many_internal_fmt(&dynamic_rec, + dynamic_names.elements, + dynamic_names.buffer, + (DYNAMIC_COLUMN_VALUE *) + dynamic_values.buffer, + FALSE, + TRUE) < 0) + dynamic_rec.length= 0; + + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer, + dynamic_values.elements); + dynamic_values.elements= dynamic_names.elements= 0; + } + if (dyncol_set)
why a separate if() ?
+ { + if (dynamic_rec.length == 0) + table->field[dyncol_field]->set_null(); + else + { + Field_blob *blob= (Field_blob *)table->field[dyncol_field]; + blob->set_notnull(); + blob->store_length(dynamic_rec.length); + *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))= + dynamic_rec.str; + } + } + + if (unpack_pk) + { + /* Unpack rowkey to primary key */ + field= table->field; + (*field)->set_notnull(); + se->get_read_rowkey(&cass_value, &cass_value_len); + if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len)) + { + print_conversion_error((*field)->field_name, cass_value, cass_value_len); + res=1; + goto err; + } + } + +err: + dbug_tmp_restore_column_map(table->write_set, old_map); + return res; +} ... +int ha_cassandra::rnd_pos(uchar *buf, uchar *pos) +{ + int rc; + DBUG_ENTER("ha_cassandra::rnd_pos"); + + int save_active_index= active_index; + active_index= 0; /* The primary key */ + rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT); + + active_index= save_active_index;
a bit nicer would be to implement index_read_map_idx and call it from here and from index_read_map
+ + DBUG_RETURN(rc); +} + ... +bool ha_cassandra::mrr_start_read() +{ + uint key_len; + + my_bitmap_map *old_map; + old_map= dbug_tmp_use_all_columns(table, table->read_set); + + se->new_lookup_keys(); + + while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range))) + { + char *cass_key; + int cass_key_len; + + DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE); + + uchar *key= (uchar*)mrr_cur_range.start_key.key; + key_len= mrr_cur_range.start_key.length; + //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS?? + store_key_image_to_rec(table->field[0], (uchar*)key, key_len); + + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); + + // Primitive buffer control + if (se->add_lookup_key(cass_key, cass_key_len) > + THDVAR(table->in_use, multiget_batch_size)) + break;
I'd suggest to add a status variable to count how many times the buffer was refilled
+ } + + dbug_tmp_restore_column_map(table->read_set, old_map); + + return se->multiget_slice(); +} ... +///////////////////////////////////////////////////////////////////////////// +// Dummy implementations start +///////////////////////////////////////////////////////////////////////////// + + +int ha_cassandra::index_next(uchar *buf)
Why did you do all these dummy methods?
+{ + int rc; + DBUG_ENTER("ha_cassandra::index_next"); + rc= HA_ERR_WRONG_COMMAND; + DBUG_RETURN(rc); +} + + +int ha_cassandra::index_prev(uchar *buf) +{ + int rc; + DBUG_ENTER("ha_cassandra::index_prev"); + rc= HA_ERR_WRONG_COMMAND; + DBUG_RETURN(rc); +} + + +int ha_cassandra::index_first(uchar *buf) +{ + int rc; + DBUG_ENTER("ha_cassandra::index_first"); + rc= HA_ERR_WRONG_COMMAND; + DBUG_RETURN(rc); +} + +int ha_cassandra::index_last(uchar *buf) +{ + int rc; + DBUG_ENTER("ha_cassandra::index_last"); + rc= HA_ERR_WRONG_COMMAND; + DBUG_RETURN(rc); +} + + +ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key, + key_range *max_key) +{ + DBUG_ENTER("ha_cassandra::records_in_range"); + //DBUG_RETURN(10); // low number to force index usage + DBUG_RETURN(HA_POS_ERROR); +} + + +class Column_name_enumerator_impl : public Column_name_enumerator +{ + ha_cassandra *obj; + uint idx; +public: + Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {} + const char* get_next_name() + { + if (idx == obj->table->s->fields) + return NULL; + else + return obj->table->field[idx++]->field_name; + } +}; + + +int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) +{ + DYNAMIC_ARRAY oldvals, oldnames, vals, names; + String oldvalcol, valcol; + char *oldfree_names= NULL, *free_names= NULL; + my_bitmap_map *old_map; + int res; + DBUG_ENTER("ha_cassandra::update_row"); + /* Currently, it is guaranteed that new_data == table->record[0] */ + DBUG_ASSERT(new_data == table->record[0]); + /* For now, just rewrite the full record */ + se->clear_insert_buffer(); + + old_map= dbug_tmp_use_all_columns(table, table->read_set); + + char *old_key; + int old_key_len; + se->get_read_rowkey(&old_key, &old_key_len); + + /* Get the key we're going to write */ + char *new_key; + int new_key_len; + if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + rowkey_converter->field->field_name, insert_lineno); + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + } + + /* + Compare it to the key we've read. For all types that Cassandra supports, + binary byte-wise comparison can be used + */ + bool new_primary_key; + if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len)) + new_primary_key= true; + else + new_primary_key= false; + + if (dyncol_set) + { + Field *field= table->field[dyncol_field]; + /* move to get old_data */ + my_ptrdiff_t diff; + diff= (my_ptrdiff_t) (old_data - new_data); + field->move_field_offset(diff); // Points now at old_data + if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names))) + DBUG_RETURN(res); + field->move_field_offset(-diff); // back to new_data + if ((res= read_dyncol(&vals, &names, &valcol, &free_names))) + { + free_dynamic_row(&oldnames, &oldvals, oldfree_names); + DBUG_RETURN(res); + } + } + + if (new_primary_key) + { + /* + Primary key value changed. This is essentially a DELETE + INSERT. + Add a DELETE operation into the batch + */ + Column_name_enumerator_impl name_enumerator(this); + se->add_row_deletion(old_key, old_key_len, &name_enumerator, + (LEX_STRING *)oldnames.buffer, + (dyncol_set ? oldnames.elements : 0)); + oldnames.elements= oldvals.elements= 0; // they will be deleted + } + + se->start_row_insert(new_key, new_key_len); + + /* Convert other fields */ + for (uint i= 1; i < table->s->fields; i++)
I'd probably would've simply done delete+insert in all cases :) but as you've went to the troubles of updating column by column, it really make little sense to rewrite columns that didn't change. could you check table->write_set here and memcmp with the old column value before updating?
+ { + char *cass_data; + int cass_data_len; + if (dyncol_set && dyncol_field == i) + { + DBUG_ASSERT(field_converters[i] == NULL); + if ((res= write_dynamic_row(&vals, &names))) + goto err; + } + else + { + if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + field_converters[i]->field->field_name, insert_lineno); + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + } + se->add_insert_column(field_converters[i]->field->field_name, 0, + cass_data, cass_data_len); + } + } + if (dyncol_set) + { + /* find removed fields */ + uint i= 0, j= 0; + LEX_STRING *onames= (LEX_STRING *)oldnames.buffer; + LEX_STRING *nnames= (LEX_STRING *)names.buffer; + /* both array are sorted */ + for(; i < oldnames.elements; i++) + { + int scmp= 0; + while (j < names.elements && + (nnames[j].length < onames[i].length || + (nnames[j].length == onames[i].length && + (scmp= memcmp(nnames[j].str, onames[i].str, + onames[i].length)) < 0))) + j++; + if (j < names.elements && + nnames[j].length == onames[i].length && + scmp == 0) + j++; + else + se->add_insert_delete_column(onames[i].str, onames[i].length); + } + } + + dbug_tmp_restore_column_map(table->read_set, old_map); + + res= se->do_insert(); + + if (res) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + +err: + if (dyncol_set) + { + free_dynamic_row(&oldnames, &oldvals, oldfree_names); + free_dynamic_row(&names, &vals, free_names); + } + + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); +} + + +int ha_cassandra::extra(enum ha_extra_function operation) +{ + DBUG_ENTER("ha_cassandra::extra"); + DBUG_RETURN(0); +}
please. why do you like dummy methods that much? :)
+ + +/* The following function was copied from ha_blackhole::store_lock: */ +THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + DBUG_ENTER("ha_cassandra::store_lock"); + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) + { + /* + Here is where we get into the guts of a row level lock. + If TL_UNLOCK is set + If we are not doing a LOCK TABLE or DISCARD/IMPORT + TABLESPACE, then allow multiple writers + */ + + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && !thd_in_lock_tables(thd) + && !thd_tablespace_op(thd))
1. tablespace op in cassandra? really? too much copy-paste is confusing. (here and in the comments too) 2. did you test LOCK TABLES?
+ lock_type = TL_WRITE_ALLOW_WRITE;
that makes all changes to cassanrda immediately visible by concurrently running threads. okay, if that's intentional, but it needs to be documented, as it not the SQL semantics.
+ + /* + In queries of type INSERT INTO t1 SELECT ... FROM t2 ... + MySQL would use the lock TL_READ_NO_INSERT on t2, and that + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts + to t2. Convert the lock to a normal read lock to allow + concurrent inserts to t2. + */ + + if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) + lock_type = TL_READ;
this also breaks SBR for INSERT ... SELECT. same as above - okay if intentional, but should be thouroughly documented.
+ + lock.type= lock_type; + } + *to++= &lock; + DBUG_RETURN(to); +} + + +int ha_cassandra::external_lock(THD *thd, int lock_type) +{ + DBUG_ENTER("ha_cassandra::external_lock"); + DBUG_RETURN(0); +} + +int ha_cassandra::delete_table(const char *name) +{ + DBUG_ENTER("ha_cassandra::delete_table"); + /* + Cassandra table is just a view. Dropping it doesn't affect the underlying + column family. + */ + DBUG_RETURN(0); +}
more dummies...
+ + +/** + check_if_incompatible_data() called if ALTER TABLE can't detect otherwise + if new and old definition are compatible + + @details If there are no other explicit signs like changed number of + fields this function will be called by compare_tables() + (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm + file. + +*/ + +bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info, + uint table_changes) +{ + DBUG_ENTER("ha_cassandra::check_if_incompatible_data"); + /* Checked, we intend to have this empty for Cassandra SE. */ + DBUG_RETURN(COMPATIBLE_DATA_YES); +} + + +///////////////////////////////////////////////////////////////////////////// +// Dummy implementations end +/////////////////////////////////////////////////////////////////////////////
::update_row and ::store_lock are hardly dummies :)
+ +static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + cassandra_counters_copy= cassandra_counters; + + var->type= SHOW_ARRAY; + var->value= (char *) &cassandra_status_variables;
what's the point? If you don't do anything in this function, you can just as easily list all status variables below in the func_status[] array.
+ return 0; +} + + +struct st_mysql_storage_engine cassandra_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +static struct st_mysql_show_var func_status[]= +{ + {"Cassandra", (char *)show_cassandra_vars, SHOW_FUNC}, + {0,0,SHOW_UNDEF} +}; + +maria_declare_plugin(cassandra) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &cassandra_storage_engine, + "CASSANDRA", + "Monty Program Ab", + "Cassandra storage engine", + PLUGIN_LICENSE_GPL, + cassandra_init_func, /* Plugin Init */ + cassandra_done_func, /* Plugin Deinit */ + 0x0001, /* version number (0.1) */ + func_status, /* status variables */ + cassandra_system_variables, /* system variables */ + "0.1", /* string version */ + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */ +} +maria_declare_plugin_end;
Regards, Sergei
Hi Sergei, On Fri, Dec 14, 2012 at 12:26:24PM +0100, Sergei Golubchik wrote:
The handler is quite ok. I've only had few mostly cosmetical comments.
Addressed most of them, but some remain. Can we discuss them on IRC?
=== modified file 'cmake/build_configurations/mysql_release.cmake' --- cmake/build_configurations/mysql_release.cmake 2012-06-06 12:15:29 +0000 +++ cmake/build_configurations/mysql_release.cmake 2012-11-12 15:11:23 +0000 @@ -46,6 +46,8 @@ SET(FEATURE_SET_large 5) SET(FEATURE_SET_xlarge 6) SET(FEATURE_SET_community 7)
+SET(WITH_CASSANDRA_STORAGE_ENGINE ON) +
This probably should be reverted before pushing into main, right?
Yes. I suppose, this will be changed to some other setting which builds Cassandra in certain configurations.
IF(FEATURE_SET) STRING(TOLOWER ${FEATURE_SET} feature_set) SET(num ${FEATURE_SET_${feature_set}})
=== modified file 'sql/sql_join_cache.cc' --- sql/sql_join_cache.cc 2012-03-24 17:21:22 +0000 +++ sql/sql_join_cache.cc 2012-11-12 15:11:23 +0000 @@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_m { last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0; if (no_association && - (curr_matching_chain= get_matching_chain_by_join_key())) + !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
Is that something that should be pushed into the main branch or it's a temporary hack for cassandra, that should be reverted?
No. It is a fix for a typo bug in BKA code. The bug can only be observed when BKA is used with "no-association" storage engine, like Cassandra SE, or Falcon. Probably, the 'psergey:' comment should be removed.
return 1; last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain); return 0;
=== added file 'storage/cassandra/CMakeLists.txt' --- storage/cassandra/CMakeLists.txt 1970-01-01 00:00:00 +0000 +++ storage/cassandra/CMakeLists.txt 2012-11-12 15:11:23 +0000 @@ -0,0 +1,25 @@ + +SET(cassandra_sources + ha_cassandra.cc + ha_cassandra.h + cassandra_se.h + cassandra_se.cc + gen-cpp/Cassandra.cpp + gen-cpp/cassandra_types.h + gen-cpp/cassandra_types.cpp + gen-cpp/cassandra_constants.h + gen-cpp/cassandra_constants.cpp + gen-cpp/Cassandra.h) + +#INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS}) + +#INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift) +INCLUDE_DIRECTORIES(AFTER /home/buildbot/build/thrift-inst/include/thrift/)
this needs to be fixed before pushing into the main tree I suppose, you'll need to detect here if thrift is available and disable the engine otherwise
Agree. We need to figure out how to make CMake check for Thrift.
+ +# +STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) +STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) +#LINK_DIRECTORIES(/home/psergey/cassandra/thrift/lib) + +MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE LINK_LIBRARIES thrift) +# was: STORAGE_ENGINE MANDATORY
...
=== added file 'storage/cassandra/ha_cassandra.h' --- storage/cassandra/ha_cassandra.h 1970-01-01 00:00:00 +0000 +++ storage/cassandra/ha_cassandra.h 2012-11-12 15:11:23 +0000 ... + ulonglong table_flags() const + { + /* + HA_BINLOG_STMT_CAPABLE + We are saying that this engine is just statement capable to have + an engine that can only handle statement-based logging. This is + used in testing. + HA_REC_NOT_IN_SEQ + If we don't set it, filesort crashes, because it assumes rowids are + 1..8 byte numbers + */ + return HA_BINLOG_STMT_CAPABLE | + HA_REC_NOT_IN_SEQ;
HA_NO_TRANSACTIONS is unset. Is this engine transactional? No. I've added the flag.
HA_PARTIAL_COLUMN_READ is unset. Do you always work with all columns, ignoring read_set and write_set? Currently, Yes.
HA_TABLE_SCAN_ON_INDEX is unset. Do you store the data MyISAM-style, index and data in separate files? HA_FAST_KEY_READ is unset. is reading keys in order faster in cassandra, than random reads? TODO:Not sure about the above two flags, let's discuss them.
HA_REQUIRE_PRIMARY_KEY is unset. but a primary key is required. added the flag.
HA_PRIMARY_KEY_IN_READ_INDEX is unset. but as far as I see, primary key columns are always returned (and needed for position()) added the flag.
HA_PRIMARY_KEY_REQUIRED_FOR_POSITION is unset, but a primary key is required for position() added the flag.
HA_NO_AUTO_INCREMENT is unset. is auto-increment supported? added the flag.
+ + } ... + uint max_supported_key_length() const { return 16*1024; /* just to return something*/ } + + int index_init(uint idx, bool sorted); + + int index_read_map(uchar * buf, const uchar * key, + key_part_map keypart_map, + enum ha_rkey_function find_flag); + + /** @brief + Called in test_quick_select to determine if indexes should be used. + */ + virtual double scan_time() { return (double) (stats.records+stats.deleted) / 20.0+10; }
does that make any sense? (I understand that it's copy-paste) stats.deleted, for example, is never set.
I've added "stats.deleted= 0" to ha_cassandra::info(). I'd prefer to keep this default impementation for the function.
+ + /** @brief + This method will never be called if you do not implement indexes. + */ + virtual double read_time(uint, uint, ha_rows rows) + { return (double) rows / 20.0+1; }
same question
...
=== added file 'storage/cassandra/ha_cassandra.cc' --- storage/cassandra/ha_cassandra.cc 1970-01-01 00:00:00 +0000 +++ storage/cassandra/ha_cassandra.cc 2012-11-12 15:11:23 +0000 ... + +static int cassandra_init_func(void *p) +{ + DBUG_ENTER("cassandra_init_func"); + +#ifdef HAVE_PSI_INTERFACE + init_cassandra_psi_keys(); +#endif + + cassandra_hton= (handlerton *)p; + mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST); + (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0, + (my_hash_get_key) cassandra_get_key,0,0); + + cassandra_hton->state= SHOW_OPTION_YES; + cassandra_hton->create= cassandra_create_handler; + /* + Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE + TABLE to create an *empty* table from scratch. Cassandra table won't be + emptied if re-created. + */
HTON_ALTER_NOT_SUPPORTED is not set. ALTER works?
Yes. Cassandra table is a view into the column family. We support instant ALTER TABLE, as long as the new DDL makes sense.
HTON_TEMPORARY_NOT_SUPPORTED is not set. CREATE TEMPORARY TABLE works? HTON_NO_PARTITION is not set. Partitioning works?
+ cassandra_hton->flags= 0; + cassandra_hton->table_options= cassandra_table_option_list; + cassandra_hton->field_options= cassandra_field_option_list; + + mysql_mutex_init(0 /* no instrumentation */,
why no instrumentation ?
+ &cassandra_default_host_lock, MY_MUTEX_INIT_FAST); + + DBUG_RETURN(0); +} + + ...
+ +ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) +{ + ColumnDataConverter *res= NULL; + + switch(field->type()) { + case MYSQL_TYPE_TINY: + if (!strcmp(validator_name, validator_boolean))
why do you strcmp here, while you're only check selected characters in get_cassandra_type() ? you could've called get_cassandra_type() for validator_name instead and compare enum values, instead of strings.
This function (written by me) was here before the get_cassandra_type() was written by Sanja. I don't particularly like the idea of checking characters: what if the next version of Cassandra gets a new datatype? The code in this function will refuse to work with it, and produce a meaningful error message. The code in Sanja's function will silently map it to some datatype, and the conversion will not make sense. map_field_to_validator() is not exactly a hotspot function. Establishing connection to cassandra currently requires: - making a TCP connection - Requesting Thrift-packed DDL for the column family from Cassandra - Receiving the mentioned DDL via Thrift (which populates the data structures) - Checking the needed columns Apparently, map_field_to_validator() is not the bottleneck. If connection setup is an issue, we need a connection pool. Why optimize map_field_to_validator to the point of unreadability?
+ { + res= new TinyintDataConverter; + break; + } + /* fall through: */ + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONGLONG: + { + bool is_counter= false; + if (!strcmp(validator_name, validator_bigint) || + !strcmp(validator_name, validator_timestamp) || + (is_counter= !strcmp(validator_name, validator_counter))) + res= new BigintDataConverter(!is_counter); + break; + } + case MYSQL_TYPE_FLOAT: + if (!strcmp(validator_name, validator_float)) + res= new FloatDataConverter; + break; + + case MYSQL_TYPE_DOUBLE: + if (!strcmp(validator_name, validator_double)) + res= new DoubleDataConverter; + break; + + case MYSQL_TYPE_TIMESTAMP: + if (!strcmp(validator_name, validator_timestamp)) + res= new TimestampDataConverter; + break; + + case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings. + if (!strcmp(validator_name, validator_uuid) && + field->real_type() == MYSQL_TYPE_STRING && + field->field_length == 36) + { + // UUID maps to CHAR(36), its text representation + res= new UuidDataConverter; + break; + } + /* fall through: */ + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + { + /* + Cassandra's "varint" type is a binary-encoded arbitary-length + big-endian number. + - It can be mapped to VARBINARY(N), with sufficiently big N. + - If the value does not fit into N bytes, it is an error. We should not + truncate it, because that is just as good as returning garbage. + - varint should not be mapped to BINARY(N), because BINARY(N) values + are zero-padded, which will work as multiplying the value by + 2^k for some value of k. + */ + if (field->type() == MYSQL_TYPE_VARCHAR && + field->binary() && + (!strcmp(validator_name, validator_varint) || + !strcmp(validator_name, validator_decimal))) + { + res= new StringCopyConverter(field->field_length); + break; + } + + if (!strcmp(validator_name, validator_blob) || + !strcmp(validator_name, validator_ascii) || + !strcmp(validator_name, validator_text)) + { + res= new StringCopyConverter((size_t)-1); + } + break; + } + case MYSQL_TYPE_LONG: + if (!strcmp(validator_name, validator_int)) + res= new Int32DataConverter; + break; + + default:; + } + return res; +} + + ...
+ +int ha_cassandra::index_init(uint idx, bool sorted) +{ + int ires; + if (!se && (ires= connect_and_check_options(table))) + return ires; + return 0; +} + +void store_key_image_to_rec(Field *field, uchar *ptr, uint len); + +int ha_cassandra::index_read_map(uchar *buf, const uchar *key, + key_part_map keypart_map, + enum ha_rkey_function find_flag) +{ + int rc= 0; + DBUG_ENTER("ha_cassandra::index_read_map"); + + if (find_flag != HA_READ_KEY_EXACT) + DBUG_RETURN(HA_ERR_WRONG_COMMAND);
did you verify that the server expects HA_ERR_WRONG_COMMAND from this method and correctly handles it? or, perhaps, find_flag is always HA_READ_KEY_EXACT here, because of your index_flags() ?
The latter is true. I don't expect that this function is ever invoked with find_flag!=HA_READ_KEY_EXACT.
+ + uint key_len= calculate_key_len(table, active_index, key, keypart_map); + store_key_image_to_rec(table->field[0], (uchar*)key, key_len); + + char *cass_key; + int cass_key_len; + my_bitmap_map *old_map; + + old_map= dbug_tmp_use_all_columns(table, table->read_set); + + if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len)) + { + /* We get here when making lookups like uuid_column='not-an-uuid' */ + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); + } + + dbug_tmp_restore_column_map(table->read_set, old_map); + + bool found; + if (se->get_slice(cass_key, cass_key_len, &found)) + { + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
generally it's better not to use my_error() directly, but only return an error code, and return your se->error_str() from ha_cassandra::get_error_message()
+ rc= HA_ERR_INTERNAL_ERROR; + } + + /* TODO: what if we're not reading all columns?? */ + if (!found) + rc= HA_ERR_KEY_NOT_FOUND; + else + rc= read_cassandra_columns(false); + + DBUG_RETURN(rc); +} + ...
+int ha_cassandra::rnd_pos(uchar *buf, uchar *pos) +{ + int rc; + DBUG_ENTER("ha_cassandra::rnd_pos"); + + int save_active_index= active_index; + active_index= 0; /* The primary key */ + rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT); + + active_index= save_active_index;
a bit nicer would be to implement index_read_map_idx and call it from here and from index_read_map
+ + DBUG_RETURN(rc); +} + ... +bool ha_cassandra::mrr_start_read() +{ + uint key_len; + + my_bitmap_map *old_map; + old_map= dbug_tmp_use_all_columns(table, table->read_set); + + se->new_lookup_keys(); + + while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range))) + { + char *cass_key; + int cass_key_len; + + DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE); + + uchar *key= (uchar*)mrr_cur_range.start_key.key; + key_len= mrr_cur_range.start_key.length; + //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS?? + store_key_image_to_rec(table->field[0], (uchar*)key, key_len); + + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); + + // Primitive buffer control + if (se->add_lookup_key(cass_key, cass_key_len) > + THDVAR(table->in_use, multiget_batch_size)) + break;
I'd suggest to add a status variable to count how many times the buffer was refilled
Agree. I'd prefer to do this in the next version.
+ } + + dbug_tmp_restore_column_map(table->read_set, old_map); + + return se->multiget_slice(); +} ... + + +/* The following function was copied from ha_blackhole::store_lock: */ +THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + DBUG_ENTER("ha_cassandra::store_lock"); + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) + { + /* + Here is where we get into the guts of a row level lock. + If TL_UNLOCK is set + If we are not doing a LOCK TABLE or DISCARD/IMPORT + TABLESPACE, then allow multiple writers + */ + + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && !thd_in_lock_tables(thd) + && !thd_tablespace_op(thd))
1. tablespace op in cassandra? really? too much copy-paste is confusing. (here and in the comments too) 2. did you test LOCK TABLES?
What is a meaningful usage scenario of cassandra SE with LOCK TABLES? It is a distributed engine. Do locks matter at all?
+ lock_type = TL_WRITE_ALLOW_WRITE;
that makes all changes to cassanrda immediately visible by concurrently running threads. okay, if that's intentional, but it needs to be documented, as it not the SQL semantics.
It is intentional, Cassandra storage semantics is different from SQL.
+ + /* + In queries of type INSERT INTO t1 SELECT ... FROM t2 ... + MySQL would use the lock TL_READ_NO_INSERT on t2, and that + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts + to t2. Convert the lock to a normal read lock to allow + concurrent inserts to t2. + */ + + if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) + lock_type = TL_READ;
this also breaks SBR for INSERT ... SELECT. same as above - okay if intentional, but should be thouroughly documented.
I am not certain about this. Let's discuss it.
+ + lock.type= lock_type; + } + *to++= &lock; + DBUG_RETURN(to); +} + + +int ha_cassandra::external_lock(THD *thd, int lock_type) +{ + DBUG_ENTER("ha_cassandra::external_lock"); + DBUG_RETURN(0); +} + +int ha_cassandra::delete_table(const char *name) +{ + DBUG_ENTER("ha_cassandra::delete_table"); + /* + Cassandra table is just a view. Dropping it doesn't affect the underlying + column family. + */ + DBUG_RETURN(0); +}
more dummies...
handler::delete_table needs this implementation. I've removed all other dummies, as well as "dummy implementations start/end" comments. ...
+static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + cassandra_counters_copy= cassandra_counters; + + var->type= SHOW_ARRAY; + var->value= (char *) &cassandra_status_variables;
what's the point? If you don't do anything in this function, you can just as easily list all status variables below in the func_status[] array.
+ return 0; +} + + +struct st_mysql_storage_engine cassandra_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +static struct st_mysql_show_var func_status[]= +{ + {"Cassandra", (char *)show_cassandra_vars, SHOW_FUNC}, + {0,0,SHOW_UNDEF} +}; + +maria_declare_plugin(cassandra) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &cassandra_storage_engine, + "CASSANDRA", + "Monty Program Ab", + "Cassandra storage engine", + PLUGIN_LICENSE_GPL, + cassandra_init_func, /* Plugin Init */ + cassandra_done_func, /* Plugin Deinit */ + 0x0001, /* version number (0.1) */ + func_status, /* status variables */ + cassandra_system_variables, /* system variables */ + "0.1", /* string version */ + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */ +} +maria_declare_plugin_end;
BR Sergei -- Sergei Petrunia, Software Developer Monty Program AB, http://askmonty.org Blog: http://s.petrunia.net/blog
On Thu, Dec 20, 2012 at 05:15:02PM +0400, Sergei Petrunia wrote:
+static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + cassandra_counters_copy= cassandra_counters; + + var->type= SHOW_ARRAY; + var->value= (char *) &cassandra_status_variables;
what's the point? If you don't do anything in this function, you can just as easily list all status variables below in the func_status[] array.
InnoDB doesn't do this. I've followed your suggestion (doable), and figured out why: If I list all variables in cassandra_status_variables, FLUSH STATUS flushes them. Is it a good thing? InnoDB counters are not reset by FLUSH STATUS, and neither are others.
+ return 0; +} + + +struct st_mysql_storage_engine cassandra_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +static struct st_mysql_show_var func_status[]= +{ + {"Cassandra", (char *)show_cassandra_vars, SHOW_FUNC}, + {0,0,SHOW_UNDEF} +}; + +maria_declare_plugin(cassandra) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &cassandra_storage_engine, + "CASSANDRA", + "Monty Program Ab", + "Cassandra storage engine", + PLUGIN_LICENSE_GPL, + cassandra_init_func, /* Plugin Init */ + cassandra_done_func, /* Plugin Deinit */ + 0x0001, /* version number (0.1) */ + func_status, /* status variables */ + cassandra_system_variables, /* system variables */ + "0.1", /* string version */ + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */ +} +maria_declare_plugin_end;
BR Sergei -- Sergei Petrunia, Software Developer Monty Program AB, http://askmonty.org Blog: http://s.petrunia.net/blog
Hi, Sergei! On Dec 20, Sergei Petrunia wrote:
On Fri, Dec 14, 2012 at 12:26:24PM +0100, Sergei Golubchik wrote:
=== modified file 'sql/sql_join_cache.cc' --- sql/sql_join_cache.cc 2012-03-24 17:21:22 +0000 +++ sql/sql_join_cache.cc 2012-11-12 15:11:23 +0000 @@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_m { last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0; if (no_association && - (curr_matching_chain= get_matching_chain_by_join_key())) + !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
Is that something that should be pushed into the main branch or it's a temporary hack for cassandra, that should be reverted?
No. It is a fix for a typo bug in BKA code. The bug can only be observed when BKA is used with "no-association" storage engine, like Cassandra SE, or Falcon. Probably, the 'psergey:' comment should be removed.
Yes, please, remove the comment.
+ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) +{ + ColumnDataConverter *res= NULL; + + switch(field->type()) { + case MYSQL_TYPE_TINY: + if (!strcmp(validator_name, validator_boolean))
why do you strcmp here, while you're only check selected characters in get_cassandra_type() ? you could've called get_cassandra_type() for validator_name instead and compare enum values, instead of strings.
This function (written by me) was here before the get_cassandra_type() was written by Sanja.
I don't particularly like the idea of checking characters: what if the next version of Cassandra gets a new datatype? The code in this function will refuse to work with it, and produce a meaningful error message.
Agree. I actually commented on it in my review, but then removed it, thinking that if it's a bottleneck and you wanted it as fast as possible, then let's keep it ugly and fast.
Apparently, map_field_to_validator() is not the bottleneck. If connection setup is an issue, we need a connection pool. Why optimize map_field_to_validator to the point of unreadability?
No point whatsoever :)
+void store_key_image_to_rec(Field *field, uchar *ptr, uint len); + +int ha_cassandra::index_read_map(uchar *buf, const uchar *key, + key_part_map keypart_map, + enum ha_rkey_function find_flag) +{ + int rc= 0; + DBUG_ENTER("ha_cassandra::index_read_map"); + + if (find_flag != HA_READ_KEY_EXACT) + DBUG_RETURN(HA_ERR_WRONG_COMMAND);
did you verify that the server expects HA_ERR_WRONG_COMMAND from this method and correctly handles it? or, perhaps, find_flag is always HA_READ_KEY_EXACT here, because of your index_flags() ?
The latter is true. I don't expect that this function is ever invoked with find_flag!=HA_READ_KEY_EXACT.
please add a DBUG_ASSERT to document that Regards, Sergei
participants (2)
-
Sergei Golubchik
-
Sergei Petrunia