[Maria-developers] bzr commit into Mariadb 5.2, with Maria 2.0:maria/5.2 branch (sanja:2725)
#At lp:maria/5.2 2725 sanja@askmonty.org 2009-07-07 Group commit and small optimisation of flush log (in case of page without CRC and sector protection) added. (for review) modified: mysql-test/suite/maria/r/maria3.result storage/maria/ha_maria.cc storage/maria/ma_init.c storage/maria/ma_loghandler.c storage/maria/ma_loghandler.h per-file messages: mysql-test/suite/maria/r/maria3.result New variables added storage/maria/ha_maria.cc New variables for group commit and routines to control them added. storage/maria/ma_init.c Service thread stop (if it is present) storage/maria/ma_loghandler.c 2 types of group commit added. If page is not protetcted by CRC or sector protection (i.e. we do not change header when add data to the page) we do not copy beginning of the page during forcing the buffer where the page is last to close. storage/maria/ma_loghandler.h Routines to control group commit added. === modified file 'mysql-test/suite/maria/r/maria3.result' --- a/mysql-test/suite/maria/r/maria3.result 2009-02-19 09:01:25 +0000 +++ b/mysql-test/suite/maria/r/maria3.result 2009-07-07 00:37:23 +0000 @@ -264,6 +264,8 @@ Variable_name Value maria_block_size 8192 maria_checkpoint_interval 30 maria_force_start_after_recovery_failures 0 +maria_group_commit none +maria_group_commit_rate 800 maria_log_file_size 4294959104 maria_log_purge_type immediate maria_max_sort_file_size 9223372036853727232 @@ -285,6 +287,7 @@ Maria_pagecache_read_requests # Maria_pagecache_reads # Maria_pagecache_write_requests # Maria_pagecache_writes # +Maria_transaction_log_syncs # create table t1 (b char(0)); insert into t1 values(NULL),(""); select length(b) from t1; === modified file 'storage/maria/ha_maria.cc' --- a/storage/maria/ha_maria.cc 2009-05-19 09:28:05 +0000 +++ b/storage/maria/ha_maria.cc 2009-07-07 00:37:23 +0000 @@ -101,22 +101,40 @@ TYPELIB maria_translog_purge_type_typeli array_elements(maria_translog_purge_type_names) - 1, "", maria_translog_purge_type_names, NULL }; + +/* transactional log directory sync */ const char *maria_sync_log_dir_names[]= { "NEVER", "NEWFILE", "ALWAYS", NullS }; - TYPELIB maria_sync_log_dir_typelib= { array_elements(maria_sync_log_dir_names) - 1, "", maria_sync_log_dir_names, NULL }; +/* transactional log group commit */ +const char *maria_group_commit_names[]= +{ + "none", "hard", "soft", NullS +}; +TYPELIB maria_group_commit_typelib= +{ + array_elements(maria_group_commit_names) - 1, "", + maria_group_commit_names, NULL +}; + /** Interval between background checkpoints in seconds */ static ulong checkpoint_interval; static void update_checkpoint_interval(MYSQL_THD thd, struct st_mysql_sys_var *var, void *var_ptr, const void *save); +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); +static void update_maria_group_commit_rate(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); /** After that many consecutive recovery failures, remove logs */ static ulong force_start_after_recovery_failures; static void update_log_file_size(MYSQL_THD thd, @@ -163,6 +181,24 @@ static MYSQL_SYSVAR_ULONG(log_file_size, NULL, update_log_file_size, TRANSLOG_FILE_SIZE, TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE); +static MYSQL_SYSVAR_ENUM(group_commit, maria_group_commit, + PLUGIN_VAR_RQCMDARG, + "Specifies maria group commit mode. " + "Possible values are \"none\" (no group commit), " + "\"hard\" (with waiting to actual commit), " + "\"soft\" (no wait for commit (DANGEROUS!!!))", + NULL, update_maria_group_commit, + TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib); + +static MYSQL_SYSVAR_ULONG(group_commit_rate, maria_group_commit_rate, + PLUGIN_VAR_RQCMDARG, + "Number of commits per 100 seconds. (in other words one commit for" + "every 100/maria_group_commit_rate second). 0 stands for no waiting" + "for other threads to come and do a commit in \"hard\" mode and no" + " sync()/commit at all in \"soft\" mode. Option has only an effect" + "if maria_group_commit is used", + NULL, update_maria_group_commit_rate, 800, 0, UINT_MAX, 1); + static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type, PLUGIN_VAR_RQCMDARG, "Specifies how maria transactional log will be purged. " @@ -3254,6 +3290,8 @@ static struct st_mysql_sys_var* system_v MYSQL_SYSVAR(block_size), MYSQL_SYSVAR(checkpoint_interval), MYSQL_SYSVAR(force_start_after_recovery_failures), + MYSQL_SYSVAR(group_commit), + MYSQL_SYSVAR(group_commit_rate), MYSQL_SYSVAR(page_checksum), MYSQL_SYSVAR(log_dir_path), MYSQL_SYSVAR(log_file_size), @@ -3284,6 +3322,97 @@ static void update_checkpoint_interval(M } /** + @brief Updates group commit mode +*/ + +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong value= (ulong)*((long *)var_ptr); + DBUG_ENTER("update_maria_group_commit"); + DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu", + value, (ulong)(*(long *)save), + maria_group_commit_rate)); + /* old value */ + switch (value) { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(FALSE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(FALSE); + if (maria_group_commit_rate) + translog_soft_sync_end(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + value= *(ulong *)var_ptr= (ulong)(*(long *)save); + translog_sync(); + /* new value */ + switch (value) { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(TRUE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(TRUE); + /* variable change made under global lock so we can just read it */ + if (maria_group_commit_rate) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + +/** + @brief Updates group commit rate +*/ + +static void update_maria_group_commit_rate(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong new_value= (ulong)*((long *)save); + ulong *value_ptr= (ulong*) var_ptr; + DBUG_ENTER("update_maria_group_commit_rate"); + DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu", + *value_ptr, new_value, maria_group_commit)); + if (new_value && + ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + + TRANSLOG_RATE_BASE / 2) / + TRANSLOG_RATE_BASE) == 0) + new_value= 0; /* protection against too small value */ + + /* variable change made under global lock so we can just read it */ + switch (maria_group_commit) { + case TRANSLOG_GCOMMIT_NONE: + *value_ptr= new_value; + translog_set_group_commit_rate(new_value); + break; + case TRANSLOG_GCOMMIT_HARD: + *value_ptr= new_value; + translog_set_group_commit_rate(new_value); + break; + case TRANSLOG_GCOMMIT_SOFT: + if (*value_ptr) + translog_soft_sync_end(); + translog_set_group_commit_rate(new_value); + if ((*value_ptr= new_value)) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + +/** @brief Updates the transaction log file limit. */ @@ -3305,6 +3434,7 @@ static SHOW_VAR status_variables[]= { {"Maria_pagecache_reads", (char*) &maria_pagecache_var.global_cache_read, SHOW_LONGLONG}, {"Maria_pagecache_write_requests", (char*) &maria_pagecache_var.global_cache_w_requests, SHOW_LONGLONG}, {"Maria_pagecache_writes", (char*) &maria_pagecache_var.global_cache_write, SHOW_LONGLONG}, + {"Maria_transaction_log_syncs", (char*) &translog_syncs, SHOW_LONGLONG}, {NullS, NullS, SHOW_LONG} }; === modified file 'storage/maria/ma_init.c' --- a/storage/maria/ma_init.c 2008-10-09 20:03:54 +0000 +++ b/storage/maria/ma_init.c 2009-07-07 00:37:23 +0000 @@ -82,6 +82,11 @@ void maria_end(void) maria_inited= maria_multi_threaded= FALSE; ft_free_stopwords(); ma_checkpoint_end(); + if (translog_status == TRANSLOG_OK) + { + translog_soft_sync_end(); + translog_sync(); + } if ((trid= trnman_get_max_trid()) > max_trid_in_control_file) { /* === modified file 'storage/maria/ma_loghandler.c' --- a/storage/maria/ma_loghandler.c 2009-05-19 09:28:05 +0000 +++ b/storage/maria/ma_loghandler.c 2009-07-07 00:37:23 +0000 @@ -18,6 +18,7 @@ #include "ma_blockrec.h" /* for some constants and in-write hooks */ #include "ma_key_recover.h" /* For some in-write hooks */ #include "ma_checkpoint.h" +#include "ma_servicethread.h" /* On Windows, neither my_open() nor my_sync() work for directories. @@ -47,6 +48,15 @@ #include <m_ctype.h> #endif +/** @brief protects checkpoint_in_progress */ +static pthread_mutex_t LOCK_soft_sync; +/** @brief for killing the background checkpoint thread */ +static pthread_cond_t COND_soft_sync; +/** @brief control structure for checkpoint background thread */ +static MA_SERVICE_THREAD_CONTROL soft_sync_control= + {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync}; + + /* transaction log file descriptor */ typedef struct st_translog_file { @@ -124,10 +134,20 @@ struct st_translog_buffer /* Previous buffer offset to detect it flush finish */ TRANSLOG_ADDRESS prev_buffer_offset; /* + If the buffer was forced to close it save value of its horizon + otherwise LSN_IMPOSSIBLE + */ + TRANSLOG_ADDRESS pre_force_close_horizon; + /* How much is written (or will be written when copy_to_buffer_in_progress become 0) to this buffer */ translog_size_t size; + /* + How much data was skipped during moving page from previous buffer + to this one (it is optimisation of forcing buffer to finish + */ + uint skipped_data; /* File handler for this buffer */ TRANSLOG_FILE *file; /* Threads which are waiting for buffer filling/freeing */ @@ -304,6 +324,7 @@ struct st_translog_descriptor */ pthread_mutex_t log_flush_lock; pthread_cond_t log_flush_cond; + pthread_cond_t new_goal_cond; /* Protects changing of headers of finished files (max_lsn) */ pthread_mutex_t file_header_lock; @@ -344,13 +365,39 @@ static struct st_translog_descriptor log ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE; ulong log_file_size= TRANSLOG_FILE_SIZE; +/* sync() of log files directory mode */ ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE; +ulong maria_group_commit= TRANSLOG_GCOMMIT_NONE; +ulong maria_group_commit_rate= 0; /* Marker for end of log */ static uchar end_of_log= 0; #define END_OF_LOG &end_of_log +/** + Switch for "soft" sync (no real sync() but periodical sync by service + thread) +*/ +static volatile my_bool soft_sync= FALSE; +/** + Switch for "hard" group commit mode +*/ +static volatile my_bool hard_group_commit= FALSE; +/** + File numbers interval which have to be sync() +*/ +static uint32 soft_sync_min= 0; +static uint32 soft_sync_max= 0; +/** + stores interval in nanoseconds/TRANSLOG_RATE_BASE (to + fit into uint32) +*/ +static uint32 group_commit_wait= 0; enum enum_translog_status translog_status= TRANSLOG_UNINITED; +ulonglong translog_syncs= 0; /* Number of sync()s */ + +/* time of last flush */ +static ulonglong flush_start= 0; /* chunk types */ #define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */ @@ -980,12 +1027,17 @@ static TRANSLOG_FILE *get_logfile_by_num static TRANSLOG_FILE *get_current_logfile() { TRANSLOG_FILE *file; + DBUG_ENTER("get_current_logfile"); rw_rdlock(&log_descriptor.open_files_lock); + DBUG_PRINT("info", ("max_file: %lu min_file: %lu open_files: %lu", + (ulong) log_descriptor.max_file, + (ulong) log_descriptor.min_file, + (ulong) log_descriptor.open_files.elements)); DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 == log_descriptor.open_files.elements); file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **); rw_unlock(&log_descriptor.open_files_lock); - return (file); + DBUG_RETURN(file); } uchar NEAR maria_trans_file_magic[]= @@ -1069,6 +1121,7 @@ static my_bool translog_write_file_heade static my_bool translog_max_lsn_to_header(File file, LSN lsn) { uchar lsn_buff[LSN_STORE_SIZE]; + my_bool rc; DBUG_ENTER("translog_max_lsn_to_header"); DBUG_PRINT("enter", ("File descriptor: %ld " "lsn: (%lu,0x%lx)", @@ -1077,11 +1130,13 @@ static my_bool translog_max_lsn_to_heade lsn_store(lsn_buff, lsn); - DBUG_RETURN(my_pwrite(file, lsn_buff, - LSN_STORE_SIZE, - (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), - log_write_flags) != 0 || - my_sync(file, MYF(MY_WME)) != 0); + if (!(rc= (my_pwrite(file, lsn_buff, + LSN_STORE_SIZE, + (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), + log_write_flags) != 0 || + my_sync(file, MYF(MY_WME)) != 0))) + translog_syncs++; + DBUG_RETURN(rc); } @@ -1423,7 +1478,9 @@ LSN translog_get_file_max_lsn_stored(uin static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num) { DBUG_ENTER("translog_buffer_init"); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= + LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); @@ -1435,6 +1492,7 @@ static my_bool translog_buffer_init(stru memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER); /* Buffer size */ buffer->size= 0; + buffer->skipped_data= 0; /* cond of thread which is waiting for buffer filling */ if (pthread_cond_init(&buffer->waiting_filling_buffer, 0)) DBUG_RETURN(1); @@ -1489,7 +1547,10 @@ static my_bool translog_close_log_file(T TODO: sync only we have changed the log */ if (!file->is_sync) + { rc= my_sync(file->handler.file, MYF(MY_WME)); + translog_syncs++; + } rc|= my_close(file->handler.file, MYF(MY_WME)); my_free(file, MYF(0)); return test(rc); @@ -2044,7 +2105,8 @@ static void translog_start_buffer(struct (ulong) LSN_OFFSET(log_descriptor.horizon), (ulong) LSN_OFFSET(log_descriptor.horizon))); DBUG_ASSERT(buffer_no == buffer->buffer_no); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); buffer->offset= log_descriptor.horizon; @@ -2052,6 +2114,7 @@ static void translog_start_buffer(struct buffer->file= get_current_logfile(); buffer->overlay= 0; buffer->size= 0; + buffer->skipped_data= 0; translog_cursor_init(cursor, buffer, buffer_no); DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx " "chaser: %d Size: %lu (%lu)", @@ -2523,6 +2586,7 @@ static my_bool translog_buffer_flush(str TRANSLOG_ADDRESS offset= buffer->offset; TRANSLOG_FILE *file= buffer->file; uint8 ver= buffer->ver; + uint skipped_data; DBUG_ENTER("translog_buffer_flush"); DBUG_PRINT("enter", ("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu", @@ -2557,6 +2621,8 @@ static my_bool translog_buffer_flush(str disk */ file= buffer->file; + skipped_data= buffer->skipped_data; + DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE); for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE; i < buffer->size; i+= TRANSLOG_PAGE_SIZE, pg++) @@ -2573,13 +2639,16 @@ static my_bool translog_buffer_flush(str DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN) DBUG_RETURN(1); - if (pagecache_inject(log_descriptor.pagecache, + if (pagecache_write_part(log_descriptor.pagecache, &file->handler, pg, 3, buffer->buffer + i, PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_LEFT_UNLOCKED, - PAGECACHE_PIN_LEFT_UNPINNED, 0, - LSN_IMPOSSIBLE)) + PAGECACHE_PIN_LEFT_UNPINNED, + PAGECACHE_WRITE_DONE, 0, + LSN_IMPOSSIBLE, + skipped_data, + TRANSLOG_PAGE_SIZE - skipped_data)) { DBUG_PRINT("error", ("Can't write page (%lu,0x%lx) to pagecache, error: %d", @@ -2589,10 +2658,12 @@ static my_bool translog_buffer_flush(str translog_stop_writing(); DBUG_RETURN(1); } + skipped_data= 0; } file->is_sync= 0; - if (my_pwrite(file->handler.file, buffer->buffer, - buffer->size, LSN_OFFSET(buffer->offset), + if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data, + buffer->size - buffer->skipped_data, + LSN_OFFSET(buffer->offset) + buffer->skipped_data, log_write_flags)) { DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu " @@ -2985,6 +3056,7 @@ restart: uchar *from, *table= NULL; int is_last_unfinished_page; uint last_protected_sector= 0; + uint skipped_data= curr_buffer->skipped_data; TRANSLOG_FILE file_copy; uint8 ver= curr_buffer->ver; translog_wait_for_writers(curr_buffer); @@ -2997,7 +3069,25 @@ restart: } DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset)); from= curr_buffer->buffer + (addr - curr_buffer->offset); - memcpy(buffer, from, TRANSLOG_PAGE_SIZE); + if (skipped_data > (addr - curr_buffer->offset)) + { + /* + We read page part of which is not present in buffer, + so we should read absent part from file (page cache actually) + */ + file= get_logfile_by_number(file_no); + DBUG_ASSERT(file != NULL); + buffer= pagecache_read(log_descriptor.pagecache, &file->handler, + LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE, + 3, buffer, + PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, + NULL); + } + else + skipped_data= 0; /* Read after skipped in buffer data */ + memcpy(buffer + skipped_data, from + skipped_data, + TRANSLOG_PAGE_SIZE - skipped_data); /* We can use copy then in translog_page_validator() because it do not put it permanently somewhere. @@ -3291,6 +3381,7 @@ static my_bool translog_truncate_log(TRA uint32 next_page_offset, page_rest; uint32 i; File fd; + int rc; TRANSLOG_VALIDATOR_DATA data; char path[FN_REFLEN]; uchar page_buff[TRANSLOG_PAGE_SIZE]; @@ -3316,14 +3407,19 @@ static my_bool translog_truncate_log(TRA TRANSLOG_PAGE_SIZE); page_rest= next_page_offset - LSN_OFFSET(addr); memset(page_buff, TRANSLOG_FILLER, page_rest); - if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 || - ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) || - (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr), - log_write_flags)) || - my_sync(fd, MYF(MY_WME))) | - my_close(fd, MYF(MY_WME))) || - (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && - sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)))) + rc= ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 || + ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) || + (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr), + log_write_flags)) || + my_sync(fd, MYF(MY_WME))))); + translog_syncs++; + rc|= (fd > 0 && my_close(fd, MYF(MY_WME))); + if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS) + { + rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + translog_syncs++; + } + if (rc) DBUG_RETURN(1); /* fix the horizon */ @@ -3511,6 +3607,7 @@ my_bool translog_init_with_table(const c pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock, MY_MUTEX_INIT_FAST) || pthread_cond_init(&log_descriptor.log_flush_cond, 0) || + pthread_cond_init(&log_descriptor.new_goal_cond, 0) || my_rwlock_init(&log_descriptor.open_files_lock, NULL) || my_init_dynamic_array(&log_descriptor.open_files, @@ -3912,7 +4009,6 @@ my_bool translog_init_with_table(const c log_descriptor.flushed= log_descriptor.horizon; log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset; log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */ - log_descriptor.previous_flush_horizon= log_descriptor.horizon; /* Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially) address of the next LSN and we want indicate that all LSNs that are @@ -3995,6 +4091,10 @@ my_bool translog_init_with_table(const c It is beginning of the log => there is no LSNs in the log => There is no harm in leaving it "as-is". */ + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor. + previous_flush_horizon))); DBUG_RETURN(0); } file_no--; @@ -4070,6 +4170,9 @@ my_bool translog_init_with_table(const c translog_free_record_header(&rec); } } + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.previous_flush_horizon))); DBUG_RETURN(0); err: ma_message_no_user(0, "log initialization failed"); @@ -4157,6 +4260,7 @@ void translog_destroy() pthread_mutex_destroy(&log_descriptor.log_flush_lock); pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock); pthread_cond_destroy(&log_descriptor.log_flush_cond); + pthread_cond_destroy(&log_descriptor.new_goal_cond); rwlock_destroy(&log_descriptor.open_files_lock); delete_dynamic(&log_descriptor.open_files); delete_dynamic(&log_descriptor.unfinished_files); @@ -6885,11 +6989,11 @@ int translog_read_record_header_from_buf { translog_size_t res; DBUG_ENTER("translog_read_record_header_from_buffer"); + DBUG_PRINT("info", ("page byte: 0x%x offset: %u", + (uint) page[page_offset], (uint) page_offset)); DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset])); DBUG_ASSERT(translog_status == TRANSLOG_OK || translog_status == TRANSLOG_READONLY); - DBUG_PRINT("info", ("page byte: 0x%x offset: %u", - (uint) page[page_offset], (uint) page_offset)); buff->type= (page[page_offset] & TRANSLOG_REC_TYPE); buff->short_trid= uint2korr(page + page_offset + 1); DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)", @@ -7356,27 +7460,27 @@ static void translog_force_current_buffe "Buffer addr: (%lu,0x%lx) " "Page addr: (%lu,0x%lx) " "size: %lu (%lu) Pg: %u left: %u in progress %u", - (uint) log_descriptor.bc.buffer_no, - (ulong) log_descriptor.bc.buffer, - LSN_IN_PARTS(log_descriptor.bc.buffer->offset), + (uint) old_buffer_no, + (ulong) old_buffer, + LSN_IN_PARTS(old_buffer->offset), (ulong) LSN_FILE_NO(log_descriptor.horizon), (ulong) (LSN_OFFSET(log_descriptor.horizon) - log_descriptor.bc.current_page_fill), - (ulong) log_descriptor.bc.buffer->size, + (ulong) old_buffer->size, (ulong) (log_descriptor.bc.ptr -log_descriptor.bc. buffer->buffer), (uint) log_descriptor.bc.current_page_fill, (uint) left, - (uint) log_descriptor.bc.buffer-> + (uint) old_buffer-> copy_to_buffer_in_progress)); translog_lock_assert_owner(); LINT_INIT(current_page_fill); - new_buff_beginning= log_descriptor.bc.buffer->offset; - new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */ + new_buff_beginning= old_buffer->offset; + new_buff_beginning+= old_buffer->size; /* increase offset */ DBUG_ASSERT(log_descriptor.bc.ptr !=NULL); DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) == - LSN_FILE_NO(log_descriptor.bc.buffer->offset)); + LSN_FILE_NO(old_buffer->offset)); translog_check_cursor(&log_descriptor.bc); DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE); if (left) @@ -7387,18 +7491,20 @@ static void translog_force_current_buffe */ DBUG_PRINT("info", ("left: %u", (uint) left)); + old_buffer->pre_force_close_horizon= + old_buffer->offset + old_buffer->size; /* decrease offset */ new_buff_beginning-= log_descriptor.bc.current_page_fill; current_page_fill= log_descriptor.bc.current_page_fill; memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left); - log_descriptor.bc.buffer->size+= left; + old_buffer->size+= left; DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx " "Size: %lu", - (uint) log_descriptor.bc.buffer->buffer_no, - (ulong) log_descriptor.bc.buffer, - (ulong) log_descriptor.bc.buffer->size)); - DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no == + (uint) old_buffer->buffer_no, + (ulong) old_buffer, + (ulong) old_buffer->size)); + DBUG_ASSERT(old_buffer->buffer_no == log_descriptor.bc.buffer_no); } else @@ -7509,11 +7615,21 @@ static void translog_force_current_buffe if (left) { - /* - TODO: do not copy beginning of the page if we have no CRC or sector - checks on - */ - memcpy(new_buffer->buffer, data, current_page_fill); + if (log_descriptor.flags & + (TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION)) + memcpy(new_buffer->buffer, data, current_page_fill); + else + { + /* + This page header does not change if we add more data to the page so + we can not copy it and will not overwrite later + */ + new_buffer->skipped_data= current_page_fill; +#ifndef DBUG_OFF + memset(new_buffer->buffer, 0xa5, current_page_fill); +#endif + DBUG_ASSERT(new_buffer->skipped_data < TRANSLOG_PAGE_SIZE); + } } old_buffer->next_buffer_offset= new_buffer->offset; translog_buffer_lock(new_buffer); @@ -7561,6 +7677,7 @@ void translog_flush_set_new_goal_and_wai { log_descriptor.next_pass_max_lsn= lsn; log_descriptor.max_lsn_requester= pthread_self(); + pthread_cond_broadcast(&log_descriptor.new_goal_cond); } while (flush_no == log_descriptor.flush_no) { @@ -7572,67 +7689,79 @@ void translog_flush_set_new_goal_and_wai /** - @brief Flush the log up to given LSN (included) + @brief sync() range of files (inclusive) and directory (by request) - @param lsn log record serial number up to which (inclusive) - the log has to be flushed + @param min min internal file number to flush + @param max max internal file number to flush + @param sync_dir need sync directory - @return Operation status + return Operation status @retval 0 OK @retval 1 Error - */ -my_bool translog_flush(TRANSLOG_ADDRESS lsn) +static my_bool translog_sync_files(uint32 min, uint32 max, + my_bool sync_dir) { - LSN sent_to_disk= LSN_IMPOSSIBLE; - TRANSLOG_ADDRESS flush_horizon; - uint fn, i; - dirty_buffer_mask_t dirty_buffer_mask; - uint8 last_buffer_no, start_buffer_no; + uint fn; my_bool rc= 0; - DBUG_ENTER("translog_flush"); - DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); - DBUG_ASSERT(translog_status == TRANSLOG_OK || - translog_status == TRANSLOG_READONLY); - LINT_INIT(sent_to_disk); - - pthread_mutex_lock(&log_descriptor.log_flush_lock); - DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.flushed))); - if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) + ulonglong flush_interval; + DBUG_ENTER("translog_sync_files"); + DBUG_PRINT("info", ("min: %lu max: %lu sync dir: %d", + (ulong) min, (ulong) max, (int) sync_dir)); + DBUG_ASSERT(min <= max); + + flush_interval= group_commit_wait; + if (flush_interval) + flush_start= my_micro_time(); + for (fn= min; fn <= max; fn++) { - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); - } - if (log_descriptor.flush_in_progress) - { - translog_flush_set_new_goal_and_wait(lsn); - if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) + TRANSLOG_FILE *file= get_logfile_by_number(fn); + DBUG_ASSERT(file != NULL); + if (!file->is_sync) { - /* fix lsn if it was horizon */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) - lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); - translog_flush_wait_for_end(lsn); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); + if (my_sync(file->handler.file, MYF(MY_WME))) + { + rc= 1; + translog_stop_writing(); + DBUG_RETURN(rc); + } + translog_syncs++; + file->is_sync= 1; } - log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; } - log_descriptor.flush_in_progress= 1; - flush_horizon= log_descriptor.previous_flush_horizon; - DBUG_PRINT("info", ("flush_in_progress is set")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - translog_lock(); - if (log_descriptor.is_everything_flushed) + if (sync_dir) { - DBUG_PRINT("info", ("everything is flushed")); - rc= (translog_status == TRANSLOG_READONLY); - translog_unlock(); - goto out; + if (!(rc= sync_dir(log_descriptor.directory_fd, + MYF(MY_WME | MY_IGNORE_BADFD)))) + translog_syncs++; } + DBUG_RETURN(rc); +} + + +/* + @brief Flushes buffers with LSNs in them less or equal address <lsn> + + @param lsn address up to which all LSNs should be flushed, + can be reset to real last LSN address + @parem sent_to_disk returns 'sent to disk' position + @param flush_horizon returns horizon of the flush + + @note About terminology see comment to translog_flush(). +*/ + +void translog_flush_buffers(TRANSLOG_ADDRESS *lsn, + TRANSLOG_ADDRESS *sent_to_disk, + TRANSLOG_ADDRESS *flush_horizon) +{ + dirty_buffer_mask_t dirty_buffer_mask; + uint i; + uint8 last_buffer_no, start_buffer_no; + DBUG_ENTER("translog_flush_buffers"); + /* We will recheck information when will lock buffers one by one so we can use unprotected read here (this is just for @@ -7656,15 +7785,15 @@ my_bool translog_flush(TRANSLOG_ADDRESS /* if LSN up to which we have to flush bigger then maximum LSN of previous buffer and at least one LSN was saved in the current buffer (last_lsn != - LSN_IMPOSSIBLE) then we better finish the current buffer. + LSN_IMPOSSIBLE) then we have to close the current buffer. */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && + if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE) { struct st_translog_buffer *buffer= log_descriptor.bc.buffer; - lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ + *lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); + LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); last_buffer_no= log_descriptor.bc.buffer_no; log_descriptor.is_everything_flushed= 1; translog_force_current_buffer_to_finish(); @@ -7676,8 +7805,10 @@ my_bool translog_flush(TRANSLOG_ADDRESS TRANSLOG_BUFFERS_NO); translog_unlock(); } - sent_to_disk= translog_get_sent_to_disk(); - if (cmp_translog_addr(lsn, sent_to_disk) > 0) + + /* flush buffers */ + *sent_to_disk= translog_get_sent_to_disk(); + if (cmp_translog_addr(*lsn, *sent_to_disk) > 0) { DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u", @@ -7697,53 +7828,237 @@ my_bool translog_flush(TRANSLOG_ADDRESS LSN_IN_PARTS(buffer->last_lsn), (buffer->file ? "dirty" : "closed"))); - if (buffer->prev_last_lsn <= lsn && + if (buffer->prev_last_lsn <= *lsn && buffer->file != NULL) { - DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size); - flush_horizon= buffer->offset + buffer->size; + DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size); + *flush_horizon= (buffer->pre_force_close_horizon != LSN_IMPOSSIBLE ? + buffer->pre_force_close_horizon : + buffer->offset + buffer->size); + /* pre_force_close_horizon is reset during new buffer start */ + DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(*flush_horizon))); + DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon); + translog_buffer_flush(buffer); } translog_buffer_unlock(buffer); i= (i + 1) % TRANSLOG_BUFFERS_NO; } while (i != last_buffer_no); - sent_to_disk= translog_get_sent_to_disk(); + *sent_to_disk= translog_get_sent_to_disk(); + } + + DBUG_VOID_RETURN; +} + +/** + @brief Flush the log up to given LSN (included) + + @param lsn log record serial number up to which (inclusive) + the log has to be flushed + + @return Operation status + @retval 0 OK + @retval 1 Error + + @note + + - Non group commit logic: Commits made in passes. Thread which started + flush first is performing actual flush, other threads sets new goal (LSN) + of the next pass (if it is maximum) and waits for the pass end or just + wait for the pass end. + + - If hard group commit enabled and rate set to zero: + The first thread sends all changed buffers to disk. This is repeated + as long as there are new LSNs added. The process can not loop + forever because we have limited number of threads and they will wait + for the data to be synced. + Pseudo code: + + do + send changed buffers to disk + while new_goal + sync + + - If hard group commit switched ON and less than rate microseconds has + passed from last sync, then after buffers have been sent to disk + wait until rate microseconds has passed since last sync, do sync and return. + This ensures that if we call sync infrequently we don't do any waits. + + - If soft group commit enabled everything works as with 'non group commit' + but the thread doesn't do any real sync(). If rate is not zero the + sync() will be performed by a service thread with the given rate + when needed (new LSN appears). + + @note Terminology: + 'sent to disk' means written to disk but not sync()ed, + 'flushed' mean sent to disk and synced(). +*/ + +my_bool translog_flush(TRANSLOG_ADDRESS lsn) +{ + struct timespec abstime; + ulonglong flush_interval; + ulonglong time_spent; + LSN sent_to_disk= LSN_IMPOSSIBLE; + TRANSLOG_ADDRESS flush_horizon; + my_bool rc= 0; + my_bool hgroup_commit_at_start; + DBUG_ENTER("translog_flush"); + DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); + DBUG_ASSERT(translog_status == TRANSLOG_OK || + translog_status == TRANSLOG_READONLY); + LINT_INIT(sent_to_disk); + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.flushed))); + if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) + + + { + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); } + if (log_descriptor.flush_in_progress) + { + translog_lock(); + /* fix lsn if it was horizon */ + if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) + lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); + translog_unlock(); + translog_flush_set_new_goal_and_wait(lsn); + if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) + { + /* + translog_flush_wait_for_end() release log_flush_lock while is + waiting then acquire it again + */ + translog_flush_wait_for_end(lsn); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); + } + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + } + log_descriptor.flush_in_progress= 1; + flush_horizon= log_descriptor.previous_flush_horizon; + DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(flush_horizon))); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + hgroup_commit_at_start= hard_group_commit; + if (hgroup_commit_at_start) + flush_interval= group_commit_wait * TRANSLOG_RATE_BASE; - /* sync files from previous flush till current one */ - for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++) + translog_lock(); + if (log_descriptor.is_everything_flushed) { - TRANSLOG_FILE *file= get_logfile_by_number(fn); - DBUG_ASSERT(file != NULL); - if (!file->is_sync) + DBUG_PRINT("info", ("everything is flushed")); + translog_unlock(); + pthread_mutex_lock(&log_descriptor.log_flush_lock); + goto out; + } + + for (;;) + { + /* Following function flushes buffers and makes translog_unlock() */ + translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon); + + if (!hgroup_commit_at_start) + break; /* flush pass is ended */ + +retest: + if (flush_interval != 0 && + (my_micro_time() - flush_start) >= flush_interval) + break; /* flush pass is ended */ + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE) + { + /* take next goal */ + lsn= log_descriptor.next_pass_max_lsn; + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + /* prevent other thread from continue */ + log_descriptor.max_lsn_requester= pthread_self(); + DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)", + LSN_IN_PARTS(lsn))); + } + else { - if (my_sync(file->handler.file, MYF(MY_WME))) + if (flush_interval == 0 || + (time_spent= (my_micro_time() - flush_start)) >= flush_interval) { - rc= 1; - translog_stop_writing(); - sent_to_disk= LSN_IMPOSSIBLE; - goto out; + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + break; } - file->is_sync= 1; + DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu", + flush_interval - time_spent, + flush_interval, time_spent)); + /* wait time or next goal */ + set_timespec_nsec(abstime, flush_interval - time_spent); + pthread_cond_timedwait(&log_descriptor.new_goal_cond, + &log_descriptor.log_flush_lock, + &abstime); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("retest conditions")); + goto retest; } + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + /* next flush pass */ + DBUG_PRINT("info", ("next flush pass")); + translog_lock(); } - if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && - (LSN_FILE_NO(log_descriptor.previous_flush_horizon) != - LSN_FILE_NO(flush_horizon) || - ((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) / - TRANSLOG_PAGE_SIZE) != - ((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE))) - rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + /* + sync() files from previous flush till current one + */ + if (!soft_sync || hgroup_commit_at_start) + { + if ((rc= + translog_sync_files(LSN_FILE_NO(log_descriptor.flushed), + LSN_FILE_NO(lsn), + sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && + (LSN_FILE_NO(log_descriptor. + previous_flush_horizon) != + LSN_FILE_NO(flush_horizon) || + (LSN_OFFSET(log_descriptor. + previous_flush_horizon) / + TRANSLOG_PAGE_SIZE) != + (LSN_OFFSET(flush_horizon) / + TRANSLOG_PAGE_SIZE))))) + { + sent_to_disk= LSN_IMPOSSIBLE; + pthread_mutex_lock(&log_descriptor.log_flush_lock); + goto out; + } + /* keep values for soft sync() and forced sync() actual */ + { + uint32 fileno= LSN_FILE_NO(lsn); + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_min, fileno); + my_atomic_store32(&soft_sync_max, fileno); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + } + } + else + { + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn)); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + } + + DBUG_ASSERT(flush_horizon <= log_descriptor.horizon); + + pthread_mutex_lock(&log_descriptor.log_flush_lock); log_descriptor.previous_flush_horizon= flush_horizon; out: - pthread_mutex_lock(&log_descriptor.log_flush_lock); if (sent_to_disk != LSN_IMPOSSIBLE) log_descriptor.flushed= sent_to_disk; log_descriptor.flush_in_progress= 0; log_descriptor.flush_no++; DBUG_PRINT("info", ("flush_in_progress is dropped")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock);\ + pthread_mutex_unlock(&log_descriptor.log_flush_lock); pthread_cond_broadcast(&log_descriptor.log_flush_cond); DBUG_RETURN(rc); } @@ -8113,6 +8428,8 @@ LSN translog_first_theoretical_lsn() my_bool translog_purge(TRANSLOG_ADDRESS low) { uint32 last_need_file= LSN_FILE_NO(low); + uint32 min_unsync; + int soft; TRANSLOG_ADDRESS horizon= translog_get_horizon(); int rc= 0; DBUG_ENTER("translog_purge"); @@ -8120,12 +8437,23 @@ my_bool translog_purge(TRANSLOG_ADDRESS DBUG_ASSERT(translog_status == TRANSLOG_OK || translog_status == TRANSLOG_READONLY); + soft= soft_sync; + DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync)); + if (soft && min_unsync < last_need_file) + { + last_need_file= min_unsync; + DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file)); + } + pthread_mutex_lock(&log_descriptor.purger_lock); + DBUG_PRINT("info", ("last_lsn_checked file: %lu:", + (ulong) log_descriptor.last_lsn_checked)); if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file) { uint32 i; uint32 min_file= translog_first_file(horizon, 1); DBUG_ASSERT(min_file != 0); /* log is already started */ + DBUG_PRINT("info", ("min_file: %lu:",(ulong) min_file)); for(i= min_file; i < last_need_file && rc == 0; i++) { LSN lsn= translog_get_file_max_lsn_stored(i); @@ -8356,6 +8684,153 @@ my_bool translog_log_debug_info(TRN *trn } + +/** + Sets soft sync mode + + @param mode TRUE if we need switch soft sync on else off +*/ + +void translog_soft_sync(my_bool mode) +{ + soft_sync= mode; +} + + +/** + Sets hard group commit + + @param mode TRUE if we need switch hard group commit on else off +*/ + +void translog_hard_group_commit(my_bool mode) +{ + hard_group_commit= mode; +} + + +/** + @brief forced log sync (used when we are switching modes) +*/ + +void translog_sync() +{ + uint32 max= get_current_logfile()->number; + uint32 min; + DBUG_ENTER("ma_translog_sync"); + + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + if (!min) + min= max; + + translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS); + + DBUG_VOID_RETURN; +} + + +/** + @brief set rate for group commit + + @param rate rate to set. + + @note internally it stores interval in nanoseconds/TRANSLOG_RATE_BASE (to + fit into uint32) +*/ + +void translog_set_group_commit_rate(uint32 rate) +{ + DBUG_ENTER("translog_set_group_commit_rate"); + ulonglong wait_time; + if (rate) + { + wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate + + TRANSLOG_RATE_BASE / 2) / + TRANSLOG_RATE_BASE); + if (wait_time == 0) + wait_time= 1; /* protection from getting special value */ + } + else + wait_time= 0; + group_commit_wait= wait_time; + DBUG_PRINT("info", ("rate: %lu wait: %llu", + (ulong)rate, (ulonglong)wait_time)); + DBUG_VOID_RETURN; +} + + +/** + @brief syncing service thread +*/ + +static pthread_handler_t +ma_soft_sync_background( void *arg __attribute__((unused))) +{ + + my_thread_init(); + { + DBUG_ENTER("ma_soft_sync_background"); + for(;;) + { + ulonglong prev_loop= my_micro_time(); + ulonglong time, sleep; + uint32 min, max; + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + max= my_atomic_load32(&soft_sync_max); + my_atomic_store32(&soft_sync_min, max); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + + sleep= group_commit_wait * TRANSLOG_RATE_BASE; + translog_sync_files(min, max, FALSE); + time= my_micro_time() - prev_loop; + if (time > sleep) + sleep= 0; + else + sleep-= time; + if (my_service_thread_sleep(&soft_sync_control, sleep)) + break; + } + my_service_thread_signal_end(&soft_sync_control); + my_thread_end(); + DBUG_RETURN(0); + } +} + + +/** + @brief Starts syncing thread +*/ + +int translog_soft_sync_start(void) +{ + pthread_t th; + int res= 0; + DBUG_ENTER("translog_soft_sync_start"); + if (!(res= ma_service_thread_control_init(&soft_sync_control))) + if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL))) + soft_sync_control.status= THREAD_RUNNING; + DBUG_RETURN(res); +} + + +/** + @brief Stops syncing thread +*/ + +void translog_soft_sync_end(void) +{ + DBUG_ENTER("translog_soft_sync_end"); + if (soft_sync_control.inited) + { + ma_service_thread_control_end(&soft_sync_control); + } + DBUG_VOID_RETURN; +} + + #ifdef MARIA_DUMP_LOG #include <my_getopt.h> extern void translog_example_table_init(); === modified file 'storage/maria/ma_loghandler.h' --- a/storage/maria/ma_loghandler.h 2009-01-15 22:25:53 +0000 +++ b/storage/maria/ma_loghandler.h 2009-07-07 00:37:23 +0000 @@ -342,6 +342,14 @@ enum enum_translog_status TRANSLOG_SHUTDOWN /* going to shutdown the loghandler */ }; extern enum enum_translog_status translog_status; +extern ulonglong translog_syncs; /* Number of sync()s */ + +void translog_soft_sync(my_bool mode); +void translog_hard_group_commit(my_bool mode); +int translog_soft_sync_start(void); +void translog_soft_sync_end(void); +void translog_sync(); +void translog_set_group_commit_rate(uint32 rate); /* all the rest added because of recovery; should we make @@ -441,6 +449,18 @@ extern LOG_DESC log_record_type_descript typedef enum { + TRANSLOG_GCOMMIT_NONE, + TRANSLOG_GCOMMIT_HARD, + TRANSLOG_GCOMMIT_SOFT +} enum_maria_group_commit; +extern ulong maria_group_commit; +extern ulong maria_group_commit_rate; +/** + group commit interval is TRANSLOG_RATE_BASE/<rate> seconds +*/ +#define TRANSLOG_RATE_BASE 100 +typedef enum +{ TRANSLOG_PURGE_IMMIDIATE, TRANSLOG_PURGE_EXTERNAL, TRANSLOG_PURGE_ONDEMAND
Hi!
"sanja" == sanja <sanja@askmonty.org> writes:
sanja> #At lp:maria/5.2 sanja> 2725 sanja@askmonty.org 2009-07-07 sanja> Group commit and small optimisation of flush log (in case of page without CRC and sector protection) added. (for review) <cut> sanja> === modified file 'storage/maria/ha_maria.cc' <cut> sanja> +static void update_maria_group_commit(MYSQL_THD thd, sanja> + struct st_mysql_sys_var *var, sanja> + void *var_ptr, const void *save) sanja> +{ sanja> + ulong value= (ulong)*((long *)var_ptr); sanja> + DBUG_ENTER("update_maria_group_commit"); sanja> + DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu", sanja> + value, (ulong)(*(long *)save), sanja> + maria_group_commit_rate)); sanja> + /* old value */ sanja> + switch (value) { sanja> + case TRANSLOG_GCOMMIT_NONE: sanja> + break; sanja> + case TRANSLOG_GCOMMIT_HARD: sanja> + translog_hard_group_commit(FALSE); sanja> + break; sanja> + case TRANSLOG_GCOMMIT_SOFT: sanja> + translog_soft_sync(FALSE); sanja> + if (maria_group_commit_rate) sanja> + translog_soft_sync_end(); sanja> + break; sanja> + default: sanja> + DBUG_ASSERT(0); /* impossible */ sanja> + } sanja> + value= *(ulong *)var_ptr= (ulong)(*(long *)save); sanja> + translog_sync(); sanja> + /* new value */ sanja> + switch (value) { sanja> + case TRANSLOG_GCOMMIT_NONE: sanja> + break; sanja> + case TRANSLOG_GCOMMIT_HARD: sanja> + translog_hard_group_commit(TRUE); sanja> + break; sanja> + case TRANSLOG_GCOMMIT_SOFT: sanja> + translog_soft_sync(TRUE); sanja> + /* variable change made under global lock so we can just read it */ sanja> + if (maria_group_commit_rate) sanja> + translog_soft_sync_start(); sanja> + break; sanja> + default: sanja> + DBUG_ASSERT(0); /* impossible */ sanja> + } sanja> + DBUG_VOID_RETURN; sanja> +} A small optimization: wouldn't it be good to read the new value at start and just return if value didn't change ? sanja> +/** sanja> + @brief Updates group commit rate sanja> +*/ sanja> + sanja> +static void update_maria_group_commit_rate(MYSQL_THD thd, sanja> + struct st_mysql_sys_var *var, sanja> + void *var_ptr, const void *save) sanja> +{ sanja> + ulong new_value= (ulong)*((long *)save); sanja> + ulong *value_ptr= (ulong*) var_ptr; sanja> + DBUG_ENTER("update_maria_group_commit_rate"); sanja> + DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu", sanja> + *value_ptr, new_value, maria_group_commit)); sanja> + if (new_value && sanja> + ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + sanja> + TRANSLOG_RATE_BASE / 2) / sanja> + TRANSLOG_RATE_BASE) == 0) sanja> + new_value= 0; /* protection against too small value */ Note the ULL is not portable to older windows versions. You have to use ULL(1000000000) The forumla is also a bit complex to understand. It's also the same as: ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + TRANSLOG_RATE_BASE / 2) / TRANSLOG_RATE_BASE) == 0) -> ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + TRANSLOG_RATE_BASE / 2) < TRANSLOG_RATE_BASE) -> ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value < TRANSLOG_RATE_BASE / 2) -> ((1000000000ULL / new_value < 1/2)) -> new_value/2 > 1000000000ULL -> new_value > 500000000ULL Which doesn't look correct as this doesn't have TRANSLOG_RATE_BASE anywhere. The idea (as dicussed on IRC) is to ensure that: wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate + TRANSLOG_RATE_BASE / 2) / TRANSLOG_RATE_BASE); And that wait_time * TRANSLOG_RATE_BASE fits in an uint32 After some discussion on IRC Sanja and me decided to change the code and instead use microseconds to wait directly. (We already have spent way to much time trying to understand the above and similar code) That means, that in this case we should limit the number of microseconds to 1 minute (syncing less often than once a minute would be stupid); This limit you probably don't have to check here as it would be automaticly checked by getopt. <cut> sanja> +++ b/storage/maria/ma_init.c 2009-07-07 00:37:23 +0000 sanja> @@ -82,6 +82,11 @@ void maria_end(void) sanja> maria_inited= maria_multi_threaded= FALSE; sanja> ft_free_stopwords(); sanja> ma_checkpoint_end(); sanja> + if (translog_status == TRANSLOG_OK) sanja> + { sanja> + translog_soft_sync_end(); sanja> + translog_sync(); sanja> + } Why do we have to call translog_sync() when we end usage of maria ? Note that in maria_init() you should set soft_sync and other variables as in case of internal mysqld restart (like for embedded server) you can't trust the initial value for variables. sanja> +++ b/storage/maria/ma_loghandler.c 2009-07-07 00:37:23 +0000 sanja> @@ -18,6 +18,7 @@ sanja> #include "ma_blockrec.h" /* for some constants and in-write hooks */ sanja> #include "ma_key_recover.h" /* For some in-write hooks */ sanja> #include "ma_checkpoint.h" sanja> +#include "ma_servicethread.h" Please call the file 'ma_service_thread.h' <cut> sanja> + /* sanja> + How much data was skipped during moving page from previous buffer sanja> + to this one (it is optimisation of forcing buffer to finish sanja> + */ sanja> + uint skipped_data; Change comment to: /* When moving from one log buffer to another, we write the last of the previous buffer to file and then move to start using the new log buffer. In the case of a part filed last page, this page is not moved to the start of the new buffer but instead we set the 'skip_data' variable to tell us how much data at the beginning of the buffer is not relevant. */ Change also variable to 'skiped_data' or 'not_initialized_data' (Over time we should change all the old skipp variables to skip...) sanja> @@ -1069,6 +1121,7 @@ static my_bool translog_write_file_heade sanja> static my_bool translog_max_lsn_to_header(File file, LSN lsn) sanja> { sanja> uchar lsn_buff[LSN_STORE_SIZE]; sanja> + my_bool rc; sanja> DBUG_ENTER("translog_max_lsn_to_header"); sanja> DBUG_PRINT("enter", ("File descriptor: %ld " sanja> "lsn: (%lu,0x%lx)", sanja> @@ -1077,11 +1130,13 @@ static my_bool translog_max_lsn_to_heade sanja> lsn_store(lsn_buff, lsn); sanja> - DBUG_RETURN(my_pwrite(file, lsn_buff, sanja> - LSN_STORE_SIZE, sanja> - (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), sanja> - log_write_flags) != 0 || sanja> - my_sync(file, MYF(MY_WME)) != 0); sanja> + if (!(rc= (my_pwrite(file, lsn_buff, sanja> + LSN_STORE_SIZE, sanja> + (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), sanja> + log_write_flags) != 0 || sanja> + my_sync(file, MYF(MY_WME)) != 0))) sanja> + translog_syncs++; sanja> + DBUG_RETURN(rc); sanja> } Good to not count syncs in case of errors, but as this is such an unlikely occurrence it would have been ok to always increment the variable and thus avoid one 'if' and branch. (This is after all a statistical variable) No need to change (if you don't want to :) sanja> @@ -2557,6 +2621,8 @@ static my_bool translog_buffer_flush(str sanja> disk sanja> */ sanja> file= buffer->file; sanja> + skipped_data= buffer->skipped_data; sanja> + DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE); sanja> for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE; sanja> i < buffer->size; sanja> i+= TRANSLOG_PAGE_SIZE, pg++) sanja> @@ -2573,13 +2639,16 @@ static my_bool translog_buffer_flush(str sanja> DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); sanja> if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN) sanja> DBUG_RETURN(1); sanja> - if (pagecache_inject(log_descriptor.pagecache, sanja> + if (pagecache_write_part(log_descriptor.pagecache, sanja> &file->handler, pg, 3, sanja> buffer->buffer + i, sanja> PAGECACHE_PLAIN_PAGE, sanja> PAGECACHE_LOCK_LEFT_UNLOCKED, sanja> - PAGECACHE_PIN_LEFT_UNPINNED, 0, sanja> - LSN_IMPOSSIBLE)) sanja> + PAGECACHE_PIN_LEFT_UNPINNED, sanja> + PAGECACHE_WRITE_DONE, 0, sanja> + LSN_IMPOSSIBLE, sanja> + skipped_data, sanja> + TRANSLOG_PAGE_SIZE - skipped_data)) sanja> { sanja> DBUG_PRINT("error", sanja> ("Can't write page (%lu,0x%lx) to pagecache, error: %d", sanja> @@ -2589,10 +2658,12 @@ static my_bool translog_buffer_flush(str sanja> translog_stop_writing(); sanja> DBUG_RETURN(1); sanja> } sanja> + skipped_data= 0; sanja> } sanja> file->is_sync= 0; sanja> - if (my_pwrite(file->handler.file, buffer->buffer, sanja> - buffer->size, LSN_OFFSET(buffer->offset), sanja> + if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data, sanja> + buffer->size - buffer->skipped_data, sanja> + LSN_OFFSET(buffer->offset) + buffer->skipped_data, sanja> log_write_flags)) sanja> { sanja> DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu " sanja> @@ -2985,6 +3056,7 @@ restart: sanja> uchar *from, *table= NULL; sanja> int is_last_unfinished_page; sanja> uint last_protected_sector= 0; sanja> + uint skipped_data= curr_buffer->skipped_data; sanja> TRANSLOG_FILE file_copy; sanja> uint8 ver= curr_buffer->ver; sanja> translog_wait_for_writers(curr_buffer); sanja> @@ -2997,7 +3069,25 @@ restart: sanja> } sanja> DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset)); sanja> from= curr_buffer->buffer + (addr - curr_buffer->offset); sanja> - memcpy(buffer, from, TRANSLOG_PAGE_SIZE); sanja> + if (skipped_data > (addr - curr_buffer->offset)) Change test to: /* If first page in buffer and skipped_date <> 0 */ if (skipped_data && addr == curr_buffer->offset) As this is easier to understand. sanja> + { sanja> + /* sanja> + We read page part of which is not present in buffer, sanja> + so we should read absent part from file (page cache actually) sanja> + */ sanja> + file= get_logfile_by_number(file_no); sanja> + DBUG_ASSERT(file != NULL); sanja> + buffer= pagecache_read(log_descriptor.pagecache, &file->handler, sanja> + LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE, sanja> + 3, buffer, sanja> + PAGECACHE_PLAIN_PAGE, sanja> + PAGECACHE_LOCK_LEFT_UNLOCKED, sanja> + NULL); I assume it's ok to not lock the page because: - The log handler has it's own page cache. - There is only one thread that can access the log cache at a time ? If this is correct, then please add this as a comment so that we know that we have to fix this if the above changes. sanja> + } sanja> + else sanja> + skipped_data= 0; /* Read after skipped in buffer data */ sanja> + memcpy(buffer + skipped_data, from + skipped_data, sanja> + TRANSLOG_PAGE_SIZE - skipped_data); Took me some time to understand why we set skipped_data above. It could help if we add a comment after the pagecache_read: /* Now we have correct data in buffer up to 'skipped_data'. The following memcpy() will move the data from the internal buffer that was not yet on disk. */ sanja> +static my_bool translog_sync_files(uint32 min, uint32 max, <cut> sanja> + flush_interval= group_commit_wait; sanja> + if (flush_interval) sanja> + flush_start= my_micro_time(); sanja> + for (fn= min; fn <= max; fn++) sanja> { sanja> + TRANSLOG_FILE *file= get_logfile_by_number(fn); sanja> + DBUG_ASSERT(file != NULL); Actually, no reason to have assert here as following if will die anyway if file is NULL. sanja> + if (!file->is_sync) <cut> sanja> +my_bool translog_flush(TRANSLOG_ADDRESS lsn) sanja> +{ <cut> sanja> + if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) sanja> + sanja> + Remove above two empty lines sanja> + { sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + DBUG_RETURN(0); sanja> } <cut> sanja> + for (;;) sanja> + { sanja> + /* Following function flushes buffers and makes translog_unlock() */ sanja> + translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon); sanja> + sanja> + if (!hgroup_commit_at_start) sanja> + break; /* flush pass is ended */ sanja> + sanja> +retest: sanja> + if (flush_interval != 0 && sanja> + (my_micro_time() - flush_start) >= flush_interval) sanja> + break; /* flush pass is ended */ sanja> + sanja> + pthread_mutex_lock(&log_descriptor.log_flush_lock); sanja> + if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE) sanja> + { sanja> + /* take next goal */ sanja> + lsn= log_descriptor.next_pass_max_lsn; sanja> + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; sanja> + /* prevent other thread from continue */ sanja> + log_descriptor.max_lsn_requester= pthread_self(); sanja> + DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)", sanja> + LSN_IN_PARTS(lsn))); sanja> + } sanja> + else sanja> { sanja> + if (flush_interval == 0 || sanja> + (time_spent= (my_micro_time() - flush_start)) >= flush_interval) sanja> { sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + break; sanja> } sanja> + DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu", sanja> + flush_interval - time_spent, sanja> + flush_interval, time_spent)); sanja> + /* wait time or next goal */ sanja> + set_timespec_nsec(abstime, flush_interval - time_spent); sanja> + pthread_cond_timedwait(&log_descriptor.new_goal_cond, sanja> + &log_descriptor.log_flush_lock, sanja> + &abstime); sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + DBUG_PRINT("info", ("retest conditions")); sanja> + goto retest; sanja> } If you invert the above if to: if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE) and move the above code up inside the if, the code will be clearer and you can remove the else part. Another optimization, assuming that log_descriptor.log_flush_lock() is never hold over a slow operation, then you could move the setting of time_spent to the first call to my_micro_time() and then just have the second test as: if (flush_interval == 0) break; It's not likely that in the few cases where we don't get the mutex_lock() at once, that there will be a significant difference in the microtime value. (The extra call to microtime() may make things slower than what we win by trying to do calculations 100 % exact). sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + sanja> + /* next flush pass */ sanja> + DBUG_PRINT("info", ("next flush pass")); sanja> + translog_lock(); sanja> } sanja> + /* sanja> + sync() files from previous flush till current one sanja> + */ sanja> + if (!soft_sync || hgroup_commit_at_start) sanja> + { sanja> + if ((rc= sanja> + translog_sync_files(LSN_FILE_NO(log_descriptor.flushed), sanja> + LSN_FILE_NO(lsn), sanja> + sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && sanja> + (LSN_FILE_NO(log_descriptor. sanja> + previous_flush_horizon) != sanja> + LSN_FILE_NO(flush_horizon) || sanja> + (LSN_OFFSET(log_descriptor. sanja> + previous_flush_horizon) / sanja> + TRANSLOG_PAGE_SIZE) != sanja> + (LSN_OFFSET(flush_horizon) / sanja> + TRANSLOG_PAGE_SIZE))))) sanja> + { sanja> + sent_to_disk= LSN_IMPOSSIBLE; sanja> + pthread_mutex_lock(&log_descriptor.log_flush_lock); sanja> + goto out; sanja> + } sanja> + /* keep values for soft sync() and forced sync() actual */ sanja> + { sanja> + uint32 fileno= LSN_FILE_NO(lsn); sanja> + my_atomic_rwlock_wrlock(&soft_sync_rwl); sanja> + my_atomic_store32(&soft_sync_min, fileno); sanja> + my_atomic_store32(&soft_sync_max, fileno); sanja> + my_atomic_rwlock_wrunlock(&soft_sync_rwl); Don't understand why my_atomic_rwlock_wrlock is enough protection here. Assuming we are only using atomic operations, doesn't the above code give a chance that someone could read unrelated soft_sync_min and soft_sync_max values? sanja> + } sanja> + } sanja> + else sanja> + { sanja> + my_atomic_rwlock_wrlock(&soft_sync_rwl); sanja> + my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn)); sanja> + my_atomic_rwlock_wrunlock(&soft_sync_rwl); Do we really need a lock to store one variable? what is the problem with just doing: soft_sync_max= LSN_FILE_NO(lsn); As after all, only one thread can be here at once. sanja> @@ -8113,6 +8428,8 @@ LSN translog_first_theoretical_lsn() sanja> my_bool translog_purge(TRANSLOG_ADDRESS low) sanja> { sanja> uint32 last_need_file= LSN_FILE_NO(low); sanja> + uint32 min_unsync; sanja> + int soft; sanja> TRANSLOG_ADDRESS horizon= translog_get_horizon(); sanja> int rc= 0; sanja> DBUG_ENTER("translog_purge"); sanja> @@ -8120,12 +8437,23 @@ my_bool translog_purge(TRANSLOG_ADDRESS sanja> DBUG_ASSERT(translog_status == TRANSLOG_OK || sanja> translog_status == TRANSLOG_READONLY); sanja> + soft= soft_sync; sanja> + DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync)); sanja> + if (soft && min_unsync < last_need_file) sanja> + { sanja> + last_need_file= min_unsync; sanja> + DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file)); sanja> + } The above must be a bug as you are never giving min_unsync a value. <cut> sanja> +void translog_sync() sanja> +{ sanja> + uint32 max= get_current_logfile()->number; sanja> + uint32 min; sanja> + DBUG_ENTER("ma_translog_sync"); sanja> + sanja> + my_atomic_rwlock_rdlock(&soft_sync_rwl); sanja> + min= my_atomic_load32(&soft_sync_min); sanja> + my_atomic_rwlock_rdunlock(&soft_sync_rwl); Don't understand why you need to do atomic operations above: - You are only reading on value - get_current_logfile() is read without a mutex, so the values are already independent. <cut> sanja> +void translog_set_group_commit_rate(uint32 rate) sanja> +{ sanja> + DBUG_ENTER("translog_set_group_commit_rate"); sanja> + ulonglong wait_time; sanja> + if (rate) sanja> + { sanja> + wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate + sanja> + TRANSLOG_RATE_BASE / 2) / sanja> + TRANSLOG_RATE_BASE); sanja> + if (wait_time == 0) sanja> + wait_time= 1; /* protection from getting special value */ sanja> + } sanja> + else sanja> + wait_time= 0; sanja> + group_commit_wait= wait_time; sanja> + DBUG_PRINT("info", ("rate: %lu wait: %llu", sanja> + (ulong)rate, (ulonglong)wait_time)); sanja> + DBUG_VOID_RETURN; sanja> +} Change to use microseconds instead of rate. sanja> + sanja> +/** sanja> + @brief syncing service thread sanja> +*/ sanja> + sanja> +static pthread_handler_t sanja> +ma_soft_sync_background( void *arg __attribute__((unused))) sanja> +{ sanja> + sanja> + my_thread_init(); sanja> + { sanja> + DBUG_ENTER("ma_soft_sync_background"); sanja> + for(;;) sanja> + { sanja> + ulonglong prev_loop= my_micro_time(); sanja> + ulonglong time, sleep; sanja> + uint32 min, max; sanja> + my_atomic_rwlock_rdlock(&soft_sync_rwl); sanja> + min= my_atomic_load32(&soft_sync_min); sanja> + max= my_atomic_load32(&soft_sync_max); sanja> + my_atomic_store32(&soft_sync_min, max); sanja> + my_atomic_rwlock_rdunlock(&soft_sync_rwl); Don't still understand what ensures that the above operations are safe from other threads as my_atomic_rwlock_rdlock() and my_atomic_rwlock_rdunlock() may be dummy operations. Please check the code and logic with Serg (if you haven't already done that). sanja> + sanja> + sleep= group_commit_wait * TRANSLOG_RATE_BASE; sanja> + translog_sync_files(min, max, FALSE); It would be good to have a test above that if there is nothing to sync, we don't call translog_sync_files(). sanja> + time= my_micro_time() - prev_loop; sanja> + if (time > sleep) sanja> + sleep= 0; sanja> + else sanja> + sleep-= time; sanja> + if (my_service_thread_sleep(&soft_sync_control, sleep)) sanja> + break; sanja> + } sanja> + my_service_thread_signal_end(&soft_sync_control); sanja> + my_thread_end(); sanja> + DBUG_RETURN(0); sanja> + } sanja> +} <cut> Regards, Monty
participants (2)
-
Michael Widenius
-
sanja@askmonty.org