[Maria-developers] bzr commit into MariaDB 5.1, with Maria 1.5:maria branch (knielsen:2850)
#At lp:maria 2850 knielsen@knielsen-hq.org 2010-06-09 MWL#116: Fix a couple of races in group commit. modified: include/atomic/gcc_builtins.h include/atomic/x86-gcc.h sql/handler.cc === modified file 'include/atomic/gcc_builtins.h' --- a/include/atomic/gcc_builtins.h 2008-02-06 16:55:04 +0000 +++ b/include/atomic/gcc_builtins.h 2010-06-09 11:17:39 +0000 @@ -19,8 +19,9 @@ v= __sync_lock_test_and_set(a, v); #define make_atomic_cas_body(S) \ int ## S sav; \ - sav= __sync_val_compare_and_swap(a, *cmp, set); \ - if (!(ret= (sav == *cmp))) *cmp= sav; + int ## S cmp_val= *cmp; \ + sav= __sync_val_compare_and_swap(a, cmp_val, set);\ + if (!(ret= (sav == cmp_val))) *cmp= sav #ifdef MY_ATOMIC_MODE_DUMMY #define make_atomic_load_body(S) ret= *a === modified file 'include/atomic/x86-gcc.h' --- a/include/atomic/x86-gcc.h 2007-02-28 16:50:51 +0000 +++ b/include/atomic/x86-gcc.h 2010-06-09 11:17:39 +0000 @@ -38,15 +38,33 @@ #define asm __asm__ #endif +/* + The atomic operations imply a memory barrier for the CPU, to ensure that all + prior writes are flushed from cache, and all subsequent reads reloaded into + cache. + + We need to imply a similar memory barrier for the compiler, so that all + pending stores (to memory that may be aliased in other parts of the code) + will be flushed to memory before the operation, and all reads from such + memory be re-loaded. This is achieved by adding the "memory" pseudo-register + to the clobber list, see GCC documentation for more explanation. + + The compiler and CPU memory barriers are needed to make sure changes in one + thread are made visible in another by the atomic operation. +*/ #ifndef MY_ATOMIC_NO_XADD #define make_atomic_add_body(S) \ - asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a)) + asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a): : "memory") #endif #define make_atomic_fas_body(S) \ - asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a)) + asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a) : : "memory") #define make_atomic_cas_body(S) \ + int ## S sav; \ asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;" \ - : "+m" (*a), "+a" (*cmp), "=q" (ret): "r" (set)) + : "+m" (*a), "=a" (sav), "=q" (ret) \ + : "r" (set), "a" (*cmp) : "memory"); \ + if (!ret) \ + *cmp= sav #ifdef MY_ATOMIC_MODE_DUMMY #define make_atomic_load_body(S) ret=*a @@ -59,9 +77,9 @@ #define make_atomic_load_body(S) \ ret=0; \ asm volatile (LOCK_prefix "; cmpxchg %2, %0" \ - : "+m" (*a), "+a" (ret): "r" (ret)) + : "+m" (*a), "+a" (ret) : "r" (ret) : "memory") #define make_atomic_store_body(S) \ - asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v)) + asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v) : : "memory") #endif /* TODO test on intel whether the below helps. on AMD it makes no difference */ === modified file 'sql/handler.cc' --- a/sql/handler.cc 2010-05-26 08:16:18 +0000 +++ b/sql/handler.cc 2010-06-09 11:17:39 +0000 @@ -1103,14 +1103,30 @@ ha_check_and_coalesce_trx_read_only(THD static THD * enqueue_atomic(THD *thd) { - my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); + THD *orig_queue; + thd->next_commit_ordered= group_commit_queue; + + my_atomic_rwlock_wrlock(&LOCK_group_commit_queue); + do + { + /* + Save the read value of group_commit_queue in each iteration of the loop. + When my_atomic_casptr() returns TRUE, we know that orig_queue is equal + to the value of group_commit_queue when we enqueued. + + However, as soon as we enqueue, thd->next_commit_ordered may be + invalidated by another thread (the group commit leader). So we need to + save the old queue value in a local variable orig_queue like this. + */ + orig_queue= thd->next_commit_ordered; + } while (!my_atomic_casptr((void **)(&group_commit_queue), (void **)(&thd->next_commit_ordered), - thd)) - ; + thd)); my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue); - return thd->next_commit_ordered; + + return orig_queue; } static THD * @@ -1399,6 +1415,9 @@ int ha_commit_trans(THD *thd, bool all) int cookie; if (tc_log->use_group_log_xid) { + // ToDo: if xid==NULL here, we may use is_group_commit_leader uninitialised. + // ToDo: Same for cookie below when xid==NULL. + // Seems we generally need to check the case xid==NULL. if (is_group_commit_leader) { pthread_mutex_lock(&LOCK_group_commit); @@ -1434,9 +1453,18 @@ int ha_commit_trans(THD *thd, bool all) } pthread_mutex_unlock(&LOCK_group_commit); - /* Wake up everyone except ourself. */ - while ((queue= queue->next_commit_ordered) != NULL) - group_commit_wakeup_other(queue); + /* Wake up everyone except ourself. */ + THD *current= queue->next_commit_ordered; + while (current != NULL) + { + /* + Careful not to access current->next_commit_ordered after waking up + the other thread! As it may change immediately after wakeup. + */ + THD *next= current->next_commit_ordered; + group_commit_wakeup_other(current); + current= next; + } } else {
participants (1)
-
knielsen@knielsen-hq.org