Hi! Here is the first part of the review. I have 1/2 of one file left (rpl_parallel.cc) to review but I wanted you to have a chance to read what I have done so far. I plan to finish the review tomorrow and start working on the code during the weekend.
"Kristian" == Kristian Nielsen
writes:
Kristian> Hi Monty,
Kristian> Here is a copy of the status mail I sent earlier. The status is mostly still
Kristian> current.
Kristian> The code is in this tree:
Kristian> lp:~maria-captains/maria/10.0-knielsen
Kristian> In that tree, you can extract the parallel replication code as a single patch
Kristian> with a command like this:
Kristian> bzr diff -rrevid:knielsen@knielsen-hq.org-20130608103621-g91eaielv4n22aha
Kristian> In the forwarded mail, I describe getting a 4.5 times speedup on a quad-core
Kristian> laptop, so that's quite promising.
=== modified file 'mysql-test/r/mysqld--help.result'
--- mysql-test/r/mysqld--help.result 2013-05-28 11:28:31 +0000
+++ mysql-test/r/mysqld--help.result 2013-08-29 08:18:22 +0000
<cut>
@@ -783,6 +794,11 @@
--slave-net-timeout=#
Number of seconds to wait for more data from any
master/slave connection before aborting the read
+ --slave-parallel-threads=#
+ If non-zero, number of threads to spawn to apply in
+ parallel events on the slave that were group-committed on
+ the master or were logged with GTID in different
+ replication domains.
What happens if --slave-parallel-threads is 0 ?
Does this mean that there will be no parallel threads?
Is this the same as 1 ?
If same as 1, why not have the option to take values starting from 1 ?
IRC:
K> yes, if --slave-parallel-threads is 0, then the feature is disabled. The old code is used, and the SQL thread applies the events itself
K> if --slave-parallel-threads is > 0, then a pool of threads are spawned. The SQL threads do not apply events themselves, they just put it in queues for the parallel threads to apply
M> do we ever need to do this with only one thread?
K> (we might want to give an error if user tries to do this, as it does not make much sense)
M> So why not have the value from 1 to N
M> where 1 is old code and > 1 is using new code ?
K> That might make sense
=== modified file 'mysql-test/suite/perfschema/r/dml_setup_instruments.result'
--- mysql-test/suite/perfschema/r/dml_setup_instruments.result 2013-03-26 09:35:34 +0000
+++ mysql-test/suite/perfschema/r/dml_setup_instruments.result 2013-08-29 08:18:22 +0000
@@ -38,14 +38,14 @@ order by name limit 10;
NAME ENABLED TIMED
wait/synch/cond/sql/COND_flush_thread_cache YES YES
wait/synch/cond/sql/COND_manager YES YES
+wait/synch/cond/sql/COND_parallel_entry YES YES
+wait/synch/cond/sql/COND_prepare_ordered YES YES
wait/synch/cond/sql/COND_queue_state YES YES
wait/synch/cond/sql/COND_rpl_status YES YES
+wait/synch/cond/sql/COND_rpl_thread YES YES
+wait/synch/cond/sql/COND_rpl_thread_pool YES YES
wait/synch/cond/sql/COND_server_started YES YES
wait/synch/cond/sql/COND_thread_cache YES YES
-wait/synch/cond/sql/COND_thread_count YES YES
-wait/synch/cond/sql/Delayed_insert::cond YES YES
-wait/synch/cond/sql/Delayed_insert::cond_client YES YES
-wait/synch/cond/sql/Event_scheduler::COND_state YES YES
Any idea why the above was deleted?
Don't see anything in your patch that could affect that.
<cut>
=== added file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- mysql-test/suite/rpl/t/rpl_parallel.test 1970-01-01 00:00:00 +0000
+++ mysql-test/suite/rpl/t/rpl_parallel.test 2013-08-29 08:18:22 +0000
I didn't find any test results for rpl_parallel.test or rpl_paralell2 in
your tree. Did you forget to commit these?
I was also missing some comment in these files what they was testing.
=== modified file 'sql/log.cc'
--- sql/log.cc 2013-06-06 15:51:28 +0000
+++ sql/log.cc 2013-08-29 08:18:22 +0000
@@ -6541,44 +6543,201 @@ MYSQL_BIN_LOG::write_transaction_to_binl
}
Could you please document in detail the arguments and return value
for the following function.
- I spent 3 hours to try to understand this function in detail and having
a full description of the arguments would have helped a bit.
bool
-MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
+ wait_for_commit *wfc)
{
+ group_commit_entry *orig_queue;
+ wait_for_commit *list, *cur, *last;
+
/*
To facilitate group commit for the binlog, we first queue up ourselves in
the group commit queue. Then the first thread to enter the queue waits for
the LOCK_log mutex, and commits for everyone in the queue once it gets the
lock. Any other threads in the queue just wait for the first one to finish
the commit and wake them up.
Do you still use LOCK_log for this?
It's a bit confusing to mention LOCK_log here when you take the
LOCK_prepare_ordered in the function.
If LOCK_log is required, shouldn't there be an assert for it in this function?
+
+ To support in-order parallel replication with group commit, after we add
+ some transaction to the queue, we check if there were other transactions
+ already prepared to commit but just waiting for the first one to commit.
+ If so, we add those to the queue as well, transitively for all waiters.
*/
entry->thd->clear_wakeup_ready();
mysql_mutex_lock(&LOCK_prepare_ordered);
<cut>
+ orig_queue= group_commit_queue;
+
+ /*
+ Iteratively process everything added to the queue, looking for waiters,
+ and their waiters, and so on. If a waiter is ready to commit, we
+ immediately add it to the queue; if not we just wake it up.
+
+ This would be natural to do with recursion, but we want to avoid
+ potentially unbounded recursion blowing the C stack, so we use the list
+ approach instead.
Add here:
cur->next_subsequent_commit is at this point an empty list. 'last' will
always point to the last element of the list. We use this list
to simulate recursion.
I did find it confusing that here you threat next_subsequent_commit as
a way to avoid recursion, but you use this list for other things in
register_wait_for_prior_commit().
Suggestion:
- If you want to reuse memory for the list, please use an union over
next_subsequent_commit and have two names for it, so that you don't
in the code use the same name for two different list.
+ */
+ list= wfc;
+ cur= list;
+ last= list;
+ for (;;)
+ {
+ /* Add the entry to the group commit queue. */
+ entry->next= group_commit_queue;
+ group_commit_queue= entry;
+
+ if (entry->cache_mngr->using_xa)
+ {
+ DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+ run_prepare_ordered(entry->thd, entry->all);
+ DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ }
+
+ if (!cur)
+ break; // Can happen if initial entry has no wait_for_commit
+
Add a comment what the following if is testing
(As there is several lists involved, the code is not that easy to understand)
+ if (cur->subsequent_commits_list)
+ {
+ bool have_lock;
+ wait_for_commit *waiter;
+
+ mysql_mutex_lock(&cur->LOCK_wait_commit);
+ have_lock= true;
+ waiter= cur->subsequent_commits_list;
+ /* Check again, now safely under lock. */
+ if (waiter)
+ {
+ /* Grab the list of waiters and process it. */
+ cur->subsequent_commits_list= NULL;
+ do
+ {
+ wait_for_commit *next= waiter->next_subsequent_commit;
+ group_commit_entry *entry2=
+ (group_commit_entry *)waiter->opaque_pointer;
+ if (entry2)
+ {
+ /*
+ This is another transaction ready to be written to the binary
+ log. We can put it into the queue directly, without needing a
+ separate context switch to the other thread. We just set a flag
+ so that the other thread will know when it wakes up that it was
+ already processed.
+
+ So put it at the end of the list to be processed in a subsequent
+ iteration of the outer loop.
+ */
+ entry2->queued_by_other= true;
+ last->next_subsequent_commit= waiter;
+ last= waiter;
+ /*
+ As a small optimisation, we do not actually need to set
+ waiter->next_subsequent_commit to NULL, as we can use the
+ pointer `last' to check for end-of-list.
+ */
+ }
+ else
+ {
+ /*
+ Wake up the waiting transaction.
+
+ For this, we need to set the "wakeup running" flag and release
+ the waitee lock to avoid a deadlock, see comments on
+ THD::wakeup_subsequent_commits2() for details.
+ */
+ if (have_lock)
+ {
+ cur->wakeup_subsequent_commits_running= true;
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ have_lock= false;
Move have_lock= false to just after the 'if (have_lock)'
- Easier to see that you are really reseting the variable.
- Faster as there is no need to reload the variable from stack after
mysql_mutex_unlock() function.
- I try to nowadays in my code to always reset flag variables just after
testing them to achive the above.
+ }
+ waiter->wakeup();
+ }
+ waiter= next;
+ } while (waiter);
+ }
You can optimze away one row, one 'if level' and one 'if' away by doing this:
Move: 'cur->subsequent_commits_list= NULL;' before the if.
Change 'if (waiter)' to 'while (waiter)'
Remove 'do' and the corresponding 'while (waiter);'
+ if (have_lock)
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ }
+ if (cur == last)
+ break;
Add a comment here of what is in next_subsequent_commit list.
+ cur= cur->next_subsequent_commit;
+ entry= (group_commit_entry *)cur->opaque_pointer;
+ DBUG_ASSERT(entry != NULL);
}
+
+ /* Now we need to clear the wakeup_subsequent_commits_running flags. */
+ if (list)
+ {
+ for (;;)
+ {
+ if (list->wakeup_subsequent_commits_running)
+ {
+ mysql_mutex_lock(&list->LOCK_wait_commit);
+ list->wakeup_subsequent_commits_running= false;
+ mysql_mutex_unlock(&list->LOCK_wait_commit);
Why do we need the mutex above?
The only place where we test this variable, as far as I can find, is in
wait_for_commit::register_wait_for_prior_commit()
This thread doesn't know if the other thread is just before
mysql_mutex_lock(&waitee->LOCK_wait_commit), inside the lock or after
the lock. So when we reset the variable should not matter.
But somehow this sounds strange. What happens if the above variable is
reset just before mysql_mutex_lock(&waitee->LOCK_wait_commit) in
wait_for_commit::register_wait_for_prior_commit() ?
Isn't the waitee added to the list when it shouldn't?
+ }
+ if (list == last)
+ break;
+ list= list->next_subsequent_commit;
+ }
+ }
+
+ if (opt_binlog_commit_wait_count > 0)
+ mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered);
DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
+ return orig_queue == NULL;
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+{
+ wait_for_commit *wfc;
+ bool is_leader;
+
+ wfc= entry->thd->wait_for_commit_ptr;
+ entry->queued_by_other= false;
+ if (wfc && wfc->waiting_for_commit)
+ {
Add comment, something like:
/* We have to wait for the threads in wfc->waiting_for_commit */
+ mysql_mutex_lock(&wfc->LOCK_wait_commit);
+ /* Do an extra check here, this time safely under lock. */
+ if (wfc->waiting_for_commit)
+ {
+ wfc->opaque_pointer= entry;
+ do
+ {
+ mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+ } while (wfc->waiting_for_commit);
+ wfc->opaque_pointer= NULL;
+ }
+ mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+ }
<cut>
@@ -6597,7 +6756,10 @@ MYSQL_BIN_LOG::write_transaction_to_binl
if (next)
{
- next->thd->signal_wakeup_ready();
+ if (next->queued_by_other)
+ next->thd->wait_for_commit_ptr->wakeup();
What does the above code mean ? (add a comment).
Why do we wakeup the thread we where waiting for instead of ourselves?
+ else
+ next->thd->signal_wakeup_ready();
}
else
{
<cut>
@@ -6968,6 +7140,48 @@ MYSQL_BIN_LOG::write_transaction_or_stmt
return 0;
}
+
+void
+MYSQL_BIN_LOG::wait_for_sufficient_commits()
+{
+ size_t count;
+ group_commit_entry *e;
+ group_commit_entry *last_head;
+ struct timespec wait_until;
+
+ mysql_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_owner(&LOCK_prepare_ordered);
+
+ count= 0;
+ for (e= last_head= group_commit_queue; e; e= e->next)
+ ++count;
+ if (count >= opt_binlog_commit_wait_count)
+ return;
Assuming that the queue is bigger may be better:
for (e= last_head= group_commit_queue, count=0; e ; e= e->next)
if (++count >= opt_binlog_commit_wait_count)
return;
+ mysql_mutex_unlock(&LOCK_log);
Here you are going to take LOCK_log and LOCK_prepare_ordered in the
wrong order. safe_mutex() should have noticed this.
In other code you are first taking LOCK_log and then
LOCK_prepare_ordered. If someone else is calling
trx_group_commit_leader() while you are in this code you would get a
mutex deadlock.
+ set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec);
+
+ for (;;)
+ {
+ int err;
+ group_commit_entry *head;
+
+ err= mysql_cond_timedwait(&COND_prepare_ordered, &LOCK_prepare_ordered,
+ &wait_until);
+ if (err == ETIMEDOUT)
+ break;
+ head= group_commit_queue;
+ for (e= head; e && e != last_head; e= e->next)
+ ++count;
+ if (count >= opt_binlog_commit_wait_count)
+ break;
+ last_head= head;
+ }
+
+ mysql_mutex_lock(&LOCK_log);
+}
<cut>
=== modified file 'sql/log_event.cc'
--- sql/log_event.cc 2013-06-06 15:51:28 +0000
+++ sql/log_event.cc 2013-08-29 08:18:22 +0000
<cut>
@@ -6101,6 +6110,18 @@ Gtid_log_event::Gtid_log_event(const cha
domain_id= uint4korr(buf);
buf+= 4;
flags2= *buf;
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ {
+ if (event_len < (uint)header_size + GTID_HEADER_LEN + 2)
+ {
+ seq_no= 0; // So is_valid() returns false
+ return;
+ }
+ ++buf;
+ commit_id= uint8korr(buf);
+ }
+ else
+ commit_id= 0;
}
Setting commit_id=0 first instead in the if will generate smaller and
faster code (setting a variable is faster than an a jmp).
=== modified file 'sql/log_event.h'
--- sql/log_event.h 2013-06-06 15:51:28 +0000
+++ sql/log_event.h 2013-08-29 08:18:22 +0000
@@ -1317,9 +1317,9 @@ class Log_event
@see do_apply_event
*/
- int apply_event(Relay_log_info const *rli)
+ int apply_event(struct rpl_group_info *rgi)
What is the simple rule to know when to use Relay_log_info and when to
use rpl_group_info ?
If I understand things correct, relay_log_info should only be used for
direct manipulation of data in the relay log.
By the way, you should probably change 'struct rpl_group_info' to just
rpl_group_info; Now you are mixing both versions in the code.
=== modified file 'sql/log_event_old.cc'
--- sql/log_event_old.cc 2013-02-19 10:45:29 +0000
+++ sql/log_event_old.cc 2013-08-29 08:18:22 +0000
@@ -67,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_r
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->sql_thd == ev_thd);
+ DBUG_ASSERT(rgi->thd == ev_thd);
Should we change all test for rli->sql_thd to rgi->thd ?
You have at least this test in log_event.c and slave.cc.
I assume that rli should not have a sql_thd member anymore ?
<cut>
=== added file 'sql/rpl_parallel.cc'
--- sql/rpl_parallel.cc 1970-01-01 00:00:00 +0000
+++ sql/rpl_parallel.cc 2013-08-29 08:18:22 +0000
@@ -0,0 +1,699 @@
+#include "my_global.h"
+#include "rpl_parallel.h"
+#include "slave.h"
+#include "rpl_mi.h"
+
+
+/*
+ Code for optional parallel execution of replicated events on the slave.
+
+ ToDo list:
+
+ - Review every field in Relay_log_info, and all code that accesses it.
+ Split out the necessary parts into rpl_group_info, to avoid conflicts
+ between parallel execution of events. (Such as deferred events ...)
+
+ - Error handling. If we fail in one of multiple parallel executions, we
+ need to make a best effort to complete prior transactions and roll back
+ following transactions, so slave binlog position will be correct.
+ And all the retry logic for temporary errors like deadlock.
+
+ - Stopping the slave needs to handle stopping all parallel executions. And
+ the logic in sql_slave_killed() that waits for current event group to
+ complete needs to be extended appropriately...
+
+ - Audit the use of Relay_log_info::data_lock. Make sure it is held
+ correctly in all needed places also when using parallel replication.
+
+ - We need some user-configurable limit on how far ahead the SQL thread will
+ fetch and queue events for parallel execution (otherwise if slave gets
+ behind we will fill up memory with pending malloc()'ed events).
+
+ - Fix update of relay-log.info and master.info. In non-GTID replication,
+ they must be serialised to preserve correctness. In GTID replication, we
+ should not update them at all except at slave thread stop.
+
+ - All the waits (eg. in struct wait_for_commit and in
+ rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
+ everything needs to be correctly rolled back and stopped in all threads,
+ to ensure a consistent slave replication state.
I can add kill checking in all functions that waits for a mutex.
Will do it with adding enter_cond()/exit_cond() in all functions
waiting for mysql_cond_wait().
+
+ - Handle the case of a partial event group. This occurs when the master
+ crashes in the middle of writing the event group to the binlog. The
+ slave rolls back the transaction; parallel execution needs to be able
+ to deal with this wrt. commit_orderer and such.
+
+ - Relay_log_info::is_in_group(). This needs to be handled correctly in all
+ callers. I think it needs to be split into two, one version in
+ Relay_log_info to be used from next_event() in slave.cc, one to be used in
+ per-transaction stuff.
+
+ - We should fail if we connect to the master with opt_slave_parallel_threads
+ greater than zero and master does not support GTID. Just to avoid a bunch
+ of potential problems, we won't be able to do any parallel replication
+ in this case anyway.
If the master doesn't support GTID, I think we should instead of
stopping, just issue a warning in the log and continue with normal
replication.
Otherwise we would have problems when using multi-source replication
if a single slave doesn't support GTID.
+*/
+
+struct rpl_parallel_thread_pool global_rpl_thread_pool;
+
+
+static void
+rpt_handle_event(rpl_parallel_thread::queued_event *qev,
+ struct rpl_parallel_thread *rpt)
+{
+ int err;
+ struct rpl_group_info *rgi= qev->rgi;
+ Relay_log_info *rli= rgi->rli;
+ THD *thd= rgi->thd;
+
+ thd->rli_slave= rli;
+ thd->rpl_filter = rli->mi->rpl_filter;
+ /* ToDo: Get rid of rli->group_info, it is not thread safe. */
+ rli->group_info= rgi;
+
+ /* ToDo: Access to thd, and what about rli, split out a parallel part? */
+ mysql_mutex_lock(&rli->data_lock);
+ err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
We should also get rid of the thd argument and change the function to
be a bool so that we can return an error.
+ /* ToDo: error handling. */
+}
+
+
+pthread_handler_t
+handle_rpl_parallel_thread(void *arg)
+{
<cut>
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->thd= thd;
+
+ while (rpt->delay_start)
+ mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
A much easier way to achive the wait is to have the caller lock
mysql_mutex_lock(&rpt->LOCK_rpl_thread) and then just release it
when it's time for this thread to continue.
- No need for signaling or checking if the thread got the signal
- No need to have a cond wait here
- No need to have a delay_start flag
+ rpt->running= true;
+
+ while (!rpt->stop && !thd->killed)
+ {
+ rpl_parallel_thread *list;
+
+ old_msg= thd->proc_info;
+ thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
+ "Waiting for work from SQL thread");
+ while (!rpt->stop && !thd->killed && !(events= rpt->event_queue))
+ mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
Add comment: /* Mark that this thread is now executing */
+ rpt->free= false;
+ rpt->event_queue= rpt->last_in_queue= NULL;
+ thd->exit_cond(old_msg);
+
Note that events may be undefined here if thd->killed was set above
(for the first loop)
It would be good to check for thd->killed or rpt->stop here and then abort
the loop.
Another question: do we really need rpt->stop ?
Isn't it enough with using the thd->killed for this ?
Otherwise we would need to check for both rpt->top and thd->killed in
every loop that may be aborted.
+ more_events:
+ while (events)
+ {
+ struct rpl_parallel_thread::queued_event *next= events->next;
+ Log_event_type event_type= events->ev->get_type_code();
+ rpl_group_info *rgi= events->rgi;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ uint64 wait_for_sub_id;
+ uint64 wait_start_sub_id;
+ bool end_of_group;
+
Add a comment why this is handled here and not in rpt_handle_event()
+ if (event_type == GTID_EVENT)
+ {
+ in_event_group= true;
Add a comment what the following variable stands for
+ group_standalone=
+ (0 != (static_cast