Hi!
"knielsen" == knielsen <knielsen@knielsen-hq.org> writes:
knielsen> At http://bazaar.launchpad.net/~maria-captains/maria/5.2 knielsen> ------------------------------------------------------------ knielsen> revno: 3020 knielsen> revision-id: knielsen@knielsen-hq.org-20110920104925-huxke80yuq1zrvcx knielsen> parent: monty@askmonty.org-20110816160131-4kju8si1nqqnohxg knielsen> committer: knielsen@knielsen-hq.org knielsen> branch nick: tmp2 knielsen> timestamp: Tue 2011-09-20 12:49:25 +0200 knielsen> message: knielsen> MWL#192: Non-blocking client API for libmysqlclient. knielsen> All client functions that can block on I/O have alternate _start() and knielsen> _cont() versions that do not block but return control back to the knielsen> application, which can then issue I/O wait in its own fashion and later knielsen> call back into the library to continue the operation. knielsen> Works behind the scenes by spawning a co-routine/fiber to run the knielsen> blocking operation and suspend it while waiting for I/O. This knielsen> co-routine/fiber use is invisible to applications. knielsen> For i368/x86_64 on GCC, uses very fast assembler co-routine support. On knielsen> Windows uses native Win32 Fibers. Falls back to POSIX ucontext on other knielsen> platforms. Assembler routines for more platforms are relatively easy to knielsen> add by extending mysys/my_context.c, eg. similar to the Lua lcoco knielsen> library. knielsen> For testing, mysqltest and mysql_client_test are extended with the knielsen> option --non-blocking-api. This causes the programs to use the knielsen> non-blocking API for database access. mysql-test-run.pl has a similar knielsen> option --non-blocking-api that uses this, as well as additional knielsen> testcases. knielsen> An example program tests/async_queries.c is included that uses the new knielsen> non-blocking API with libevent to show how, in a single-threaded knielsen> program, to issue many queries in parallel against a database. <cut> +++ client/async_example.c 2011-09-20 10:49:25 +0000 @@ -0,0 +1,207 @@ +/* + Copyright 2011 Kristian Nielsen and Monty Program Ab. + + Experiments with non-blocking libmysql. + + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this. If not, see <http://www.gnu.org/licenses/>. +*/ Please change license to LGPL for this and the other new files (so we can add them to our new LGPL client library) + + +#ifndef __WIN__ +#include <poll.h> +#else +#include <WinSock2.h> +#endif You don't need to incldue Winsock2.h; This is automaticly included by mysql.h. +#include <stdlib.h> +#include <stdio.h> +#include <mysql.h> <cut> === modified file 'client/mysqltest.cc' --- client/mysqltest.cc 2011-05-18 13:17:26 +0000 +++ client/mysqltest.cc 2011-09-20 10:49:25 +0000 @@ -60,6 +60,12 @@ #define SIGNAL_FMT "signal %d" #endif === modified file 'include/Makefile.am' --- include/Makefile.am 2011-05-03 16:10:10 +0000 +++ include/Makefile.am 2011-09-20 10:49:25 +0000 @@ -46,7 +46,7 @@ atomic/rwlock.h atomic/x86-gcc.h \ atomic/generic-msvc.h \ atomic/gcc_builtins.h my_libwrap.h my_stacktrace.h \ - wqueue.h waiting_threads.h + wqueue.h waiting_threads.h my_context.h EXTRA_DIST = mysql.h.pp mysql/plugin_auth.h.pp mysql/client_plugin.h.pp CMakeLists.txt === added file 'include/my_context.h' --- include/my_context.h 1970-01-01 00:00:00 +0000 +++ include/my_context.h 2011-09-20 10:49:25 +0000 @@ -0,0 +1,222 @@ +/* + Copyright 2011 Kristian Nielsen + + Experiments with non-blocking libmysql. + + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this. If not, see <http://www.gnu.org/licenses/>. +*/ + +/* + Simple API for spawning a co-routine, to be used for async libmysqlclient. + + Idea is that by implementing this interface using whatever facilities are + available for given platform, we can use the same code for the generic + libmysqlclient-async code. + + (This particular implementation uses Posix ucontext swapcontext().) +*/ + +#ifdef __WIN__ +#define MY_CONTEXT_USE_WIN32_FIBERS 1 +#elif defined(__GNUC__) && __GNUC__ >= 3 && defined(__x86_64__) +#define MY_CONTEXT_USE_X86_64_GCC_ASM +#elif defined(__GNUC__) && __GNUC__ >= 3 && defined(__i386__) +#define MY_CONTEXT_USE_I386_GCC_ASM +#else +#define MY_CONTEXT_USE_UCONTEXT +#endif + +#ifdef MY_CONTEXT_USE_WIN32_FIBERS +struct my_context { + void (*user_func)(void *); + void *user_arg; + void *app_fiber; + void *lib_fiber; + int return_value; +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +#ifdef MY_CONTEXT_USE_UCONTEXT +#include <ucontext.h> + +struct my_context { + void (*user_func)(void *); + void *user_data; + void *stack; + size_t stack_size; + ucontext_t base_context; + ucontext_t spawned_context; + int active; +#ifdef HAVE_VALGRIND_VALGRIND_H + unsigned int valgrind_stack_id; +#endif +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +#ifdef MY_CONTEXT_USE_X86_64_GCC_ASM +#include <stdint.h> + +struct my_context { + uint64_t save[9]; + void *stack_top; + void *stack_bot; +#ifdef HAVE_VALGRIND_VALGRIND_H + unsigned int valgrind_stack_id; +#endif +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +#ifdef MY_CONTEXT_USE_I386_GCC_ASM +#include <stdint.h> + +struct my_context { + uint64_t save[7]; + void *stack_top; + void *stack_bot; +#ifdef HAVE_VALGRIND_VALGRIND_H + unsigned int valgrind_stack_id; +#endif +#ifndef DBUG_OFF + void *dbug_state; +#endif +}; +#endif + + +/* + Initialize an asynchroneous context object. + Returns 0 on success, non-zero on failure. +*/ +extern int my_context_init(struct my_context *c, size_t stack_size); + +/* Free an asynchroneous context object, deallocating any resources used. */ +extern void my_context_destroy(struct my_context *c); + +/* + Spawn an asynchroneous context. The context will run the supplied user + function, passing the supplied user data pointer. + + The context must have been initialised with my_context_init() prior to + this call. + + The user function may call my_context_yield(), which will cause this + function to return 1. Then later my_context_continue() may be called, which + will resume the asynchroneous context by returning from the previous + my_context_yield() call. + + When the user function returns, this function returns 0. + + In case of error, -1 is returned. +*/ +extern int my_context_spawn(struct my_context *c, void (*f)(void *), void *d); + +/* + Suspend an asynchroneous context started with my_context_spawn. + + When my_context_yield() is called, execution immediately returns from the + last my_context_spawn() or my_context_continue() call. Then when later + my_context_continue() is called, execution resumes by returning from this + my_context_yield() call. + + Returns 0 if ok, -1 in case of error. +*/ +extern int my_context_yield(struct my_context *c); + +/* + Resume an asynchroneous context. The context was spawned by + my_context_spawn(), and later suspended inside my_context_yield(). + + The asynchroneous context may be repeatedly suspended with + my_context_yield() and resumed with my_context_continue(). + + Each time it is suspended, this function returns 1. When the originally + spawned user function returns, this function returns 0. + + In case of error, -1 is returned. +*/ +extern int my_context_continue(struct my_context *c); + + +struct mysql_async_context { + /* + This is set to the value that should be returned from foo_start() or + foo_cont() when a call is suspended. + It is also set to the event(s) that triggered when a suspended call is + resumed, eg. whether we woke up due to connection completed or timeout + in mysql_real_connect_cont(). + */ + unsigned int ret_status; + /* + This is set to the result of the whole asynchronous operation when it + completes. It uses a union, as different calls have different return + types. + */ + union { + void *r_ptr; + const void *r_const_ptr; + int r_int; + my_bool r_my_bool; + } ret_result; + /* + The timeout value, for suspended calls that need to wake up on a timeout + (eg. mysql_real_connect_start(). + */ + unsigned int timeout_value; + /* + This flag is set when we are executing inside some asynchronous call + foo_start() or foo_cont(). It is used to decide whether to use the + synchronous or asynchronous version of calls that may block such as + recv(). + + Note that this flag is not set when a call is suspended, eg. after + returning from foo_start() and before re-entering foo_cont(). + */ + my_bool active; + /* + This flag is set when an asynchronous operation is in progress, but + suspended. Ie. it is set when foo_start() or foo_cont() returns because + the operation needs to block, suspending the operation. + + It is used to give an error (rather than crash) if the application + attempts to call some foo_cont() method when no suspended operation foo is + in progress. + */ + my_bool suspended; + /* + If non-NULL, this is a pointer to a callback hook that will be invoked with + the user data argument just before the context is suspended, and just after + it is resumed. + */ + void (*suspend_resume_hook)(my_bool suspend, void *user_data); + void *suspend_resume_hook_user_data; + /* + This is used to save the execution contexts so that we can suspend an + operation and switch back to the application context, to resume the + suspended context later when the application re-invokes us with + foo_cont(). + */ + struct my_context async_context; +}; === modified file 'include/my_dbug.h' --- include/my_dbug.h 2011-02-20 16:51:43 +0000 +++ include/my_dbug.h 2011-09-20 10:49:25 +0000 @@ -83,6 +83,8 @@ extern void _db_unlock_file_(void); extern FILE *_db_fp_(void); extern void _db_flush_(); +extern void dbug_swap_code_state(void **code_state_store); +extern void dbug_free_code_state(void **code_state_store); Please define a macros DBUG_FREE_CODE_STATE / DBUG_SWAP_CODE_STATE that are mapped to the aboe if DBUG is defined and to "do {} while (0)" otherwise. This allows you to replace: #ifndef DBUG_OFF dbug_free_code_state(&c->dbug_state); #endif with DBUG_FREE_CODE_STATE(&c->dbug_state); (It's always nice to not have a lot of #ifdef in the code). +++ include/mysql.h 2011-09-20 10:49:25 +0000 void *extension; } MYSQL_PARAMETERS; +/* + Flag bits, the asynchronous methods return a combination of these ORed + together to let the application know when to resume the suspended operation. +*/ +typedef enum { + MYSQL_WAIT_READ= 1, /* Wait for data to be available on socket to read */ + /* mysql_get_socket_fd() will return socket descriptor*/ + MYSQL_WAIT_WRITE= 2, /* Wait for socket to be ready to write data */ + MYSQL_WAIT_EXCEPT= 4, /* Wait for select() to mark exception on socket */ + MYSQL_WAIT_TIMEOUT= 8 /* Wait until timeout occurs. Value of timeout can be */ + /* obtained from mysql_get_timeout_value() */ +} MYSQL_ASYNC_STATUS; + Why have enum's instead of defines ? (As these are bits and you are not going to use 'switch' on these, defines makes more sense). #ifdef USE_OLD_FUNCTIONS MYSQL * STDCALL mysql_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd); +int STDCALL mysql_connect_start(MYSQL **ret, MYSQL *mysql, + const char *host, const char *user, + const char *passwd); +int STDCALL mysql_connect_cont(MYSQL **ret, MYSQL *mysql, + int status); int STDCALL mysql_create_db(MYSQL *mysql, const char *DB); +int STDCALL mysql_create_db_start(int *ret, MYSQL *mysql, + const char *DB); +int STDCALL mysql_create_db_cont(int *ret, MYSQL *mysql, + int status); +int STDCALL mysql_drop_db(MYSQL *mysql, const char *DB); +int STDCALL mysql_drop_db_start(int *ret, MYSQL *mysql, + const char *DB); +int STDCALL mysql_drop_db_cont(int *ret, MYSQL *mysql, int status); int STDCALL mysql_drop_db(MYSQL *mysql, const char *DB); #define mysql_reload(mysql) mysql_refresh((mysql),REFRESH_GRANT) #endif If USE_OLD_FUNCTIONS is defined, you don't have to provide the new api. These are deprecated functions that we don't want people to use so there is no reason to provide 'better' versions of them... === modified file 'include/sql_common.h' --- include/sql_common.h 2010-04-01 09:04:26 +0000 +++ include/sql_common.h 2011-09-20 10:49:25 +0000 @@ -26,11 +26,18 @@ extern const char *cant_connect_sqlstate; extern const char *not_error_sqlstate; + +struct mysql_async_context; + struct st_mysql_options_extention { char *plugin_dir; char *default_auth; }; +struct st_mysql_extension { + struct mysql_async_context *async_context; +}; + Why provide 'st_mysql_extension' in a public file? Isn't it enough to have this only in a local file in the client directory? (We don't want anyone to access this structure in any clients) === modified file 'include/violite.h' --- include/violite.h 2010-01-29 10:42:31 +0000 +++ include/violite.h 2011-09-20 10:49:25 +0000 @@ -194,6 +194,8 @@ char *read_pos; /* start of unfetched data in the read buffer */ char *read_end; /* end of unfetched data */ + struct mysql_async_context *async_context; /* For non-blocking API */ + uint read_timeout, write_timeout; /* function pointers. They are similar for socket/SSL/whatever */ void (*viodelete)(Vio*); int (*vioerrno)(Vio*); Hm, I haven't thought about that it's safe to add new things to struct st_vio without breaking the interface. That's good and useful to know! <cut> <cut> === added file 'mysql-test/t/non_blocking_api.test' --- mysql-test/t/non_blocking_api.test 1970-01-01 00:00:00 +0000 +++ mysql-test/t/non_blocking_api.test 2011-09-20 10:49:25 +0000 @@ -0,0 +1,18 @@ +# Test mixing the use of blocking and non-blocking API in a single connection. + +--enable_non_blocking_api +connect (con_nonblock,localhost,root,,test); +--disable_non_blocking_api +connect (con_normal,localhost,root,,test); + Usually all test starts with drop table if exists t1; This is mostly useful when you want to run the tests against an external server. Not critical but good practice to do... <cut> === added file 'mysys/my_context.c' --- mysys/my_context.c 1970-01-01 00:00:00 +0000 +++ mysys/my_context.c 2011-09-20 10:49:25 +0000 @@ -0,0 +1,749 @@ +/* + Copyright 2011 Kristian Nielsen + + Experiments with non-blocking libmysql. + Add the information text after the copyright. (Is the comment still relevant?) + This is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. <cut> + Implementation of async context spawning using Posix ucontext and + swapcontext(). +*/ + +#include <stdio.h> +#include <errno.h> + You can remove the above as mysys_priv.h will include these anyway. +#include "mysys_priv.h" +#include "my_context.h" + +#ifdef HAVE_VALGRIND_VALGRIND_H +#include <valgrind/valgrind.h> +#endif Shouldn't this also be depending on the HAVE_valgrind define? <cut> +int +my_context_init(struct my_context *c, size_t stack_size) +{ + if (2*sizeof(int) < sizeof(void *)) + { + fprintf(stderr, + "Error: Unable to store pointer in 2 ints on this architecture\n"); + return -1; + } Please make the above a compile time error instead. #if SIZEOF_CHARP < SIZEOF_INT*2 #error Error: Unable to store pointer in 2 ints on this architecture #endif I think it would be a good idea to do here: bzero(*c, sizeof(*c)); This is better than assuming that 'c' is zeroed on entry. + if (!(c->stack= malloc(stack_size))) + return -1; /* Out of memory */ + c->stack_size= stack_size; +#ifdef HAVE_VALGRIND_VALGRIND_H + c->valgrind_stack_id= + VALGRIND_STACK_REGISTER(c->stack, ((unsigned char *)(c->stack))+stack_size); +#endif +#ifndef DBUG_OFF + c->dbug_state= NULL; +#endif If we do the bzero, you can remove the above + return 0; +} <cut> +int +my_context_init(struct my_context *c, size_t stack_size) +{ + if (!(c->stack_bot= malloc(stack_size))) + return -1; /* Out of memory */ Add bzero of 'c' here. <cut> +int +my_context_init(struct my_context *c, size_t stack_size) +{ Add bzero() here. <cut> +int +my_context_init(struct my_context *c, size_t stack_size) +{ Add bzero here. +#ifndef DBUG_OFF + c->dbug_state= NULL; +#endif + c->lib_fiber= CreateFiber(stack_size, my_context_trampoline, c); + if (c->lib_fiber) + return 0; + else + return -1; Remove else or do: return c->lib_fiber ? 0 : -1; +} <cut> === modified file 'sql-common/client.c' --- sql-common/client.c 2011-03-18 15:03:43 +0000 +++ sql-common/client.c 2011-09-20 10:49:25 +0000 @@ -108,6 +108,7 @@ #include "client_settings.h" #include <sql_common.h> #include <mysql/client_plugin.h> +#include "my_context.h" Should probably be: #include <my_context.h> +#define mysql_extension_get(MYSQL, X) \ + ((MYSQL)->extension ? (MYSQL)->extension->X : NULL) +#define mysql_extension_set(MYSQL, X, VAL) \ + if (!(MYSQL)->extension) \ + (MYSQL)->extension= (struct st_mysql_extension *) \ + my_malloc(sizeof(struct st_mysql_extension), \ + MYF(MY_WME | MY_ZEROFILL)); \ + (MYSQL)->extension->X= VAL; + In 5.3 and 5.5 we have the following macro: #define extension_set(OPTS, X, VAL) \ if (!(OPTS)->extension) \ (OPTS)->extension= (struct st_mysql_options_extention *) \ my_malloc(sizeof(struct st_mysql_options_extention), \ MYF(MY_WME | MY_ZEROFILL)); \ (OPTS)->extension->X= VAL; Why not also store your new extension in mysql->option->extension instead of mysql->extensions ? This would allow you to reuse the existing macros.. +/* + Fetch the context for asynchronous API calls, allocating a new one if + necessary. +*/ +#define STACK_SIZE (4096*15) + +struct mysql_async_context * +mysql_get_async_context(MYSQL *mysql) +{ + struct mysql_async_context *b; + if ((b= mysql_extension_get(mysql, async_context))) + return b; + + if (!(b= (struct mysql_async_context *) + my_malloc(sizeof(*b), MYF(MY_ZEROFILL)))) + { + set_mysql_error(mysql, CR_OUT_OF_MEMORY, unknown_sqlstate); + return NULL; + } Remove the MY_ZEROFILL and let my_context_init() do it. (Makes the function cleaner). + if (my_context_init(&b->async_context, STACK_SIZE)) + { + my_free(b, MYF(0)); + return NULL; + } <cut> /************************************************************************** Get column lengths of the current row @@ -2537,6 +2577,26 @@ } +static int +connect_sync_or_async(MYSQL *mysql, NET *net, my_socket fd, + const struct sockaddr *name, uint namelen) +{ + extern int my_connect_async(struct mysql_async_context *b, my_socket fd, + const struct sockaddr *name, uint namelen, + uint timeout); Please move the above definitioon to header file that is also read by the file the defines the function. (Otherwise someone can change the function without you getting a compiler error). + struct mysql_async_context *actxt= mysql_extension_get(mysql, async_context); + + if (actxt && actxt->active) + { + my_bool old_mode; + vio_blocking(net->vio, FALSE, &old_mode); + return my_connect_async(actxt, fd, name, namelen, + mysql->options.connect_timeout); + } + else Remove else + return my_connect(fd, name, namelen, mysql->options.connect_timeout); +} + <cut> my_bool mysql_reconnect(MYSQL *mysql) { <cut> + if ((ctxt= mysql_extension_get(mysql, async_context)) && ctxt->active) + { + hook_data.orig_mysql= mysql; + hook_data.new_mysql= &tmp_mysql; + hook_data.orig_vio= mysql->net.vio; + my_context_install_suspend_resume_hook(ctxt, my_suspend_hook, &hook_data); + } if (!mysql_real_connect(&tmp_mysql,mysql->host,mysql->user,mysql->passwd, mysql->db, mysql->port, mysql->unix_socket, mysql->client_flag | CLIENT_REMEMBER_OPTIONS)) { + if (ctxt) + my_context_install_suspend_resume_hook(ctxt, NULL, NULL); Change the above to: res= mysql_real_connect(&tmp_mysql,mysql->host,mysql->user,mysql->passwd, mysql->db, mysql->port, mysql->unix_socket, mysql->client_flag | CLIENT_REMEMBER_OPTIONS); if (ctxt) my_context_install_suspend_resume_hook(ctxt, NULL, NULL); if (!res) .... This way you only need to call my_context_install_suspend_resume_hook(ctxt, NULL, NULL) once. mysql->net.last_errno= tmp_mysql.net.last_errno; strmov(mysql->net.last_error, tmp_mysql.net.last_error); strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate); @@ -3109,13 +3220,18 @@ if (mysql_set_character_set(&tmp_mysql, mysql->charset->csname)) { DBUG_PRINT("error", ("mysql_set_character_set() failed")); + tmp_mysql.extension= NULL; bzero((char*) &tmp_mysql.options,sizeof(tmp_mysql.options)); mysql_close(&tmp_mysql); + if (ctxt) + my_context_install_suspend_resume_hook(ctxt, NULL, NULL); Please remove the above call. mysql->net.last_errno= tmp_mysql.net.last_errno; strmov(mysql->net.last_error, tmp_mysql.net.last_error); strmov(mysql->net.sqlstate, tmp_mysql.net.sqlstate); DBUG_RETURN(1); } + if (ctxt) + my_context_install_suspend_resume_hook(ctxt, NULL, NULL); Please remove the above call. <cut> +static void +mysql_close_free_extension(MYSQL *mysql) +{ + if (mysql->extension) + { + if (mysql->extension->async_context) + { + my_context_destroy(&mysql->extension->async_context->async_context); + my_free(mysql->extension->async_context, MYF(0)); + } + my_free(mysql->extension, MYF(0)); + mysql->extension= NULL; + } +} I still think it would be better to use mysql->option.extension as in this case you can do the above with 2 lines of code and we would not need another function. <discussions over IRC about this....> The reason for not using mysql->options.extension was only becasue mysql_real_connect() freed the options on failure. This code is not needed as they will be freed by mysql_close() anyway. So I suggest you remove the code: if (!(client_flag & CLIENT_REMEMBER_OPTIONS)) mysql_close_free_options(mysql); from mysql_real_connect() and change to use mysql->options.extension instead static void mysql_close_free(MYSQL *mysql) { my_free((uchar*) mysql->host_info,MYF(MY_ALLOW_ZERO_PTR)); @@ -3304,6 +3439,33 @@ (As some clients call this after mysql_real_connect() fails) */ +/* + mysql_close() can actually block, at least in theory, if the socket buffer + is full when sending the COM_QUIT command. + + On the other hand, the latter part of mysql_close() needs to free the stack + used for non-blocking operation of blocking stuff, so that later part can + _not_ be done non-blocking. + + Therefore, mysql_pre_close() is used to run the parts of mysql_close() that + may block. It can be called before mysql_close(), and in that case + mysql_close() is guaranteed not to need to block. +*/ Consider changing the name to 'mysql_slow_part_of_close()' and make it static if possible. +void mysql_pre_close(MYSQL *mysql) +{ + if (!mysql) + return; Remove test; impossible with current code, see belove in mysql_close. + /* If connection is still up, send a QUIT message */ + if (mysql->net.vio != 0) + { + free_old_query(mysql); + mysql->status=MYSQL_STATUS_READY; /* Force command */ + mysql->reconnect=0; + simple_command(mysql,COM_QUIT,(uchar*) 0,0,1); + end_server(mysql); /* Sets mysql->net.vio= 0 */ + } +} + void STDCALL mysql_close(MYSQL *mysql) { DBUG_ENTER("mysql_close"); @@ -3311,16 +3473,9 @@ + +my_socket STDCALL +mysql_get_socket(const MYSQL *mysql) +{ + if (mysql->net.vio) + return mysql->net.vio->sd; + else remove else + return INVALID_SOCKET; +} +++ sql-common/mysql_async.c 2011-09-20 10:49:25 +0000 +#ifdef __WIN__ +/* + Windows does not support MSG_DONTWAIT for send()/recv(). So we need to ensure + that the socket is non-blocking at the start of every operation. +*/ +#define WIN_SET_NONBLOCKING(mysql) { \ + my_bool old_mode__; \ + if ((mysql)->net.vio) vio_blocking((mysql)->net.vio, FALSE, &old_mode__); \ + } old_mode__ -> old_mode +#else +#define WIN_SET_NONBLOCKING(mysql) +#endif + +extern struct mysql_async_context *mysql_get_async_context(MYSQL *mysql); Move to header file. +/* Asynchronous connect(); socket must already be set non-blocking. */ +int +my_connect_async(struct mysql_async_context *b, my_socket fd, + const struct sockaddr *name, uint namelen, uint timeout) +{ + int res; +#ifdef __WIN__ + int s_err_size; +#else + socklen_t s_err_size; +#endif It would be nice to just define socklen_t for windows, but this isn't critical. You could have a DBUG_ASSERT here to check if socket is really in non blocking. + + /* + Start to connect asynchronously. + If this will block, we suspend the call and return control to the + application context. The application will then resume us when the socket + polls ready for write, indicating that the connection attempt completed. + */ + res= connect(fd, name, namelen); +#ifdef __WIN__ + if (res != 0) + { + int wsa_err= WSAGetLastError(); + if (wsa_err != WSAEWOULDBLOCK) + return res; +#else + if (res < 0) To not have only one { inside the #ifdef / #else I suggest that you move if (res != 0) { before the +#ifdef __WIN__. This works on both windows and Unix This will make the number of { balance + { + if (errno != EINPROGRESS && errno != EALREADY && errno != EAGAIN) + return res; +#endif + b->timeout_value= timeout; + b->ret_status= MYSQL_WAIT_WRITE | + (timeout ? MYSQL_WAIT_TIMEOUT : 0); +#ifdef __WIN__ + b->ret_status|= MYSQL_WAIT_EXCEPT; +#endif How about setting b->ret_status to 0 before res= connect() ? Then you can move the above code to the previous #ifdef __WIN__ and change the setting of b->ret_status to use |= This removes the extra ifdef __WIN__ and also ensures that ret_status is 0 if evertything went well. (not required but still better than a random value when trying to understand what's going in). + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + if (b->ret_status & MYSQL_WAIT_TIMEOUT) + return -1; + + s_err_size= sizeof(int); Better to use (safer and more descriptive): s_err_size= sizeof(res); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*) &res, &s_err_size) != 0) + return -1; <cut> + +ssize_t +my_recv_async(struct mysql_async_context *b, int fd, + unsigned char *buf, size_t size, uint timeout) +{ + ssize_t res; + + for (;;) + { + res= recv(fd, buf, size, +#ifdef __WIN__ + 0 +#else + MSG_DONTWAIT +#endif + ); Better to do before function start: #ifdef __WIN__ #define MSG_DONTWAIT 0 #endif and remove the above #ifdef in middle of call. Another option is to use the IF_WIN() macro: IF_WIN(0, MSG_DONTWAIT) + if (res >= 0 || +#ifdef __WIN__ + WSAGetLastError() != WSAEWOULDBLOCK +#else + (errno != EAGAIN && errno != EINTR) +#endif Please define macro: #define IS_BLOCKING_ERROR IF_WIN(WSAGetLastError() != WSAEWOULDBLOCK, (errno != EAGAIN && errno != EINTR)) and use that instead of having an ifdef in the middle of the code. + ) + return res; + b->ret_status= MYSQL_WAIT_READ; + if (timeout) + { + b->ret_status|= MYSQL_WAIT_TIMEOUT; + b->timeout_value= timeout; + } + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); The following call was quite confusion as we are checking a variable that we where just setting. <phone call> The reasons is that b->ret_status is both an in an out parameter. We agree to do one of the following to make the code easier to understand: - Change to use different bits for in and out status. - Change to use different variables for in and out status. + if (b->ret_status & MYSQL_WAIT_TIMEOUT) + return -1; + } +} + Add empty row here +ssize_t +my_send_async(struct mysql_async_context *b, int fd, + const unsigned char *buf, size_t size, uint timeout) +{ + ssize_t res; + + for (;;) + { + res= send(fd, buf, size, +#ifdef __WIN__ + 0 +#else + MSG_DONTWAIT +#endif + ); + if (res >= 0 || +#ifdef __WIN__ + WSAGetLastError() != WSAEWOULDBLOCK +#else + (errno != EAGAIN && errno != EINTR) +#endif + ) See comments for previous function how to get rid of the ifdefs' <cut> +#ifdef HAVE_OPENSSL +int +my_ssl_read_async(struct mysql_async_context *b, SSL *ssl, + void *buf, int size) +{ + int res, ssl_err; + + for (;;) + { + res= SSL_read(ssl, buf, size); + if (res >= 0) + return res; + ssl_err= SSL_get_error(ssl, res); + if (ssl_err == SSL_ERROR_WANT_READ) + b->ret_status= MYSQL_WAIT_READ; + else if (ssl_err == SSL_ERROR_WANT_WRITE) + b->ret_status= MYSQL_WAIT_WRITE; + else + return res; + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + } +} + +int +my_ssl_write_async(struct mysql_async_context *b, SSL *ssl, + const void *buf, int size) +{ + int res, ssl_err; + + for (;;) + { + res= SSL_write(ssl, buf, size); + if (res >= 0) + return res; + ssl_err= SSL_get_error(ssl, res); + if (ssl_err == SSL_ERROR_WANT_READ) + b->ret_status= MYSQL_WAIT_READ; + else if (ssl_err == SSL_ERROR_WANT_WRITE) + b->ret_status= MYSQL_WAIT_WRITE; + else + return res; + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); + my_context_yield(&b->async_context); + if (b->suspend_resume_hook) + (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); + } +} 90 % of the above functions are the same: I would do it this way: static my_bool my_ssl_async_check_result(int *res, struct mysql_async_context *b) { int ssl_err; if (*res >= 0) return 1; ssl_err= SSL_get_error(ssl, *res); if (ssl_err == SSL_ERROR_WANT_READ) b->ret_status= MYSQL_WAIT_READ; else if (ssl_err == SSL_ERROR_WANT_WRITE) b->ret_status= MYSQL_WAIT_WRITE; else return 1; if (b->suspend_resume_hook) (*b->suspend_resume_hook)(TRUE, b->suspend_resume_hook_user_data); my_context_yield(&b->async_context); if (b->suspend_resume_hook) (*b->suspend_resume_hook)(FALSE, b->suspend_resume_hook_user_data); return 0; } And then: my_ssl_read_async(struct mysql_async_context *b, SSL *ssl, void *buf, int size) { int res; for (;;) { res= SSL_read(ssl, buf, size); if (my_ssl_async_check_result(res, b)) return res; } } <cut> +#endif /* HAVE_OPENSSL */ + +unsigned int STDCALL +mysql_get_timeout_value(const MYSQL *mysql) +{ + if (mysql->extension && mysql->extension->async_context) + return mysql->extension->async_context->timeout_value; + else Remove else + return 0; +} + Add empty line +/* + Now create non-blocking definitions for all the calls that may block. + + Each call FOO gives rise to FOO_start() that prepares the MYSQL object for + doing non-blocking calls that can suspend operation mid-way, and then starts + the call itself. And a FOO_start_internal trampoline to assist with running + the real call in a co-routine that can be suspended. And a FOO_cont() that + can continue a suspended operation. +*/ + +#define MK_ASYNC_CALLS(call__, decl_args__, invoke_args__, cont_arg__, mysql_val__, parms_mysql_val__, parms_assign__, ret_type__, err_val__, ok_val__, extra1__) \ +static void \ +call__ ## _start_internal(void *d) \ +{ \ As discussed on IRC, it would be good to have the functions declarations outside of the macro; This makes it easier to make breakpoints in the code and help debugging. + struct call__ ## _params *parms; \ + ret_type__ ret; \ + struct mysql_async_context *b; \ + \ + parms= (struct call__ ## _params *)d; \ + b= (parms_mysql_val__)->extension->async_context; \ + \ + ret= call__ invoke_args__; \ + b->ret_result. ok_val__ = ret; \ + b->ret_status= 0; \ +} \ +int STDCALL \ +call__ ## _start decl_args__ \ +{ \ + int res; \ + struct mysql_async_context *b; \ + struct call__ ## _params parms; \ + \ + extra1__ \ + if (!(b= mysql_get_async_context((mysql_val__)))) \ + { \ + *ret= err_val__; \ + return 0; \ + } \ Instead of calling mysql_get_async_context, another option would be to do: mysql_options(mysql, MYSQL_OPT_ASYNC_IO, (char*) &STACK_SIZE) And if STACK_SIZE is 0, then we allocate our default stack size then you can change the above code to: b= mysql_val__->options.extension->async_context; If it doesn't exist, it's ok to crash on access. (Same as if mysql is 0) + parms_assign__ \ + \ + b->active= 1; \ + res= my_context_spawn(&b->async_context, call__ ## _start_internal, &parms);\ + b->active= 0; \ Set also b->suspended= 0 here. (Makes the rest of the code shorter) + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ How do we know the reason is out of memory? (Just curious) + b->suspended= 0; \ + *ret= err_val__; \ + return 0; \ + } \ + else if (res > 0) \ Remove else + { \ + /* Suspended. */ \ + b->suspended= 1; \ + return b->ret_status; \ + } \ + else \ Remove else and {} + { \ + /* Finished. */ \ + b->suspended= 0; \ + *ret= b->ret_result. ok_val__; \ + return 0; \ + } \ +} \ If you change to test for (res > 0) first, you can combine the two return 0 to one row and thus remove some of the { }. Makes the code shorter (at least to read): b->active= b->suspended= 0; if (res > 0) { b->suspended= 1; return b->ret_status; } if (res < 0) { set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); *ret= err_val__; } else *ret= b->ret_result. ok_val__; /* Finished. */ return 0; +int STDCALL \ +call__ ## _cont(ret_type__ *ret, cont_arg__, int ready_status) \ +{ \ + int res; \ + struct mysql_async_context *b; \ + \ + b= (mysql_val__)->extension->async_context; \ + if (!b || !b->suspended) \ Remove test for !b; You can assume it's exists. (We can never protect aginst wrong usage) + { \ + set_mysql_error((mysql_val__), CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate);\ + *ret= err_val__; \ + return 0; \ + } \ + \ + b->active= 1; \ + b->ret_status= ready_status; \ + res= my_context_continue(&b->async_context); \ + b->active= 0; \ + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ + b->suspended= 0; \ + *ret= err_val__; \ + return 0; \ + } \ + else if (res > 0) \ + { \ + /* Suspended. */ \ + return b->ret_status; \ + } \ + else \ + { \ + /* Finished. */ \ + b->suspended= 0; \ + *ret= b->ret_result. ok_val__; \ + return 0; \ + } \ You can do the same 'suspend and check for error' optimization I did in previous function to reduce the above code with 4 lines. +} + +#define MK_ASYNC_CALLS_VOID_RETURN(call__, decl_args__, invoke_args__, cont_arg__, mysql_val__, parms_mysql_val__, parms_assign__, extra1__) \ +static void \ +call__ ## _start_internal(void *d) \ +{ \ + struct call__ ## _params *parms; \ + struct mysql_async_context *b; \ + \ + parms= (struct call__ ## _params *)d; \ + b= (parms_mysql_val__)->extension->async_context; \ + \ + call__ invoke_args__; \ + b->ret_status= 0; \ +} \ +int STDCALL \ +call__ ## _start decl_args__ \ +{ \ + int res; \ + struct mysql_async_context *b; \ + struct call__ ## _params parms; \ + \ + extra1__ \ + if (!(b= mysql_get_async_context((mysql_val__)))) \ + { \ + return 0; \ + } \ You can remove test if we know that b always exists. At least you can remove '{' + parms_assign__ \ + \ + b->active= 1; \ + res= my_context_spawn(&b->async_context, call__ ## _start_internal, &parms);\ + b->active= 0; \ + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ + b->suspended= 0; \ + return 0; \ + } \ + else if (res > 0) \ + { \ + /* Suspended. */ \ + b->suspended= 1; \ + return b->ret_status; \ + } \ + else \ + { \ + /* Finished. */ \ + b->suspended= 0; \ + return 0; \ + } \ +} \ See previous comment of how to remove 4 code lines. +int STDCALL \ +call__ ## _cont(cont_arg__, int ready_status) \ +{ \ + int res; \ + struct mysql_async_context *b; \ + \ + b= (mysql_val__)->extension->async_context; \ + if (!b || !b->suspended) \ Assume !b exists + { \ + set_mysql_error((mysql_val__), CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate);\ + return 0; \ + } \ + \ + b->active= 1; \ + b->ret_status= ready_status; \ + res= my_context_continue(&b->async_context); \ + b->active= 0; \ + if (res < 0) \ + { \ + set_mysql_error((mysql_val__), CR_OUT_OF_MEMORY, unknown_sqlstate); \ + b->suspended= 0; \ + return 0; \ + } \ + else if (res > 0) \ + { \ + /* Suspended. */ \ + return b->ret_status; \ + } \ + else \ + { \ + /* Finished. */ \ + b->suspended= 0; \ + return 0; \ + } \ +} Remove extra lines above. Add comment here (and for other similar structs): /* This struct is used by the next MK_ASYNC_CALL */ + +struct mysql_real_connect_params { + MYSQL *mysql; + const char *host; + const char *user; + const char *passwd; + const char *db; + unsigned int port; + const char *unix_socket; + unsigned long client_flags; +}; +MK_ASYNC_CALLS( + mysql_real_connect, + (MYSQL **ret, MYSQL *mysql, const char *host, const char *user, + const char *passwd, const char *db, unsigned int port, + const char *unix_socket, unsigned long client_flags), + (parms->mysql, parms->host, parms->user, parms->passwd, parms->db, + parms->port, parms->unix_socket, parms->client_flags), <cut> +mysql_close_start(MYSQL *sock) +{ + int res; + + /* It is legitimate to have NULL sock argument, which will do nothing. */ + if (sock) + { + res= mysql_pre_close_start(sock); + /* If we need to block, return now and do the rest in mysql_close_cont(). */ + if (res) + return res; + } + mysql_close(sock); + return 0; +} +int STDCALL +mysql_close_cont(MYSQL *sock, int ready_status) +{ + int res; + + res= mysql_pre_close_cont(sock, ready_status); + if (res) + return res; + mysql_close(sock); + return 0; +} You should use the above mysql_close functions also in client/async_example.cc +#ifdef USE_OLD_FUNCTIONS Forget these! We should remove them soon anyway... <cut> === added file 'tests/async_queries.c' +void +add_query(const char *q) +{ + struct query_entry *e; + char *q2; + size_t len; + + e= malloc(sizeof(*e)); + q2= strdup(q); Would be better if you used my_malloc / my_strdup here. (To get things safemalloc checked). + if (!e || !q2) + fatal(NULL, "Out of memory"); + + /* Remove any trailing newline. */ + len= strlen(q2); + if (q2[len] == '\n') + q2[len--]= '\0'; + if (q2[len] == '\r') + q2[len--]= '\0'; + + e->next= NULL; + e->query= q2; + e->index= query_counter++; + *tail_ptr= e; + tail_ptr= &e->next; +} <cut> === modified file 'vio/viosocket.c' --- vio/viosocket.c 2011-05-14 16:42:07 +0000 +++ vio/viosocket.c 2011-09-20 10:49:25 +0000 @@ -21,6 +21,7 @@ */ #include "vio_priv.h" +#include "my_context.h" int vio_errno(Vio *vio __attribute__((unused))) { @@ -31,18 +32,34 @@ size_t vio_read(Vio * vio, uchar* buf, size_t size) { size_t r; + extern ssize_t my_recv_async(struct mysql_async_context *b, int fd, + unsigned char *buf, size_t size, uint timeout); Move to include file. DBUG_ENTER("vio_read"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); /* Ensure nobody uses vio_read_buff and vio_read simultaneously */ DBUG_ASSERT(vio->read_end == vio->read_pos); + if (vio->async_context && vio->async_context->active) + r= my_recv_async(vio->async_context, vio->sd, buf, size, vio->read_timeout); + else + { <cut> === modified file 'vio/viossl.c' @@ -21,6 +21,7 @@ */ #include "vio_priv.h" +#include "my_context.h" #ifdef HAVE_OPENSSL @@ -90,11 +91,16 @@ size_t vio_ssl_read(Vio *vio, uchar* buf, size_t size) { size_t r; + extern int my_ssl_read_async(struct mysql_async_context *b, SSL *ssl, + void *buf, int size); Move to extern file (Same for other extern declarations in this file). <cut> Good and intresting work! Regards, Monty