Re: [Maria-developers] [Maria-discuss] Pipeline-stype slave parallel applier
andrei.elkin@pp.inet.fi writes:
Your comments, thoughts and critical notes are most welcome!
Thanks for the interesting idea and detailed description, Andrei! I have written some initial comments inline, below:
entering the binlog group (ordered) commit module. They can end up into binlog as A,B or B,A, and regardless of that (Gtid actually) order also be acknowledged to the user in either way.
The latter fact infers that slave does not have to commit the group sub-transaction/branches in the GTID order. It could be done in any order including no order at all like to do so as one epoch commit.
However, MariaDB GTID requires that transactions be binlogged in the same order on the slave as on the master. Otherwise it is not possible to move a lower-level GTID slave from one master to the other, as the slave position only includes a single GTID in MariaDB.
conservative scheduler offers a decent parallelization method still suffering from the aforementioned uneven branch's sizes and inter-thread communications caused by ordering. The optimistic scheduler remedies to some level but not fully (an optimistically executed group may be unbalanced too) and even conceding some risk of performance degradation (rollback is costly).
Jean François Gagné's comprehensive benchmarking of parallel replication actually has some very interesting data on this. He ran a real Booking.com workload with literally thousands of worker threads and had a huge rollback ratio (I remember it as up around 40%), and still saw speedup and good scalabitily. Apparently rollback isn't necessarily so costly; maybe because replication servers often have spare cores idling anyway, and because the aborted transaction acts as a pre-fetcher, improving I/O parallelism. But the fact that thousands of threads still improved speed suggests exactly that there was a problem of uneven transaction size - a few large transactions spread far apart, so lots of transaction lookahead was needed. I do not know if those transactions were single large queries or contained many individual statements, but in the latter case this is exactly what this idea could help with!
There is an idea reminiscent to pipelining to handle the epoch (the master binlog group) as a single transaction but in parallel. That is to execute its statements by multiple workers, this time scheduling them fairly, and "atomically" commit their works.
So this is a very interesting idea. Once the transactions are recorded in the binlog, we have a serial stream of statements that will reproduce the original data on the master. And they will do this regardless of if we remove some of the COMMITs/BEGINs - or if we add some. (I wonder if there are any exceptions to this, I can't think of any off the top of my head). So instead of trying to parallelise transactions, just do all the individual statements in parallel, disregarding transaction boundaries (with some restrictions to ensure consistency, as you describe below). I wonder how much this will benefit real-life workloads. There seems to be a lot of potential. For some workloads it will not help - if most transactions are single-statement, or if the multi-statement transactions have dependencies between the statements. But that doesn't seem likely to be the typical scenario. But why tie this to the master binlog group? It seems to fit perfectly to the optimistic parallel replication mode, where different transactions are run in parallel speculatively, and any conflicts are detected and resolved. In fact, it seems the exact same mechanism will work to detect conflicts between the more fine-grained per-statement parallelisation. The conservative parallle replication I consider obsolete. It causes a lot of hassle for the user to try to get good-sized group commits on the master to ensure parallel replication opportunities on the slave. And it doesn't even ensure consistency; there are corner cases (in InnoDB and elsewhere) that can cause conflicts to appear anyway on the slave, so the rollback+retry mechanism is needed anyway for conservative, just like for optimistic.
To the fairness part, workers get assigned by a statement at a time from the epoch transaction(sub-transaction BEGIN, COMMIT removed). Say 'E' an epoch consists of 's' of sub-transaction
E := { T_1, T_2, ... T_s }
For each m from 1 to s sub-transactions T_m gets scheduled to some worker
W_1 W_2 ... W_k
| | | V V V
T_m := { e_1, e_2, ... e_l }
Hereinafter I use the TeX math notation, '^' - a superscript attached to the event to designate its parent transaction sequence number (Gtid seq_no), '_' - a subscript to enumerate an object within its compound object.
e_i stands for T-ransation's statements, W_t:s are workers. Effectively the epoch breaks into modified branches executed by workers on one to one basic:
E := { T'_1, T'_2, ... T'_k }
here 'k' is the number of workers engaged, T' indicates the modified transaction. The branches are executed until they are ready to prepare which event is triggered by scheduling of the last statement of the last original sub-transaction.
It's clear that sizes of the modified branches are even this time.
Well... each worker has exactly one statement at a time, but each statement can be very different in execution cost. But more even than transaction-based scheduling, certainly.
The last statement worker coordinates 2pc.
Thinking more technically workers can consume/pull from the current epoch presented as a queue which is pushed into by statement producer.
Here is a possible state of the queue when m:th statement of T_n is about to be pushed:
e^n_k ->
[ e^n_m-1, ..., e^n_2, e^n_1; ...; e^1_l1, ... e^1_2, e^1_1 ]
...----- T_n ------------| T_n-1,...T_2 |---------- T_1 ---------|
A pull by consumer at this point would return e^1_1.
How do you imagine integrating this in the current parallel replication scheduling algorithm? In the current algorithm, the producer (the SQL thread) assigns work to worker threads in a round-robin way. There is a lot of care taken to minimise contention on a single global queue-lock or similar, and I think that is an important reason why Jean François' tests were able to scale so well to many thousand worker threads. I wonder if the existing scheduling could not be used directly for this idea also - but it needs to be extended of course to schedule individual statements rather than whole transactions, with some way to let worker threads coordinate transaction start/end between them. What are your thoughts on this?
Very likely that by the epoch prepare event *all* the branches have been already ready for prepare().
Hm, any particular reason you think so? I would imagine the opposite, that individual statements will be scheduled quickly, and it will be quite random in which order they happen to become ready in?
This pipelining parallelization method can work for the single "large" transaction in the binlog group and also could drag into the parallelizable input transactions from later groups if we additionally create master side dependency tracking (think of mysql logical timestamp method). Also notice that the optimisitic scheduler is orthogonal to this method so the two can be combined.
Consistency concern -------------------
The epoch parallel execution (scheduling) must respect intra-transaction statement dependencies (e.g FIFO execution order of operations over the same object [record, table]). There is no inter-transaction dependencies, at least in theory.
In fact, there do exist such inter-transaction dependencies in a number of corner cases, at least if we want to enforce the same commit order on slave as on master, which is required for MariaDB GTID. But that is already handled in current code, by detecting the dependency inside the storage engine (the thd_rpl_deadlock_check() mechanism). I wonder if the exact same thd_rpl_deadlock_check() mechanism would not work as well for this per-statement scheduling idea? It would be interesting to see a proof-of-concept based on this...
The notion of a leader, the two-phase-commits leader, remains roughly the same as in the existing conservative scheduler. This time more than initiating commits of the epoch branches it also takes care to advance the slave gtid execution status accordingly. It could be implemented as the very last statement of its own T' branch.
Hm, right, interesting, agree that this seems a good fit. In fact, if the existing mechanism can be slightly extended, maybe it can already do much of what is needed to ensure consistency. Eg. suppose we have 3 original transactions each with two statements: e^1_1 e^1_2 e^2_1 e^2_2 e^3_1 e^3_2 Suppose the first 3 of these are ready to commit at some point: e^1_1 e^1_2 e^2_1 The leader can check that it doesn't try to commit only part of an original transaction. In this case it sees that e^2_1 is missing e_2_2, and it can commit only the first part and leave the partial transaction for a following group commit: e^2_1 This way, we already ensure that a crash will not leave us with an inconsistent storage engine state on the slave. Because we never commit part of an original master transaction - all parts of it are always group-committed together. Seems promising, though that code is also heavily optimised for scalability, so will require some case. Hm, one case needs to be handled though - when there are more statements in one original master transaction, than there are worker threads - otherwise we end up with a deadlock.
It's clear that causal dependencies of an user write and a read (to find the effect of that write) can be tracked by WAIT_FOR_GTID() as currently.
A plain two-phase commit does admit possibility to catch on slave combinations of the old (pre-epoch) and new version (post-epoch) or some objects of the same transaction if the user hits with non-locking SELECT in a "tiny" time window between commit():s of two branches like in the following example.
Let t1,t2 some tables and the binlog group consists of just one transaction
T_1 := { update(t1), update(t2) }
Let on slave it's modified into two parallel ones:
T'_1 := { update(t1) } || T'2 := { update(t2) }
After both respond OK to 2pc prepare() and then the leader initiates commit() where could be 4 possible result sets to the contemporary SELECT
SET @@session.tx_isolation='REPEATABLE-READ'; SELECT * from t1 as t_1, t2 as t_2; => ...
Well, due to the enforced commit order, it will not be possible to see the case where update(t2) is committed but update(t1) is not. In fact, if the group commit is extended as suggested above (to only group-commit the statements of an original transaction together), then I think it is possible for the user to avoid this problem entirely using START TRANSACTION WITH CONSISTENT SNAPSHOT In MariaDB, this ensures a consistent read view against 2pc transactions (reader will always see all of the 2pc commit or none of it). And IIRC, it works by holding the same lock (LOCK_commit_ordered) that is used by the group commit leader. So if the user has some special case where this minor read-inconsistency is sensitive, the START TRANSACTION WITH CONSISTENT SNAPSHOT mechanism can be used to avoid it completely.
While this is perhaps more than imperfect, I believe it may be relevant only to remote corner cases, but more importantly, we still can refine that doing some *adjustments* in the Engine studied briefly myself of what would be a sub-transaction in Innodb, "ideologically" endorsed and guided by Marko Makela. The idea is to hold the branch's locks even after the branch commits, and delegate their release to the leader.
Recoverability --------------
When slave is configured to log in the binary log we must make sure to rollback any prepared epoch branches at the server recovery unless the last branch (of the leader) is also there, prepared.
When it does not binlog the problem is that the modified branch's binlog images can't be just flushed one after another especially if the user specifies to log on the slave uniformly with the master that is to preserve the original sub-transaction structures and their boundaries.
"if the user specified" - but this is always a requirement in MariaDB, right? Otherwise GTID order is broken?
In such case there are several multiple option, here are some:
- use relay-logged image of original sub-transactions (may be limited to the master and slave version combination though;)
I did not follow what this means - can you elaborate?
- reconstruct original sub-transaction's binlog images. That is binlog events "belonging" to an original sub-transaction will be cached cooperatively by workers that took share of the original sub-transaction execution.
Optionally at least the caching could be set to allow out of order while the statements are parallelizable, like in the above T1(t1,t2) example. Otherwise the ordering will be preserved through implementing the shared cache as as a queue with an interface to insert an item into the middle.
Isn't it possible to do the binlog caching locally, each worker caching just the binlogging for its own statement? If we make sure they group-commit together, they will be written in sequence into the binlog, so they can just write each their part one after the other?
- exotics like to create the slave side "merge" GTID which would embed the original "commit". (The quotes here to allude to the Git's way).
While it's a serious subject I am leaving out mariabackup for this moment. I believe it will be feasible to adjust its logics to account as prepared only such transactions that that are prepared in all their branches.
More about pipeline style optimization --------------------------------------
As mentioned workers can be made self-serving to assign events for execution at once when become available. That must turn to think of producer-consumer model, potentially lock-free queues, dynamical # of workers to stay in balance with the receiver's pace etc.
So are you planning to implement a completely new parallel replication scheduler, rather than build on the existing one?
I think *eventually* we will need to assign the producer role to the IO (receiver) thread, and convert the current SQL (driver) thread into (relay-) logger (we still may have to have such logger for the semisync replication, the relay logger can be opted out otherwise). Here is a picture
IO_thread.push() | +---- worker.pull() V V e^s_i -> [ e^s_i-1, ... e^s_1 }, T_s-1, ... ] ^ | SQL.pull()
where on the right hand side the two consumers handle a transaction and when both of them have done with it (a worker has read its last event and the logger written it to the relay-log) the query element becomes garbage-collectable.
So overall, this was an interesting idea to see! I never thought before of doing parallelisation beyond the original transaction boundaries from the master. But now that you have described it, it actually seems quite a natural extension of the optimistic parallel replication, and it fits very well with the fundamental principle in MariaDB replication of preserving strict GTID ordering, which might work to handle much of the consistency requirements. It would be quite interesting to see this in practice on real workloads and with benchmarks - I could imagine very large improvements for some workloads. Though I suppose you shouldn't underestimate the work required to complete something like this in production quality (the devil is in the detail, once you get to nasty things like temporary tables and non-transactional engines and other horrors that are part of the MySQL/MariaDB ecosystem). Hope this helps, - Kristian.
Kristian, salute!
Thanks for the interesting idea and detailed description, Andrei! I have written some initial comments inline, below:
It was a great piece of input, few notes are taken. I stumbled at one point though where I struggled to find out what made you apparently misled (see your example of a strayed e^2_2 event). Made my best to amend it, but your turn is needed.
entering the binlog group (ordered) commit module. They can end up into binlog as A,B or B,A, and regardless of that (Gtid actually) order also be acknowledged to the user in either way.
The latter fact infers that slave does not have to commit the group sub-transaction/branches in the GTID order. It could be done in any order including no order at all like to do so as one epoch commit.
However, MariaDB GTID requires that transactions be binlogged in the same order on the slave as on the master. Otherwise it is not possible to move a lower-level GTID slave from one master to the other, as the slave position only includes a single GTID in MariaDB.
And that's one of two technical obstacles that the proposed method needs to address. However we shall face it. Assume out of order commit of the grouped original transaction, GTID gaps would exist some little time but as long as there no crashes slave conservative (the group boundaries respective that is) execution would fill them; and in case of crash recovery Slave could specify the gaps at reconnecting (unless those transactions are the relay-log) for retransmitting. They would be logged out-of-order in the log-bin case, and may appearing as set (mathematically). However it's a range as the union of gtids, and it - the master range - can be preserved as described along replication chain. (I've not called here for doing that. We continue to treat grids as gap-free ranges for now)
conservative scheduler offers a decent parallelization method still suffering from the aforementioned uneven branch's sizes and inter-thread communications caused by ordering. The optimistic scheduler remedies to some level but not fully (an optimistically executed group may be unbalanced too) and even conceding some risk of performance degradation (rollback is costly).
Jean François Gagné's comprehensive benchmarking of parallel replication actually has some very interesting data on this. He ran a real Booking.com workload with literally thousands of worker threads and had a huge rollback ratio (I remember it as up around 40%), and still saw speedup and good scalabitily. Apparently rollback isn't necessarily so costly; maybe because replication servers often have spare cores idling anyway, and because the aborted transaction acts as a pre-fetcher, improving I/O parallelism.
But the fact that thousands of threads still improved speed suggests exactly that there was a problem of uneven transaction size - a few large transactions spread far apart, so lots of transaction lookahead was needed. I do not know if those transactions were single large queries or contained many individual statements, but in the latter case this is exactly what this idea could help with!
Right, those 40% of rollbacks we are going to trade with extra productivity :-).
There is an idea reminiscent to pipelining to handle the epoch (the master binlog group) as a single transaction but in parallel. That is to execute its statements by multiple workers, this time scheduling them fairly, and "atomically" commit their works.
So this is a very interesting idea. Once the transactions are recorded in the binlog, we have a serial stream of statements that will reproduce the original data on the master. And they will do this regardless of if we remove some of the COMMITs/BEGINs - or if we add some. (I wonder if there are any exceptions to this, I can't think of any off the top of my head). So instead of trying to parallelise transactions, just do all the individual statements in parallel, disregarding transaction boundaries (with some restrictions to ensure consistency, as you describe below).
I wonder how much this will benefit real-life workloads. There seems to be a lot of potential. For some workloads it will not help - if most transactions are single-statement, or if the multi-statement transactions have dependencies between the statements. But that doesn't seem likely to be the typical scenario.
My lovely example is a stream of epochs of 1:1000 ratio between the smallest and largest transactions in the group, feasible with 1000 master connections. The Amdahl's law asymptotic estimates 1 / (1 - 0.999) speedup. I wish we'll have it done ... and gained even more exactly in this case.
But why tie this to the master binlog group? It seems to fit perfectly to the optimistic parallel replication mode, where different transactions are run in parallel speculatively, and any conflicts are detected and resolved. In fact, it seems the exact same mechanism will work to detect conflicts between the more fine-grained per-statement parallelisation.
Very true. On one hand we can go blind "speculatively" direct ready to pay rollback price, on the other already Master does detect (some) conflicts, and the actual conflict-free range of transactions is wider than a mere one binlog group (mind a reference to the logical clock that can "measure" dependencies).
The conservative parallle replication I consider obsolete. It causes a lot of hassle for the user to try to get good-sized group commits on the master to ensure parallel replication opportunities on the slave. And it doesn't even ensure consistency; there are corner cases (in InnoDB and elsewhere) that can cause conflicts to appear anyway on the slave, so the rollback+retry mechanism is needed anyway for conservative, just like for optimistic.
Indeed, asymmetry of lock conflicts in Innodb was the last time I saw it in the mysql parallel replication though. Such known threats can be evaded through reordering of vulnerable statements.
To the fairness part, workers get assigned by a statement at a time from the epoch transaction(sub-transaction BEGIN, COMMIT removed). Say 'E' an epoch consists of 's' of sub-transaction
E := { T_1, T_2, ... T_s }
For each m from 1 to s sub-transactions T_m gets scheduled to some worker
W_1 W_2 ... W_k
| | | V V V
T_m := { e_1, e_2, ... e_l }
Hereinafter I use the TeX math notation, '^' - a superscript attached to the event to designate its parent transaction sequence number (Gtid seq_no), '_' - a subscript to enumerate an object within its compound object.
e_i stands for T-ransation's statements, W_t:s are workers. Effectively the epoch breaks into modified branches executed by workers on one to one basic:
E := { T'_1, T'_2, ... T'_k }
here 'k' is the number of workers engaged, T' indicates the modified transaction. The branches are executed until they are ready to prepare which event is triggered by scheduling of the last statement of the last original sub-transaction.
It's clear that sizes of the modified branches are even this time.
Well... each worker has exactly one statement at a time, but each statement can be very different in execution cost. But more even than transaction-based scheduling, certainly.
Let me elaborate, the claim is still not far stretched. The row-based events can be further split apart, so the difference in prepare readiness is at most one ha_{write,update,delete}_row() which still were scheduled ahead and are executed in parallel with the very last statement of updating `mysql.gtid_slave_pos` by the leader. This ultimate row-level granularity chase may be deferred though.
The last statement worker coordinates 2pc.
Thinking more technically workers can consume/pull from the current epoch presented as a queue which is pushed into by statement producer.
Here is a possible state of the queue when m:th statement of T_n is about to be pushed:
e^n_k ->
[ e^n_m-1, ..., e^n_2, e^n_1; ...; e^1_l1, ... e^1_2, e^1_1 ]
...----- T_n ------------| T_n-1,...T_2 |---------- T_1 ---------|
A pull by consumer at this point would return e^1_1.
How do you imagine integrating this in the current parallel replication scheduling algorithm?
In the current algorithm, the producer (the SQL thread) assigns work to worker threads in a round-robin way. There is a lot of care taken to minimise contention on a single global queue-lock or similar, and I think that is an important reason why Jean François' tests were able to scale so well to many thousand worker threads.
I wonder if the existing scheduling could not be used directly for this idea also - but it needs to be extended of course to schedule individual statements rather than whole transactions, with some way to let worker threads coordinate transaction start/end between them.
What are your thoughts on this?
I thought to reuse from the conservative scheduler as much as possible. We could (and the plan is do so for initial comparative benchmarking) leave the driver thread as the assigner. Although the assignment role of the producer is clearly better off be taken by the workers. What I meant in the original mail that the assignment mechanism can be improved to avoid any chance of worker idling but more importantly to facilitate fairness. The workers go to pick pieces of work at once they become available. The optimistic scheduler assignment is close to that but it still assigns only round robin that is speculatively. I've been considering a shared circular buffer as communication channel between a producer (actually could be the IO thread, read below) and workers (consumers). It must feature lock-free potentially or most of the time actually (Why won't it be Boost.Circular Buffer..). The push by producer (the driver as of current, but the IO can be as well, I expand that below) and pull by worker to the buffer should *not* be computationally more costly than the current push/pull to/from the private worker queues. The cost just must be less because of the fairness of model. Conservative, optimistic and "fair" schedulers all can use it.
Very likely that by the epoch prepare event *all* the branches have been already ready for prepare().
Hm, any particular reason you think so? I would imagine the opposite, that individual statements will be scheduled quickly, and it will be quite random in which order they happen to become ready in?
Like I said above, I meant the Row format statement which we technically can refine further to sub-statements each of cost of one handler call.
This pipelining parallelization method can work for the single "large" transaction in the binlog group and also could drag into the parallelizable input transactions from later groups if we additionally create master side dependency tracking (think of mysql logical timestamp method). Also notice that the optimisitic scheduler is orthogonal to this method so the two can be combined.
Consistency concern -------------------
The epoch parallel execution (scheduling) must respect intra-transaction statement dependencies (e.g FIFO execution order of operations over the same object [record, table]). There is no inter-transaction dependencies, at least in theory.
In fact, there do exist such inter-transaction dependencies in a number of corner cases,
True,
at least if we want to enforce the same commit order on slave as on master, which is required for MariaDB GTID. But that is already handled in current code, by detecting the dependency inside the storage engine (the thd_rpl_deadlock_check() mechanism).
so we shall continue with this method, and at the same time to keep a "repository" of corner cases in order to consult it at assignment decision (to schedule suspicious to hidden dependency events to one worker).
I wonder if the exact same thd_rpl_deadlock_check() mechanism would not work as well for this per-statement scheduling idea? It would be interesting to see a proof-of-concept based on this...
So the question is how we would recover from a deadlock now involving modified transactions, correct? I am yet to check closer thd_rpl_deadlock_check(), perhaps that's why I can't see what could require extra to the existing retry mechanisms which is about to retry the whole group. But it certainly simpler (as no binlogging action can take place yet) than crash-recovery which I highlighted to some details.
The notion of a leader, the two-phase-commits leader, remains roughly the same as in the existing conservative scheduler. This time more than initiating commits of the epoch branches it also takes care to advance the slave gtid execution status accordingly. It could be implemented as the very last statement of its own T' branch.
Hm, right, interesting, agree that this seems a good fit.
In fact, if the existing mechanism can be slightly extended, maybe it can already do much of what is needed to ensure consistency. Eg. suppose we have 3 original transactions each with two statements:
e^1_1 e^1_2 e^2_1 e^2_2 e^3_1 e^3_2
Suppose the first 3 of these are ready to commit at some point:
e^1_1 e^1_2 e^2_1
The leader can check that it doesn't try to commit only part of an original transaction.
Probably I will correct your initial perception here (otherwise I am lost in how e^2_1 got separated). In this case you mean three (original) T:s (E) are mapped into a modified T'_1 = {e^1_1 e^1_2 e^2_1} and some more e.g just one T'_2 unions of T'_2 \cup T'_1 = T_1 \cup T_2 \cup T_3 = E As the leader can't initiate 2p-commit without T'_2.
In this case it sees that e^2_1 is missing e_2_2, and it can commit only the first part and leave the partial transaction for a following group commit:
e^2_1
This way, we already ensure that a crash will not leave us with an inconsistent storage engine state on the slave. Because we never commit part of an original master transaction - all parts of it are always group-committed together. Seems promising, though that code is also heavily optimised for scalability, so will require some case.
Or you're trying to extend the method to collapse few master groups.. (But in this case the leader would also initiate commit at the boundary of that "big" epoch.)
Hm, one case needs to be handled though - when there are more statements in one original master transaction, than there are worker threads - otherwise we end up with a deadlock.
Sorry, previously I omitted one important detail. A Worker pulls from the share queue its tail statement not only for itself. When the pull result is a dependent statement whose parent has been or is in processing by another worker, that statement will be redirected to that worker. Does this dismiss your deadlock point?
It's clear that causal dependencies of an user write and a read (to find the effect of that write) can be tracked by WAIT_FOR_GTID() as currently.
A plain two-phase commit does admit possibility to catch on slave combinations of the old (pre-epoch) and new version (post-epoch) or some objects of the same transaction if the user hits with non-locking SELECT in a "tiny" time window between commit():s of two branches like in the following example.
Let t1,t2 some tables and the binlog group consists of just one transaction
T_1 := { update(t1), update(t2) }
Let on slave it's modified into two parallel ones:
T'_1 := { update(t1) } || T'2 := { update(t2) }
After both respond OK to 2pc prepare() and then the leader initiates commit() where could be 4 possible result sets to the contemporary SELECT
SET @@session.tx_isolation='REPEATABLE-READ'; SELECT * from t1 as t_1, t2 as t_2; => ...
Well, due to the enforced commit order, it will not be possible to see the case where update(t2) is committed but update(t1) is not.
In fact, if the group commit is extended as suggested above (to only group-commit the statements of an original transaction together),
(always meant!)
then I think it is possible for the user to avoid this problem entirely using
START TRANSACTION WITH CONSISTENT SNAPSHOT
In MariaDB, this ensures a consistent read view against 2pc transactions (reader will always see all of the 2pc commit or none of it).
I missed to involve this what was recently a novelty (to myself)!
And IIRC, it works by holding the same lock (LOCK_commit_ordered) that is used by the group commit leader. So if the user has some special case where this minor read-inconsistency is sensitive, the START TRANSACTION WITH CONSISTENT SNAPSHOT mechanism can be used to avoid it completely.
Just underscores how good the SNAPSHOT feature is then :-)!
While this is perhaps more than imperfect, I believe it may be relevant only to remote corner cases, but more importantly, we still can refine that doing some *adjustments* in the Engine studied briefly myself of what would be a sub-transaction in Innodb, "ideologically" endorsed and guided by Marko Makela. The idea is to hold the branch's locks even after the branch commits, and delegate their release to the leader.
Recoverability --------------
When slave is configured to log in the binary log we must make sure to rollback any prepared epoch branches at the server recovery unless the last branch (of the leader) is also there, prepared.
When it does not binlog the problem is that the modified branch's binlog images can't be just flushed one after another especially if the user specifies to log on the slave uniformly with the master that is to preserve the original sub-transaction structures and their boundaries.
"if the user specified" - but this is always a requirement in MariaDB, right? Otherwise GTID order is broken?
It meant a possibility of gaps that I advertise above where the gaps are property of execution, but the integrity of master groups is never violated regardless of how long is their replication chain travel.
In such case there are several multiple option, here are some:
- use relay-logged image of original sub-transactions (may be limited to the master and slave version combination though;)
I did not follow what this means - can you elaborate?
With some efforts we can make use of relay-log for both recovery and as binlog for further replication. In above I hinted at the latter. So why won't the slave worker "appropriate" the master binlog images? :-) While the master and slave server version are the same this optimization must be feasible.
- reconstruct original sub-transaction's binlog images. That is binlog events "belonging" to an original sub-transaction will be cached cooperatively by workers that took share of the original sub-transaction execution.
Optionally at least the caching could be set to allow out of order while the statements are parallelizable, like in the above T1(t1,t2) example. Otherwise the ordering will be preserved through implementing the shared cache as as a queue with an interface to insert an item into the middle.
Isn't it possible to do the binlog caching locally, each worker caching just the binlogging for its own statement? If we make sure they group-commit together, they will be written in sequence into the binlog, so they can just write each their part one after the other?
Well, the aimed assignment policy is 'to the first available worker'. Doing that way T':s - the modified branches - are determined dynamically so you can see that logging them as they are would only satisfy the epoch boundaries. Internally the original T:s would be interleaved. However all input events in the epoch are totally enumerated, and each input event maps to an output one that will inherit the enumeration then the leader - that flushes to binlog file - would be able to simulate a compatible slave binlog epoch.
- exotics like to create the slave side "merge" GTID which would embed the original "commit". (The quotes here to allude to the Git's way).
While it's a serious subject I am leaving out mariabackup for this moment. I believe it will be feasible to adjust its logics to account as prepared only such transactions that that are prepared in all their branches.
More about pipeline style optimization --------------------------------------
As mentioned workers can be made self-serving to assign events for execution at once when become available. That must turn to think of producer-consumer model, potentially lock-free queues, dynamical # of workers to stay in balance with the receiver's pace etc.
So are you planning to implement a completely new parallel replication scheduler, rather than build on the existing one?
The actual agenda is of multiple activities that could be prioritized (if at all) for different phases/times. It consists of: - a new type of scheduler that features 2pc rather that the orderly committing. It requires *a* to change granularity of input to workers and *b* handle statement dependencies. The SQL thread may remain as the current scheduler, and a number of optimization potentially useful by the current schedulers too, including - changing the assigning to be self-direct by Workers; effectively that leaves the SQL thread a role of relay-log reader, or it could be turned into relay-logger altogether (to relax the IO thread from that burden, why is below); also the relay-logging could be turned into an user option in which case or regardless of it; - the IO thread to turn from the logger into a mere event producer that queues into a memory buffer from where the consumer such as the relay-logger (SQL thread) and worker threads pull. This apparently streamlines event execution and in combination with the scheduler's fairness should give us I dare to say a maximum that replication is capable of. When always limited buffer size comes to play the IO thread will yield its producer role e.g to the logger thread (the SQL) which takes care to queue into the shared buffer now through reading into it from the relay-log.
I think *eventually* we will need to assign the producer role to the IO (receiver) thread, and convert the current SQL (driver) thread into (relay-) logger (we still may have to have such logger for the semisync replication, the relay logger can be opted out otherwise). Here is a picture
IO_thread.push() | +---- worker.pull() V V e^s_i -> [ e^s_i-1, ... e^s_1 }, T_s-1, ... ] ^ | SQL.pull()
where on the right hand side the two consumers handle a transaction and when both of them have done with it (a worker has read its last event and the logger written it to the relay-log) the query element becomes garbage-collectable.
So overall, this was an interesting idea to see! I never thought before of doing parallelisation beyond the original transaction boundaries from the master. But now that you have described it, it actually seems quite a natural extension of the optimistic parallel replication, and it fits very well with the fundamental principle in MariaDB replication of preserving strict GTID ordering, which might work to handle much of the consistency requirements.
It would be quite interesting to see this in practice on real workloads and with benchmarks - I could imagine very large improvements for some workloads. Though I suppose you shouldn't underestimate the work required to complete something like this in production quality (the devil is in the detail, once you get to nasty things like temporary tables and non-transactional engines and other horrors that are part of the MySQL/MariaDB ecosystem).
You're absolutely right. We should handle that, first of all - together, secondly with proper prioritization and finally there's a modest hope from my side that these ideas represent a natural evolution out of bloody :-) experience with the best of two (actually more !:-) parallel replication realms. Cheers, Andrei
andrei.elkin@pp.inet.fi writes:
I am yet to check closer thd_rpl_deadlock_check(), perhaps that's why
Oh. I definitely think you should get well acquainted with existing parallel replication in MariaDB to undertake a project as ambitious as this one. There are a number of central concepts and algorithms that synergies together, including (off the top of my head): - binlog group commit, which uses the commit_ordered mechanism to efficiently perform commits for multiple threads and coordinate with storage engines. - wait_for_prior_commit, which works with the binlog group commit to scalably handle dependencies between replication workers. - thd_rpl_deadlock_check(), which works with supporting storage engines to detect conflicts between transactions replicated in parallel. - rpl_parallel_entry and group_commit_orderer, which efficiently coordinates scheduling between worker threads (Jean François' benchmarks show this scaling to 5000 threads). - And behind it all the strict ordering of commits, which underlies many assumptions around parallel replication as well as GTID and allows things like optimistic/aggressive parallel replication in a way that is completely transparent to applications. One thing about pipeline-style slave parallel applier is still not clear to me. How will conflicts between individual statements be handled? Eg. suppose we have this group commit (epoch) on the master: BEGIN; /* T1 */ INSERT INTO t1 VALUES (...); /* S1 */ COMMIT; BEGIN; /* T2 */ UPDATE t1 SET b=10 WHERE a=1; /* S2 */ UPDATE t1 SET b=20 WHERE a=1; /* S3 */ COMMIT; So the idea is to schedule these two transactions (T1 T2) to three different worker threads, right? Each thread gets one of (S1 S2 S3). But we must not commit S3 before S2, obviously, or we end up with incorrect result. So how will this dependency be handled? I was assuming that this would be using the same mechanism as optimistic parallel replication - in fact, I thought the point was that this is precisely a clever generalisation of optimistic parallel replication, realising that transaction boundaries need not be respected. And since epoch does not help with the dependency between S2 and S3, I was confused of the point of using it to detect lack of dependency between T1 and T2... But if you say that you had not considered using the thd_rpl_deadlock_check mechanism, then I am confused as to how to detect that S3 needs to wait for S2?
Assume out of order commit of the grouped original transaction, GTID gaps would exist some little time but as long as there no crashes slave conservative (the group boundaries respective that is) execution would fill them; and in case of crash recovery Slave could specify the gaps at reconnecting (unless those transactions are the relay-log) for retransmitting. They would be logged out-of-order in the log-bin case, and may appearing as set (mathematically). However it's a range as the union of gtids, and it - the master range - can be preserved as described along replication chain.
I don't understand how MariaDB GTID can work unless the slave binlogs the GTIDs in the exact same order as the master. At any time, a lower-level slave may connect, and you have to find the right point at which to start sending it events. There is only a single GTID to start from. Remember, it is not guaranteed that the GTIDs are in strict sequence number ordering from the master! Things like master-master ring or transactions executed directly on the slave can break ordering when gtid_strict_mode is not set. Suppose you have GTIDS 0-1-100,0-1-99 in the log and a lower-level slave connects at GTID 0-1-100. How will you know whether you need to send 0-1-99 to the slave (because of out-of-order master) or if you need to skip it (because we did parallel execution out-of-order)? Or if binlogging is changed so that it uses the slave relay logs instead (identical to master binlogs), but replicated transactions are committed to storage engines out of order. Then how do we remember which parts are committed in storage engines, so we can recover from a crash? In current MariaDB parallel replication, the enforced commit ordering ensures that a single GTID position is enough to recover the slave from a crash, as well as to connect a lower-level slave. It also guarantees to users that they can enable parallel replication without introducing any new read-inconsistencies.
Right, those 40% of rollbacks we are going to trade with extra productivity :-).
But the point is - those 40% rollbacks were beneficial. The main cost of the transaction was the the I/O to read affected pages from disk - so running future transactions in parallel is a win even if we know that they need to be rolled back and retried later. I/O latency is perhaps the most important slave bottleneck to address with parallel replication, and speculative apply can work as a pre-fetcher.
Very true. On one hand we can go blind "speculatively" direct ready to pay rollback price, on the other already Master does detect (some) conflicts, and the actual conflict-free range of transactions is wider than a mere one binlog group (mind a reference to the logical clock that can "measure" dependencies).
Can you btw. point me to a detailed description of how logical clock works? My impression is that this has still limited scope for parallellisation if the master has fast commits, but not knowing the details I may be missing something...
Although the assignment role of the producer is clearly better off be taken by the workers. What I meant in the original mail that the assignment mechanism can be improved to avoid any chance of worker idling but more importantly to facilitate fairness. The workers go to pick pieces of work at once they become available. The optimistic scheduler assignment is close to that but it still assigns only round robin that is speculatively.
I don't understand "the assignment role of the producer is clearly better off be taken by the workers". How would this reduce workers idling? I do not think the current parallel replication scheduler leaves workers idle? On the other hand, if workers are to individually pull work from a shared queue, then they will all be contending for access to the queue (imagine 5000 threads doing this at once...). The idea with the existing scheduler is that this does not happen - each worker has its own queue, so contention is reduced. (Just to be clear, I'm trying to understand here, the current MariaDB scheduler is obviously not perfect).
In fact, if the existing mechanism can be slightly extended, maybe it can already do much of what is needed to ensure consistency. Eg. suppose we have 3 original transactions each with two statements:
e^1_1 e^1_2 e^2_1 e^2_2 e^3_1 e^3_2
Suppose the first 3 of these are ready to commit at some point:
e^1_1 e^1_2 e^2_1
The leader can check that it doesn't try to commit only part of an original transaction.
Probably I will correct your initial perception here (otherwise I am lost in how e^2_1 got separated). In this case you mean three (original) T:s (E) are mapped into a modified
T'_1 = {e^1_1 e^1_2 e^2_1}
and some more e.g just one T'_2 unions of
T'_2 \cup T'_1 = T_1 \cup T_2 \cup T_3 = E
As the leader can't initiate 2p-commit without T'_2.
To illustrate what I mean, take my earlier example, with two transactions T1 and T2 containing individual statements S1, S2, and S3. My understanding is that on the slave, we will start 3 transactions for S1, S2, and S3, each running in its own worker threads. Right? So suppose S1 and S2 finish and are ready to complete, but S3 is still running. Now, we cannot just commit S1 and S2, as that would leave us in an inconsistent state (in case of crash for example). So we should either commit just S1 at this point. Or we could wait for S3 and then group commit all of S1, S2, and S3 together. Right? Or is there some mechanism by which two different threads could run S2 and S3 in parallel, but still within a single transactions (inside InnoDB eg?) - I imagine this is not the case.
Sorry, previously I omitted one important detail. A Worker pulls from the share queue its tail statement not only for itself. When the pull result is a dependent statement whose parent has been or is in processing by another worker, that statement will be redirected to that worker.
Right - but how will you determine whether there is such a dependency? It seems very hard to do in the general case at pull time (where we haven't even parsed the statement yet)?
With some efforts we can make use of relay-log for both recovery and as binlog for further replication. In above I hinted at the latter. So why won't the slave worker "appropriate" the master binlog images? :-)
While the master and slave server version are the same this optimization must be feasible.
Aha, I see. This is something like a built-in binlog server. I agree this seems feasible. Surely it is a separate project from this (of course one may benefit from the other). In a sense, this is the "natural" way to do things. Why go to all the trouble of trying to re-produce the original binlog from executing individual statements, when we already have an (almost) copy directly from the master? Still, this may not be as easy as that... for example, how do we interleave the multiple binlogs from multi-source replication in a way that is consistent with the actual execution order? And what about extra transactions executed directly on the slave? Thanks, - Kristian.
Kristian,
andrei.elkin@pp.inet.fi writes:
I am yet to check closer thd_rpl_deadlock_check(), perhaps that's why
Oh.
I definitely think you should get well acquainted with existing parallel replication in MariaDB to undertake a project as ambitious as this one. There are a number of central concepts and algorithms that synergies together, including (off the top of my head):
- binlog group commit, which uses the commit_ordered mechanism to efficiently perform commits for multiple threads and coordinate with storage engines.
- wait_for_prior_commit, which works with the binlog group commit to scalably handle dependencies between replication workers.
- thd_rpl_deadlock_check(), which works with supporting storage engines to detect conflicts between transactions replicated in parallel.
- rpl_parallel_entry and group_commit_orderer, which efficiently coordinates scheduling between worker threads (Jean François' benchmarks show this scaling to 5000 threads).
- And behind it all the strict ordering of commits, which underlies many assumptions around parallel replication as well as GTID and allows things like optimistic/aggressive parallel replication in a way that is completely transparent to applications.
Thanks for summing them all together! Apparently one has to understand these various parts interactions. And when it comes to thd_rpl_deadlock_check() let me learn along the way. While the feature as you put in comments is largely about optimistic schedulers it still does apply to some corner cases in the conservative one. You mention 'different optimizer decision' which hints to SBR specifics. Am I correct? However I am aware of conflicts in Innodb caused by mere different timing of lock acquisition of two parallel branches. So regardless of the format that clearly sets the deadlock/lock-waiting to the current agenda. And yet I am not sure *how* thd_rpl_deadlock_check() could be reused. We can notice that in the pipeline method any attempt to start waiting for a granted lock to another branch needs interception and erroring out. We are bound to this because the grated lock is to be released only after 2pc. Upon the statement errors out it will be reassigned to the lock grantee worker. If along this way the group still could not be finished, we could rollback all branches and turn to conservative scheduler, eventually to the sequential one. I say more relating to thd_rpl_deadlock_check() to follow up your optimistic thinking to schedule conflicting statements that way.
One thing about pipeline-style slave parallel applier is still not clear to me. How will conflicts between individual statements be handled? Eg. suppose we have this group commit (epoch) on the master:
BEGIN; /* T1 */ INSERT INTO t1 VALUES (...); /* S1 */ COMMIT; BEGIN; /* T2 */ UPDATE t1 SET b=10 WHERE a=1; /* S2 */ UPDATE t1 SET b=20 WHERE a=1; /* S3 */ COMMIT;
So the idea is to schedule these two transactions (T1 T2) to three different worker threads, right? Each thread gets one of (S1 S2 S3). But we must not commit S3 before S2, obviously, or we end up with incorrect result. So how will this dependency be handled?
This is dependency tracking specifics. In RBR case (the whole group is row-format) S2 -> S3 can be only pessimistically estimated right now via reading table name from Table_map. In SBR case that you meant Query-log-events would have to carry table names. We could refine the estimate with help of enriched logging on master to collect PK values into the event. So having learned S2->S3 at assigning in the first approximation I suggest we schedule T2 as one transaction.
I was assuming that this would be using the same mechanism as optimistic parallel replication - in fact, I thought the point was that this is precisely a clever generalisation of optimistic parallel replication,
You have a very nice point and the dumb pessimistic method is refined by optimistic execution of *one* statement (whose cost to rollback is insignificant)! [Why not it be the only option...] More to that, somewhat similarly to waiting for prior trx to commit now we would wait for prior statements if they (may) depend. When an earlier ordered statement finds a locked row it should eliminate low-ranked current owner (could take the owner to reach the wait-for-prior-stmt commit). Otherwise it waits.
realising that transaction boundaries need not be respected. And since epoch does not help with the dependency between S2 and S3, I was confused of the point of using it to detect lack of dependency between T1 and T2...
But if you say that you had not considered using the thd_rpl_deadlock_check mechanism, then I am confused as to how to detect that S3 needs to wait for S2?
Here how it would go the simple way which must have sense 'cos just statically S2->S3 can't be of common pattern. To the dependency tracking specifically, each event is associated with a descriptor of data it modifies (e.g db.table pair right now, db.table.record_id later). When e_1 gets assigned it initiates a slave branch. Each branch remembers descriptors of all its events, so e_1 contributes to that. Next e_2 comes in the first action is to check if its data is handled in the e_1's branch. When not a second branch is forked out. The process goes on up to a maximum number of branches. When that is reached the following events get assigned to the branches through first data descriptor checking and next "round robin" if need. Round-robin is just for the concept, the actual plan is to serve it to the first available worker, or almost the first affable in the common case sense (meaning my *conservative* handling of S2->S3, but I'm transiting to start off with your optimization at once).
Assume out of order commit of the grouped original transaction, GTID gaps would exist some little time but as long as there no crashes slave conservative (the group boundaries respective that is) execution would fill them; and in case of crash recovery Slave could specify the gaps at reconnecting (unless those transactions are the relay-log) for retransmitting. They would be logged out-of-order in the log-bin case, and may appearing as set (mathematically). However it's a range as the union of gtids, and it - the master range - can be preserved as described along replication chain.
I don't understand how MariaDB GTID can work unless the slave binlogs the GTIDs in the exact same order as the master. At any time, a lower-level slave may connect, and you have to find the right point at which to start sending it events. There is only a single GTID to start from.
This is an offtopic to the main discussion, but I could not resist to the urge of elaboration (but I indent the reply). Consider the mysql method then. Roughly speaking a slave supplies a sort of low-water mark of gtids that it has without any gaps and a sequence of gtids atop of that which are responsible for the gaps. That's in the general case, normally when slave was stopped gracefully there won't be any gaps. My statement that the master group as a range is an invariant which holds (as long as it does break up) along replication chain. When the master group is of 'k' number of sub-transactions, and assuming no interruption of its replication including the applying, its image on a slave is the same 'k' number of the same sub-transactions: [T_1, T_2, ... T_k] -> ... replicate ... -> [T_l1, T_l2, ..., T_lk] here 1 <= li <= k, and internal indexes li <> ik are unordered. The master group can break, but the fragments inherit the property of sub-transaction commit in any order. So in the out-of-order case a slave that was gracefully stopped (and thus has applied the last master group fully) requests to resume replication from some a state of like [T_l1, T_l2, ..., T_lk] which can be described by a single maximum gtid (max(li)). In the general case of gaps like it's said the slave state description like T_1, T_2, ... T_l^(wm), gap_l+1, T_l+2 ... requires T_l^(wm) - the watermark (desribing the left hand side of the last master group committed sub-transaction) , and a sequence of T_l+2... that are the rhs. When the gaps are delivered and filled by execution the slave gtid state is described by the single max(li), which is the end of the master group commit recovery.
Remember, it is not guaranteed that the GTIDs are in strict sequence number ordering from the master! Things like master-master ring or transactions executed directly on the slave can break ordering when gtid_strict_mode is not set.
As you could see in my description there is some strictness implied. Namely, if we enumerate the master groups, their sequence number grows monotonically (abstracting from the group breakup for simplicity).
Suppose you have GTIDS 0-1-100,0-1-99 in the log and a lower-level slave connects at GTID 0-1-100. How will you know whether you need to send 0-1-99 to the slave (because of out-of-order master) or if you need to skip it (because we did parallel execution out-of-order)?
If I got your point right, in my case the slave would come with its watermark gtid which should be less or equal to that of master in order to master to accept the slave. So the slave requesting to start feading from 0-1-100 (watermark) would mean it has 0-1-99 applied.
Or if binlogging is changed so that it uses the slave relay logs instead (identical to master binlogs), but replicated transactions are committed to storage engines out of order. Then how do we remember which parts are committed in storage engines, so we can recover from a crash?
This is also a practical question to the main agenda! In the out-of-order but preserving original sub-transaction case the group sub-transactions are logged into binlog as a permutable range. There should be a method to locate the first and last sub-transactions in there, regardless of occurred ordering. In mysql such method bases on the low watermark and tracking file:pos of it. In the pipeline it would be natural to maintain the last group end-of file:pos like the watermark is always at the beginning of the current group. I paid some thinking to pipeline recovery to conclude positive about feasibility all necessary steps. I shall describe that in a separate document.
In current MariaDB parallel replication, the enforced commit ordering ensures that a single GTID position is enough to recover the slave from a crash, as well as to connect a lower-level slave. It also guarantees to users that they can enable parallel replication without introducing any new read-inconsistencies.
Right, those 40% of rollbacks we are going to trade with extra productivity :-).
But the point is - those 40% rollbacks were beneficial. The main cost of the transaction was the the I/O to read affected pages from disk - so running future transactions in parallel is a win even if we know that they need to be rolled back and retried later.
I got it now. We still can have this effect when run the dependent statements optimistically.
I/O latency is perhaps the most important slave bottleneck to address with parallel replication, and speculative apply can work as a pre-fetcher.
Very true. On one hand we can go blind "speculatively" direct ready to pay rollback price, on the other already Master does detect (some) conflicts, and the actual conflict-free range of transactions is wider than a mere one binlog group (mind a reference to the logical clock that can "measure" dependencies).
Can you btw. point me to a detailed description of how logical clock works?
No detailed description that I am aware of (partly through my personal guilt: my blog was ready, but somehow it never saw the light of the day). But it is not that difficult to grasp from the following observation. When a transaction is prepared it is assigned with a sequence number as a logical timestamp from a shared logical clock (to step the clock). Also at this point another logical timestamp is acquired with describes the low watermark of the last committed. The latter clock is *updated* rather than steps monotonically by the value of being committed transaction (if they were to commit in order this commit clock would also step). The two timestamp comprise a pluralization range which meaning is that the transaction of the lhs is parallelizable with any in the range up to the rhs; it's something like this in the binlog: #190207 00:08:31 ... last_committed = 1201 sequence_number = 1203
My impression is that this has still limited scope for parallellisation if the master has fast commits, but not knowing the details I may be missing something...
I agree with your intuition. The group size must be proportional to the ratio of cost(prepare+commit) / cost(body).
Although the assignment role of the producer is clearly better off be taken by the workers. What I meant in the original mail that the assignment mechanism can be improved to avoid any chance of worker idling but more importantly to facilitate fairness. The workers go to pick pieces of work at once they become available. The optimistic scheduler assignment is close to that but it still assigns only round robin that is speculatively.
I don't understand "the assignment role of the producer is clearly better off be taken by the workers". How would this reduce workers idling? I do not think the current parallel replication scheduler leaves workers idle?
The main difference of pushing in round-robin vs pulling from a shared queue is that the first activity is speculative (hoping the worker will pick up soon). With unfair load SHOW PROCESSLIST proves that wrong with a common "Waiting for work from SQL thread" On the other hand the pulling is volunteer and assuming that the queue always has something the worker becomes busy non stop until prepare at least (perhaps we could involve it into as well). Optimistic version this method would be to leave the prepared modified branch to the hands of the leader and switch to the next group's events.
On the other hand, if workers are to individually pull work from a shared queue, then they will all be contending for access to the queue (imagine 5000 threads doing this at once...). The idea with the existing scheduler is that this does not happen - each worker has its own queue, so contention is reduced.
Notice that contention time lasts for as "long" as it takes to *mark* a queue item as taken. So it can't be of an issue. There must be a purging mechanism, but it does not overlap with access the hot part of the queue.
(Just to be clear, I'm trying to understand here, the current MariaDB scheduler is obviously not perfect).
I read that too and very distinctively (and I meant the first part mostly :-)).
In fact, if the existing mechanism can be slightly extended, maybe it can already do much of what is needed to ensure consistency. Eg. suppose we have 3 original transactions each with two statements:
e^1_1 e^1_2 e^2_1 e^2_2 e^3_1 e^3_2
Suppose the first 3 of these are ready to commit at some point:
e^1_1 e^1_2 e^2_1
The leader can check that it doesn't try to commit only part of an original transaction.
Probably I will correct your initial perception here (otherwise I am lost in how e^2_1 got separated). In this case you mean three (original) T:s (E) are mapped into a modified
T'_1 = {e^1_1 e^1_2 e^2_1}
and some more e.g just one T'_2 unions of
T'_2 \cup T'_1 = T_1 \cup T_2 \cup T_3 = E
As the leader can't initiate 2p-commit without T'_2.
To illustrate what I mean, take my earlier example, with two transactions T1 and T2 containing individual statements S1, S2, and S3.
My understanding is that on the slave, we will start 3 transactions for S1, S2, and S3, each running in its own worker threads. Right?
So suppose S1 and S2 finish and are ready to complete, but S3 is still running. Now, we cannot just commit S1 and S2, as that would leave us in an inconsistent state (in case of crash for example). So we should either commit just S1 at this point. Or we could wait for S3 and then group commit all of S1, S2, and S3 together. Right?
True. My preference is to wait for S3 which preserves the master group with all bonuses of doing that.
Or is there some mechanism by which two different threads could run S2 and S3 in parallel, but still within a single transactions (inside InnoDB eg?) - I imagine this is not the case.
We are certainly going to that direction. Let me shy off detailing in already a day long mail :-), anyway it's better off to show a patch that templates an idea.
Sorry, previously I omitted one important detail. A Worker pulls from the share queue its tail statement not only for itself. When the pull result is a dependent statement whose parent has been or is in processing by another worker, that statement will be redirected to that worker.
Right - but how will you determine whether there is such a dependency? It seems very hard to do in the general case at pull time (where we haven't even parsed the statement yet)?
Not really if we will be content with pessimistic estimates. But I already admitted your optimistic "encouragement"!
With some efforts we can make use of relay-log for both recovery and as binlog for further replication. In above I hinted at the latter. So why won't the slave worker "appropriate" the master binlog images? :-)
While the master and slave server version are the same this optimization must be feasible.
Aha, I see. This is something like a built-in binlog server.
I agree this seems feasible. Surely it is a separate project from this (of course one may benefit from the other).
In a sense, this is the "natural" way to do things. Why go to all the trouble of trying to re-produce the original binlog from executing individual statements, when we already have an (almost) copy directly from the master?
It's a shame it's not there already long time ago :-)
Still, this may not be as easy as that... for example, how do we interleave the multiple binlogs from multi-source replication in a way that is consistent with the actual execution order? And what about extra transactions executed directly on the slave?
Binlog groups from multiple sources, including the slave local ones, must remain as the original groups at binary logging. This can fail only through a crash. Recovery has not been scrutinized here much, it needs a description that I owe. "Lastly" I need to detail any possible group split and its consequence. Many thanks for this round of discussion and I am all ears to hear more! Andrei
Thanks,
- Kristian.
Let me expand on a couple of points in the last reply.
Kristian,
andrei.elkin@pp.inet.fi writes:
I am yet to check closer thd_rpl_deadlock_check(), perhaps that's why
Oh.
I definitely think you should get well acquainted with existing parallel replication in MariaDB to undertake a project as ambitious as this one. There are a number of central concepts and algorithms that synergies together, including (off the top of my head):
- binlog group commit, which uses the commit_ordered mechanism to efficiently perform commits for multiple threads and coordinate with storage engines.
- wait_for_prior_commit, which works with the binlog group commit to scalably handle dependencies between replication workers.
- thd_rpl_deadlock_check(), which works with supporting storage engines to detect conflicts between transactions replicated in parallel.
- rpl_parallel_entry and group_commit_orderer, which efficiently coordinates scheduling between worker threads (Jean François' benchmarks show this scaling to 5000 threads).
- And behind it all the strict ordering of commits, which underlies many assumptions around parallel replication as well as GTID and allows things like optimistic/aggressive parallel replication in a way that is completely transparent to applications.
Thanks for summing them all together! Apparently one has to understand these various parts interactions.
And when it comes to thd_rpl_deadlock_check() let me learn along the way. While the feature as you put in comments is largely about optimistic schedulers it still does apply to some corner cases in the conservative one. You mention 'different optimizer decision' which hints to SBR specifics. Am I correct?
However I am aware of conflicts in Innodb caused by mere different timing of lock acquisition of two parallel branches. So regardless of the format that clearly sets the deadlock/lock-waiting to the current agenda. And yet I am not sure *how* thd_rpl_deadlock_check() could be reused.
We can notice that in the pipeline method any attempt to start waiting for a granted lock to another branch needs interception and erroring out. We are bound to this because the grated lock is to be released only after 2pc.
Upon the statement errors out it will be reassigned to the lock grantee worker. If along this way the group still could not be finished, we could rollback all branches and turn to conservative scheduler, eventually to the sequential one.
I say more relating to thd_rpl_deadlock_check() to follow up your optimistic thinking to schedule conflicting statements that way.
One thing about pipeline-style slave parallel applier is still not clear to me. How will conflicts between individual statements be handled? Eg. suppose we have this group commit (epoch) on the master:
BEGIN; /* T1 */ INSERT INTO t1 VALUES (...); /* S1 */ COMMIT; BEGIN; /* T2 */ UPDATE t1 SET b=10 WHERE a=1; /* S2 */ UPDATE t1 SET b=20 WHERE a=1; /* S3 */ COMMIT;
So the idea is to schedule these two transactions (T1 T2) to three different worker threads, right? Each thread gets one of (S1 S2 S3). But we must not commit S3 before S2, obviously, or we end up with incorrect result. So how will this dependency be handled?
This is dependency tracking specifics. In RBR case (the whole group is row-format) S2 -> S3 can be only pessimistically estimated right now via reading table name from Table_map. In SBR case that you meant Query-log-events would have to carry table names. We could refine the estimate with help of enriched logging on master to collect PK values into the event.
So having learned S2->S3 at assigning in the first approximation I suggest we schedule T2 as one transaction.
Naturally the first approximation admits unfairness when the input is dependent. Yet we can safe that what is statistically corner cases. Either (though it's not binary after all) with running optimistically as you've suggested, or with reinforcing master logging to track *intra*-transaction dependecies. Practically it's to record a set of 'db.table.record_id' arrays, an array per 'db.table'. Each array will be sorted per 'record_id' as part of a replication optimizer to build up non-dependant branches as equal as possible sizes. Also just a number of noticed dependecies could be checked by slave to its execution to conservative mode.
I was assuming that this would be using the same mechanism as optimistic parallel replication - in fact, I thought the point was that this is precisely a clever generalisation of optimistic parallel replication,
You have a very nice point and the dumb pessimistic method is refined by optimistic execution of *one* statement (whose cost to rollback is insignificant)!
[Why not it be the only option...]
More to that, somewhat similarly to waiting for prior trx to commit now we would wait for prior statements if they (may) depend. When an earlier ordered statement finds a locked row it should eliminate low-ranked current owner (could take the owner to reach the wait-for-prior-stmt commit). Otherwise it waits.
This obviously requires cross-group event enumeration and awareness of potential dependency on an earlier scheduled event (of a smaller sequence number).
realising that transaction boundaries need not be respected. And since epoch does not help with the dependency between S2 and S3, I was confused of the point of using it to detect lack of dependency between T1 and T2...
But if you say that you had not considered using the thd_rpl_deadlock_check mechanism, then I am confused as to how to detect that S3 needs to wait for S2?
Here how it would go the simple way which must have sense 'cos just statically S2->S3 can't be of common pattern.
To the dependency tracking specifically, each event is associated with a descriptor of data it modifies (e.g db.table pair right now, db.table.record_id later).
When e_1 gets assigned it initiates a slave branch. Each branch remembers descriptors of all its events, so e_1 contributes to that. Next e_2 comes in the first action is to check if its data is handled in the e_1's branch. When not a second branch is forked out. The process goes on up to a maximum number of branches. When that is reached the following events get assigned to the branches through first data descriptor checking and next "round robin" if need. Round-robin is just for the concept, the actual plan is to serve it to the first available worker, or almost the first affable in the common case sense (meaning my *conservative* handling of S2->S3, but I'm transiting to start off with your optimization at once).
Assume out of order commit of the grouped original transaction, GTID gaps would exist some little time but as long as there no crashes slave conservative (the group boundaries respective that is) execution would fill them; and in case of crash recovery Slave could specify the gaps at reconnecting (unless those transactions are the relay-log) for retransmitting. They would be logged out-of-order in the log-bin case, and may appearing as set (mathematically). However it's a range as the union of gtids, and it - the master range - can be preserved as described along replication chain.
I don't understand how MariaDB GTID can work unless the slave binlogs the GTIDs in the exact same order as the master. At any time, a lower-level slave may connect, and you have to find the right point at which to start sending it events. There is only a single GTID to start from.
This is an offtopic to the main discussion, but I could not resist to the urge of elaboration (but I indent the reply).
Consider the mysql method then. Roughly speaking a slave supplies a sort of low-water mark of gtids that it has without any gaps and a sequence of gtids atop of that which are responsible for the gaps. That's in the general case, normally when slave was stopped gracefully there won't be any gaps.
My statement that the master group as a range is an invariant which holds (as long as it does break up) along replication chain. When the master group is of 'k' number of sub-transactions, and assuming no interruption of its replication including the applying, its image on a slave is the same 'k' number of the same sub-transactions:
[T_1, T_2, ... T_k] -> ... replicate ... -> [T_l1, T_l2, ..., T_lk]
here 1 <= li <= k, and internal indexes li <> ik are unordered.
The master group can break, but the fragments inherit the property of sub-transaction commit in any order.
So in the out-of-order case a slave that was gracefully stopped (and thus has applied the last master group fully) requests to resume replication from some a state of like
[T_l1, T_l2, ..., T_lk]
which can be described by a single maximum gtid (max(li)). In the general case of gaps like it's said the slave state description like
T_1, T_2, ... T_l^(wm), gap_l+1, T_l+2 ...
requires T_l^(wm) - the watermark (desribing the left hand side of the last master group committed sub-transaction) , and a sequence of T_l+2... that are the rhs. When the gaps are delivered and filled by execution the slave gtid state is described by the single max(li), which is the end of the master group commit recovery.
Remember, it is not guaranteed that the GTIDs are in strict sequence number ordering from the master! Things like master-master ring or transactions executed directly on the slave can break ordering when gtid_strict_mode is not set.
As you could see in my description there is some strictness implied. Namely, if we enumerate the master groups, their sequence number grows monotonically (abstracting from the group breakup for simplicity).
Suppose you have GTIDS 0-1-100,0-1-99 in the log and a lower-level slave connects at GTID 0-1-100. How will you know whether you need to send 0-1-99 to the slave (because of out-of-order master) or if you need to skip it (because we did parallel execution out-of-order)?
If I got your point right, in my case the slave would come with its watermark gtid which should be less or equal to that of master in order to master to accept the slave. So the slave requesting to start feading from 0-1-100 (watermark) would mean it has 0-1-99 applied.
Or if binlogging is changed so that it uses the slave relay logs instead (identical to master binlogs), but replicated transactions are committed to storage engines out of order. Then how do we remember which parts are committed in storage engines, so we can recover from a crash?
This is also a practical question to the main agenda! In the out-of-order but preserving original sub-transaction case the group sub-transactions are logged into binlog as a permutable range.
There should be a method to locate the first and last sub-transactions in there, regardless of occurred ordering. In mysql such method bases on the low watermark and tracking file:pos of it. In the pipeline it would be natural to maintain the last group end-of file:pos like the watermark is always at the beginning of the current group.
I paid some thinking to pipeline recovery to conclude positive about feasibility all necessary steps. I shall describe that in a separate document.
In current MariaDB parallel replication, the enforced commit ordering ensures that a single GTID position is enough to recover the slave from a crash, as well as to connect a lower-level slave. It also guarantees to users that they can enable parallel replication without introducing any new read-inconsistencies.
Right, those 40% of rollbacks we are going to trade with extra productivity :-).
But the point is - those 40% rollbacks were beneficial. The main cost of the transaction was the the I/O to read affected pages from disk - so running future transactions in parallel is a win even if we know that they need to be rolled back and retried later.
I got it now. We still can have this effect when run the dependent statements optimistically.
I/O latency is perhaps the most important slave bottleneck to address with parallel replication, and speculative apply can work as a pre-fetcher.
Very true. On one hand we can go blind "speculatively" direct ready to pay rollback price, on the other already Master does detect (some) conflicts, and the actual conflict-free range of transactions is wider than a mere one binlog group (mind a reference to the logical clock that can "measure" dependencies).
Can you btw. point me to a detailed description of how logical clock works?
No detailed description that I am aware of (partly through my personal guilt: my blog was ready, but somehow it never saw the light of the day). But it is not that difficult to grasp from the following observation.
When a transaction is prepared it is assigned with a sequence number as a logical timestamp from a shared logical clock (to step the clock). Also at this point another logical timestamp is acquired with describes the low watermark of the last committed. The latter clock is *updated* rather than steps monotonically by the value of being committed transaction (if they were to commit in order this commit clock would also step). The two timestamp comprise a pluralization range which meaning is that the transaction of the lhs is parallelizable with any in the range up to the rhs; it's something like this in the binlog:
#190207 00:08:31 ... last_committed = 1201 sequence_number = 1203
My impression is that this has still limited scope for parallellisation if the master has fast commits, but not knowing the details I may be missing something...
I agree with your intuition. The group size must be proportional to the ratio of cost(prepare+commit) / cost(body).
Although the assignment role of the producer is clearly better off be taken by the workers. What I meant in the original mail that the assignment mechanism can be improved to avoid any chance of worker idling but more importantly to facilitate fairness. The workers go to pick pieces of work at once they become available. The optimistic scheduler assignment is close to that but it still assigns only round robin that is speculatively.
I don't understand "the assignment role of the producer is clearly better off be taken by the workers". How would this reduce workers idling? I do not think the current parallel replication scheduler leaves workers idle?
The main difference of pushing in round-robin vs pulling from a shared queue is that the first activity is speculative (hoping the worker will pick up soon). With unfair load SHOW PROCESSLIST proves that wrong with a common
"Waiting for work from SQL thread"
which currently happens to the conservative scheduler (for the reason of unequal group (sub-)transaction sizes). In the pipeline case the driver thread would remain with the speculative mood, which the worker's self-assigning is alway precise. Cheers, Andrei
On the other hand the pulling is volunteer and assuming that the queue always has something the worker becomes busy non stop until prepare at least (perhaps we could involve it into as well). Optimistic version this method would be to leave the prepared modified branch to the hands of the leader and switch to the next group's events.
On the other hand, if workers are to individually pull work from a shared queue, then they will all be contending for access to the queue (imagine 5000 threads doing this at once...). The idea with the existing scheduler is that this does not happen - each worker has its own queue, so contention is reduced.
Notice that contention time lasts for as "long" as it takes to *mark* a queue item as taken. So it can't be of an issue. There must be a purging mechanism, but it does not overlap with access the hot part of the queue.
(Just to be clear, I'm trying to understand here, the current MariaDB scheduler is obviously not perfect).
I read that too and very distinctively (and I meant the first part mostly :-)).
In fact, if the existing mechanism can be slightly extended, maybe it can already do much of what is needed to ensure consistency. Eg. suppose we have 3 original transactions each with two statements:
e^1_1 e^1_2 e^2_1 e^2_2 e^3_1 e^3_2
Suppose the first 3 of these are ready to commit at some point:
e^1_1 e^1_2 e^2_1
The leader can check that it doesn't try to commit only part of an original transaction.
Probably I will correct your initial perception here (otherwise I am lost in how e^2_1 got separated). In this case you mean three (original) T:s (E) are mapped into a modified
T'_1 = {e^1_1 e^1_2 e^2_1}
and some more e.g just one T'_2 unions of
T'_2 \cup T'_1 = T_1 \cup T_2 \cup T_3 = E
As the leader can't initiate 2p-commit without T'_2.
To illustrate what I mean, take my earlier example, with two transactions T1 and T2 containing individual statements S1, S2, and S3.
My understanding is that on the slave, we will start 3 transactions for S1, S2, and S3, each running in its own worker threads. Right?
So suppose S1 and S2 finish and are ready to complete, but S3 is still running. Now, we cannot just commit S1 and S2, as that would leave us in an inconsistent state (in case of crash for example). So we should either commit just S1 at this point. Or we could wait for S3 and then group commit all of S1, S2, and S3 together. Right?
True. My preference is to wait for S3 which preserves the master group with all bonuses of doing that.
Or is there some mechanism by which two different threads could run S2 and S3 in parallel, but still within a single transactions (inside InnoDB eg?) - I imagine this is not the case.
We are certainly going to that direction. Let me shy off detailing in already a day long mail :-), anyway it's better off to show a patch that templates an idea.
Sorry, previously I omitted one important detail. A Worker pulls from the share queue its tail statement not only for itself. When the pull result is a dependent statement whose parent has been or is in processing by another worker, that statement will be redirected to that worker.
Right - but how will you determine whether there is such a dependency? It seems very hard to do in the general case at pull time (where we haven't even parsed the statement yet)?
Not really if we will be content with pessimistic estimates. But I already admitted your optimistic "encouragement"!
With some efforts we can make use of relay-log for both recovery and as binlog for further replication. In above I hinted at the latter. So why won't the slave worker "appropriate" the master binlog images? :-)
While the master and slave server version are the same this optimization must be feasible.
Aha, I see. This is something like a built-in binlog server.
I agree this seems feasible. Surely it is a separate project from this (of course one may benefit from the other).
In a sense, this is the "natural" way to do things. Why go to all the trouble of trying to re-produce the original binlog from executing individual statements, when we already have an (almost) copy directly from the master?
It's a shame it's not there already long time ago :-)
Still, this may not be as easy as that... for example, how do we interleave the multiple binlogs from multi-source replication in a way that is consistent with the actual execution order? And what about extra transactions executed directly on the slave?
Binlog groups from multiple sources, including the slave local ones, must remain as the original groups at binary logging. This can fail only through a crash. Recovery has not been scrutinized here much, it needs a description that I owe.
"Lastly" I need to detail any possible group split and its consequence.
Many thanks for this round of discussion and I am all ears to hear more!
Andrei
Thanks,
- Kristian.
_______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : maria-developers@lists.launchpad.net Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp
Actually the optimistic statement scheduling is restricted to the Write_log_events/INSERT-Query_log_event types. Unlike inter-transaction conflicts which can be cured through proper victimization and retry this time it's the intra- case where retrying won't help when the 2nd event is separated from the parent. And that means the parent and the child must be in the same modified branch. Only Delete vs Delete and Write vs Write are pairwise non-conflicting in RBR. I've briefly explained the idea of the table-level pessimistic dependency approximation solely by slave in the Row format case. Similarly to cover the Statement format we would have to make master to collecting involved tables, including triggered ones (mysql does so for databases names). Another approach is to switch to the conservative mode when the master group contains at least one Query-log-event. This fact can be passed with the first Gtid-log-event of the group (master binlogging change required). So in your example
BEGIN; /* T1 */ INSERT INTO t1 VALUES (...); /* S1 */ COMMIT; BEGIN; /* T2 */ UPDATE t1 SET b=10 WHERE a=1; /* S2 */ UPDATE t1 SET b=20 WHERE a=1; /* S3 */ COMMIT;
the updates are doomed to schedule in the same branch. Cheers, Andrei
So the idea is to schedule these two transactions (T1 T2) to three different worker threads, right? Each thread gets one of (S1 S2 S3). But we must not commit S3 before S2, obviously, or we end up with incorrect result. So how will this dependency be handled?
This is dependency tracking specifics. In RBR case (the whole group is row-format) S2 -> S3 can be only pessimistically estimated right now via reading table name from Table_map. In SBR case that you meant Query-log-events would have to carry table names. We could refine the estimate with help of enriched logging on master to collect PK values into the event.
So having learned S2->S3 at assigning in the first approximation I suggest we schedule T2 as one transaction.
Naturally the first approximation admits unfairness when the input is dependent. Yet we can safe that what is statistically corner cases. Either (though it's not binary after all)
with running optimistically as you've suggested, or
with reinforcing master logging to track *intra*-transaction dependecies. Practically it's to record a set of 'db.table.record_id' arrays, an array per 'db.table'. Each array will be sorted per 'record_id' as part of a replication optimizer to build up non-dependant branches as equal as possible sizes. Also just a number of noticed dependecies could be checked by slave to its execution to conservative mode.
I was assuming that this would be using the same mechanism as optimistic parallel replication - in fact, I thought the point was that this is precisely a clever generalisation of optimistic parallel replication,
You have a very nice point and the dumb pessimistic method is refined by optimistic execution of *one* statement (whose cost to rollback is insignificant)!
[Why not it be the only option...]
More to that, somewhat similarly to waiting for prior trx to commit now we would wait for prior statements if they (may) depend. When an earlier ordered statement finds a locked row it should eliminate low-ranked current owner (could take the owner to reach the wait-for-prior-stmt commit). Otherwise it waits.
This obviously requires cross-group event enumeration and awareness of potential dependency on an earlier scheduled event (of a smaller sequence number).
andrei.elkin@pp.inet.fi writes:
We can notice that in the pipeline method any attempt to start waiting for a granted lock to another branch needs interception and erroring out. We are bound to this because the grated lock is to be released only after 2pc.
So this is essentially what thd_rpl_deadlock_check() is used for. It allows the parallel replication code to see all lock waits in storage engines. Suppose T2 occurs after T1 in the binlog. If T2 waits for T1 that is fine, T2 would in any case need to wait for T1 to commit first. But if T1 waits for T2, that is not allowed, T1 must commit first. So we kill T2, allowing T1 to continue. T2 is then re-tried once T1 committed. So I think there might be a quick'n'dirty way to get a proof-of-concept of the idea of replicating in parallel different statements in a single transaction. In rpl_parallel::do_event(), just duplicate all (transactional) GTID and commit/XID events. Then optimistic parallel replication will just run all the statements individually in parallel, and in case of any dependencies roll back and retry. Though retry_event_group will need to be hacked as well so it knows to re-try only its own single statement in an event group. And maybe something else will break... but if it works, it could be a very quick way to do some benchmarking on various workloads to get an idea of the potential benefit, before starting to tackle all the difficult consistency requirements etc.
Upon the statement errors out it will be reassigned to the lock grantee worker. If along this way the group still could not be finished, we could rollback all branches and turn to conservative scheduler, eventually to the sequential one.
So this is the idea that the scheduler will first analyse event groups for dependencies, and then schedule them in a conflict-free way to worker threads. The main problem to attack then becomes one of discovering dependencies in a scalable way. For a long time, this is also how I thought of the problem. It's not just conservative mode, there were many ideas for doing this, some going back more than 10 years. There were also patches for this, for example a very interesting patch (not by me, should be in mailing list archive) that analyzed row-events and scheduled conflicting transactions to the same branch. But eventually I've reached the conclusion that optimistic mode is just the better approach. The main reason is that it provides very good opportunities for parallel apply (and hence speedup), while _also_ providing very strong consistency guarantees for existing (and new) applications. In more detail: 1. Optimistic replication has the best potential to discover opportunities for parallel execution - it can run _anything_ in parallel that does not conflict. In can even run many transactions in parallel that _do_ conflict! Even if transactions T1 and T2 have a conflict, there is still a chance that T1 will grab the conflicting lock first, and the first part of T2 can successfully replicate in parallel with T1. 2. The in-order commit provides very strong and robust consistency guarantees, both on data integrity (eg. in case of crash), and on read-consistency (a query on slave will never see a state of data that did not also exist on master). It is important not to under-estimate how important this point can be for users to have confidence in using the parallel replication. 3. When dependencies do prevent parallel execution of T1 and T2 (and we have to kill and re-try T2), optimistic parallel replication still acts as a pre-fetcher to speed up I/O-bound workloads. I/O latencies are so large as to often dwarf any CPU bottleneck, and disk systems often require multiple outstanding I/O requests to scale well. 4. Optimistic parallel replication re-uses existing mechanisms - it uses the normal query execution path in storage engines to discover conflicts, and normal thread kill/rollback is used to handle conflicts. It avoids all the complexity of trying to duplicate the exact conflict semantics in the dependency-checking algorithms. It avoids the need to be overly conservative in the dependency-checking to ensure safety (eg. it trivially is able to parallelise non-conflicting statement-based transactions on the same single table). And in the dependency-checking approach, if you let just one corner case through that you need to handle during execution - then you end up implementing the full optimistic conflict handling _anyway_ (as I learned the hard way :-). There are a few limitations to optimistic parallel replication that I first thought would be quite severe. But experience seems to show they are not; here are my thoughts on what makes this so: 1. Optimistic parallel replication increases the rollback rate, sometimes dramatically so. I think the point is that this rollback also happens in parallel. Even a small increase (eg. 10%) in replication throughput can make the difference between a slave that keeps up and one that lags behind, and this is well worth the cost of increasing CPU usage with a core or two, modern servers usually have plenty cores to spare. And rollback+retry typically does not incur much extra I/O overhead (eg. pre-fetching). 2. In-order commits means that look-ahead past a long-running transaction is limited by how many pending commits we can have outstanding. But as Jean-François demonstrated in his benchmarks, optimistic parallel replication can have thousands of outstanding commits and still scale. If the slave is keeping up (and hopefully it is!), there probably are not thousands of pending transactions to work in on parallel anyway. And even approaches that do allow out-of-order commit probably need some limit on look-ahead anyway (eg. I think MySQL uses a "checkpoint" mechanism.) 3. In-order parallel replication is limited to inter-transaction parallelism, it cannot do intra-transaction parallel apply. Thus if the workload mainly consists of large transactions with conflicts between them, speedup is limited to prefetching. Well, this is exactly the limitation that your idea is addressing! Which is one reason I found it so interesting to learn about. I really think parallel replication should *just*work* out-of-the-box for the "normal" user. This is 2018, AMD server CPUs come with 32 cores, the other day I saw a raspberry-pi-lookalike board with 6 cores. A decent, robust implementation of parallel replication needs to be enabled by default. The user shouldn't need to review all of their applications to check if they can tolerate different read consistencies from on the master, let alone re-write them to spread queries over multiple tables or databases. The optimistic parallel replication is the first method I have seen that can properly satisfy this - give decent speedup on most real-life workloads, in a way that is transparent to applications. Users can just enable optimistic parallel replication, and if it fails in some way, it is a bug that should be fixed. (How about enabling it by default in 10.4, BTW?) Of course, 10.3 MariaDB parallel replication does not solve all problems, far from it! I agree that better dependency analysis in the scheduler can be usefully combined with optimistic parallel replication, not just the conservative mode. For example, DDL currently stalls optimistic parallel replication completely, and the user must manually set GTID domain id and control dependencies to replicate DDL in parallel with other transactions. Optimistic also does not do much for non-transactional DML. Improved dependency tracking and scheduling could improve this. Analysis could also determine events that are very likely to conflict, and this way reduce unnecessary rollback. There is the "optimistic" vs. "aggressive" --slave-parallel-mode distinction to enable/disable such heuristics, but few are currently implemented.
While the feature as you put in comments is largely about optimistic schedulers it still does apply to some corner cases in the conservative one. You mention 'different optimizer decision' which hints to SBR specifics.
Yes, the thd_rpl_deadlock_check() feature was originally implemented to fix various corner cases in conservative parallel replication. I think this mostly/only occurs when commit order on the slave is forced to be the same as on master. The "different optimiser decision" is the theoretical case where the optimiser chooses to use an index on the master but a table scan on the slave. Or chooses different indexes on master and slave (could also apply to RBR perhaps). In this case different locks can be taken on master and slave and it seems likely that order-conflicts could result.
Here how it would go the simple way which must have sense 'cos just statically S2->S3 can't be of common pattern.
The concrete example was silly, of course, updating the same row twice in a single transaction. But for example foreign key conflicts will be common. Eg.: BEGIN; INSERT INTO parent_table(key,val) VALUES (10, "hello"); INSERT INTO child_table(key, parent_key, val) VALUES (1, 10, "xizzy"); INSERT INTO child_table(key, parent_key, val) VALUES (2, 10, "red"); INSERT INTO child_table(key, parent_key, val) VALUES (3, 10, "cube"); COMMIT; where parent_key is a foreign key referencing "key" in parent_table.
On the other hand the pulling is volunteer and assuming that the queue always has something the worker becomes busy non stop until prepare at least (perhaps we could involve it into as well). Optimistic version this method would be to leave the prepared modified branch to the hands of the leader and switch to the next group's events.
Right. In fact, this is a limitation of the existing scheduling algorithm that already has an effect which is seen for multi-source replication and for replication with multiple GTID domain ids. In this case it is possible for the SQL driver thread to completely fill up the worker queues with events from one source / one domain. And the other sources/domains will stall until the first one catches up. Which is not great. And thus we have the work-around with slave_domain_parallel_threads which is rather crude and inflexible. A more flexible scheduler would also be needed to optimise the thread usage related to wait_for_prior_commit(). Currently this method blocks the worker thread, and thus for long lookahead in optimistic parallel replication, large number of worker threads are needed, most of them idle as you also pointed out. It might be possible to leave the THD in the group commit queue and let the thread switch to work on another transaction meanwhile. This is something I never really looked into, but I think there are similar mechanisms already existing for thread pool operation. Not sure how much can be gained nowadays from avoiding large number of idle threads, but this again would require a better scheduling algorithm.
Or is there some mechanism by which two different threads could run S2 and S3 in parallel, but still within a single transactions (inside InnoDB eg?) - I imagine this is not the case.
We are certainly going to that direction. Let me shy off detailing in already a day long mail :-), anyway it's better off to show a patch that templates an idea.
Right, running a single transaction (or even single statement) on multiple threads is also something interesting to look at - also outside of replication of course, but yes, probably best left for another discussion ;-) - Kristian.
Salute, Kristian.
andrei.elkin@pp.inet.fi writes:
We can notice that in the pipeline method any attempt to start waiting for a granted lock to another branch needs interception and erroring out. We are bound to this because the grated lock is to be released only after 2pc.
So this is essentially what thd_rpl_deadlock_check() is used for. It allows the parallel replication code to see all lock waits in storage engines. Suppose T2 occurs after T1 in the binlog. If T2 waits for T1 that is fine, T2 would in any case need to wait for T1 to commit first. But if T1 waits for T2, that is not allowed, T1 must commit first. So we kill T2, allowing T1 to continue. T2 is then re-tried once T1 committed.
So I think there might be a quick'n'dirty way to get a proof-of-concept of the idea of replicating in parallel different statements in a single transaction. In rpl_parallel::do_event(), just duplicate all (transactional) GTID and commit/XID events. Then optimistic parallel replication will just run all the statements individually in parallel, and in case of any dependencies roll back and retry.
Though retry_event_group will need to be hacked as well so it knows to re-try only its own single statement in an event group. And maybe something else will break... but if it works, it could be a very quick way to do some benchmarking on various workloads to get an idea of the potential benefit, before starting to tackle all the difficult consistency requirements etc.
Just to add up this type of deadlock does not necessarily involve ordered committing as a resource. It could be also possible due to various engine "features" (e.g observable in Innodb of "asymmetric" lock conflict). Notice, this sort of conflict is *never* about modifying the same data, mere to access a its location. When it comes to the pipeline method let me show that is actually deadlock-free when is elaborated the following way. So we respect intra-sub-transaction dependencies: all of a sub-transaction's statements on the same table are scheduled to the same branch. That is if the sub-transaction T(t1,t2) dealt with two tables there could be at most 2 slave branches B1(t1), B2(t2). And if there are two sub-transactions T1(t1,t2), T2(t1,t2) there could be up to four B1(t1),B2(t2),B3(t1),B4(t2). In theory B1,B3 and B2,B4 are pair-wise non-conflicting but we must suspect in practice they do. We rate as conflict any lock request (e.g by B3) that would not be granted as it's alreay granted to another branch (that would be B1). On the *concept* level, when that happens indeed B3's statement returns with an error, its work - the statement's (!) is rolled back, and the statement is passed to B1. Let's *refine* it right away as the whole statement replying by B1 is not proven to succeed when it consists of multiple record operations. Instead of erroring by B3 its problematic sub-statement is split out and is passed. B3 goes on to the following sub-statement or statements. B1 will find the sub-statement queued to it for execution and it *must*^{#f1} succeed. It's implied (of not being so obvious though) that it's safe to extract a sub-statement from the Rows-log-event statement. The sub-statement is passed as a dynamically created single-record Rows-log-event. To sum up there hardly would exist any deadlock and retry. And despite inability of "scattering" of the original transaction's statements, the fair load distribution remains promising, even in apparent unfavorable cases. The described lock wait conflict resolution hints at viable sub-statement level parallelization. That is when a Rows-log-event contains a big number of costly sub-statements, noticing that the sub-statements are independent, the event could be executed in parallel with spliting its statement into sub-statement ranges and delegating them to different branches (workers). Example: LOAD-DATA's Write_log_events.
Upon the statement errors out it will be reassigned to the lock grantee worker. If along this way the group still could not be finished, we could rollback all branches and turn to conservative scheduler, eventually to the sequential one.
Footnote #f1: [this remains as last resort]
So this is the idea that the scheduler will first analyse event groups for dependencies, and then schedule them in a conflict-free way to worker threads. The main problem to attack then becomes one of discovering dependencies in a scalable way.
For a long time, this is also how I thought of the problem. It's not just conservative mode, there were many ideas for doing this, some going back more than 10 years. There were also patches for this, for example a very interesting patch (not by me, should be in mailing list archive) that analyzed row-events and scheduled conflicting transactions to the same branch.
I vaguely remember this type of projects for mysqlbinlog.
But eventually I've reached the conclusion that optimistic mode is just the better approach. The main reason is that it provides very good opportunities for parallel apply (and hence speedup), while _also_ providing very strong consistency guarantees for existing (and new) applications. In more detail:
1. Optimistic replication has the best potential to discover opportunities for parallel execution - it can run _anything_ in parallel that does not conflict.
It's universal, admitted.
In can even run many transactions in parallel that _do_ conflict! Even if transactions T1 and T2 have a conflict, there is still a chance that T1 will grab the conflicting lock first, and the first part of T2 can successfully replicate in parallel with T1.
2. The in-order commit provides very strong and robust consistency guarantees, both on data integrity (eg. in case of crash), and on read-consistency (a query on slave will never see a state of data that did not also exist on master). It is important not to under-estimate how important this point can be for users to have confidence in using the parallel replication.
3. When dependencies do prevent parallel execution of T1 and T2 (and we have to kill and re-try T2), optimistic parallel replication still acts as a pre-fetcher to speed up I/O-bound workloads. I/O latencies are so large as to often dwarf any CPU bottleneck, and disk systems often require multiple outstanding I/O requests to scale well.
4. Optimistic parallel replication re-uses existing mechanisms - it uses the normal query execution path in storage engines to discover conflicts, and normal thread kill/rollback is used to handle conflicts. It avoids all the complexity of trying to duplicate the exact conflict semantics in the dependency-checking algorithms. It avoids the need to be overly conservative in the dependency-checking to ensure safety (eg. it trivially is able to parallelise non-conflicting statement-based transactions on the same single table).
And in the dependency-checking approach, if you let just one corner case through that you need to handle during execution - then you end up implementing the full optimistic conflict handling _anyway_ (as I learned the hard way :-).
Specifically to the outlined paragraph you must mean the necessity of rollback-reply in case something "unexpectedly" slips out of control? [To that part, I left the conservative method as backup.]
There are a few limitations to optimistic parallel replication that I first thought would be quite severe. But experience seems to show they are not; here are my thoughts on what makes this so:
1. Optimistic parallel replication increases the rollback rate, sometimes dramatically so. I think the point is that this rollback also happens in parallel. Even a small increase (eg. 10%) in replication throughput can make the difference between a slave that keeps up and one that lags behind, and this is well worth the cost of increasing CPU usage with a core or two, modern servers usually have plenty cores to spare. And rollback+retry typically does not incur much extra I/O overhead (eg. pre-fetching).
2. In-order commits means that look-ahead past a long-running transaction is limited by how many pending commits we can have outstanding. But as Jean-François demonstrated in his benchmarks, optimistic parallel replication can have thousands of outstanding commits and still scale. If the slave is keeping up (and hopefully it is!), there probably are not thousands of pending transactions to work in on parallel anyway. And even approaches that do allow out-of-order commit probably need some limit on look-ahead anyway
(eg. I think MySQL uses a "checkpoint" mechanism.)
To cope with gaps in the parallelization range ('pluralization' in one of mails thanks to my ispell "creative" selection :-)) you mean?
3. In-order parallel replication is limited to inter-transaction parallelism, it cannot do intra-transaction parallel apply.
Still could be approximated though with collapsing sub-transactions to create roughly equal size branches.
Thus if the workload mainly consists of large transactions with conflicts between them, speedup is limited to prefetching. Well, this is exactly the limitation that your idea is addressing! Which is one reason I found it so interesting to learn about.
I pointed earlier that the pipeline is orthogonal to optimistic, it clashes with the conservative's in-order commit. The optimistic fares best of course with sub-transactions of equal sizes which pipelinig aims to provide. So we shall expect maximum performance of combining the two.
I really think parallel replication should *just*work* out-of-the-box for the "normal" user. This is 2018, AMD server CPUs come with 32 cores, the other day I saw a raspberry-pi-lookalike board with 6 cores. A decent, robust implementation of parallel replication needs to be enabled by default. The user shouldn't need to review all of their applications to check if they can tolerate different read consistencies from on the master, let alone re-write them to spread queries over multiple tables or databases.
The optimistic parallel replication is the first method I have seen that can properly satisfy this - give decent speedup on most real-life workloads, in a way that is transparent to applications. Users can just enable optimistic parallel replication, and if it fails in some way, it is a bug that should be fixed.
(How about enabling it by default in 10.4, BTW?)
Personally I don't have objection. It's quite stable already.
Of course, 10.3 MariaDB parallel replication does not solve all problems, far from it!
I agree that better dependency analysis in the scheduler can be usefully combined with optimistic parallel replication, not just the conservative mode.
For example, DDL currently stalls optimistic parallel replication completely, and the user must manually set GTID domain id and control dependencies to replicate DDL in parallel with other transactions. Optimistic also does not do much for non-transactional DML. Improved dependency tracking and scheduling could improve this.
Analysis could also determine events that are very likely to conflict, and this way reduce unnecessary rollback. There is the "optimistic" vs. "aggressive" --slave-parallel-mode distinction to enable/disable such heuristics, but few are currently implemented.
Right, dependency tracking on master is a followup into that direction.
While the feature as you put in comments is largely about optimistic schedulers it still does apply to some corner cases in the conservative one. You mention 'different optimizer decision' which hints to SBR specifics.
Yes, the thd_rpl_deadlock_check() feature was originally implemented to fix various corner cases in conservative parallel replication. I think this mostly/only occurs when commit order on the slave is forced to be the same as on master.
The "different optimiser decision" is the theoretical case where the optimiser chooses to use an index on the master but a table scan on the slave. Or chooses different indexes on master and slave (could also apply to RBR perhaps). In this case different locks can be taken on master and slave and it seems likely that order-conflicts could result.
[ack]
Here how it would go the simple way which must have sense 'cos just statically S2->S3 can't be of common pattern.
['statistically' was meant]
The concrete example was silly, of course, updating the same row twice in a single transaction. But for example foreign key conflicts will be common. Eg.:
BEGIN; INSERT INTO parent_table(key,val) VALUES (10, "hello"); INSERT INTO child_table(key, parent_key, val) VALUES (1, 10, "xizzy"); INSERT INTO child_table(key, parent_key, val) VALUES (2, 10, "red"); INSERT INTO child_table(key, parent_key, val) VALUES (3, 10, "cube"); COMMIT;
where parent_key is a foreign key referencing "key" in parent_table.
The FK dependency type is of some challenge as it applies to RBR (the pipeline's main client). Even though few ideas exist around, initially can be handled with marking the master group to prevent pipelinig so slave.
On the other hand the pulling is volunteer and assuming that the queue always has something the worker becomes busy non stop until prepare at least (perhaps we could involve it into as well). Optimistic version this method would be to leave the prepared modified branch to the hands of the leader and switch to the next group's events.
Right.
In fact, this is a limitation of the existing scheduling algorithm that already has an effect which is seen for multi-source replication and for replication with multiple GTID domain ids. In this case it is possible for the SQL driver thread to completely fill up the worker queues with events from one source / one domain. And the other sources/domains will stall until the first one catches up. Which is not great. And thus we have the work-around with slave_domain_parallel_threads which is rather crude and inflexible.
A more flexible scheduler would also be needed to optimise the thread usage related to wait_for_prior_commit().
Indeed. This also remains in the optimistic pipelining.
Currently this method blocks the worker thread, and thus for long lookahead in optimistic parallel replication, large number of worker threads are needed, most of them idle as you also pointed out. It might be possible to leave the THD in the group commit queue and let the thread switch to work on another transaction meanwhile. This is something I never really looked into, but I think there are similar mechanisms already existing for thread pool operation. Not sure how much can be gained nowadays from avoiding large number of idle threads, but this again would require a better scheduling algorithm.
Or is there some mechanism by which two different threads could run S2 and S3 in parallel, but still within a single transactions (inside InnoDB eg?) - I imagine this is not the case.
We are certainly going to that direction. Let me shy off detailing in already a day long mail :-), anyway it's better off to show a patch that templates an idea.
Right, running a single transaction (or even single statement) on multiple threads is also something interesting to look at - also outside of replication of course, but yes, probably best left for another discussion ;-)
Now I think I'm fully equipped to write it all down on MDEV-16404 so we'll be discussing specific items and stages. Thanks for so far! Andrei
- Kristian.
participants (2)
-
andrei.elkin@pp.inet.fi
-
Kristian Nielsen