[Maria-developers] bzr commit into mariadb 5.1, with Maria 2.0:maria/2.0 branch (sanja:2722)
#At lp:maria/2.0 2722 sanja@askmonty.org 2009-03-04 Group commit (only for review). modified: mysql-test/suite/maria/r/maria3.result mysys/my_init.c storage/maria/ha_maria.cc storage/maria/ma_init.c storage/maria/ma_loghandler.c storage/maria/ma_loghandler.h tests/fork_big2.pl per-file messages: mysql-test/suite/maria/r/maria3.result new maria variables. mysys/my_init.c commented my_atomic_initialize() call create problem with linker but it should be here. storage/maria/ha_maria.cc Group commit controls added. storage/maria/ma_init.c Ending soft sync thread. storage/maria/ma_loghandler.c Group commit added. storage/maria/ma_loghandler.h Functions for controlling group commit. tests/fork_big2.pl Testing script fixed (now can run under Linux). === modified file 'mysql-test/suite/maria/r/maria3.result' --- a/mysql-test/suite/maria/r/maria3.result 2008-10-09 20:03:54 +0000 +++ b/mysql-test/suite/maria/r/maria3.result 2009-03-04 16:13:55 +0000 @@ -264,6 +264,8 @@ Variable_name Value maria_block_size 8192 maria_checkpoint_interval 30 maria_force_start_after_recovery_failures 0 +maria_group_commit none +maria_group_commit_rate 800 maria_log_file_size 4294959104 maria_log_purge_type immediate maria_max_sort_file_size 9223372036854775807 === modified file 'mysys/my_init.c' --- a/mysys/my_init.c 2008-12-10 09:02:25 +0000 +++ b/mysys/my_init.c 2009-03-04 16:13:55 +0000 @@ -40,6 +40,7 @@ static void netware_init(); #else #define netware_init() #endif +#include <my_atomic.h> my_bool my_init_done= 0; uint mysys_usage_id= 0; /* Incremented for each my_init() */ @@ -82,6 +83,10 @@ my_bool my_init(void) if (my_progname) my_progname_short= my_progname + dirname_length(my_progname); +/* + if (my_atomic_initialize()) + return 1; +*/ #if defined(THREAD) && defined(SAFE_MUTEX) safe_mutex_global_init(); /* Must be called early */ #endif === modified file 'storage/maria/ha_maria.cc' --- a/storage/maria/ha_maria.cc 2009-01-16 16:18:17 +0000 +++ b/storage/maria/ha_maria.cc 2009-03-04 16:13:55 +0000 @@ -101,22 +101,40 @@ TYPELIB maria_translog_purge_type_typeli array_elements(maria_translog_purge_type_names) - 1, "", maria_translog_purge_type_names, NULL }; + +/* transactional log directory sync */ const char *maria_sync_log_dir_names[]= { "NEVER", "NEWFILE", "ALWAYS", NullS }; - TYPELIB maria_sync_log_dir_typelib= { array_elements(maria_sync_log_dir_names) - 1, "", maria_sync_log_dir_names, NULL }; +/* transactional log group commit */ +const char *maria_group_commit_names[]= +{ + "none", "hard", "soft", NullS +}; +TYPELIB maria_group_commit_typelib= +{ + array_elements(maria_group_commit_names) - 1, "", + maria_group_commit_names, NULL +}; + /** Interval between background checkpoints in seconds */ static ulong checkpoint_interval; static void update_checkpoint_interval(MYSQL_THD thd, struct st_mysql_sys_var *var, void *var_ptr, const void *save); +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); +static void update_maria_group_commit_rate(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); /** After that many consecutive recovery failures, remove logs */ static ulong force_start_after_recovery_failures; static void update_log_file_size(MYSQL_THD thd, @@ -163,6 +181,22 @@ static MYSQL_SYSVAR_ULONG(log_file_size, NULL, update_log_file_size, TRANSLOG_FILE_SIZE, TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE); +static MYSQL_SYSVAR_ENUM(group_commit, group_commit, + PLUGIN_VAR_RQCMDARG, + "Specifies maria group commit mode. " + "Possible values are \"none\" (no group commit), " + "\"hard\" (with waiting to actual commit), " + "\"soft\" (no wait for commit (DANGEROUS!!!))", + NULL, update_maria_group_commit, + TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib); + +static MYSQL_SYSVAR_ULONG(group_commit_rate, group_commit_rate, + PLUGIN_VAR_RQCMDARG, + "If group commits switched on commit will happens with about every " + "100/maria_group_commit_rate second. 0 is special value which switch " + "rate off", + NULL, update_maria_group_commit_rate, 800, 0, UINT_MAX, 1); + static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type, PLUGIN_VAR_RQCMDARG, "Specifies how maria transactional log will be purged. " @@ -3247,6 +3281,8 @@ static struct st_mysql_sys_var* system_v MYSQL_SYSVAR(block_size), MYSQL_SYSVAR(checkpoint_interval), MYSQL_SYSVAR(force_start_after_recovery_failures), + MYSQL_SYSVAR(group_commit), + MYSQL_SYSVAR(group_commit_rate), MYSQL_SYSVAR(page_checksum), MYSQL_SYSVAR(log_dir_path), MYSQL_SYSVAR(log_file_size), @@ -3277,6 +3313,97 @@ static void update_checkpoint_interval(M } /** + @brief Updates group commit mode +*/ + +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong value= (ulong)*((long *)var_ptr); + DBUG_ENTER("update_maria_group_commit"); + DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu", + value, (ulong)(*(long *)save), group_commit_rate)); + /* old value */ + switch (value) + { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(FALSE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(FALSE); + if (group_commit_rate) + translog_soft_sync_end(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + value= *(ulong *)var_ptr= (ulong)(*(long *)save); + translog_sync(); + /* new value */ + switch (value) + { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(TRUE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(TRUE); + /* variable change made under global lock so we can just read it */ + if (group_commit_rate) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + +/** + @brief Updates group commit rate +*/ + +static void update_maria_group_commit_rate(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong new_value= (ulong)*((long *)save); + DBUG_ENTER("update_maria_group_commit_rate"); + DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu", + *(ulong *)var_ptr, new_value, group_commit)); + if (new_value && + ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value + + TRANSLOG_RATE_BASE / 2) / + TRANSLOG_RATE_BASE) == 0) + new_value= 0; /* protection against too small value */ + /* variable change made under global lock so we can just read it */ + switch (group_commit) + { + case TRANSLOG_GCOMMIT_NONE: + *(ulong *)var_ptr= new_value; + translog_set_group_commit_rate(new_value); + break; + case TRANSLOG_GCOMMIT_HARD: + *(ulong *)var_ptr= new_value; + translog_set_group_commit_rate(new_value); + break; + case TRANSLOG_GCOMMIT_SOFT: + if (*(ulong *)var_ptr) + translog_soft_sync_end(); + translog_set_group_commit_rate(new_value); + if ((*(ulong *)var_ptr= new_value)) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + +/** @brief Updates the transaction log file limit. */ === modified file 'storage/maria/ma_init.c' --- a/storage/maria/ma_init.c 2008-10-09 20:03:54 +0000 +++ b/storage/maria/ma_init.c 2009-03-04 16:13:55 +0000 @@ -82,6 +82,11 @@ void maria_end(void) maria_inited= maria_multi_threaded= FALSE; ft_free_stopwords(); ma_checkpoint_end(); + if (translog_status == TRANSLOG_OK) + { + translog_soft_sync_end(); + translog_sync(); + } if ((trid= trnman_get_max_trid()) > max_trid_in_control_file) { /* === modified file 'storage/maria/ma_loghandler.c' --- a/storage/maria/ma_loghandler.c 2009-01-16 09:38:02 +0000 +++ b/storage/maria/ma_loghandler.c 2009-03-04 16:13:55 +0000 @@ -18,6 +18,7 @@ #include "ma_blockrec.h" /* for some constants and in-write hooks */ #include "ma_key_recover.h" /* For some in-write hooks */ #include "ma_checkpoint.h" +#include "ma_servicethread.h" /* On Windows, neither my_open() nor my_sync() work for directories. @@ -47,6 +48,15 @@ #include <m_ctype.h> #endif +/** @brief protects checkpoint_in_progress */ +static pthread_mutex_t LOCK_soft_sync; +/** @brief for killing the background checkpoint thread */ +static pthread_cond_t COND_soft_sync; +/** @brief control structure for checkpoint background thread */ +static MA_SERVICE_THREAD_CONTROL soft_sync_control= + {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync}; + + /* transaction log file descriptor */ typedef struct st_translog_file { @@ -124,6 +134,11 @@ struct st_translog_buffer /* Previous buffer offset to detect it flush finish */ TRANSLOG_ADDRESS prev_buffer_offset; /* + If the buffer was forced to close it save value of its horizon + otherwise LSN_IMPOSSIBLE + */ + TRANSLOG_ADDRESS pre_force_close_horizon; + /* How much is written (or will be written when copy_to_buffer_in_progress become 0) to this buffer */ @@ -304,6 +319,7 @@ struct st_translog_descriptor */ pthread_mutex_t log_flush_lock; pthread_cond_t log_flush_cond; + pthread_cond_t new_goal_cond; /* Protects changing of headers of finished files (max_lsn) */ pthread_mutex_t file_header_lock; @@ -343,13 +359,40 @@ static struct st_translog_descriptor log ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE; ulong log_file_size= TRANSLOG_FILE_SIZE; ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE; +ulong group_commit= TRANSLOG_GCOMMIT_NONE; +ulong group_commit_rate= 0; /* Marker for end of log */ static uchar end_of_log= 0; #define END_OF_LOG &end_of_log +static my_atomic_rwlock_t soft_sync_rwl; +static my_atomic_rwlock_t hgroup_commit_rwl; +/** + Switch for "soft" sync (no real sync() but periodical sync by service + thread) +*/ +static volatile uint32 soft_sync= FALSE; +/** + Switch for "hard" group commit mode +*/ +static uint32 hgroup_commit= FALSE; +/** + File numbers interval which have to be sync() +*/ +static uint32 soft_sync_min= 0; +static uint32 soft_sync_max= 0; +static my_atomic_rwlock_t group_commit_wait_rwl; +/** + stores interval in nanoseconds/TRANSLOG_RATE_BASE (to + fit into uint32) +*/ +static uint32 group_commit_wait= 0; enum enum_translog_status translog_status= TRANSLOG_UNINITED; +/* time of last flush */ +static ulonglong flush_start= 0; + /* chunk types */ #define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */ #define TRANSLOG_CHUNK_FIXED (1 << 6) /* 1 (pseudo)fixed record (also LSN) */ @@ -978,12 +1021,17 @@ static TRANSLOG_FILE *get_logfile_by_num static TRANSLOG_FILE *get_current_logfile() { TRANSLOG_FILE *file; + DBUG_ENTER("get_current_logfile"); rw_rdlock(&log_descriptor.open_files_lock); + DBUG_PRINT("info", ("max_file: %lu min_file: %lu open_files: %lu", + (ulong) log_descriptor.max_file, + (ulong) log_descriptor.min_file, + (ulong) log_descriptor.open_files.elements)); DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 == log_descriptor.open_files.elements); file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **); rw_unlock(&log_descriptor.open_files_lock); - return (file); + DBUG_RETURN(file); } uchar NEAR maria_trans_file_magic[]= @@ -1421,7 +1469,9 @@ LSN translog_get_file_max_lsn_stored(uin static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num) { DBUG_ENTER("translog_buffer_init"); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= + LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); @@ -2042,7 +2092,8 @@ static void translog_start_buffer(struct (ulong) LSN_OFFSET(log_descriptor.horizon), (ulong) LSN_OFFSET(log_descriptor.horizon))); DBUG_ASSERT(buffer_no == buffer->buffer_no); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); buffer->offset= log_descriptor.horizon; @@ -3495,6 +3546,8 @@ my_bool translog_init_with_table(const c log_descriptor.open_flags= O_BINARY | O_RDONLY; else log_descriptor.open_flags= O_BINARY | O_RDWR; + my_atomic_rwlock_init(&soft_sync_rwl); + my_atomic_rwlock_init(&group_commit_wait_rwl); if (pthread_mutex_init(&log_descriptor.sent_to_disk_lock, MY_MUTEX_INIT_FAST) || pthread_mutex_init(&log_descriptor.file_header_lock, @@ -3508,6 +3561,7 @@ my_bool translog_init_with_table(const c pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock, MY_MUTEX_INIT_FAST) || pthread_cond_init(&log_descriptor.log_flush_cond, 0) || + pthread_cond_init(&log_descriptor.new_goal_cond, 0) || my_rwlock_init(&log_descriptor.open_files_lock, NULL) || my_init_dynamic_array(&log_descriptor.open_files, @@ -3909,7 +3963,6 @@ my_bool translog_init_with_table(const c log_descriptor.flushed= log_descriptor.horizon; log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset; log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */ - log_descriptor.previous_flush_horizon= log_descriptor.horizon; /* Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially) address of the next LSN and we want indicate that all LSNs that are @@ -3992,6 +4045,10 @@ my_bool translog_init_with_table(const c It is beginning of the log => there is no LSNs in the log => There is no harm in leaving it "as-is". */ + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor. + previous_flush_horizon))); DBUG_RETURN(0); } file_no--; @@ -4067,6 +4124,9 @@ my_bool translog_init_with_table(const c translog_free_record_header(&rec); } } + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.previous_flush_horizon))); DBUG_RETURN(0); err: ma_message_no_user(0, "log initialization failed"); @@ -4154,6 +4214,9 @@ void translog_destroy() pthread_mutex_destroy(&log_descriptor.log_flush_lock); pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock); pthread_cond_destroy(&log_descriptor.log_flush_cond); + pthread_cond_destroy(&log_descriptor.new_goal_cond); + my_atomic_rwlock_destroy(&soft_sync_rwl); + my_atomic_rwlock_destroy(&group_commit_wait_rwl); rwlock_destroy(&log_descriptor.open_files_lock); delete_dynamic(&log_descriptor.open_files); delete_dynamic(&log_descriptor.unfinished_files); @@ -7383,18 +7446,20 @@ static void translog_force_current_buffe */ DBUG_PRINT("info", ("left: %u", (uint) left)); + old_buffer->pre_force_close_horizon= + old_buffer->offset + old_buffer->size; /* decrease offset */ new_buff_beginning-= log_descriptor.bc.current_page_fill; current_page_fill= log_descriptor.bc.current_page_fill; memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left); - log_descriptor.bc.buffer->size+= left; + old_buffer->size+= left; DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx " "Size: %lu", - (uint) log_descriptor.bc.buffer->buffer_no, - (ulong) log_descriptor.bc.buffer, - (ulong) log_descriptor.bc.buffer->size)); - DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no == + (uint) old_buffer->buffer_no, + (ulong) old_buffer, + (ulong) old_buffer->size)); + DBUG_ASSERT(old_buffer->buffer_no == log_descriptor.bc.buffer_no); } else @@ -7425,6 +7490,10 @@ static void translog_force_current_buffe log_descriptor.bc.buffer->offset= new_buff_beginning; log_descriptor.bc.write_counter= write_counter; log_descriptor.bc.previous_offset= previous_offset; + new_buffer->prev_last_lsn= BUFFER_MAX_LSN(old_buffer); + DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx", + LSN_IN_PARTS(new_buffer->prev_last_lsn), + (ulong) new_buffer)); /* Advances this log pointer, increases writers and let other threads to @@ -7552,6 +7621,7 @@ void translog_flush_set_new_goal_and_wai { log_descriptor.next_pass_max_lsn= lsn; log_descriptor.max_lsn_requester= pthread_self(); + pthread_cond_broadcast(&log_descriptor.new_goal_cond); } while (log_descriptor.flush_in_progress) { @@ -7563,66 +7633,65 @@ void translog_flush_set_new_goal_and_wai /** - @brief Flush the log up to given LSN (included) - - @param lsn log record serial number up to which (inclusive) - the log has to be flushed - - @return Operation status - @retval 0 OK - @retval 1 Error + @brief sync() range of files (inclusive) and directory (by request) + @param min from file + @param max to file + @param sync_dir need sync directory */ -my_bool translog_flush(TRANSLOG_ADDRESS lsn) +static my_bool translog_sync_files(uint32 min, uint32 max, + my_bool sync_dir) { - LSN sent_to_disk= LSN_IMPOSSIBLE; - TRANSLOG_ADDRESS flush_horizon; - uint fn, i; - dirty_buffer_mask_t dirty_buffer_mask; - uint8 last_buffer_no, start_buffer_no; + uint fn; my_bool rc= 0; - DBUG_ENTER("translog_flush"); - DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); - DBUG_ASSERT(translog_status == TRANSLOG_OK || - translog_status == TRANSLOG_READONLY); - LINT_INIT(sent_to_disk); + DBUG_ENTER("translog_sync_files"); + DBUG_PRINT("info", ("min: %lu max: %lu sync dir: %d", + (ulong) min, (ulong) max, (int) sync_dir)); + DBUG_ASSERT(min <= max); - pthread_mutex_lock(&log_descriptor.log_flush_lock); - DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.flushed))); - if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) - { - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); - } - if (log_descriptor.flush_in_progress) + flush_start= my_getsystime(); + for (fn= min; fn <= max; fn++) { - translog_flush_set_new_goal_and_wait(lsn); - if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) + TRANSLOG_FILE *file= get_logfile_by_number(fn); + DBUG_ASSERT(file != NULL); + if (!file->is_sync) { - /* fix lsn if it was horizon */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) - lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); - translog_flush_wait_for_end(lsn); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); + if (my_sync(file->handler.file, MYF(MY_WME))) + { + rc= 1; + translog_stop_writing(); + DBUG_RETURN(rc); + } + file->is_sync= 1; } - log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; } - log_descriptor.flush_in_progress= 1; - flush_horizon= log_descriptor.previous_flush_horizon; - DBUG_PRINT("info", ("flush_in_progress is set")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - translog_lock(); - if (log_descriptor.is_everything_flushed) - { - DBUG_PRINT("info", ("everything is flushed")); - rc= (translog_status == TRANSLOG_READONLY); - translog_unlock(); - goto out; - } + if (sync_dir) + rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + + DBUG_RETURN(rc); +} + + +/* + @brief Flushes buffers up to lsn + + @param lsn LSN to which we should flush + @parem sent_to_disk returns 'sent to disk' position + @param flush_horizon returns horizon of the flush + + @note See comment to translog_flush(). +*/ + +void translog_flush_buffers(TRANSLOG_ADDRESS *lsn, + TRANSLOG_ADDRESS *sent_to_disk, + TRANSLOG_ADDRESS *flush_horizon) +{ + dirty_buffer_mask_t dirty_buffer_mask; + uint i; + uint8 last_buffer_no, start_buffer_no; + DBUG_ENTER("translog_flush_buffers"); /* We will recheck information when will lock buffers one by @@ -7647,15 +7716,15 @@ my_bool translog_flush(TRANSLOG_ADDRESS /* if LSN up to which we have to flush bigger then maximum LSN of previous buffer and at least one LSN was saved in the current buffer (last_lsn != - LSN_IMPOSSIBLE) then we better finish the current buffer. + LSN_IMPOSSIBLE) then we have to close the current buffer. */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && + if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE) { struct st_translog_buffer *buffer= log_descriptor.bc.buffer; - lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ + *lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); + LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); last_buffer_no= log_descriptor.bc.buffer_no; log_descriptor.is_everything_flushed= 1; translog_force_current_buffer_to_finish(); @@ -7667,8 +7736,10 @@ my_bool translog_flush(TRANSLOG_ADDRESS TRANSLOG_BUFFERS_NO); translog_unlock(); } - sent_to_disk= translog_get_sent_to_disk(); - if (cmp_translog_addr(lsn, sent_to_disk) > 0) + + /* flush buffers */ + *sent_to_disk= translog_get_sent_to_disk(); + if (cmp_translog_addr(*lsn, *sent_to_disk) > 0) { DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u", @@ -7688,44 +7759,215 @@ my_bool translog_flush(TRANSLOG_ADDRESS LSN_IN_PARTS(buffer->last_lsn), (buffer->file ? "dirty" : "closed"))); - if (buffer->prev_last_lsn <= lsn && + if (buffer->prev_last_lsn <= *lsn && buffer->file != NULL) { - DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size); - flush_horizon= buffer->offset + buffer->size; + DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size); + *flush_horizon= (buffer->pre_force_close_horizon ? + buffer->pre_force_close_horizon : + buffer->offset + buffer->size); + DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(*flush_horizon))); + DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon); translog_buffer_flush(buffer); } translog_buffer_unlock(buffer); i= (i + 1) % TRANSLOG_BUFFERS_NO; } while (i != last_buffer_no); - sent_to_disk= translog_get_sent_to_disk(); + *sent_to_disk= translog_get_sent_to_disk(); } - /* sync files from previous flush till current one */ - for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++) + DBUG_VOID_RETURN; +} + +/** + @brief Flush the log up to given LSN (included) + + @param lsn log record serial number up to which (inclusive) + the log has to be flushed + + @return Operation status + @retval 0 OK + @retval 1 Error + + @note + - Non group commit logic: Commits made in passes. Thread which started + flush first is performing actual flush, other threads sets new goal (LSN) + of the next pass (if it is maximum) and waits for the pass end or just + wait for the pass end. + - If hard group commit switched on and rate set to zero the first thread + repeat pass if finds at the end of pass new goals (just take them as its + new goal). The process can not loop forever because we have limited number + of threads. + - If hard group commit switched ON and rate is not zero then at the end of + the pass first tread waits for new goals but not more then need to have + actual rate close to set value. If time exceed it stop pass and let other + thread start new pass if time is not over take new goal and repeats the + pass. + - If soft group commit switched on everything work as without group commit + but this procedure do not perform real sync(). If rate is not zero real + sync() will be performed by service thread with the rate and if there was + new LSN appeared. + + @note Terminology: + 'sent to disk' means written to disk but not sync()ed, + 'flushed' mean sent to disk and synced(). +*/ + +my_bool translog_flush(TRANSLOG_ADDRESS lsn) +{ + struct timespec abstime; + ulonglong flush_interval; + ulonglong time_spent; + LSN sent_to_disk= LSN_IMPOSSIBLE; + TRANSLOG_ADDRESS flush_horizon; + my_bool rc= 0; + my_bool hgroup_commit_at_start; + + DBUG_ENTER("translog_flush"); + DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); + DBUG_ASSERT(translog_status == TRANSLOG_OK || + translog_status == TRANSLOG_READONLY); + LINT_INIT(sent_to_disk); + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.flushed))); + if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) { - TRANSLOG_FILE *file= get_logfile_by_number(fn); - DBUG_ASSERT(file != NULL); - if (!file->is_sync) + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); + } + if (log_descriptor.flush_in_progress) + { + translog_lock(); + /* fix lsn if it was horizon */ + if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) + lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); + translog_unlock(); + translog_flush_set_new_goal_and_wait(lsn); + if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) { - if (my_sync(file->handler.file, MYF(MY_WME))) + translog_flush_wait_for_end(lsn); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); + } + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + } + log_descriptor.flush_in_progress= 1; + flush_horizon= log_descriptor.previous_flush_horizon; + DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(flush_horizon))); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + my_atomic_rwlock_wrlock(&hgroup_commit_rwl); + hgroup_commit_at_start= my_atomic_load32(&hgroup_commit); + my_atomic_rwlock_wrunlock(&hgroup_commit_rwl); + if (hgroup_commit_at_start) + { + my_atomic_rwlock_rdlock(&group_commit_wait_rwl); + flush_interval= my_atomic_load32(&group_commit_wait) * TRANSLOG_RATE_BASE; + my_atomic_rwlock_rdunlock(&group_commit_wait_rwl); + } + + translog_lock(); + if (log_descriptor.is_everything_flushed) + { + DBUG_PRINT("info", ("everything is flushed")); + rc= (translog_status == TRANSLOG_READONLY); + translog_unlock(); + goto out; + } + + for (;;) + { + translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon); + + if (!hgroup_commit_at_start) + break; /* flush pass is ended */ + +retest: + if (flush_interval != 0 && + (my_getsystime() - flush_start) >= flush_interval) + break; /* flush pass is ended */ + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE) + { + /* take next goal */ + lsn= log_descriptor.next_pass_max_lsn; + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + /* prevent other thread from continue */ + log_descriptor.max_lsn_requester= pthread_self(); + DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)", + LSN_IN_PARTS(lsn))); + } + else + { + if (flush_interval == 0 || + (time_spent= (my_getsystime() - flush_start)) >= flush_interval) { - rc= 1; - translog_stop_writing(); - sent_to_disk= LSN_IMPOSSIBLE; - goto out; + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + break; } - file->is_sync= 1; + DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu", + flush_interval - time_spent, + flush_interval, time_spent)); + /* wait time or next goal */ + set_timespec_nsec(abstime, flush_interval - time_spent); + pthread_cond_timedwait(&log_descriptor.new_goal_cond, + &log_descriptor.log_flush_lock, + &abstime); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("retest conditions")); + goto retest; } + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + /* next flush pass */ + DBUG_PRINT("info", ("next flush pass")); + translog_lock(); } - if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && - (LSN_FILE_NO(log_descriptor.previous_flush_horizon) != - LSN_FILE_NO(flush_horizon) || - ((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) / - TRANSLOG_PAGE_SIZE) != - ((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE))) - rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + /* + sync() files from previous flush till current one + + We read soft_sync unprotected because it is actually 1 bit + value and we do not care much about "a bit old value" speed + of test is much more valuable here. + */ + if (!soft_sync || hgroup_commit_at_start) + { + if ((rc= + translog_sync_files(LSN_FILE_NO(log_descriptor.flushed), + LSN_FILE_NO(lsn), + sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && + (LSN_FILE_NO(log_descriptor. + previous_flush_horizon) != + LSN_FILE_NO(flush_horizon) || + ((LSN_OFFSET(log_descriptor. + previous_flush_horizon) - 1) / + TRANSLOG_PAGE_SIZE) != + ((LSN_OFFSET(flush_horizon) - 1) / + TRANSLOG_PAGE_SIZE))))) + { + sent_to_disk= LSN_IMPOSSIBLE; + goto out; + } + /* keep values for soft sync() and forced sync() actual */ + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_min, LSN_FILE_NO(lsn)); + my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn)); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + } + else + { + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn)); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + } + + DBUG_ASSERT(flush_horizon <= log_descriptor.horizon); log_descriptor.previous_flush_horizon= flush_horizon; out: pthread_mutex_lock(&log_descriptor.log_flush_lock); @@ -7733,7 +7975,7 @@ out: log_descriptor.flushed= sent_to_disk; log_descriptor.flush_in_progress= 0; DBUG_PRINT("info", ("flush_in_progress is dropped")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock);\ + pthread_mutex_unlock(&log_descriptor.log_flush_lock); pthread_cond_broadcast(&log_descriptor.log_flush_cond); DBUG_RETURN(rc); } @@ -8103,6 +8345,7 @@ LSN translog_first_theoretical_lsn() my_bool translog_purge(TRANSLOG_ADDRESS low) { uint32 last_need_file= LSN_FILE_NO(low); + uint32 min_unsync; TRANSLOG_ADDRESS horizon= translog_get_horizon(); int rc= 0; DBUG_ENTER("translog_purge"); @@ -8110,6 +8353,12 @@ my_bool translog_purge(TRANSLOG_ADDRESS DBUG_ASSERT(translog_status == TRANSLOG_OK || translog_status == TRANSLOG_READONLY); + my_atomic_rwlock_wrlock(&soft_sync_rwl); + min_unsync= my_atomic_load32(&soft_sync_min); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + if (min_unsync < last_need_file) + last_need_file= min_unsync; + pthread_mutex_lock(&log_descriptor.purger_lock); if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file) { @@ -8346,6 +8595,160 @@ my_bool translog_log_debug_info(TRN *trn } + +/** + Sets soft sync mode + + @param mode TRUE if we need switch soft sync on else off +*/ + +void translog_soft_sync(my_bool mode) +{ + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync, (uint32) mode); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); +} + + +/** + Sets hard group commit + + @param mode TRUE if we need switch hard group commit on else off +*/ + +void translog_hard_group_commit(my_bool mode) +{ + my_atomic_rwlock_wrlock(&hgroup_commit_rwl); + my_atomic_store32(&hgroup_commit, (uint32) mode); + my_atomic_rwlock_wrunlock(&hgroup_commit_rwl); +} + + +/** + @brief forced log sync (used when we are switching modes) +*/ + +void translog_sync() +{ + uint32 max= get_current_logfile()->number; + uint32 min; + DBUG_ENTER("ma_translog_sync"); + + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + if (!min) + min= max; + + translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS); + + DBUG_VOID_RETURN; +} + + +/** + @brief set rate for group commit + + @param rate rate to set. + + @note internally it stores interval in nanoseconds/TRANSLOG_RATE_BASE (to + fit into uint32) +*/ + +void translog_set_group_commit_rate(uint32 rate) +{ + DBUG_ENTER("translog_set_group_commit_rate"); + ulonglong wait_time; + if (rate) + { + wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate + + TRANSLOG_RATE_BASE / 2) / + TRANSLOG_RATE_BASE); + if (wait_time == 0) + wait_time= 1; /* protection from getting special value */ + } + else + wait_time= 0; + my_atomic_rwlock_wrlock(&group_commit_wait_rwl); + my_atomic_store32(&group_commit_wait, (uint32)wait_time); + my_atomic_rwlock_wrunlock(&group_commit_wait_rwl); + DBUG_PRINT("info", ("rate: %lu wait: %llu", + (ulong)rate, (ulonglong)wait_time)); + DBUG_VOID_RETURN; +} + + +/** + @brief syncing service thread +*/ + +static pthread_handler_t +ma_soft_sync_background( void *arg __attribute__((unused))) +{ + + my_thread_init(); + DBUG_ENTER("ma_soft_sync_background"); + for(;;) + { + ulonglong prev_loop= my_getsystime(); + ulonglong time, sleep; + uint32 min, max; + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + max= my_atomic_load32(&soft_sync_max); + my_atomic_store32(&soft_sync_min, max); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + + my_atomic_rwlock_rdlock(&group_commit_wait_rwl); + sleep= my_atomic_load32(&group_commit_wait) * TRANSLOG_RATE_BASE; + my_atomic_rwlock_rdunlock(&group_commit_wait_rwl); + translog_sync_files(min, max, FALSE); + time= my_getsystime() - prev_loop; + if (time > sleep) + sleep= 0; + else + sleep-= time; + if (my_service_thread_sleep(&soft_sync_control, sleep)) + break; + } + my_service_thread_signal_end(&soft_sync_control); + my_thread_end(); + DBUG_RETURN(0); +} + + +/** + @brief Starts syncing thread +*/ + +int translog_soft_sync_start(void) +{ + pthread_t th; + int res= 0; + DBUG_ENTER("translog_soft_sync_start"); + if (ma_service_thread_control_init(&soft_sync_control)) + res= 1; + else if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL))) + soft_sync_control.status= THREAD_RUNNING; + DBUG_RETURN(res); +} + + +/** + @brief Stops syncing thread +*/ + +void translog_soft_sync_end(void) +{ + DBUG_ENTER("translog_soft_sync_end"); + if (soft_sync_control.inited) + { + ma_service_thread_control_end(&soft_sync_control); + } + DBUG_VOID_RETURN; +} + + #ifdef MARIA_DUMP_LOG #include <my_getopt.h> extern void translog_example_table_init(); === modified file 'storage/maria/ma_loghandler.h' --- a/storage/maria/ma_loghandler.h 2009-01-15 22:25:53 +0000 +++ b/storage/maria/ma_loghandler.h 2009-03-04 16:13:55 +0000 @@ -343,6 +343,13 @@ enum enum_translog_status }; extern enum enum_translog_status translog_status; +void translog_soft_sync(my_bool mode); +void translog_hard_group_commit(my_bool mode); +int translog_soft_sync_start(void); +void translog_soft_sync_end(void); +void translog_sync(); +void translog_set_group_commit_rate(uint32 rate); + /* all the rest added because of recovery; should we make ma_loghandler_for_recovery.h ? @@ -441,6 +448,18 @@ extern LOG_DESC log_record_type_descript typedef enum { + TRANSLOG_GCOMMIT_NONE, + TRANSLOG_GCOMMIT_HARD, + TRANSLOG_GCOMMIT_SOFT +} enum_maria_group_commit; +extern ulong group_commit; +extern ulong group_commit_rate; +/** + group commit interval is TRANSLOG_RATE_BASE/<rate> seconds +*/ +#define TRANSLOG_RATE_BASE 100 +typedef enum +{ TRANSLOG_PURGE_IMMIDIATE, TRANSLOG_PURGE_EXTERNAL, TRANSLOG_PURGE_ONDEMAND === modified file 'tests/fork_big2.pl' --- a/tests/fork_big2.pl 2006-02-12 21:26:30 +0000 +++ b/tests/fork_big2.pl 2009-03-04 16:13:55 +0000 @@ -16,21 +16,21 @@ package main; $opt_skip_create=$opt_skip_in=$opt_verbose=$opt_fast_insert= $opt_lock_tables=$opt_debug=$opt_skip_drop=$opt_fast=$opt_force=0; -$opt_thread_factor=1; -$opt_insert=1; -$opt_select=6;$opt_join=4; -$opt_select_count=$opt_join_count=0; -$opt_update=1;$opt_delete=0; -$opt_flush=$opt_check=$opt_repair=$opt_alter=0; -$opt_join_range=100; +$opt_thread_factor=1; +$opt_insert=1; +$opt_select=6;$opt_join=4; +$opt_select_count=$opt_join_count=0; +$opt_update=1;$opt_delete=0; +$opt_flush=$opt_check=$opt_repair=$opt_alter=0; +$opt_join_range=100; $opt_resize_interval=0; $opt_time=0; $opt_host=$opt_user=$opt_password=""; $opt_db="test"; $opt_verbose=$opt_debug=$opt_lock_tables=$opt_fast_insert=$opt_fast=$opt_skip_in=$opt_force=undef; # Ignore warnings from these -GetOptions("host=s","db=s","user=s","password=s","loop-count=i","skip-create","skip-in","skip-drop", - "verbose","fast-insert","lock-tables","debug","fast","force","thread-factor=i", - "insert=i", "select=i", "join=i", "select-count=i", "join-count=i", "update=i", "delete=i", +GetOptions("host=s","db=s","user=s","password=s","loop-count=i","skip-create","skip-in","skip-drop", + "verbose","fast-insert","lock-tables","debug","fast","force","thread-factor=i", + "insert=i", "select=i", "join=i", "select-count=i", "join-count=i", "update=i", "delete=i", "flush=i", "check=i", "repair=i", "alter=i", "resize-interval=i", "max-join_range=i", "time=i") || die "Aborted"; print "Test of multiple connections that test the following things:\n"; @@ -48,20 +48,20 @@ srand 100; # Make random numbers repea #### #### Start timeing and start test -#### - +#### + $opt_insert*=$opt_thread_factor; -$opt_select*=$opt_thread_factor; -$opt_join*=$opt_thread_factor; -$opt_select_count*=$opt_thread_factor; -$opt_join_count*=$opt_thread_factor; -$opt_update*=$opt_thread_factor; -$opt_delete*=$opt_thread_factor; - -if ($opt_time == 0 && $opt_insert == 0) -{ - $opt_insert=1; -} +$opt_select*=$opt_thread_factor; +$opt_join*=$opt_thread_factor; +$opt_select_count*=$opt_thread_factor; +$opt_join_count*=$opt_thread_factor; +$opt_update*=$opt_thread_factor; +$opt_delete*=$opt_thread_factor; + +if ($opt_time == 0 && $opt_insert == 0) +{ + $opt_insert=1; +} $start_time=new Benchmark; $dbh = DBI->connect("DBI:mysql:$opt_db:$opt_host", @@ -100,71 +100,71 @@ $|= 1; # Autoflush #### #### Start the tests #### -if ($opt_time != 0) -{ - test_abort() if (($pid=fork()) == 0); $work{$pid}="abort"; +if ($opt_time != 0) +{ + test_abort() if (($pid=fork()) == 0); $work{$pid}="abort"; } for ($i=0 ; $i < $opt_insert ; $i ++) { test_insert() if (($pid=fork()) == 0); $work{$pid}="insert"; -} +} $threads=$i; -for ($i=0 ; $i < $opt_select ; $i ++) -{ - test_select() if (($pid=fork()) == 0); $work{$pid}="select"; -} -$threads+=$i; -for ($i=0 ; $i < $opt_join ; $i ++) -{ - test_join() if (($pid=fork()) == 0); $work{$pid}="join"; -} -$threads+=$i; +for ($i=0 ; $i < $opt_select ; $i ++) +{ + test_select() if (($pid=fork()) == 0); $work{$pid}="select"; +} +$threads+=$i; +for ($i=0 ; $i < $opt_join ; $i ++) +{ + test_join() if (($pid=fork()) == 0); $work{$pid}="join"; +} +$threads+=$i; for ($i=0 ; $i < $opt_select_count ; $i ++) { test_select_count() if (($pid=fork()) == 0); $work{$pid}="select_count"; } -$threads+=$i; -for ($i=0 ; $i < $opt_join_count ; $i ++) -{ - test_join_count() if (($pid=fork()) == 0); $work{$pid}="join_count"; -} -$threads+=$i; -for ($i=0 ; $i < $opt_update ; $i ++) -{ - test_update() if (($pid=fork()) == 0); $work{$pid}="update"; -} -$threads+=$i; -for ($i=0 ; $i < $opt_delete ; $i ++) -{ - test_delete() if (($pid=fork()) == 0); $work{$pid}="delete"; -} -$threads+=$i; -for ($i=0 ; $i < $opt_flush ; $i ++) -{ - test_flush() if (($pid=fork()) == 0); $work{$pid}="flush"; -} -$threads+=$i; -for ($i=0 ; $i < $opt_check ; $i ++) -{ - test_check() if (($pid=fork()) == 0); $work{$pid}="check"; -} -$threads+=$i; -for ($i=0 ; $i < $opt_repair ; $i ++) -{ - test_repair() if (($pid=fork()) == 0); $work{$pid}="repair"; -} -$threads+=$i; -for ($i=0 ; $i < $opt_alter ; $i ++) -{ - test_alter() if (($pid=fork()) == 0); $work{$pid}="alter"; -} -$threads+=$i; +$threads+=$i; +for ($i=0 ; $i < $opt_join_count ; $i ++) +{ + test_join_count() if (($pid=fork()) == 0); $work{$pid}="join_count"; +} +$threads+=$i; +for ($i=0 ; $i < $opt_update ; $i ++) +{ + test_update() if (($pid=fork()) == 0); $work{$pid}="update"; +} +$threads+=$i; +for ($i=0 ; $i < $opt_delete ; $i ++) +{ + test_delete() if (($pid=fork()) == 0); $work{$pid}="delete"; +} +$threads+=$i; +for ($i=0 ; $i < $opt_flush ; $i ++) +{ + test_flush() if (($pid=fork()) == 0); $work{$pid}="flush"; +} +$threads+=$i; +for ($i=0 ; $i < $opt_check ; $i ++) +{ + test_check() if (($pid=fork()) == 0); $work{$pid}="check"; +} +$threads+=$i; +for ($i=0 ; $i < $opt_repair ; $i ++) +{ + test_repair() if (($pid=fork()) == 0); $work{$pid}="repair"; +} +$threads+=$i; +for ($i=0 ; $i < $opt_alter ; $i ++) +{ + test_alter() if (($pid=fork()) == 0); $work{$pid}="alter"; +} +$threads+=$i; if ($opt_resize_interval != 0) { test_resize() if (($pid=fork()) == 0); $work{$pid}="resize"; $threads+=1; } - + print "Started $threads threads\n"; $errors=0; @@ -172,17 +172,17 @@ $running_insert_threads=$opt_insert; while (($pid=wait()) != -1) { $ret=$?/256; - print "thread '" . $work{$pid} . "' finished with exit code $ret\n"; - if ($opt_time == 0) + print "thread '" . $work{$pid} . "' finished with exit code $ret\n"; + if ($opt_time == 0) { if ($work{$pid} =~ /^insert/) { if (!--$running_insert_threads) - { - + { + # Time to stop other threads signal_abort(); - } + } } } $errors++ if ($ret != 0); @@ -214,17 +214,17 @@ print "Total time: " . exit(0); -# -# Sleep and then abort other threads -# - -sub test_abort -{ - sleep($opt_time); - signal_abort(); - exit(0); -} - +# +# Sleep and then abort other threads +# + +sub test_abort +{ + sleep($opt_time); + signal_abort(); + exit(0); +} + # # Insert records in the table @@ -363,58 +363,58 @@ sub test_join $dbh->disconnect; $dbh=0; print "Test_join: Executed $count joins\n"; exit(0); -} - -# -# select records -# Do continously joins between the first and second for range and count selected rows -# - -sub test_join_count -{ - my ($dbh, $i, $j, $count, $loop); - - $dbh = DBI->connect("DBI:mysql:$opt_db:$opt_host", - $opt_user, $opt_password, - { PrintError => 0}) || die $DBI::errstr; - - $count_query=make_count_query($numtables); - $count=0; - $loop=9999; - $sum=0; - - srand(); - - $i=0; - while (($i++ % 10) || !test_if_abort($dbh)) - { - if ($loop++ >= 10) - { - $loop=0; - $row_counts=simple_query($dbh, $count_query); - } - for ($j=0 ; $j < $numtables-1 ; $j++) - { - my ($id1)= int rand $row_counts->[$j]; - my ($id2)= int rand $row_counts->[$j]; - if ($id1 > $id2) - { - my $id0=$id1; $id1=$id2; $id2=$id0; - if ($id2-$id1 > $opt_join_range) - { - $id2=$id1+$opt_join_range; - } - } - my ($t1,$t2)= ($testtables[$j]->[0],$testtables[$j+1]->[0]); - $row=simple_query($dbh, "select count(*) from $t1, $t2 where $t1.id=$t2.id and $t1.id between $id1 and $id2"); - $sum+=$row->[0]; - $count++; - } - } - $dbh->disconnect; $dbh=0; - print "Test_join_count: Executed $count joins: total $sum rows\n"; - exit(0); -} +} + +# +# select records +# Do continously joins between the first and second for range and count selected rows +# + +sub test_join_count +{ + my ($dbh, $i, $j, $count, $loop); + + $dbh = DBI->connect("DBI:mysql:$opt_db:$opt_host", + $opt_user, $opt_password, + { PrintError => 0}) || die $DBI::errstr; + + $count_query=make_count_query($numtables); + $count=0; + $loop=9999; + $sum=0; + + srand(); + + $i=0; + while (($i++ % 10) || !test_if_abort($dbh)) + { + if ($loop++ >= 10) + { + $loop=0; + $row_counts=simple_query($dbh, $count_query); + } + for ($j=0 ; $j < $numtables-1 ; $j++) + { + my ($id1)= int rand $row_counts->[$j]; + my ($id2)= int rand $row_counts->[$j]; + if ($id1 > $id2) + { + my $id0=$id1; $id1=$id2; $id2=$id0; + if ($id2-$id1 > $opt_join_range) + { + $id2=$id1+$opt_join_range; + } + } + my ($t1,$t2)= ($testtables[$j]->[0],$testtables[$j+1]->[0]); + $row=simple_query($dbh, "select count(*) from $t1, $t2 where $t1.id=$t2.id and $t1.id between $id1 and $id2"); + $sum+=$row->[0]; + $count++; + } + } + $dbh->disconnect; $dbh=0; + print "Test_join_count: Executed $count joins: total $sum rows\n"; + exit(0); +} #
participants (1)
-
sanja@askmonty.org