Hi, Nikita, Despite the email subject, it's a review of everything in bb-10.10-MDEV-16440, *except* 9c57bbfef92. That is 37e1ccbf678..c16b2429f02 without notifiable_work_zone.h I'll send a separate email about notifiable_work_zone.h On Sep 08, Nikita Malyavin wrote:
revision-id: b96afca238a (mariadb-10.6.1-472-gb96afca238a) parent(s): 37e1ccbf678 author: Nikita Malyavin committer: Nikita Malyavin timestamp: 2022-06-19 14:52:30 +0300 message:
MDEV-16440 merge 5.7 P_S sysvars instrumentation and tables
diff --git a/mysql-test/suite/perfschema/t/variables_stress.test b/mysql-test/suite/perfschema/t/variables_stress.test new file mode 100644 index 00000000000..be3012bf8af --- /dev/null +++ b/mysql-test/suite/perfschema/t/variables_stress.test @@ -0,0 +1,71 @@
stress tests generally don't belong in mtr and aren't reliable enough for the mtr. Also, you forgot to check in the result file for it. on the other hand, looking at the patch I got the impression that this stress test helped to find a lot of bugs.
+let $i = 64; +enable_connect_log; +while ($i) { + echo i = $i; + connect(con_$i, localhost, root); +# set debug_sync = "tp_hanger wait_for banger"; + send set debug_dbug = "+d,tp_wanger"; + dec $i; +} +connection default; + +SELECT THREAD_ID, PROCESSLIST_ID FROM performance_schema.threads; +set debug_sync = "now signal banger"; +connect(banger_1, localhost, root); +let $banger1= `SELECT THREAD_ID FROM performance_schema.threads WHERE PROCESSLIST_ID = CONNECTION_ID()`; +send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; +connect(banger_2, localhost, root); +let $banger2= `SELECT THREAD_ID FROM performance_schema.threads WHERE PROCESSLIST_ID = CONNECTION_ID()`; +send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; +connect(banger_3, localhost, root); +let $banger3= `SELECT THREAD_ID FROM performance_schema.threads WHERE PROCESSLIST_ID = CONNECTION_ID()`; +send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; +connect(banger_4, localhost, root); +let $banger4= `SELECT THREAD_ID FROM performance_schema.threads WHERE PROCESSLIST_ID = CONNECTION_ID()`; +send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; +connect(banger_5, localhost, root); +let $banger5= `SELECT THREAD_ID FROM performance_schema.threads WHERE PROCESSLIST_ID = CONNECTION_ID()`; + +send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; + +let $i = 2000; +let $j=65; +while ($i) { + echo i = $i; + connection banger_1; + reap; + send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; + connection banger_2; + reap; + send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; + connection banger_3; + reap; + send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; + connection banger_4; + reap; + send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; + connection banger_5; + reap; + send select count(*) from performance_schema.variables_by_thread where variable_name="time_zone"; + dec $i; + + let $threads= `show status where variable_name = 'threads_connected'`; + if ($threads < 63) { + connect(con_$i, localhost, root); + send set debug_dbug = "+d,tp_wanger"; + inc $j; + } +} +reap; +select * from performance_schema.variables_by_thread where variable_name="time_zone"; +echo $banger1; +echo $banger2; +echo $banger3; +echo $banger4; +echo $banger5; +# eval select * from performance_schema.variables_by_thread where variable_name like "%id" and thread_id=197; +connection default; +# set global debug_dbug = ""; +set debug_sync = reset; + +show status where variable_name = 'threads_connected'; diff --git a/sql/sql_plugin.h b/sql/sql_plugin.h index d4df8c6468f..3d02d1c7509 100644 --- a/sql/sql_plugin.h +++ b/sql/sql_plugin.h @@ -186,6 +186,10 @@ extern SHOW_COMP_OPTION plugin_status(const char *name, size_t len, int type); extern bool check_valid_path(const char *path, size_t length); extern void plugin_mutex_init();
+template <class T> class Dynamic_array;
you can simply include sql_array.h, can't you?
+ +void plugin_lock_by_sys_var_array(THD *thd, Dynamic_array<SHOW_VAR> *vars); + typedef my_bool (plugin_foreach_func)(THD *thd, plugin_ref plugin, void *arg); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 0597b086b7b..ec86389d08f 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1296,6 +1296,9 @@ dispatch_command_return do_command(THD *thd, bool blocking) goto out; }
+ if (unlikely(thd->apc_target.have_apc_requests())) + thd->apc_target.process_apc_requests();
1. why here? 2. there generally shouldn't be direct checks of apc requests, the code should do check_killed() often enough. So if you really need a check here (see question 1), add if (thd->check_killed())...
+ packet= (char*) net->read_pos; /* 'packet_length' contains length of data, as it was stored in packet diff --git a/vio/viopipe.c b/vio/viopipe.c index aeaad311b7e..5106c6d6513 100644 --- a/vio/viopipe.c +++ b/vio/viopipe.c @@ -53,8 +53,9 @@ static size_t wait_overlapped_result(Vio *vio, int timeout) } else { + DWORD last_error= GetLastError(); /* Error or timeout, cancel the pending I/O operation. */ - CancelIo(vio->hPipe); + CancelIoEx(vio->hPipe, &vio->overlapped);
why, CancelIoEx resets last error, but CancelIo doesn't?
/* If the wait timed out, set error code to indicate a diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index ed68e31c755..25844d1f3ef 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -131,6 +131,11 @@ void TP_pool_win::resume(TP_connection* c) SubmitThreadpoolWork(((TP_connection_win*)c)->work); }
+int TP_pool_win::wake(TP_connection *c) +{ + return 0; +}
How is that supposed to work? How do you wake up threadpool threads on Windows?
+ #define CHECK_ALLOC_ERROR(op) \ do \ { \ diff --git a/mysys/my_thr_init.c b/mysys/my_thr_init.c index cde34e5bdb9..89f190ee989 100644 --- a/mysys/my_thr_init.c +++ b/mysys/my_thr_init.c @@ -432,9 +432,21 @@ const char *my_thread_name(void) extern void **my_thread_var_dbug() { struct st_my_thread_var *tmp; + int last_error; if (!my_thread_global_init_done) return NULL; +#ifdef WIN32 + last_error= GetLastError(); +#else + last_error= errno; +#endif + tmp= my_thread_var; +#ifdef WIN32 + SetLastError(last_error); +#else + errno= last_error; +#endif
huh? my_thread_var can change errno?
return tmp && tmp->init ? &tmp->dbug : 0; } #endif /* DBUG_OFF */ diff --git a/storage/perfschema/pfs_variable.h b/storage/perfschema/pfs_variable.h index e59b02f2af8..f509e97048c 100644 --- a/storage/perfschema/pfs_variable.h +++ b/storage/perfschema/pfs_variable.h @@ -212,7 +212,7 @@ class Find_THD_variable : public Find_THD_Impl return false;
/* Hold this lock to keep THD during materialization. */ - mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill);
see commits 736a54f49c72 and c6c2a2b8d463
return true; } void set_unsafe_thd(THD *unsafe_thd) { m_unsafe_thd= unsafe_thd; } diff --git a/sql/threadpool_generic.h b/sql/threadpool_generic.h index b7a35b7cbf0..e16ec4a4966 100644 --- a/sql/threadpool_generic.h +++ b/sql/threadpool_generic.h @@ -88,9 +89,17 @@ struct TP_connection_generic :public TP_connection ulonglong abs_wait_timeout; ulonglong enqueue_time; TP_file_handle fd; + /** + Designates whether fd is currently connected to the poll denoted by + thread_group->pollfd. See also change_group. + */ bool bound_to_poll_descriptor; int waiting; bool fix_group; +#ifndef WIN32 + Notifiable_work_zone work_zone; + bool leave_work_zone() final; +#endif
Do you need this #ifndef? WIN32 is *never* defined, it's _WIN32. and _WIN32 is never defined *here*, on Windows we use TP_connection_win.
#ifdef _WIN32 win_aiosocket win_sock{}; void init_vio(st_vio *vio) override diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a0ba1d7e5a3..bfe49cddbf4 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1909,13 +1910,13 @@ void THD::awake_no_mutex(killed_state state_to_set) }
/* Broadcast a condition to kick the target if it is waiting on it. */ -void THD::abort_current_cond_wait(bool force) +void THD::abort_current_cond_wait(bool force, bool mark_abort)
a bit illogical, these values aren't orthogonal. you want a 3-state enum, like ABORT_NONE, ABORT_CONNECTIONS_BUT_NOT_SYSTEM_THREADS, ABORT_ALL
{ mysql_mutex_assert_owner(&LOCK_thd_kill); if (mysys_var) { mysql_mutex_lock(&mysys_var->mutex); - if (!system_thread || force) // Don't abort locks + if (mark_abort && (!system_thread || force)) // Don't abort locks mysys_var->abort=1;
/* @@ -4373,6 +4374,14 @@ my_bool thd_net_is_killed(THD *thd) return thd && thd->killed ? 1 : 0; }
+/* returns true if any APC was processed */ +void thd_net_process_apc_requests(THD *thd)
even though it's called from net_serv.cc, there's nothing about NET here, so calling it thd_net_process_apc_requests is misleading
+{ + if (unlikely(thd->apc_target.have_apc_requests())) + thd->apc_target.process_apc_requests();
you also call it from the threadpool_common and a couple of times you have these two lines verbatim in other places (e.g. sql_parse.cc) why couldn't you call thd->check_killed() instead?
+ DEBUG_SYNC(thd, "net_after_apc"); +} +
void thd_increment_bytes_received(void *thd, size_t length) { diff --git a/sql/my_apc.h b/sql/my_apc.h index cc98e36bbe4..ae60bdc2763 100644 --- a/sql/my_apc.h +++ b/sql/my_apc.h @@ -81,7 +83,7 @@ class Apc_target */ inline bool have_apc_requests() { - return MY_TEST(apc_calls); + return MY_TEST(apc_calls.load(std::memory_order_acquire));
why?
}
inline bool is_enabled() { return enabled; } @@ -95,14 +97,19 @@ class Apc_target virtual ~Apc_call() {} };
+ class Call_request; /* Make a call in the target thread (see function definition for details) */ - bool make_apc_call(THD *caller_thd, Apc_call *call, int timeout_sec, bool *timed_out); + bool make_apc_call(THD *caller_thd, Apc_call *call, + int timeout_sec, bool *timed_out); + + void enqueue_request(Call_request *request_buff, Apc_call *call); + int wait_for_completion(THD *caller_thd, Call_request *request, + int timeout_sec);
You wrote
In pfs_variable.cc we will have to make an additional step between enqueue_request and wait_for_completion. These two functions will be called directly and therefore both should have a public interface
and I don't think I quite agree with that. Your "additional step" seems to be abort_current_cond_wait() and notify_apc(). It's not an exotic additional step that only pfs_variable.cc needs, anyone, who makes an apc call needs the target thread to wake up and process it. It's something that should be part of apc code, not done only in pfs_variable.cc And this case you likely won't need enqueue_request and wait_for_completion as a public api
#ifndef DBUG_OFF int n_calls_processed; /* Number of calls served by this target */ #endif private: - class Call_request;
/* Non-zero value means we're enabled. It's an int, not bool, because one can diff --git a/sql/threadpool.h b/sql/threadpool.h index 7737d056b4a..08e85d362e7 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -112,6 +119,8 @@ struct TP_connection
/* Read for the next client command (async) with specified timeout */ virtual int start_io() = 0; + IF_WIN(,virtual) bool leave_work_zone(){ return true; } + IF_WIN(Notifiable_work_zone work_zone;,) // Dummy object.
Ugh, this is unreadable. IF_WIN was supposed to be used in expressions. And why do you need virtual methods here, again?
virtual void wait_begin(int type)= 0; virtual void wait_end() = 0; diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index eb08441a4d5..f61dd90606b 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -114,7 +114,8 @@ struct pool_timer_t
static pool_timer_t pool_timer;
-static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); +IF_DBUG(,static)
you can remove this IF_DBUG, I don't see you use queue_put outside of this file.
+void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt); static int wake_thread(thread_group_t *thread_group,bool due_to_stall); static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false); @@ -1485,8 +1520,41 @@ static int change_group(TP_connection_generic *c, }
+#ifndef WIN32 +bool TP_connection_generic::leave_work_zone() +{ + auto leave_state= work_zone.try_leave(); + while (unlikely(leave_state == Notifiable_work_zone::SIGNAL)) + { + if (thd->apc_target.have_apc_requests()) + thd->apc_target.process_apc_requests(); + leave_state= work_zone.try_leave(); + } + return leave_state == Notifiable_work_zone::OWNER; +} +#endif + +#ifndef DBUG_OFF +#include <random> +static thread_local std::mt19937 mt_random{std::random_device()()}; +static thread_local std::uniform_int_distribution<int> kill_dist{1, 100000}; +void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); +void tp_send(TP_connection_type* c); +#endif + int TP_connection_generic::start_io() { + if(DBUG_IF("tp_wanger")) + { + bool survive= kill_dist(mt_random) != 555;
does that compile? you define kill_dist under #ifndef DBUG_OFF. You can fix this with, like #ifndef DBUG_OFF if (DBUG_IF("tp_wanger")) { static thread_local std::mt19937 ... etc } #endif
+ if (survive) + { + state= TP_STATE_INTERRUPTED; + tp_send(this); + } + return survive ? 0 : 1; + } + /* Usually, connection will stay in the same group for the entire connection's life. However, we do allow group_count to diff --git a/vio/viosocket.c b/vio/viosocket.c index 002ff274b74..45c3d80a33d 100644 --- a/vio/viosocket.c +++ b/vio/viosocket.c @@ -909,20 +916,53 @@ static my_bool socket_peek_read(Vio *vio, uint *bytes) */
#ifndef _WIN32 +#ifdef _GNU_SOURCE +#define PFD_SIZE 1 +static inline int vio_poll(struct pollfd pfd[], nfds_t nr, int timeout) +{ + struct timespec tm, *tm_arg= NULL; + sigset_t signals; + /* Convert the timeout, in milliseconds, to seconds and microseconds. */ + if (timeout >= 0) + { + tm.tv_sec= timeout / 1000; + tm.tv_nsec= (timeout % 1000) * 1000 * 1000; + tm_arg= &tm; + } + sigemptyset(&signals); + + return ppoll(pfd, 1, tm_arg, &signals); +} +#else +#define PFD_SIZE 2 +#define vio_poll poll +#endif
why do you need that? How is your vio_poll() different from poll() ?
+ int vio_io_wait(Vio *vio, enum enum_vio_io_event event, int timeout) { int ret; short revents __attribute__((unused)) = 0; - struct pollfd pfd; + const short revents_read = MY_POLL_SET_IN | MY_POLL_SET_ERR | POLLRDHUP; + struct pollfd pfd[PFD_SIZE]; my_socket sd= mysql_socket_getfd(vio->mysql_socket); + int pfd_size; MYSQL_SOCKET_WAIT_VARIABLES(locker, state) /* no ';' */ DBUG_ENTER("vio_io_wait"); DBUG_PRINT("enter", ("timeout: %d", timeout));
- memset(&pfd, 0, sizeof(pfd)); + memset(pfd, 0, sizeof(pfd));
- pfd.fd= sd; + pfd[0].fd= sd;
+#ifndef _GNU_SOURCE + my_socket self_pipe= threadlocal_get_self_pipe(); + if (self_pipe) + { + pfd[1].fd= self_pipe; + pfd[1].events= MY_POLL_SET_IN; + } +#endif + pfd_size= PFD_SIZE;
really? I'd expect it to be essentially pfd_size= self_pipe ? 2 : 1;
/* Set the poll bitmask describing the type of events. The error flags are only valid in the revents bitmask. @@ -970,16 +1023,16 @@ int vio_io_wait(Vio *vio, enum enum_vio_io_event event, int timeout) }
#else - int vio_io_wait(Vio *vio, enum enum_vio_io_event event, int timeout) { int ret; - struct timeval tm; - my_socket fd= mysql_socket_getfd(vio->mysql_socket); - fd_set readfds, writefds, exceptfds; MYSQL_SOCKET_WAIT_VARIABLES(locker, state) /* no ';' */ DBUG_ENTER("vio_io_wait");
+ /* Not read , e.g connect or write - use select() based wait */ + struct timeval tm; + my_socket fd= mysql_socket_getfd(vio->mysql_socket); + fd_set readfds, writefds, exceptfds;
it's C, you cannot mix declarations and code
/* Convert the timeout, in milliseconds, to seconds and microseconds. */ if (timeout >= 0) { @@ -1013,14 +1066,17 @@ int vio_io_wait(Vio *vio, enum enum_vio_io_event event, int timeout) ret= select(0, &readfds, &writefds, &exceptfds, (timeout >= 0) ? &tm : NULL);
END_SOCKET_WAIT(locker, timeout); /* Set error code to indicate a timeout error. */ if (ret == 0) WSASetLastError(SOCKET_ETIMEDOUT);
/* Error or timeout? */ - if (ret <= 0) + if (ret <= 0){
indentation
+ DBUG_PRINT("vio", ("vio_io_wait: error return: %d. Last error: %d", + ret, socket_errno)); DBUG_RETURN(ret); + }
/* The requested I/O event is ready? */ switch (event) diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index 9beb3cca8f8..1c8975cfb23 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -23,7 +23,53 @@ #include "mariadb.h" #include "mysqld.h" #include "sql_priv.h" -#ifndef _WIN32 +#ifdef _WIN32 +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif + +#define _WIN32_WINNT 0x0500 +#include "windows.h" // QueueUserAPC2 +#define WINDOWS_KERNEL32_DLLNAME_W "kernel32" + +// https://github.com/dotnet/runtime/pull/55649/files + +// These declarations are for a new special user-mode APC feature introduced in Windows. These are not yet available in Windows +// SDK headers, so some names below are prefixed with "CLONE_" to avoid conflicts in the future. Once the prefixed declarations +// become available in the Windows SDK headers, the prefixed declarations below can be removed in favor of the SDK ones. + +//enum CLONE_QUEUE_USER_APC_FLAGS +//{ +// CLONE_QUEUE_USER_APC_FLAGS_NONE = 0x0, +// CLONE_QUEUE_USER_APC_FLAGS_SPECIAL_USER_APC = 0x1, +// +// CLONE_QUEUE_USER_APC_CALLBACK_DATA_CONTEXT = 0x10000 +//}; +//typedef BOOL (WINAPI *QueueUserAPC2Proc)(PAPCFUNC ApcRoutine, HANDLE Thread, ULONG_PTR Data, CLONE_QUEUE_USER_APC_FLAGS Flags); +//void c() +//{ +// HMODULE hKernel32 = LoadLibraryExA(WINDOWS_KERNEL32_DLLNAME_W, NULL, LOAD_LIBRARY_SEARCH_SYSTEM32); +// +// // See if QueueUserAPC2 exists +// QueueUserAPC2Proc pfnQueueUserAPC2Proc = (QueueUserAPC2Proc)GetProcAddress(hKernel32, "QueueUserAPC2"); +// if (pfnQueueUserAPC2Proc == nullptr) +// { +// abort(); +// } +// +// // See if QueueUserAPC2 supports the special user-mode APC with a callback that includes the interrupted CONTEXT. A special +// // user-mode APC can interrupt a thread that is in user mode and not in a non-alertable wait. +// if (!(*pfnQueueUserAPC2Proc)(EmptyApcCallback, GetCurrentThread(), 0, SpecialUserModeApcWithContextFlags)) +// { +// return; +// } +// +// return pfnQueueUserAPC2Proc; +//} +// +//static QueueUserAPC2Proc pfnQueueUserAPC2 = InitializeSpecialUserModeApc();
what's that big commented block for? and win-something defines/includes - still needed?
+ +#else #include <netdb.h> // getservbyname, servent #endif #include "sql_audit.h" @@ -1353,6 +1399,107 @@ bool thd_is_connection_alive(THD *thd) return FALSE; }
+#if !defined(_WIN32) +static void self_pipe_write(); +#if !defined(_GNU_SOURCE) +class Thread_apc_context; +static thread_local Thread_apc_context *_THR_APC_CTX= NULL;
may be it should be part of THD (rather, Apc_target) or mysys_var? that is, accessible as current_thd->...->self_pipe or my_thread_var->self_pipe the first one (in Apc_target) seems quite logical
+#endif + +class Thread_apc_context +{ +public: +#ifndef _GNU_SOURCE + my_socket self_pipe[2]{}; +#endif + + bool setup_thread_apc() + { +#if defined(_GNU_SOURCE) + struct sigaction act {}; + act.sa_handler= [](int) -> void {self_pipe_write();};
why? it doesn't do anything, as far as I can see
+ act.sa_flags= 0; + int ret= sigaction(SIG_APC_NOTIFY, &act, NULL); + DBUG_ASSERT(ret == 0); + + sigset_t signals; + ret|= sigemptyset(&signals); + DBUG_ASSERT(ret == 0); + ret|= sigaddset(&signals, SIG_APC_NOTIFY); + DBUG_ASSERT(ret == 0); + + ret|= pthread_sigmask(SIG_BLOCK, &signals, NULL); + DBUG_ASSERT(ret == 0); +#else + // Self-pipe trick. Create a new pipe and store it thread-locally + // It can be accessed from Vio later. See also vio_io_wait() + // From the other end, it is accessed through a threadlocal + // _THR_APC_CTX, from a SIG_APC_NOTIFY signal handler. + int ret= pipe(self_pipe); + + struct sigaction act {}; + act.sa_handler= [](int) -> void { self_pipe_write(); }; + act.sa_flags= 0; + ret|= sigaction(SIG_APC_NOTIFY, &act, NULL);
why? you send a signal and convert it to a self-pipe write. why not to write to the pipe in the first place? like Apc_target::wake() which will send a signal or write to a pipe, as appropriate. And TP_pool::wake() will call thd->apc_target->wake();
+ return ret == 0; +#endif + return true; + } + bool inited; + Thread_apc_context() + { + inited = setup_thread_apc(); +#ifndef _GNU_SOURCE + if (inited) + _THR_APC_CTX= this; +#endif + } +#ifndef _GNU_SOURCE + ~Thread_apc_context() + { + _THR_APC_CTX= NULL; + closesocket(self_pipe[0]); + closesocket(self_pipe[1]); + } +#endif +}; + +#ifdef _GNU_SOURCE +static void self_pipe_write() +{ + // No self-pipe is actually used. Instead, ppoll is used to wake up on signal. +} +#else + +my_socket threadlocal_get_self_pipe() +{ + return _THR_APC_CTX ? _THR_APC_CTX->self_pipe[0] : 0; +} + +static void self_pipe_write() +{ + DBUG_ASSERT(IF_WIN(0, pthread_self() == select_thread) || _THR_APC_CTX); + if (!_THR_APC_CTX) + return; // SIGUSR1 is used in thr_alarm, which can wake up main thread. + char buf[4]{}; + send(_THR_APC_CTX->self_pipe[1], buf, sizeof buf, 0); +} +#endif +#endif /* !_WIN32 */ + +bool thread_per_connection_notify_apc(THD *thd) +{ +#ifdef WIN32 + auto vio= thd->net.vio; + HANDLE h= + (vio->type == VIO_TYPE_NAMEDPIPE) ? vio->hPipe : (HANDLE)vio->mysql_socket.fd; + return CancelIoEx(h, NULL); +#else + bool ret = pthread_kill(thd->mysys_var->pthread_self, SIG_APC_NOTIFY) == 0; + DBUG_ASSERT(ret || errno == EAGAIN); + return ret; +#endif +}
1. I'd prefer APC framework implementation to stay in my_apc.cc. not being spread over three (?) different files. 2. how is it diferent from ::wake() ?
void do_handle_one_connection(CONNECT *connect, bool put_in_cache) { diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index f72f46b3a6b..8f24bc9e4f7 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -320,6 +346,10 @@ static void threadpool_remove_connection(THD *thd) end_connection(thd); close_connection(thd, 0); unlink_thd(thd); + // The rest of APC requests should be processed after unlinking. + // This guarantees that no new APC requests can be added. + // TODO: better notify the requestor with some KILLED state here. + thd->apc_target.process_apc_requests();
not sure it guarantees much. One can find the thd before the unlink and add a new request after the unlink. or is it impossible?
PSI_CALL_delete_current_thread(); // before THD is destroyed delete thd;
diff --git a/sql/my_apc.cc b/sql/my_apc.cc index 9777deb399a..a89d9c8bc19 100644 --- a/sql/my_apc.cc +++ b/sql/my_apc.cc @@ -47,11 +47,12 @@ void Apc_target::init(mysql_mutex_t *target_mutex) void Apc_target::enqueue_request(Call_request *qe) { mysql_mutex_assert_owner(LOCK_thd_kill_ptr); - if (apc_calls) + Call_request *old_apc_calls= apc_calls;
eh, what? you modify apc_calls without a lock somewhere?
+ if (old_apc_calls) { - Call_request *after= apc_calls->prev; - qe->next= apc_calls; - apc_calls->prev= qe; + Call_request *after= old_apc_calls->prev; + qe->next= old_apc_calls; + old_apc_calls->prev= qe;
qe->prev= after; after->next= qe; @@ -100,89 +109,118 @@ void init_show_explain_psi_keys(void) if (PSI_server == NULL) return;
- PSI_server->register_cond("sql", show_explain_psi_conds, - array_elements(show_explain_psi_conds)); + PSI_server->register_cond("sql", apc_request_psi_conds, + array_elements(apc_request_psi_conds)); + PSI_server->register_mutex("sql", apc_request_psi_locks, + array_elements(apc_request_psi_locks)); } #endif
- -/* - Make an APC (Async Procedure Call) to another thread. - - @detail - Make an APC call: schedule it for execution and wait until the target - thread has executed it. - - - The caller is responsible for making sure he's not posting request - to the thread he's calling this function from. - - - The caller must have locked target_mutex. The function will release it. - - @retval FALSE - Ok, the call has been made - @retval TRUE - Call wasnt made (either the target is in disabled state or - timeout occurred) -*/ - -bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call, - int timeout_sec, bool *timed_out) +/** + Wait gracefully until the request is completed. + @retval 0 -- Success + @retval 1 -- Timeout + */ +int Apc_target::wait_for_completion(THD *caller_thd, Call_request *apc_request, + int timeout_sec) { - bool res= TRUE; - *timed_out= FALSE; - - if (enabled) - { - /* Create and post the request */ - Call_request apc_request; - apc_request.call= call; - apc_request.processed= FALSE; - mysql_cond_init(key_show_explain_request_COND, &apc_request.COND_request, - NULL); - enqueue_request(&apc_request); - apc_request.what="enqueued by make_apc_call"; - struct timespec abstime; const int timeout= timeout_sec; set_timespec(abstime, timeout);
+ DBUG_EXECUTE_IF("apc_timeout", set_timespec_nsec(abstime, 1000000);); + int res = 1; int wait_res= 0; PSI_stage_info old_stage; - caller_thd->ENTER_COND(&apc_request.COND_request, LOCK_thd_kill_ptr, + + mysql_mutex_unlock(LOCK_thd_kill_ptr);
I don't understand how this works. After you release LOCK_thd_kill_ptr, the THD might be destroyed. How do you detect that? By the wait timing out? Who will destroy the Call_request?
+ + mysql_mutex_lock(&apc_request->LOCK_request); + caller_thd->ENTER_COND(&apc_request->COND_request, &apc_request->LOCK_request, &stage_show_explain, &old_stage); /* todo: how about processing other errors here? */ - while (!apc_request.processed && (wait_res != ETIMEDOUT)) + while (!apc_request->processed && (wait_res != ETIMEDOUT)) { - /* We own LOCK_thd_kill_ptr */ - wait_res= mysql_cond_timedwait(&apc_request.COND_request, - LOCK_thd_kill_ptr, &abstime); - // &apc_request.LOCK_request, &abstime); + wait_res= mysql_cond_timedwait(&apc_request->COND_request, + &apc_request->LOCK_request, &abstime); if (caller_thd->killed) break; + + if (caller_thd->apc_target.have_apc_requests()) + { + mysql_mutex_unlock(&apc_request->LOCK_request); + caller_thd->apc_target.process_apc_requests(); + mysql_mutex_lock(&apc_request->LOCK_request); + } }
- if (!apc_request.processed) + if (!apc_request->processed) { /* The wait has timed out, or this thread was KILLed. - Remove the request from the queue (ok to do because we own - LOCK_thd_kill_ptr) + We can't delete it from the queue, because LOCK_thd_kill_ptr is already + released. It can't be reacquired because of the ordering with + apc_request->LOCK_request. + However, apc_request->processed is guarded by this lock. + Set processed= TRUE and transfer the ownership to the processor thread. + It should free the resources itself. + apc_request cannot be referred after unlock anymore in this case. */ - apc_request.processed= TRUE; - dequeue_request(&apc_request); - *timed_out= TRUE; - res= TRUE; + apc_request->processed= TRUE; + res= 1; } else { /* Request was successfully executed and dequeued by the target thread */ - res= FALSE; + res= 0; } - /* - exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us: - */ + + /* EXIT_COND() will call mysql_mutex_unlock(LOCK_request) for us */ caller_thd->EXIT_COND(&old_stage);
- /* Destroy all APC request data */ - mysql_cond_destroy(&apc_request.COND_request); + return res; +} + +/** Create and post the request */ +void Apc_target::enqueue_request(Call_request *request_buff, Apc_call *call) +{ + request_buff->call= call; + request_buff->processed= FALSE; + enqueue_request(request_buff); + request_buff->what="enqueued by make_apc_call"; +} + +/** + Make an APC (Async Procedure Call) to another thread. + + @detail + Make an APC call: schedule it for execution and wait until the target + thread has executed it. + + - The caller is responsible for making sure he's not posting request + to the thread he's calling this function from. + + - The caller must have locked target_mutex. The function will release it. + + @retval FALSE - Ok, the call has been made + @retval TRUE - Call wasnt made (either the target is in disabled state or + timeout occurred) +*/ + +bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call, + int timeout_sec, bool *timed_out) +{ + bool res= TRUE; + *timed_out= FALSE; + + if (enabled) + { + Call_request *apc_request= new Call_request; + enqueue_request(apc_request, call); + res= wait_for_completion(caller_thd, apc_request, timeout_sec); + *timed_out= res; + if (!*timed_out) + delete apc_request; } else { diff --git a/storage/perfschema/pfs_variable.cc b/storage/perfschema/pfs_variable.cc index 17b7dfc200c..698ff1eca6c 100644 --- a/storage/perfschema/pfs_variable.cc +++ b/storage/perfschema/pfs_variable.cc @@ -164,7 +167,11 @@ int PFS_system_variable_cache::do_materialize_global(void) during materialization. */ if (!m_external_init) + { + mysql_mutex_lock(&LOCK_plugin); init_show_var_array(OPT_GLOBAL, true); + mysql_mutex_unlock(&LOCK_plugin);
1. why not move the locking inside init_show_var_array()? 2. init_show_var_array() locks LOCK_system_variables_hash - is that not enough?
+ }
/* Resolve the value for each SHOW_VAR in the array, add to cache. */ for (SHOW_VAR *show_var= m_show_var_array.front(); @@ -230,37 +236,26 @@ int PFS_system_variable_cache::do_materialize_all(THD *unsafe_thd) m_materialized= false; m_cache.clear();
- /* Block plugins from unloading. */ - mysql_mutex_lock(&LOCK_plugin_delete);
also please remove all references of LOCK_plugin_delete
- /* Build array of SHOW_VARs from system variable hash. Do this within LOCK_plugin_delete to ensure that the hash table remains unchanged while this thread is materialized. */ if (!m_external_init) + { + mysql_mutex_lock(&LOCK_plugin); init_show_var_array(OPT_SESSION, false); + mysql_mutex_unlock(&LOCK_plugin); + }
/* Get and lock a validated THD from the thread manager. */ if ((m_safe_thd= get_THD(unsafe_thd)) != NULL) { DEBUG_SYNC(m_current_thd, "materialize_session_variable_array_THD_locked"); - for (SHOW_VAR *show_var= m_show_var_array.front(); - show_var->value && (show_var != m_show_var_array.end()); show_var++) - { - /* Resolve value, convert to text, add to cache. */ - System_variable system_var(m_safe_thd, show_var, m_query_scope, false); - m_cache.push(system_var); - } - - /* Release lock taken in get_THD(). */ - mysql_mutex_unlock(&m_safe_thd->LOCK_thd_data); + ret= make_call(&PFS_system_variable_cache::refresh_vars, 1);
m_materialized= true; - ret= 0; } - - mysql_mutex_unlock(&LOCK_plugin_delete); return ret; }
@@ -312,6 +307,105 @@ void PFS_system_variable_cache::free_mem_root(void) } } } +class PFS_system_variable_cache_apc: public Apc_target::Apc_call +{ +public: + typedef PFS_system_variable_cache::Request_func Request; + PFS_system_variable_cache_apc(PFS_system_variable_cache *pfs, Request func, + uint param) + : m_pfs(pfs), m_func(func), m_param(param) {} +private: + PFS_system_variable_cache *m_pfs; + Request m_func; + uint m_param; + + void call_in_target_thread() override + { + call(m_pfs, m_func, m_param); + } +public: + static void call(PFS_system_variable_cache *pfs, Request func, uint param) + { + THD *safe_thd= pfs->safe_thd(); + + DBUG_ASSERT(pfs->query_scope() == OPT_SESSION); + + mysql_mutex_lock(&LOCK_global_system_variables); + if (!safe_thd->variables.dynamic_variables_ptr || + global_system_variables.dynamic_variables_head > + safe_thd->variables.dynamic_variables_head) + { + mysql_prlock_rdlock(&LOCK_system_variables_hash); + sync_dynamic_session_variables(safe_thd, false); + mysql_prlock_unlock(&LOCK_system_variables_hash); + } + mysql_mutex_unlock(&LOCK_global_system_variables); + + (pfs->*func)(param); + } +}; + +void PFS_system_variable_cache::refresh_vars(uint all) +{ + for (SHOW_VAR *show_var= m_show_var_array.front(); + show_var->value && (show_var != m_show_var_array.end()); show_var++) + { + sys_var *value= (sys_var *)show_var->value; + + /* Match the system variable scope to the target scope. */ + if (all || match_scope(value->scope())) + { + /* Resolve value, convert to text, add to cache. */ + System_variable system_var(m_safe_thd, show_var, m_query_scope, false); + m_cache.push(system_var); + } + } +} +void PFS_system_variable_cache::refresh_one_var(uint index) +{ + SHOW_VAR *show_var= &m_show_var_array.at(index); + + if (show_var && show_var->value && + (show_var != m_show_var_array.end())) + { + sys_var *value= (sys_var *)show_var->value; + + /* Match the system variable scope to the target scope. */ + if (match_scope(value->scope())) + { + /* Resolve value, convert to text, add to cache. */ + System_variable system_var(m_safe_thd, show_var, m_query_scope, false); + m_cache.push(system_var); + } + } +} + + +#define MAKE_CALL_MAX_RETRIES 3
unused
+ +int PFS_system_variable_cache::make_call(Request_func func, uint param) +{ + int ret= 0; + THD *requestor_thd= m_current_thd; + if (requestor_thd == m_safe_thd) + { + mysql_mutex_unlock(&m_safe_thd->LOCK_thd_kill); + PFS_system_variable_cache_apc::call(this, func, param); + } + else + { + PFS_system_variable_cache_apc apc_call(this, func, param); + auto *request= new Apc_target::Call_request; + m_safe_thd->apc_target.enqueue_request(request, &apc_call); + m_safe_thd->abort_current_cond_wait(false, false); + m_safe_thd->scheduler->notify_apc(m_safe_thd); + DEBUG_SYNC(requestor_thd, "apc_after_notify"); + ret= m_safe_thd->apc_target.wait_for_completion(requestor_thd, request, 10); + if (ret == 0) + delete request; + } + return ret; +}
/** Build a SESSION system variable cache for a pfs_thread.
Regards, Sergei Chief Architect, MariaDB Server and security@mariadb.org