[Maria-developers] Request for a discusison: A fine-grained concurrent ring buffer mode for IO_CACHE
Hello, dear MariaDB community! I am glad to present the design for IO_CACHE's SEQ_READ_APPEND mode concurrency improvements. Jira task is MDEV-24676. A student, Vladislav Kakurin, has applied to GSoC for this task, and has passed, so he will be engaged in implementing it this summer. The discussion is very important here, since I could make the mistakes, and Vladislav can miss them, but the success of the task completion may depend on it. We're hoping to succeed in implementing the multi-producer till GSoC end. So please don't hesitate to ask for clarifications. Note that reader/writer and producer/consumer pairs are used interchangeably in the text. The text is attached as PDF, but for the sake of convenient quoting it is additionally embodied in the email with shrinked markup and no illustrations. Sincerely, Nikita Malyavin Software Engineer of MariaDB ==================================================================== A fine-grained concurrent ring buffer mode for IO_CACHE 1. Rationale. --------------------------------------------------- Current implementation of IO_CACHE’s SEQ_READ_APPEND mode behaves coarsely grained on its write buffer: every read and every write is protected by append_buffer_lock. int _my_b_seq_read(IO_CACHE *info, ...) { lock_append_buffer(info); ... // read logic unlock_append_buffer(info); return Count ? 1 : 0; } int my_b_append(IO_CACHE *info, ...) { lock_append_buffer(info); ... // append logic unlock_append_buffer(info); return 0; } Despite the separate read buffer is read-only, and therefore is accessed wait-free, the write buffer can have a contention with medium-sized transactions. The design described hereafter is going to solve this issue, and an extension for a parallel multi-producer workflow is additionally provided. Furthermore, the API extension for multi-producer approach support is proposed, and the multi-consumerness is discussed. 2. The single-producer, single-consumer case. --------------------------------------------------- Idea. The memcpy operations of consumer and producer never overlap, therefore they can be freed of locks. Overflow and emptiness We cannot begin writing in the area still involved in reading. Therefore, reader should not update the pointers before it finishes reading. This means that we should lock in the beginning to atomically read the data, and in the end, to write the new reader data. The same for vice-versa, we cannot read from the area still involved into writing, therefore a read should finish with EMPTY error (currently _my_b_seq_read just returns 1) When we reach a “buffer is full” condition, we can flip the read and write (append) buffers, if we were reading from an append buffer. Otherwise, the append buffer is flushed. The algorithm. The following pseudocode will describe the single-consumer, single-producer approach. It is assumed that reading from the read buffer is handled in the usual way. io->total_size and io->read_buffer are considered to be accessed atomically. io_err_t read(IO_CACHE *io, uchar *buffer, size_t sz) { if (sz > io->total_size) return E_IO_EMPTY; uchar *read_buffer = io->read_buffer; if (io->read_pos points to read_buffer) sz_read = read_from_read_buffer(io, buffer, sz); buffer += sz_read; sz -= sz_read; io->total_size -= sz_read; if (sz == 0) return 0; // else copy from append buffer lock(io->append_buffer_lock); // copy the local variables uchar *read_pos = io->read_pos; uchar *read_buffer = io->read_buffer; uchar *append_start_pos = i->append_start_pos; uchar *append_size = io->append_size; uchar *append_pos = io->append_pos; // etc, if needed unlock(io->append_buffer_lock); read from append buffer; lock(io->append_buffer_lock); // update the variables io->append_size -= sz; io->append_start_pos += sz; if (i->append_start_pos >= io->append_buf + io->cache_size) io->append_start_pos -= io->cache_size; unlock(io->append_buffer_lock); io->total_size -= sz; } The first read()’s part tries to read from a read-only buffer. If it’s empty, it moves the effort to a volatile append buffer. All the metadata is copied in the first critical section, before the data copying, to the stack. It is updated back in the second critical section, after the data copying. io_err_t write(IO_CACHE *io, uchar *buffer, size_t sz) { lock(io->append_buffer_lock); if (append_buffer is full and io->total_size <= io->append_size) swap(io->append_buffer, io->read_buffer); else flush the append buffer if needed; write to disk directly, if the data is too large; uchar *write_pos = io->write_pos; unlock(io->append_buffer_lock); write to append buffer; lock(io->append_buffer_lock); io->write_pos = new_write_pos; unlock(io->append_buffer_lock); io->total_size += sz; } The important note here is that we access io->read_buffer in the reader’s thread without the lock (the accesses are marked bold). However this access happens only once in the beginning and is safe: Only writer changes read_buffer. The writer can change it only once during one read() if io->read_buffer is considered reads-only, then it will not flip again, and continue to be consistent, until io->total_size is changed: io->total_size -= sz_read; Then the lock happens. It should be fine to read from a flipped buffer on that stage. 3. Multi-producer concurrency --------------------------------------------------- Idea. Writes start from io->write_start, which is to update immediately. Reads are possible only until io->read_end, which is updated, as soon as writes are finished. Medium-grained approach io->write_start is updated immediately to allow parallel writes. However, we cannot update io->read_end immediately after this thread’s write ends, because earlier writes can still be in progress. We should wait for them i.e. we wait while (io->read_end != local_read_end) Algorithm (medium-grained). Medium-grained approach will modify write() function as follows (the changed lines and locks are bolded): io_err_t write(IO_CACHE *io, uchar *buffer, size_t sz) { lock(io->append_buffer_lock); if (buffer flip of flush is needed) wait until all the writes are finished; if (append_buffer is full && io->write_total_size <= io->append_size) swap(io->append_buffer, io->read_buffer); else flush the append buffer if needed; write to disk directly, if the data is too large; uchar *local_write_start = io->write_start; io->write_total_size += sz; io->write_start += sz; if (io->write_start > io->append_buffer + io->cache_size) io->write_start -= io->cache_size; unlock(io->append_buffer_lock); write to append buffer; lock(io->write_event_lock) while(local_write_start != io->read_end) cond_wait(io->write_event, io->write_event_lock); unlock(io->write_event_lock) lock(io->append_buffer_lock); io->read_end = new_read_end; unlock(io->append_buffer_lock); cond_signal(io->write_event); io->total_size += sz; } The read function should be modified mostly cosmetically. Fine graining. The writers are still waiting for each other’s finish. The approach described here defers waiting through helping pattern by introducing progress slots. Each time a writer begins progress it allocates a slot in the dedicated (fixed size) array. When the writer finishes its job, it checks whether it is the leftmost one (relative to its read_end value. If it is, it updates read_end for itself, and for all the consecutive writers already finished. The slot allocation will be controlled by a semaphore to prevent overflow. Therefore, only a fixed number of producers can work simultaneously. The slot array is made of elements of private cache_slot_t structure: struct cache_slot_t { bool vacant: 1; bool finished: 1; uint next: size_bits(uint) - 2; uint pos; } The slot is acquired whenever a write begins by searching an array cell with vacant=1. When it’s found, vacant = 0, finished = 0 is set. The last_slot variable holds the slot index for the latest write. slots[ last_slot].next is set to a new index, and last_slot itself is updated. The following example demonstrates how the slots work: +-------------------------------------------------------------+ | | write1 | write2 | write3 | | +-------------------------------------------------------------+ A B C D +-------------------------------------------------+ | vacant=0 | vacant=1 | vacant=0 | vacant=0 | | finished=0 | | finished=1 | finished=1 | | next=2 | | next=3 | next=? | | pos=A | | pos=B | pos=C | +-------------------------------------------------+ there were three writes currently running in parallel. write2 and write3 are finished, but write1 is still running. When it finishes, it will hop through slot.next while vacant==0 and finished==1 and pos != io->write_start. Therefore, read_end will be updated to C if no other write will begin in parallel. If another write begins in parallel before write1 finishes, it allocates slots[1] and sets pos=D. slots[3].next would be set to 1, and last_slot will be updated from 3 to 1. The slot run through expected complexity is O(1). The proof for acquisition is however not that obvious to prove the same, and no effort was spent for proving it (It’s only obvious that it’s O(slots)). 4. Arbitrary data sources support --------------------------------------------------- The widely spread use-case is pouring from another IO_CACHE source (like a statement or transaction cache). The operation may require several consecutive write() calls with an external lock: lock(write_lock); uchar buffer[SIZE]; while(cache_out is not empty) { read(ceche_out, buffer, SIZE); write(cache_in, buffer, SIZE); } unlock(write_lock); This case destroys all the parallel design described. However, let’s make api changes to allow blocks of predicted size be written in parallel: /** Allocates the slot of a requested size for a writer. Returns new slot id. */ slot_id_t append_allocate(IO_CAHCE*, size_t block_size); /** Frees the slot and propogates the data to be available for reading */ void append_commit(IO_CACHE*, slot_id_t); These two functions just decompose our write() function: append_allocate would include the first critical section and append_commit would include the second one. The use-case will be changed slightly: slot_id_t slot = append_allocate(cache_out, append_tell(cache_in)); uchar buffer[SIZE]; while(cache_out is not empty) { read(ceche_out, buffer, SIZE); write(cache_in, buffer, SIZE); } append_commit(cache_out, slot); 5. Multi-consumerness --------------------------------------------------- We currently have no cases with several readers working in parallel in SEQ_READ_APPEND mode. It is only used by the replication thread to read out the log, where it is delegated to a dedicated worker. The first problem is that parallel readout would require additional coordination -- the order of event application can be important. Another problem is that a variable-sized blocks require at least two consecutive reads if the structure is not known. If the length is stored, it can be read out with exactly two reads (first reads length, second reads the body). The slot allocation strategy can be applied, and api can be added similar to a new write api: /** lock the cache and allocate the read slot */ slot_id_t read_allocate_lock(IO_CACHE*); /** Allocate a read zone of the requested size and unlock the cache */ void read_allocate_unlock(IO_CACHE*, slot_id_t, size_t size); /** Finish reading; deallocate the read slot */ viod read_commit(IO_CACHE*, slot_id_t); Reading api needs one function more than writing api -- the allocation is split on two phases: locking phase (to compute the block length), and the actual requesting phase. This approach has several disadvantages: 1. The read buffer access is no longer lock-free 2. read_allocate_lock leaves the IO_CACHE in a locked state, which can be potentially misused. Additionally, two SX locks can be used (one for readers and one for writers) for extra parallelism.
Hi Nikita, On Tue, May 18, 2021 at 04:06:25PM +0300, Nikita Malyavin wrote:
I am glad to present the design for IO_CACHE's SEQ_READ_APPEND mode concurrency improvements. Jira task is MDEV-24676.
So, this is targeting only the scenario with multiple producers and one consumer? I see SEQ_READ_APPEND is only used for Relay Log on the slave. Afaiu, the relay log has only only one producer, the network io thread. Do I miss something here? Please clarify. BR Sergei -- Sergei Petrunia, Software Developer MariaDB Corporation | Skype: sergefp | Blog: http://petrunia.net
Hi Nikita,
So, this is targeting only the scenario with multiple producers and one consumer?
Mostly, yes. Because of the log format the transactions can't be read out in parallel. (Or does GTID_EVENT or something else store the length of transaction in
Hello Sergei, On Wed, 19 May 2021 at 16:22, Sergey Petrunia <sergey@mariadb.com> wrote: the log?)
I see SEQ_READ_APPEND is only used for Relay Log on the slave. Afaiu, the relay log has only only one producer, the network io thread. Do I miss something here? Please clarify.
Right, therefore only single-producer case can be applied to the replication. We have one scenario on ES with multiple producers though.
-- Yours truly, Nikita Malyavin
participants (2)
-
Nikita Malyavin
-
Sergey Petrunia