Hi!
"sanja" == sanja <sanja@askmonty.org> writes:
sanja> #At lp:maria/5.2 sanja> 2725 sanja@askmonty.org 2009-07-07 sanja> Group commit and small optimisation of flush log (in case of page without CRC and sector protection) added. (for review) <cut> sanja> === modified file 'storage/maria/ha_maria.cc' <cut> sanja> +static void update_maria_group_commit(MYSQL_THD thd, sanja> + struct st_mysql_sys_var *var, sanja> + void *var_ptr, const void *save) sanja> +{ sanja> + ulong value= (ulong)*((long *)var_ptr); sanja> + DBUG_ENTER("update_maria_group_commit"); sanja> + DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu", sanja> + value, (ulong)(*(long *)save), sanja> + maria_group_commit_rate)); sanja> + /* old value */ sanja> + switch (value) { sanja> + case TRANSLOG_GCOMMIT_NONE: sanja> + break; sanja> + case TRANSLOG_GCOMMIT_HARD: sanja> + translog_hard_group_commit(FALSE); sanja> + break; sanja> + case TRANSLOG_GCOMMIT_SOFT: sanja> + translog_soft_sync(FALSE); sanja> + if (maria_group_commit_rate) sanja> + translog_soft_sync_end(); sanja> + break; sanja> + default: sanja> + DBUG_ASSERT(0); /* impossible */ sanja> + } sanja> + value= *(ulong *)var_ptr= (ulong)(*(long *)save); sanja> + translog_sync(); sanja> + /* new value */ sanja> + switch (value) { sanja> + case TRANSLOG_GCOMMIT_NONE: sanja> + break; sanja> + case TRANSLOG_GCOMMIT_HARD: sanja> + translog_hard_group_commit(TRUE); sanja> + break; sanja> + case TRANSLOG_GCOMMIT_SOFT: sanja> + translog_soft_sync(TRUE); sanja> + /* variable change made under global lock so we can just read it */ sanja> + if (maria_group_commit_rate) sanja> + translog_soft_sync_start(); sanja> + break; sanja> + default: sanja> + DBUG_ASSERT(0); /* impossible */ sanja> + } sanja> + DBUG_VOID_RETURN; sanja> +} A small optimization: wouldn't it be good to read the new value at start and just return if value didn't change ? sanja> +/** sanja> + @brief Updates group commit rate sanja> +*/ sanja> + sanja> +static void update_maria_group_commit_rate(MYSQL_THD thd, sanja> + struct st_mysql_sys_var *var, sanja> + void *var_ptr, const void *save) sanja> +{ sanja> + ulong new_value= (ulong)*((long *)save); sanja> + ulong *value_ptr= (ulong*) var_ptr; sanja> + DBUG_ENTER("update_maria_group_commit_rate"); sanja> + DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu", sanja> + *value_ptr, new_value, maria_group_commit)); sanja> + if (new_value && sanja> + ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + sanja> + TRANSLOG_RATE_BASE / 2) / sanja> + TRANSLOG_RATE_BASE) == 0) sanja> + new_value= 0; /* protection against too small value */ Note the ULL is not portable to older windows versions. You have to use ULL(1000000000) The forumla is also a bit complex to understand. It's also the same as: ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + TRANSLOG_RATE_BASE / 2) / TRANSLOG_RATE_BASE) == 0) -> ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + TRANSLOG_RATE_BASE / 2) < TRANSLOG_RATE_BASE) -> ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value < TRANSLOG_RATE_BASE / 2) -> ((1000000000ULL / new_value < 1/2)) -> new_value/2 > 1000000000ULL -> new_value > 500000000ULL Which doesn't look correct as this doesn't have TRANSLOG_RATE_BASE anywhere. The idea (as dicussed on IRC) is to ensure that: wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate + TRANSLOG_RATE_BASE / 2) / TRANSLOG_RATE_BASE); And that wait_time * TRANSLOG_RATE_BASE fits in an uint32 After some discussion on IRC Sanja and me decided to change the code and instead use microseconds to wait directly. (We already have spent way to much time trying to understand the above and similar code) That means, that in this case we should limit the number of microseconds to 1 minute (syncing less often than once a minute would be stupid); This limit you probably don't have to check here as it would be automaticly checked by getopt. <cut> sanja> +++ b/storage/maria/ma_init.c 2009-07-07 00:37:23 +0000 sanja> @@ -82,6 +82,11 @@ void maria_end(void) sanja> maria_inited= maria_multi_threaded= FALSE; sanja> ft_free_stopwords(); sanja> ma_checkpoint_end(); sanja> + if (translog_status == TRANSLOG_OK) sanja> + { sanja> + translog_soft_sync_end(); sanja> + translog_sync(); sanja> + } Why do we have to call translog_sync() when we end usage of maria ? Note that in maria_init() you should set soft_sync and other variables as in case of internal mysqld restart (like for embedded server) you can't trust the initial value for variables. sanja> +++ b/storage/maria/ma_loghandler.c 2009-07-07 00:37:23 +0000 sanja> @@ -18,6 +18,7 @@ sanja> #include "ma_blockrec.h" /* for some constants and in-write hooks */ sanja> #include "ma_key_recover.h" /* For some in-write hooks */ sanja> #include "ma_checkpoint.h" sanja> +#include "ma_servicethread.h" Please call the file 'ma_service_thread.h' <cut> sanja> + /* sanja> + How much data was skipped during moving page from previous buffer sanja> + to this one (it is optimisation of forcing buffer to finish sanja> + */ sanja> + uint skipped_data; Change comment to: /* When moving from one log buffer to another, we write the last of the previous buffer to file and then move to start using the new log buffer. In the case of a part filed last page, this page is not moved to the start of the new buffer but instead we set the 'skip_data' variable to tell us how much data at the beginning of the buffer is not relevant. */ Change also variable to 'skiped_data' or 'not_initialized_data' (Over time we should change all the old skipp variables to skip...) sanja> @@ -1069,6 +1121,7 @@ static my_bool translog_write_file_heade sanja> static my_bool translog_max_lsn_to_header(File file, LSN lsn) sanja> { sanja> uchar lsn_buff[LSN_STORE_SIZE]; sanja> + my_bool rc; sanja> DBUG_ENTER("translog_max_lsn_to_header"); sanja> DBUG_PRINT("enter", ("File descriptor: %ld " sanja> "lsn: (%lu,0x%lx)", sanja> @@ -1077,11 +1130,13 @@ static my_bool translog_max_lsn_to_heade sanja> lsn_store(lsn_buff, lsn); sanja> - DBUG_RETURN(my_pwrite(file, lsn_buff, sanja> - LSN_STORE_SIZE, sanja> - (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), sanja> - log_write_flags) != 0 || sanja> - my_sync(file, MYF(MY_WME)) != 0); sanja> + if (!(rc= (my_pwrite(file, lsn_buff, sanja> + LSN_STORE_SIZE, sanja> + (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), sanja> + log_write_flags) != 0 || sanja> + my_sync(file, MYF(MY_WME)) != 0))) sanja> + translog_syncs++; sanja> + DBUG_RETURN(rc); sanja> } Good to not count syncs in case of errors, but as this is such an unlikely occurrence it would have been ok to always increment the variable and thus avoid one 'if' and branch. (This is after all a statistical variable) No need to change (if you don't want to :) sanja> @@ -2557,6 +2621,8 @@ static my_bool translog_buffer_flush(str sanja> disk sanja> */ sanja> file= buffer->file; sanja> + skipped_data= buffer->skipped_data; sanja> + DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE); sanja> for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE; sanja> i < buffer->size; sanja> i+= TRANSLOG_PAGE_SIZE, pg++) sanja> @@ -2573,13 +2639,16 @@ static my_bool translog_buffer_flush(str sanja> DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); sanja> if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN) sanja> DBUG_RETURN(1); sanja> - if (pagecache_inject(log_descriptor.pagecache, sanja> + if (pagecache_write_part(log_descriptor.pagecache, sanja> &file->handler, pg, 3, sanja> buffer->buffer + i, sanja> PAGECACHE_PLAIN_PAGE, sanja> PAGECACHE_LOCK_LEFT_UNLOCKED, sanja> - PAGECACHE_PIN_LEFT_UNPINNED, 0, sanja> - LSN_IMPOSSIBLE)) sanja> + PAGECACHE_PIN_LEFT_UNPINNED, sanja> + PAGECACHE_WRITE_DONE, 0, sanja> + LSN_IMPOSSIBLE, sanja> + skipped_data, sanja> + TRANSLOG_PAGE_SIZE - skipped_data)) sanja> { sanja> DBUG_PRINT("error", sanja> ("Can't write page (%lu,0x%lx) to pagecache, error: %d", sanja> @@ -2589,10 +2658,12 @@ static my_bool translog_buffer_flush(str sanja> translog_stop_writing(); sanja> DBUG_RETURN(1); sanja> } sanja> + skipped_data= 0; sanja> } sanja> file->is_sync= 0; sanja> - if (my_pwrite(file->handler.file, buffer->buffer, sanja> - buffer->size, LSN_OFFSET(buffer->offset), sanja> + if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data, sanja> + buffer->size - buffer->skipped_data, sanja> + LSN_OFFSET(buffer->offset) + buffer->skipped_data, sanja> log_write_flags)) sanja> { sanja> DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu " sanja> @@ -2985,6 +3056,7 @@ restart: sanja> uchar *from, *table= NULL; sanja> int is_last_unfinished_page; sanja> uint last_protected_sector= 0; sanja> + uint skipped_data= curr_buffer->skipped_data; sanja> TRANSLOG_FILE file_copy; sanja> uint8 ver= curr_buffer->ver; sanja> translog_wait_for_writers(curr_buffer); sanja> @@ -2997,7 +3069,25 @@ restart: sanja> } sanja> DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset)); sanja> from= curr_buffer->buffer + (addr - curr_buffer->offset); sanja> - memcpy(buffer, from, TRANSLOG_PAGE_SIZE); sanja> + if (skipped_data > (addr - curr_buffer->offset)) Change test to: /* If first page in buffer and skipped_date <> 0 */ if (skipped_data && addr == curr_buffer->offset) As this is easier to understand. sanja> + { sanja> + /* sanja> + We read page part of which is not present in buffer, sanja> + so we should read absent part from file (page cache actually) sanja> + */ sanja> + file= get_logfile_by_number(file_no); sanja> + DBUG_ASSERT(file != NULL); sanja> + buffer= pagecache_read(log_descriptor.pagecache, &file->handler, sanja> + LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE, sanja> + 3, buffer, sanja> + PAGECACHE_PLAIN_PAGE, sanja> + PAGECACHE_LOCK_LEFT_UNLOCKED, sanja> + NULL); I assume it's ok to not lock the page because: - The log handler has it's own page cache. - There is only one thread that can access the log cache at a time ? If this is correct, then please add this as a comment so that we know that we have to fix this if the above changes. sanja> + } sanja> + else sanja> + skipped_data= 0; /* Read after skipped in buffer data */ sanja> + memcpy(buffer + skipped_data, from + skipped_data, sanja> + TRANSLOG_PAGE_SIZE - skipped_data); Took me some time to understand why we set skipped_data above. It could help if we add a comment after the pagecache_read: /* Now we have correct data in buffer up to 'skipped_data'. The following memcpy() will move the data from the internal buffer that was not yet on disk. */ sanja> +static my_bool translog_sync_files(uint32 min, uint32 max, <cut> sanja> + flush_interval= group_commit_wait; sanja> + if (flush_interval) sanja> + flush_start= my_micro_time(); sanja> + for (fn= min; fn <= max; fn++) sanja> { sanja> + TRANSLOG_FILE *file= get_logfile_by_number(fn); sanja> + DBUG_ASSERT(file != NULL); Actually, no reason to have assert here as following if will die anyway if file is NULL. sanja> + if (!file->is_sync) <cut> sanja> +my_bool translog_flush(TRANSLOG_ADDRESS lsn) sanja> +{ <cut> sanja> + if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) sanja> + sanja> + Remove above two empty lines sanja> + { sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + DBUG_RETURN(0); sanja> } <cut> sanja> + for (;;) sanja> + { sanja> + /* Following function flushes buffers and makes translog_unlock() */ sanja> + translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon); sanja> + sanja> + if (!hgroup_commit_at_start) sanja> + break; /* flush pass is ended */ sanja> + sanja> +retest: sanja> + if (flush_interval != 0 && sanja> + (my_micro_time() - flush_start) >= flush_interval) sanja> + break; /* flush pass is ended */ sanja> + sanja> + pthread_mutex_lock(&log_descriptor.log_flush_lock); sanja> + if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE) sanja> + { sanja> + /* take next goal */ sanja> + lsn= log_descriptor.next_pass_max_lsn; sanja> + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; sanja> + /* prevent other thread from continue */ sanja> + log_descriptor.max_lsn_requester= pthread_self(); sanja> + DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)", sanja> + LSN_IN_PARTS(lsn))); sanja> + } sanja> + else sanja> { sanja> + if (flush_interval == 0 || sanja> + (time_spent= (my_micro_time() - flush_start)) >= flush_interval) sanja> { sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + break; sanja> } sanja> + DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu", sanja> + flush_interval - time_spent, sanja> + flush_interval, time_spent)); sanja> + /* wait time or next goal */ sanja> + set_timespec_nsec(abstime, flush_interval - time_spent); sanja> + pthread_cond_timedwait(&log_descriptor.new_goal_cond, sanja> + &log_descriptor.log_flush_lock, sanja> + &abstime); sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + DBUG_PRINT("info", ("retest conditions")); sanja> + goto retest; sanja> } If you invert the above if to: if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE) and move the above code up inside the if, the code will be clearer and you can remove the else part. Another optimization, assuming that log_descriptor.log_flush_lock() is never hold over a slow operation, then you could move the setting of time_spent to the first call to my_micro_time() and then just have the second test as: if (flush_interval == 0) break; It's not likely that in the few cases where we don't get the mutex_lock() at once, that there will be a significant difference in the microtime value. (The extra call to microtime() may make things slower than what we win by trying to do calculations 100 % exact). sanja> + pthread_mutex_unlock(&log_descriptor.log_flush_lock); sanja> + sanja> + /* next flush pass */ sanja> + DBUG_PRINT("info", ("next flush pass")); sanja> + translog_lock(); sanja> } sanja> + /* sanja> + sync() files from previous flush till current one sanja> + */ sanja> + if (!soft_sync || hgroup_commit_at_start) sanja> + { sanja> + if ((rc= sanja> + translog_sync_files(LSN_FILE_NO(log_descriptor.flushed), sanja> + LSN_FILE_NO(lsn), sanja> + sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && sanja> + (LSN_FILE_NO(log_descriptor. sanja> + previous_flush_horizon) != sanja> + LSN_FILE_NO(flush_horizon) || sanja> + (LSN_OFFSET(log_descriptor. sanja> + previous_flush_horizon) / sanja> + TRANSLOG_PAGE_SIZE) != sanja> + (LSN_OFFSET(flush_horizon) / sanja> + TRANSLOG_PAGE_SIZE))))) sanja> + { sanja> + sent_to_disk= LSN_IMPOSSIBLE; sanja> + pthread_mutex_lock(&log_descriptor.log_flush_lock); sanja> + goto out; sanja> + } sanja> + /* keep values for soft sync() and forced sync() actual */ sanja> + { sanja> + uint32 fileno= LSN_FILE_NO(lsn); sanja> + my_atomic_rwlock_wrlock(&soft_sync_rwl); sanja> + my_atomic_store32(&soft_sync_min, fileno); sanja> + my_atomic_store32(&soft_sync_max, fileno); sanja> + my_atomic_rwlock_wrunlock(&soft_sync_rwl); Don't understand why my_atomic_rwlock_wrlock is enough protection here. Assuming we are only using atomic operations, doesn't the above code give a chance that someone could read unrelated soft_sync_min and soft_sync_max values? sanja> + } sanja> + } sanja> + else sanja> + { sanja> + my_atomic_rwlock_wrlock(&soft_sync_rwl); sanja> + my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn)); sanja> + my_atomic_rwlock_wrunlock(&soft_sync_rwl); Do we really need a lock to store one variable? what is the problem with just doing: soft_sync_max= LSN_FILE_NO(lsn); As after all, only one thread can be here at once. sanja> @@ -8113,6 +8428,8 @@ LSN translog_first_theoretical_lsn() sanja> my_bool translog_purge(TRANSLOG_ADDRESS low) sanja> { sanja> uint32 last_need_file= LSN_FILE_NO(low); sanja> + uint32 min_unsync; sanja> + int soft; sanja> TRANSLOG_ADDRESS horizon= translog_get_horizon(); sanja> int rc= 0; sanja> DBUG_ENTER("translog_purge"); sanja> @@ -8120,12 +8437,23 @@ my_bool translog_purge(TRANSLOG_ADDRESS sanja> DBUG_ASSERT(translog_status == TRANSLOG_OK || sanja> translog_status == TRANSLOG_READONLY); sanja> + soft= soft_sync; sanja> + DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync)); sanja> + if (soft && min_unsync < last_need_file) sanja> + { sanja> + last_need_file= min_unsync; sanja> + DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file)); sanja> + } The above must be a bug as you are never giving min_unsync a value. <cut> sanja> +void translog_sync() sanja> +{ sanja> + uint32 max= get_current_logfile()->number; sanja> + uint32 min; sanja> + DBUG_ENTER("ma_translog_sync"); sanja> + sanja> + my_atomic_rwlock_rdlock(&soft_sync_rwl); sanja> + min= my_atomic_load32(&soft_sync_min); sanja> + my_atomic_rwlock_rdunlock(&soft_sync_rwl); Don't understand why you need to do atomic operations above: - You are only reading on value - get_current_logfile() is read without a mutex, so the values are already independent. <cut> sanja> +void translog_set_group_commit_rate(uint32 rate) sanja> +{ sanja> + DBUG_ENTER("translog_set_group_commit_rate"); sanja> + ulonglong wait_time; sanja> + if (rate) sanja> + { sanja> + wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate + sanja> + TRANSLOG_RATE_BASE / 2) / sanja> + TRANSLOG_RATE_BASE); sanja> + if (wait_time == 0) sanja> + wait_time= 1; /* protection from getting special value */ sanja> + } sanja> + else sanja> + wait_time= 0; sanja> + group_commit_wait= wait_time; sanja> + DBUG_PRINT("info", ("rate: %lu wait: %llu", sanja> + (ulong)rate, (ulonglong)wait_time)); sanja> + DBUG_VOID_RETURN; sanja> +} Change to use microseconds instead of rate. sanja> + sanja> +/** sanja> + @brief syncing service thread sanja> +*/ sanja> + sanja> +static pthread_handler_t sanja> +ma_soft_sync_background( void *arg __attribute__((unused))) sanja> +{ sanja> + sanja> + my_thread_init(); sanja> + { sanja> + DBUG_ENTER("ma_soft_sync_background"); sanja> + for(;;) sanja> + { sanja> + ulonglong prev_loop= my_micro_time(); sanja> + ulonglong time, sleep; sanja> + uint32 min, max; sanja> + my_atomic_rwlock_rdlock(&soft_sync_rwl); sanja> + min= my_atomic_load32(&soft_sync_min); sanja> + max= my_atomic_load32(&soft_sync_max); sanja> + my_atomic_store32(&soft_sync_min, max); sanja> + my_atomic_rwlock_rdunlock(&soft_sync_rwl); Don't still understand what ensures that the above operations are safe from other threads as my_atomic_rwlock_rdlock() and my_atomic_rwlock_rdunlock() may be dummy operations. Please check the code and logic with Serg (if you haven't already done that). sanja> + sanja> + sleep= group_commit_wait * TRANSLOG_RATE_BASE; sanja> + translog_sync_files(min, max, FALSE); It would be good to have a test above that if there is nothing to sync, we don't call translog_sync_files(). sanja> + time= my_micro_time() - prev_loop; sanja> + if (time > sleep) sanja> + sleep= 0; sanja> + else sanja> + sleep-= time; sanja> + if (my_service_thread_sleep(&soft_sync_control, sleep)) sanja> + break; sanja> + } sanja> + my_service_thread_signal_end(&soft_sync_control); sanja> + my_thread_end(); sanja> + DBUG_RETURN(0); sanja> + } sanja> +} <cut> Regards, Monty