diff --recursive -u base5067/innobase/btr/btr0cur.c m5067/innobase/btr/btr0cur.c --- base5067/innobase/btr/btr0cur.c 2008-08-04 05:19:12.000000000 -0700 +++ m5067/innobase/btr/btr0cur.c 2008-09-08 06:53:12.000000000 -0700 @@ -312,8 +312,8 @@ #ifdef UNIV_SEARCH_PERF_STAT info->n_searches++; -#endif - if (btr_search_latch.writer == RW_LOCK_NOT_LOCKED +#endif + if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_NOT_LOCKED && latch_mode <= BTR_MODIFY_LEAF && info->last_hash_succ && !estimate #ifdef PAGE_CUR_LE_OR_EXTENDS diff --recursive -u base5067/innobase/btr/btr0sea.c m5067/innobase/btr/btr0sea.c --- base5067/innobase/btr/btr0sea.c 2008-08-04 05:19:12.000000000 -0700 +++ m5067/innobase/btr/btr0sea.c 2008-09-08 06:53:12.000000000 -0700 @@ -746,8 +746,8 @@ rw_lock_s_lock(&btr_search_latch); } - ut_ad(btr_search_latch.writer != RW_LOCK_EX); - ut_ad(btr_search_latch.reader_count > 0); + ut_ad(rw_lock_get_writer(&btr_search_latch) != RW_LOCK_EX); + ut_ad(rw_lock_get_reader_count(&btr_search_latch) > 0); rec = ha_search_and_get_data(btr_search_sys->hash_index, fold); diff --recursive -u base5067/innobase/buf/buf0buf.c m5067/innobase/buf/buf0buf.c --- base5067/innobase/buf/buf0buf.c 2008-08-04 05:19:12.000000000 -0700 +++ m5067/innobase/buf/buf0buf.c 2008-09-08 06:53:12.000000000 -0700 @@ -1226,7 +1226,7 @@ if (mode == BUF_GET_NOWAIT) { if (rw_latch == RW_S_LATCH) { - success = rw_lock_s_lock_func_nowait(&(block->lock), + success = rw_lock_s_lock_nowait(&(block->lock), file, line); fix_type = MTR_MEMO_PAGE_S_FIX; } else { @@ -1359,8 +1359,7 @@ ut_ad(!ibuf_inside() || ibuf_page(block->space, block->offset)); if (rw_latch == RW_S_LATCH) { - success = rw_lock_s_lock_func_nowait(&(block->lock), - file, line); + success = rw_lock_s_lock_nowait(&(block->lock), file, line); fix_type = MTR_MEMO_PAGE_S_FIX; } else { success = rw_lock_x_lock_func_nowait(&(block->lock), @@ -1507,8 +1506,7 @@ ut_ad(!ibuf_inside() || (mode == BUF_KEEP_OLD)); if (rw_latch == RW_S_LATCH) { - success = rw_lock_s_lock_func_nowait(&(block->lock), - file, line); + success = rw_lock_s_lock_nowait(&(block->lock), file, line); fix_type = MTR_MEMO_PAGE_S_FIX; } else { success = rw_lock_x_lock_func_nowait(&(block->lock), diff --recursive -u base5067/innobase/configure m5067/innobase/configure --- base5067/innobase/configure 2008-08-04 05:23:02.000000000 -0700 +++ m5067/innobase/configure 2008-09-08 06:53:12.000000000 -0700 @@ -20520,6 +20520,76 @@ fi done +# as http://lists.mysql.com/commits/40686 does +echo "$as_me:$LINENO: checking whether the compiler provides atomic builtins" >&5 +echo $ECHO_N "checking whether the compiler provides atomic builtins... $ECHO_C" >&6 +if test "${mysql_cv_atomic_builtins+set}" = set; then + echo $ECHO_N "(cached) $ECHO_C" >&6 +else + if test "$cross_compiling" = yes; then + { { echo "$as_me:$LINENO: error: cannot run test program while cross compiling +See \`config.log' for more details." >&5 +echo "$as_me: error: cannot run test program while cross compiling +See \`config.log' for more details." >&2;} + { (exit 1); exit 1; }; } +else + cat >conftest.$ac_ext <<_ACEOF +/* confdefs.h. */ +_ACEOF +cat confdefs.h >>conftest.$ac_ext +cat >>conftest.$ac_ext <<_ACEOF +/* end confdefs.h. */ + + int main() + { + int foo= -10; int bar= 10; + __sync_fetch_and_add(&foo, bar); + if (foo) + return -1; + bar= __sync_lock_test_and_set(&foo, bar); + if (bar || foo != 10) + return -1; + bar= __sync_val_compare_and_swap(&bar, foo, 15); + if (bar) + return -1; + return 0; + } + +_ACEOF +rm -f conftest$ac_exeext +if { (eval echo "$as_me:$LINENO: \"$ac_link\"") >&5 + (eval $ac_link) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); } && { ac_try='./conftest$ac_exeext' + { (eval echo "$as_me:$LINENO: \"$ac_try\"") >&5 + (eval $ac_try) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); }; }; then + mysql_cv_atomic_builtins=yes +else + echo "$as_me: program exited with status $ac_status" >&5 +echo "$as_me: failed program was:" >&5 +sed 's/^/| /' conftest.$ac_ext >&5 + +( exit $ac_status ) +mysql_cv_atomic_builtins=no +fi +rm -f core *.core gmon.out bb.out conftest$ac_exeext conftest.$ac_objext conftest.$ac_ext +fi +fi +echo "$as_me:$LINENO: result: $mysql_cv_atomic_builtins" >&5 +echo "${ECHO_T}$mysql_cv_atomic_builtins" >&6 + +if test "x$mysql_cv_atomic_builtins" = xyes; then + +cat >>confdefs.h <<\_ACEOF +#define HAVE_ATOMIC_BUILTINS 1 +_ACEOF + +fi + #AC_CHECK_FUNCS(readdir_r) MySQL checks that it has also the right args. # Some versions of Unix only take 2 arguments. #AC_C_INLINE Already checked in MySQL diff --recursive -u base5067/innobase/configure.in m5067/innobase/configure.in --- base5067/innobase/configure.in 2008-08-04 05:19:12.000000000 -0700 +++ m5067/innobase/configure.in 2008-09-08 06:53:12.000000000 -0700 @@ -42,6 +42,31 @@ AC_CHECK_FUNCS(sched_yield) AC_CHECK_FUNCS(fdatasync) AC_CHECK_FUNCS(localtime_r) + +# as http://lists.mysql.com/commits/40686 does +AC_CACHE_CHECK([whether the compiler provides atomic builtins], + [mysql_cv_atomic_builtins], [AC_TRY_RUN([ + int main() + { + int foo= -10; int bar= 10; + __sync_fetch_and_add(&foo, bar); + if (foo) + return -1; + bar= __sync_lock_test_and_set(&foo, bar); + if (bar || foo != 10) + return -1; + bar= __sync_val_compare_and_swap(&bar, foo, 15); + if (bar) + return -1; + return 0; + } +], [mysql_cv_atomic_builtins=yes], [mysql_cv_atomic_builtins=no])]) + +if test "x$mysql_cv_atomic_builtins" = xyes; then + AC_DEFINE(HAVE_ATOMIC_BUILTINS, 1, + [Define to 1 if compiler provides atomic builtins.]) +fi + #AC_CHECK_FUNCS(readdir_r) MySQL checks that it has also the right args. # Some versions of Unix only take 2 arguments. #AC_C_INLINE Already checked in MySQL diff --recursive -u base5067/innobase/ib_config.h m5067/innobase/ib_config.h --- base5067/innobase/ib_config.h 2008-08-04 05:24:29.000000000 -0700 +++ m5067/innobase/ib_config.h 2008-09-08 06:53:12.000000000 -0700 @@ -4,6 +4,9 @@ /* Define to 1 if you have the header file. */ #define HAVE_AIO_H 1 +/* Define to 1 if compiler provides atomic builtins. */ +#define HAVE_ATOMIC_BUILTINS 1 + /* Define to 1 if you have the header file. */ #define HAVE_DLFCN_H 1 diff --recursive -u base5067/innobase/ib_config.h.in m5067/innobase/ib_config.h.in --- base5067/innobase/ib_config.h.in 2008-08-04 05:22:51.000000000 -0700 +++ m5067/innobase/ib_config.h.in 2008-09-08 06:53:12.000000000 -0700 @@ -3,6 +3,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_AIO_H +/* Define to 1 if compiler provides atomic builtins. */ +#undef HAVE_ATOMIC_BUILTINS + /* Define to 1 if you have the header file. */ #undef HAVE_DLFCN_H diff --recursive -u base5067/innobase/include/buf0buf.ic m5067/innobase/include/buf0buf.ic --- base5067/innobase/include/buf0buf.ic 2008-08-04 05:19:13.000000000 -0700 +++ m5067/innobase/include/buf0buf.ic 2008-09-08 06:53:12.000000000 -0700 @@ -518,7 +518,7 @@ #ifdef UNIV_SYNC_DEBUG ibool ret; - ret = rw_lock_s_lock_func_nowait(&(block->debug_latch), file, line); + ret = rw_lock_s_lock_nowait(&(block->debug_latch), file, line); ut_ad(ret == TRUE); ut_ad(mutex_own(&block->mutex)); diff --recursive -u base5067/innobase/include/os0sync.h m5067/innobase/include/os0sync.h --- base5067/innobase/include/os0sync.h 2008-08-04 05:19:14.000000000 -0700 +++ m5067/innobase/include/os0sync.h 2008-09-08 06:53:12.000000000 -0700 @@ -260,7 +260,30 @@ os_fast_mutex_free( /*===============*/ os_fast_mutex_t* fast_mutex); /* in: mutex to free */ - + +#ifdef HAVE_ATOMIC_BUILTINS +/************************************************************** +Atomic compare-and-swap for InnoDB. Currently requires GCC atomic builtins. */ +UNIV_INLINE +ibool +os_compare_and_swap( +/*================*/ + /* out: true if swapped */ + volatile lint* ptr, /* in: pointer to target */ + lint oldVal, /* in: value to compare to */ + lint newVal); /* in: value to swap in */ +/************************************************************** +Atomic increment for InnoDB. Currently requires GCC atomic builtins. */ +UNIV_INLINE +lint +os_atomic_increment( +/*================*/ + /* out: resulting value */ + volatile lint* ptr, /* in: pointer to target */ + lint amount); /* in: amount of increment */ + +#endif /* HAVE_ATOMIC_BUILTINS */ + #ifndef UNIV_NONINL #include "os0sync.ic" #endif diff --recursive -u base5067/innobase/include/os0sync.ic m5067/innobase/include/os0sync.ic --- base5067/innobase/include/os0sync.ic 2008-08-04 05:19:14.000000000 -0700 +++ m5067/innobase/include/os0sync.ic 2008-09-08 06:53:12.000000000 -0700 @@ -44,3 +44,33 @@ #endif #endif } + +#ifdef HAVE_ATOMIC_BUILTINS +/************************************************************** +Atomic compare-and-swap for InnoDB. Currently requires GCC atomic builtins. */ +UNIV_INLINE +ibool +os_compare_and_swap( +/*================*/ + /* out: true if swapped */ + volatile lint* ptr, /* in: pointer to target */ + lint oldVal, /* in: value to compare to */ + lint newVal) /* in: value to swap in */ +{ + return __sync_bool_compare_and_swap(ptr, oldVal, newVal); +} + +/************************************************************** +Atomic increment for InnoDB. Currently requires GCC atomic builtins. */ +UNIV_INLINE +lint +os_atomic_increment( +/*================*/ + /* out: resulting value */ + volatile lint* ptr, /* in: pointer to target */ + lint amount) /* in: amount of increment */ +{ + return __sync_add_and_fetch(ptr, amount); +} + +#endif /* HAVE_ATOMIC_BUILTINS */ diff --recursive -u base5067/innobase/include/srv0srv.h m5067/innobase/include/srv0srv.h --- base5067/innobase/include/srv0srv.h 2008-08-04 05:19:15.000000000 -0700 +++ m5067/innobase/include/srv0srv.h 2008-09-08 06:53:12.000000000 -0700 @@ -541,6 +541,8 @@ ulint innodb_buffer_pool_read_ahead_rnd; ulint innodb_dblwr_pages_written; ulint innodb_dblwr_writes; + ibool innodb_have_atomic_builtins; + ibool innodb_heap_enabled; ulint innodb_log_waits; ulint innodb_log_write_requests; ulint innodb_log_writes; @@ -580,4 +582,3 @@ extern ulint srv_n_threads_active[]; #endif - diff --recursive -u base5067/innobase/include/sync0rw.h m5067/innobase/include/sync0rw.h --- base5067/innobase/include/sync0rw.h 2008-08-04 05:19:15.000000000 -0700 +++ m5067/innobase/include/sync0rw.h 2008-09-08 06:53:12.000000000 -0700 @@ -47,14 +47,14 @@ there may be waiters for the event */ #endif /* UNIV_SYNC_DEBUG */ -extern ulint rw_s_system_call_count; -extern ulint rw_s_spin_wait_count; -extern ulint rw_s_exit_count; -extern ulint rw_s_os_wait_count; -extern ulint rw_x_system_call_count; -extern ulint rw_x_spin_wait_count; -extern ulint rw_x_os_wait_count; -extern ulint rw_x_exit_count; +extern ib_longlong rw_s_spin_wait_count; +extern ib_longlong rw_s_spin_round_count; +extern ib_longlong rw_s_exit_count; +extern ib_longlong rw_s_os_wait_count; +extern ib_longlong rw_x_spin_wait_count; +extern ib_longlong rw_x_spin_round_count; +extern ib_longlong rw_x_os_wait_count; +extern ib_longlong rw_x_exit_count; /********************************************************************** Creates, or rather, initializes an rw-lock object in a specified memory @@ -115,9 +115,8 @@ /****************************************************************** NOTE! The following macros should be used in rw s-locking, not the corresponding function. */ - -#define rw_lock_s_lock_nowait(M) rw_lock_s_lock_func_nowait(\ - (M), __FILE__, __LINE__) +#define rw_lock_s_lock_nowait(M, F, L) rw_lock_s_lock_low(\ + (M), 0, (F), (L)) /********************************************************************** NOTE! Use the corresponding macro, not directly this function, except if you supply the file name and line number. Lock an rw-lock in shared mode @@ -135,18 +134,6 @@ const char* file_name,/* in: file name where lock requested */ ulint line); /* in: line where requested */ /********************************************************************** -NOTE! Use the corresponding macro, not directly this function, except if -you supply the file name and line number. Lock an rw-lock in shared mode -for the current thread if the lock can be acquired immediately. */ -UNIV_INLINE -ibool -rw_lock_s_lock_func_nowait( -/*=======================*/ - /* out: TRUE if success */ - rw_lock_t* lock, /* in: pointer to rw-lock */ - const char* file_name,/* in: file name where lock requested */ - ulint line); /* in: line where requested */ -/********************************************************************** NOTE! Use the corresponding macro, not directly this function! Lock an rw-lock in exclusive mode for the current thread if the lock can be obtained immediately. */ @@ -338,6 +325,23 @@ rw_lock_get_reader_count( /*=====================*/ rw_lock_t* lock); +/********************************************************************** +Decrements lock_word the specified amount if it is greater than 0. +This is used by both s_lock and x_lock operations. */ +UNIV_INLINE +ibool +rw_lock_lock_word_decr( + /* out: TRUE if decr occurs */ + rw_lock_t* lock, /* in: rw-lock */ + ulint amount); /* in: amount to decrement */ +/********************************************************************** +Increments lock_word the specified amount and returns new value. */ +UNIV_INLINE +lint +rw_lock_lock_word_incr( + /* out: TRUE if decr occurs */ + rw_lock_t* lock, /* in: rw-lock */ + ulint amount); /* in: amount to incrememnt */ #ifdef UNIV_SYNC_DEBUG /********************************************************************** Checks if the thread has locked the rw-lock in the specified mode, with @@ -408,56 +412,38 @@ rw_lock_debug_t* info); /* in: debug struct */ #endif /* UNIV_SYNC_DEBUG */ +/* We decrement lock_word by this amount for each x_lock. It is also the +start value for the lock_word, meaning that it limits the maximum number +of concurrent read locks before the rw_lock breaks. The current value of +0x00100000 allows 1,048,575 concurrent readers and 2047 recursive writers.*/ +#define X_LOCK_DECR 0x00100000 + /* NOTE! The structure appears here only for the compiler to know its size. Do not use its fields directly! The structure used in the spin lock implementation of a read-write lock. Several threads may have a shared lock simultaneously in this lock, but only one writer may have an exclusive lock, in which case no shared locks are allowed. To prevent starving of a writer -blocked by readers, a writer may queue for the lock by setting the writer -field. Then no new readers are allowed in. */ +blocked by readers, a writer may queue for the lock by decrementing lock_word: +no new readers will be let in while the thread waits for readers to exit. */ struct rw_lock_struct { - os_event_t event; /* Used by sync0arr.c for thread queueing */ - -#ifdef __WIN__ - os_event_t wait_ex_event; /* This windows specific event is - used by the thread which has set the - lock state to RW_LOCK_WAIT_EX. The - rw_lock design guarantees that this - thread will be the next one to proceed - once the current the event gets - signalled. See LEMMA 2 in sync0sync.c */ -#endif - - ulint reader_count; /* Number of readers who have locked this - lock in the shared mode */ - ulint writer; /* This field is set to RW_LOCK_EX if there - is a writer owning the lock (in exclusive - mode), RW_LOCK_WAIT_EX if a writer is - queueing for the lock, and - RW_LOCK_NOT_LOCKED, otherwise. */ - os_thread_id_t writer_thread; - /* Thread id of a possible writer thread */ - ulint writer_count; /* Number of times the same thread has - recursively locked the lock in the exclusive - mode */ - mutex_t mutex; /* The mutex protecting rw_lock_struct */ - ulint pass; /* Default value 0. This is set to some + volatile lint lock_word; + /* Holds the state of the lock. */ + volatile ulint waiters;/* 1: there are waiters */ + volatile ulint pass; /* Default value 0. This is set to some value != 0 given by the caller of an x-lock operation, if the x-lock is to be passed to another thread to unlock (which happens in asynchronous i/o). */ - ulint waiters; /* This ulint is set to 1 if there are - waiters (readers or writers) in the global - wait array, waiting for this rw_lock. - Otherwise, == 0. */ - ibool writer_is_wait_ex; - /* This is TRUE if the writer field is - RW_LOCK_WAIT_EX; this field is located far - from the memory update hotspot fields which - are at the start of this struct, thus we can - peek this field without causing much memory - bus traffic */ + volatile os_thread_id_t writer_thread; + /* Thread id of writer thread */ + os_event_t event; /* Used by sync0arr.c for thread queueing */ + os_event_t wait_ex_event; + /* Event for next-writer to wait on. A thread + must decrement lock_word before waiting. */ +#ifndef HAVE_ATOMIC_BUILTINS + mutex_t mutex; /* The mutex protecting rw_lock_struct */ +#endif /* HAVE_ATOMIC_BUILTINS */ UT_LIST_NODE_T(rw_lock_t) list; /* All allocated rw locks are put into a list */ @@ -466,10 +452,12 @@ /* In the debug version: pointer to the debug info list of the lock */ #endif /* UNIV_SYNC_DEBUG */ + ulint count_os_wait; /* Count of os_waits. May not be accurate */ ulint level; /* Level in the global latching order; default SYNC_LEVEL_NONE */ const char* cfile_name;/* File name where lock created */ ulint cline; /* Line where created */ + /* last s-lock file/line is not guaranteed to be correct */ const char* last_s_file_name;/* File name where last s-locked */ const char* last_x_file_name;/* File name where last x-locked */ ulint last_s_line; /* Line number where last time s-locked */ diff --recursive -u base5067/innobase/include/sync0rw.ic m5067/innobase/include/sync0rw.ic --- base5067/innobase/include/sync0rw.ic 2008-08-04 05:19:15.000000000 -0700 +++ m5067/innobase/include/sync0rw.ic 2008-09-08 06:53:12.000000000 -0700 @@ -62,40 +62,48 @@ { lock->waiters = flag; } + +/********************************************************************** +Returns the write-status of the lock - this function made more sense +with the old rw_lock implementation. + */ UNIV_INLINE ulint rw_lock_get_writer( /*===============*/ rw_lock_t* lock) { - return(lock->writer); -} -UNIV_INLINE -void -rw_lock_set_writer( -/*===============*/ - rw_lock_t* lock, - ulint flag) -{ - lock->writer = flag; + lint lock_word = lock->lock_word; + if(lock_word > 0) { + /* return NOT_LOCKED in s-lock state, like the writer + member of the old lock implementation. */ + return RW_LOCK_NOT_LOCKED; + } else if (((-lock_word) % X_LOCK_DECR) == 0) { + return RW_LOCK_EX; + } else { + ut_ad(lock_word > -X_LOCK_DECR); + return RW_LOCK_WAIT_EX; + } } + UNIV_INLINE ulint rw_lock_get_reader_count( /*=====================*/ rw_lock_t* lock) { - return(lock->reader_count); -} -UNIV_INLINE -void -rw_lock_set_reader_count( -/*=====================*/ - rw_lock_t* lock, - ulint count) -{ - lock->reader_count = count; + lint lock_word = lock->lock_word; + if(lock_word > 0) { + /* s-locked, no x-waiters */ + return(X_LOCK_DECR - lock_word); + } else if (lock_word < 0 && lock_word > -X_LOCK_DECR) { + /* s-locked, with x-waiters */ + return (ulint)(-lock_word); + } + return 0; } + +#ifndef HAVE_ATOMIC_BUILTINS UNIV_INLINE mutex_t* rw_lock_get_mutex( @@ -104,6 +112,7 @@ { return(&(lock->mutex)); } +#endif /********************************************************************** Returns the value of writer_count for the lock. Does not reserve the lock @@ -115,7 +124,87 @@ /* out: value of writer_count */ rw_lock_t* lock) /* in: rw-lock */ { - return(lock->writer_count); + lint lock_copy = lock->lock_word; + /* If there is a reader, lock_word is not divisible by X_LOCK_DECR */ + if(lock_copy > 0 || (-lock_copy) % X_LOCK_DECR != 0) { + return 0; + } + return ((-lock_copy) / X_LOCK_DECR) + 1; +} + +/********************************************************************** +Two different implementations for decrementing the lock_word of a rw_lock: +one for systems supporting atomic operations, one for others. This does +does not support recusive x-locks: they should be handled by the caller and +need not be atomic since they are performed by the current lock holder. +Returns true if the decrement was made, false if not. */ +UNIV_INLINE +ibool +rw_lock_lock_word_decr( + /* out: TRUE if decr occurs */ + rw_lock_t* lock, /* in: rw-lock */ + ulint amount) /* in: amount of decrement */ +{ + +#ifdef HAVE_ATOMIC_BUILTINS + + lint local_lock_word = lock->lock_word; + while (local_lock_word > 0) { + if(os_compare_and_swap(&(lock->lock_word), + local_lock_word, + local_lock_word - amount)) { + return TRUE; + } + local_lock_word = lock->lock_word; + } + return(FALSE); + +#else /* HAVE_ATOMIC_BUILTINS */ + + ibool success = FALSE; + mutex_enter(&(lock->mutex)); + if(lock->lock_word > 0) { + lock->lock_word -= amount; + success = TRUE; + } + mutex_exit(&(lock->mutex)); + return success; + +#endif /* HAVE_ATOMIC_BUILTINS */ + +} + +/********************************************************************** +Two different implementations for incrementing the lock_word of a rw_lock: +one for systems supporting atomic operations, one for others. +Returns the value of lock_word after increment. */ +UNIV_INLINE +lint +rw_lock_lock_word_incr( + /* out: lock->lock_word after increment */ + rw_lock_t* lock, /* in: rw-lock */ + ulint amount) /* in: amount of increment */ +{ + +#ifdef HAVE_ATOMIC_BUILTINS + + return(os_atomic_increment(&(lock->lock_word), amount)); + +#else /* HAVE_ATOMIC_BUILTINS */ + + lint local_lock_word; + + mutex_enter(&(lock->mutex)); + + lock->lock_word += amount; + local_lock_word = lock->lock_word; + + mutex_exit(&(lock->mutex)); + + return local_lock_word; + +#endif /* HAVE_ATOMIC_BUILTINS */ + } /********************************************************************** @@ -133,26 +222,21 @@ const char* file_name, /* in: file name where lock requested */ ulint line) /* in: line where requested */ { -#ifdef UNIV_SYNC_DEBUG - ut_ad(mutex_own(rw_lock_get_mutex(lock))); -#endif /* UNIV_SYNC_DEBUG */ - /* Check if the writer field is free */ - - if (UNIV_LIKELY(lock->writer == RW_LOCK_NOT_LOCKED)) { - /* Set the shared lock by incrementing the reader count */ - lock->reader_count++; + /* TODO: study performance of UNIV_LIKELY branch prediction hints. */ + if (!rw_lock_lock_word_decr(lock, 1)) { + /* Locking did not succeed */ + return(FALSE); + } #ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name, - line); + rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name, line); #endif - lock->last_s_file_name = file_name; - lock->last_s_line = line; - - return(TRUE); /* locking succeeded */ - } + /* These debugging values are not set safely: they may be incorrect + or even refer to a line that is invalid for the file name. */ + lock->last_s_file_name = file_name; + lock->last_s_line = line; - return(FALSE); /* locking did not succeed */ + return(TRUE); /* locking succeeded */ } /********************************************************************** @@ -167,11 +251,10 @@ const char* file_name, /* in: file name where requested */ ulint line) /* in: line where lock requested */ { - ut_ad(lock->writer == RW_LOCK_NOT_LOCKED); - ut_ad(rw_lock_get_reader_count(lock) == 0); - - /* Set the shared lock by incrementing the reader count */ - lock->reader_count++; + ut_ad(lock->lock_word == X_LOCK_DECR); + + /* Indicate there is a new reader by decrementing lock_word */ + lock->lock_word--; lock->last_s_file_name = file_name; lock->last_s_line = line; @@ -194,14 +277,13 @@ ulint line) /* in: line where lock requested */ { ut_ad(rw_lock_validate(lock)); - ut_ad(rw_lock_get_reader_count(lock) == 0); - ut_ad(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED); + ut_ad(lock->lock_word == X_LOCK_DECR); + + lock->lock_word -= X_LOCK_DECR; - rw_lock_set_writer(lock, RW_LOCK_EX); lock->writer_thread = os_thread_get_curr_id(); - lock->writer_count++; lock->pass = 0; - + lock->last_x_file_name = file_name; lock->last_x_line = line; @@ -241,15 +323,12 @@ ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED)); /* see NOTE above */ #endif /* UNIV_SYNC_DEBUG */ - mutex_enter(rw_lock_get_mutex(lock)); - - if (UNIV_LIKELY(rw_lock_s_lock_low(lock, pass, file_name, line))) { - mutex_exit(rw_lock_get_mutex(lock)); + /* TODO: study performance of UNIV_LIKELY branch prediction hints. */ + if (rw_lock_s_lock_low(lock, pass, file_name, line)) { return; /* Success */ } else { /* Did not succeed, try spin wait */ - mutex_exit(rw_lock_get_mutex(lock)); rw_lock_s_lock_spin(lock, pass, file_name, line); @@ -259,86 +338,67 @@ /********************************************************************** NOTE! Use the corresponding macro, not directly this function! Lock an -rw-lock in shared mode for the current thread if the lock can be acquired -immediately. */ +rw-lock in exclusive mode for the current thread if the lock can be +obtained immediately. */ UNIV_INLINE ibool -rw_lock_s_lock_func_nowait( +rw_lock_x_lock_func_nowait( /*=======================*/ /* out: TRUE if success */ rw_lock_t* lock, /* in: pointer to rw-lock */ const char* file_name,/* in: file name where lock requested */ ulint line) /* in: line where requested */ { - ibool success = FALSE; - - mutex_enter(rw_lock_get_mutex(lock)); + os_thread_id_t curr_thread = os_thread_get_curr_id(); - if (lock->writer == RW_LOCK_NOT_LOCKED) { - /* Set the shared lock by incrementing the reader count */ - lock->reader_count++; + ibool success; -#ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, 0, RW_LOCK_SHARED, file_name, - line); -#endif +#ifdef HAVE_ATOMIC_BUILTINS + success = os_compare_and_swap(&(lock->lock_word), X_LOCK_DECR, 0); +#else - lock->last_s_file_name = file_name; - lock->last_s_line = line; - + success = FALSE; + mutex_enter(&(lock->mutex)); + if(lock->lock_word == X_LOCK_DECR) { + lock->lock_word = 0; success = TRUE; } + mutex_exit(&(lock->mutex)); - mutex_exit(rw_lock_get_mutex(lock)); - - return(success); -} - -/********************************************************************** -NOTE! Use the corresponding macro, not directly this function! Lock an -rw-lock in exclusive mode for the current thread if the lock can be -obtained immediately. */ -UNIV_INLINE -ibool -rw_lock_x_lock_func_nowait( -/*=======================*/ - /* out: TRUE if success */ - rw_lock_t* lock, /* in: pointer to rw-lock */ - const char* file_name,/* in: file name where lock requested */ - ulint line) /* in: line where requested */ -{ - ibool success = FALSE; - os_thread_id_t curr_thread = os_thread_get_curr_id(); - mutex_enter(rw_lock_get_mutex(lock)); - - if (UNIV_UNLIKELY(rw_lock_get_reader_count(lock) != 0)) { - } else if (UNIV_LIKELY(rw_lock_get_writer(lock) - == RW_LOCK_NOT_LOCKED)) { - rw_lock_set_writer(lock, RW_LOCK_EX); +#endif + if(success) { lock->writer_thread = curr_thread; lock->pass = 0; - relock: - lock->writer_count++; - -#ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line); -#endif - lock->last_x_file_name = file_name; - lock->last_x_line = line; + } else if (!(lock->pass) && + os_thread_eq(lock->writer_thread, curr_thread)) { + /* Must verify pass first: otherwise another thread can + call move_ownership suddenly allowing recursive locks. + and after we have verified our thread_id matches + (though move_ownership has since changed it).*/ + + /* Relock: this lock_word modification is safe since no other + threads can modify (lock, unlock, or reserve) lock_word while + there is an exclusive writer and this is the writer thread. */ + lock->lock_word -= X_LOCK_DECR; - success = TRUE; - } else if (rw_lock_get_writer(lock) == RW_LOCK_EX - && lock->pass == 0 - && os_thread_eq(lock->writer_thread, curr_thread)) { - goto relock; + ut_ad(((-lock->lock_word) % X_LOCK_DECR) == 0); + + } else { + /* Failure */ + return(FALSE); } - mutex_exit(rw_lock_get_mutex(lock)); +#ifdef UNIV_SYNC_DEBUG + rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line); +#endif + + lock->last_x_file_name = file_name; + lock->last_x_line = line; ut_ad(rw_lock_validate(lock)); - return(success); + return(TRUE); } /********************************************************************** @@ -354,38 +414,19 @@ #endif ) { - mutex_t* mutex = &(lock->mutex); - ibool sg = FALSE; - - /* Acquire the mutex protecting the rw-lock fields */ - mutex_enter(mutex); - - /* Reset the shared lock by decrementing the reader count */ - - ut_a(lock->reader_count > 0); - lock->reader_count--; + ut_ad((lock->lock_word % X_LOCK_DECR) != 0); #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, pass, RW_LOCK_SHARED); #endif - - /* If there may be waiters and this was the last s-lock, - signal the object */ - if (UNIV_UNLIKELY(lock->waiters) - && lock->reader_count == 0) { - sg = TRUE; - - rw_lock_set_waiters(lock, 0); - } - - mutex_exit(mutex); + /* Increment lock_word to indicate 1 less reader */ + if(rw_lock_lock_word_incr(lock, 1) == 0) { - if (UNIV_UNLIKELY(sg)) { -#ifdef __WIN__ + /* wait_ex waiter exists. It may not be asleep, but we signal + anyway. We do not wake other waiters, because they can't + exist without wait_ex waiter and wait_ex waiter goes first.*/ os_event_set(lock->wait_ex_event); -#endif - os_event_set(lock->event); sync_array_object_signalled(sync_primary_wait_array); } @@ -405,15 +446,13 @@ /*====================*/ rw_lock_t* lock) /* in: rw-lock */ { - /* Reset the shared lock by decrementing the reader count */ - - ut_ad(lock->reader_count > 0); - - lock->reader_count--; + ut_ad(lock->lock_word < X_LOCK_DECR); #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, 0, RW_LOCK_SHARED); #endif + /* Decrease reader count by incrementing lock_word */ + lock->lock_word++; ut_ad(!lock->waiters); ut_ad(rw_lock_validate(lock)); @@ -435,42 +474,32 @@ #endif ) { - ibool sg = FALSE; - - /* Acquire the mutex protecting the rw-lock fields */ - mutex_enter(&(lock->mutex)); - - /* Reset the exclusive lock if this thread no longer has an x-mode - lock */ - - ut_ad(lock->writer_count > 0); + ut_ad((lock->lock_word % X_LOCK_DECR) == 0); - lock->writer_count--; - - if (lock->writer_count == 0) { - rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED); - } + /* Must reset writer_thread while we still have the lock. + If we are not the last unlocker, we correct it later in the function, + which is harmless since we still hold the lock. */ + /* TODO: are there any risks of a thread id == -1 on any platform? */ + os_thread_id_t local_writer_thread = lock->writer_thread; + lock->writer_thread = -1; #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, pass, RW_LOCK_EX); #endif - - /* If there may be waiters, signal the lock */ - if (UNIV_UNLIKELY(lock->waiters) - && lock->writer_count == 0) { - sg = TRUE; - rw_lock_set_waiters(lock, 0); - } - - mutex_exit(&(lock->mutex)); + if(rw_lock_lock_word_incr(lock, X_LOCK_DECR) == X_LOCK_DECR) { + /* Lock is now free. May have to signal read/write waiters. + We do not need to signal wait_ex waiters, since they cannot + exist when there is a writer. */ + if(lock->waiters) { + rw_lock_set_waiters(lock, 0); + os_event_set(lock->event); + sync_array_object_signalled(sync_primary_wait_array); + } - if (UNIV_UNLIKELY(sg)) { -#ifdef __WIN__ - os_event_set(lock->wait_ex_event); -#endif - os_event_set(lock->event); - sync_array_object_signalled(sync_primary_wait_array); + } else { + /* We still hold x-lock, so we correct writer_thread. */ + lock->writer_thread = local_writer_thread; } ut_ad(rw_lock_validate(lock)); @@ -492,18 +521,14 @@ /* Reset the exclusive lock if this thread no longer has an x-mode lock */ - ut_ad(lock->writer_count > 0); - - lock->writer_count--; - - if (lock->writer_count == 0) { - rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED); - } + ut_ad((lock->lock_word % X_LOCK_DECR) == 0); #ifdef UNIV_SYNC_DEBUG rw_lock_remove_debug_info(lock, 0, RW_LOCK_EX); #endif + lock->lock_word += X_LOCK_DECR; + ut_ad(!lock->waiters); ut_ad(rw_lock_validate(lock)); diff --recursive -u base5067/innobase/include/sync0sync.h m5067/innobase/include/sync0sync.h --- base5067/innobase/include/sync0sync.h 2008-08-04 05:19:15.000000000 -0700 +++ m5067/innobase/include/sync0sync.h 2008-09-08 06:53:33.000000000 -0700 @@ -247,7 +247,7 @@ NOT to be used outside this module except in debugging! Gets the value of the lock word. */ UNIV_INLINE -ulint +byte mutex_get_lock_word( /*================*/ mutex_t* mutex); /* in: mutex */ @@ -460,9 +460,12 @@ struct mutex_struct { os_event_t event; /* Used by sync0arr.c for the wait queue */ - ulint lock_word; /* This ulint is the target of the atomic - test-and-set instruction in Win32 */ -#if !defined(_WIN32) || !defined(UNIV_CAN_USE_X86_ASSEMBLER) + byte lock_word; /* This byte is the target of the atomic + test-and-set instruction in Win32 and + x86 32/64 with GCC 4.1.0 or later version */ +#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER) +#elif defined(HAVE_ATOMIC_BUILTINS) +#else os_fast_mutex_t os_fast_mutex; /* In other systems we use this OS mutex in place of lock_word */ diff --recursive -u base5067/innobase/include/sync0sync.ic m5067/innobase/include/sync0sync.ic --- base5067/innobase/include/sync0sync.ic 2008-08-04 05:19:15.000000000 -0700 +++ m5067/innobase/include/sync0sync.ic 2008-09-08 06:54:12.000000000 -0700 @@ -58,7 +58,7 @@ Performs an atomic test-and-set instruction to the lock_word field of a mutex. */ UNIV_INLINE -ulint +byte mutex_test_and_set( /*===============*/ /* out: the previous value of lock_word: 0 or @@ -66,18 +66,18 @@ mutex_t* mutex) /* in: mutex */ { #if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER) - ulint res; - ulint* lw; /* assembler code is used to ensure that + byte res; + byte* lw; /* assembler code is used to ensure that lock_word is loaded from memory */ ut_ad(mutex); - ut_ad(sizeof(ulint) == 4); + ut_ad(sizeof(byte) == 1); lw = &(mutex->lock_word); __asm MOV ECX, lw __asm MOV EDX, 1 - __asm XCHG EDX, DWORD PTR [ECX] - __asm MOV res, EDX + __asm XCHG DL, BYTE PTR [ECX] + __asm MOV res, DL /* The fence below would prevent this thread from reading the data structure protected by the mutex before the test-and-set operation is @@ -100,6 +100,13 @@ TAS(&mutex->lock_word, 1, res); return(res); +#elif defined(HAVE_ATOMIC_BUILTINS) + /* GNUC 4.1.0 and later versions provide built-in functions + for atomic memory access. see online doc for details at + http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html + TODO: add wrapper for __sync_lock_test_and_set in os0sync.ic + */ + return __sync_lock_test_and_set(&(mutex->lock_word), 1); #else ibool ret; @@ -113,7 +120,7 @@ mutex->lock_word = 1; } - return(ret); + return((byte)ret); #endif } @@ -127,7 +134,7 @@ mutex_t* mutex) /* in: mutex */ { #if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER) - ulint* lw; /* assembler code is used to ensure that + byte* lw; /* assembler code is used to ensure that lock_word is loaded from memory */ ut_ad(mutex); @@ -135,11 +142,23 @@ __asm MOV EDX, 0 __asm MOV ECX, lw - __asm XCHG EDX, DWORD PTR [ECX] + __asm XCHG DL, BYTE PTR [ECX] #elif defined(not_defined) && defined(__GNUC__) && defined(UNIV_INTEL_X86) ulint res; TAS(&mutex->lock_word, 0, res); + +#elif defined(HAVE_ATOMIC_BUILTINS) + /* GNUC 4.1.0 and later versions provide built-in functions + for atomic memory access. see online doc for details at + http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html + */ + /* __sync_lock_release(&(mutex->lock_word)); */ + /* TODO: we should try to get __sync_lock_release work properly */ + /* In theory __sync_lock_release should be used to release the lock. + Unfortunately, it does not work properly alone. The workaround is + that more conservative __sync_lock_test_and_set is used instead. */ + __sync_lock_test_and_set(&(mutex->lock_word), 0); #else mutex->lock_word = 0; @@ -150,12 +169,12 @@ /********************************************************************** Gets the value of the lock word. */ UNIV_INLINE -ulint +byte mutex_get_lock_word( /*================*/ mutex_t* mutex) /* in: mutex */ { -volatile ulint* ptr; /* declared volatile to ensure that +volatile byte* ptr; /* declared volatile to ensure that lock_word is loaded from memory */ ut_ad(mutex); diff --recursive -u base5067/innobase/include/univ.i m5067/innobase/include/univ.i --- base5067/innobase/include/univ.i 2008-08-04 05:19:15.000000000 -0700 +++ m5067/innobase/include/univ.i 2008-09-08 06:53:12.000000000 -0700 @@ -166,6 +166,9 @@ /* Maximum number of parallel threads in a parallelized operation */ #define UNIV_MAX_PARALLELISM 32 +/* Disable the custom malloc pool, libc malloc is fine. */ +#define UNIV_DISABLE_MEM_POOL + /* UNIVERSAL TYPE DEFINITIONS ========================== diff --recursive -u base5067/innobase/mem/mem0pool.c m5067/innobase/mem/mem0pool.c --- base5067/innobase/mem/mem0pool.c 2008-08-04 05:19:16.000000000 -0700 +++ m5067/innobase/mem/mem0pool.c 2008-09-08 06:53:12.000000000 -0700 @@ -326,6 +326,9 @@ minus MEM_AREA_EXTRA_SIZE */ mem_pool_t* pool) /* in: memory pool */ { +#ifdef UNIV_DISABLE_MEM_POOL + return malloc(size); +#else /* UNIV_DISABLE_MEM_POOL */ mem_area_t* area; ulint n; ibool ret; @@ -399,6 +402,7 @@ ut_ad(mem_pool_validate(pool)); return((void*)(MEM_AREA_EXTRA_SIZE + ((byte*)area))); +#endif /* UNIV_DISABLE_MEM_POOL */ } /************************************************************************ @@ -451,6 +455,9 @@ buffer */ mem_pool_t* pool) /* in: memory pool */ { +#ifdef UNIV_DISABLE_MEM_POOL + free(ptr); +#else /* UNIV_DISABLE_MEM_POOL */ mem_area_t* area; mem_area_t* buddy; void* new_ptr; @@ -557,6 +564,7 @@ mutex_exit(&(pool->mutex)); ut_ad(mem_pool_validate(pool)); +#endif /* UNIV_DISABLE_MEM_POOL */ } /************************************************************************ diff --recursive -u base5067/innobase/row/row0sel.c m5067/innobase/row/row0sel.c --- base5067/innobase/row/row0sel.c 2008-08-04 05:19:17.000000000 -0700 +++ m5067/innobase/row/row0sel.c 2008-09-08 06:53:12.000000000 -0700 @@ -1178,7 +1178,7 @@ rw_lock_s_lock(&btr_search_latch); search_latch_locked = TRUE; - } else if (btr_search_latch.writer_is_wait_ex) { + } else if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_WAIT_EX) { /* There is an x-latch request waiting: release the s-latch for a moment; as an s-latch here is often @@ -3156,7 +3156,7 @@ /* PHASE 0: Release a possible s-latch we are holding on the adaptive hash index latch if there is someone waiting behind */ - if (UNIV_UNLIKELY(btr_search_latch.writer != RW_LOCK_NOT_LOCKED) + if (UNIV_UNLIKELY(rw_lock_get_writer(&btr_search_latch) != RW_LOCK_NOT_LOCKED) && trx->has_search_latch) { /* There is an x-latch request on the adaptive hash index: diff --recursive -u base5067/innobase/srv/srv0srv.c m5067/innobase/srv/srv0srv.c --- base5067/innobase/srv/srv0srv.c 2008-08-04 05:19:17.000000000 -0700 +++ m5067/innobase/srv/srv0srv.c 2008-09-08 06:53:12.000000000 -0700 @@ -1807,6 +1807,16 @@ export_vars.innodb_buffer_pool_pages_total= buf_pool->curr_size; export_vars.innodb_buffer_pool_pages_misc= buf_pool->max_size - UT_LIST_GET_LEN(buf_pool->LRU) - UT_LIST_GET_LEN(buf_pool->free); +#ifdef HAVE_ATOMIC_BUILTINS + export_vars.innodb_have_atomic_builtins = 1; +#else + export_vars.innodb_have_atomic_builtins = 0; +#endif +#ifdef UNIV_DISABLE_MEM_POOL + export_vars.innodb_heap_enabled = 0; +#else + export_vars.innodb_heap_enabled = 1; +#endif export_vars.innodb_page_size= UNIV_PAGE_SIZE; export_vars.innodb_log_waits= srv_log_waits; export_vars.innodb_os_log_written= srv_os_log_written; diff --recursive -u base5067/innobase/srv/srv0start.c m5067/innobase/srv/srv0start.c --- base5067/innobase/srv/srv0start.c 2008-08-04 05:19:17.000000000 -0700 +++ m5067/innobase/srv/srv0start.c 2008-09-08 06:53:12.000000000 -0700 @@ -1030,6 +1030,17 @@ fprintf(stderr, "InnoDB: !!!!!!!!!!!!!! UNIV_SIMULATE_AWE switched on !!!!!!!!!!!!!!!!!\n"); #endif + +#ifdef UNIV_DISABLE_MEM_POOL + fprintf(stderr, + "InnoDB: The InnoDB memory heap has been disabled.\n"); +#endif + +#ifdef HAVE_ATOMIC_BUILTINS + fprintf(stderr, + "InnoDB: Mutex and rw_lock use GCC atomic builtins.\n"); +#endif + if (srv_sizeof_trx_t_in_ha_innodb_cc != (ulint)sizeof(trx_t)) { fprintf(stderr, "InnoDB: Error: trx_t size is %lu in ha_innodb.cc but %lu in srv0start.c\n" diff --recursive -u base5067/innobase/sync/sync0arr.c m5067/innobase/sync/sync0arr.c --- base5067/innobase/sync/sync0arr.c 2008-08-04 05:19:17.000000000 -0700 +++ m5067/innobase/sync/sync0arr.c 2008-09-08 06:55:36.000000000 -0700 @@ -297,27 +297,24 @@ } /*********************************************************************** -Puts the cell event in reset state. */ -static -ib_longlong -sync_cell_event_reset( +Returns the event that the thread owning the cell waits for. */ + +UNIV_INLINE +os_event_t +sync_cell_get_event( /*==================*/ - /* out: value of signal_count - at the time of reset. */ - ulint type, /* in: lock type mutex/rw_lock */ - void* object) /* in: the rw_lock/mutex object */ + /* out: event for which wait is done */ + sync_cell_t* cell) /* in: non-empty sync array cell */ { + ulint type = cell->request_type; if (type == SYNC_MUTEX) { - return(os_event_reset(((mutex_t *) object)->event)); -#ifdef __WIN__ + return ((mutex_t*) cell->wait_object)->event; } else if (type == RW_LOCK_WAIT_EX) { - return(os_event_reset( - ((rw_lock_t *) object)->wait_ex_event)); -#endif + return ((rw_lock_t*) cell->wait_object)->wait_ex_event; } else { - return(os_event_reset(((rw_lock_t *) object)->event)); + return ((rw_lock_t*) cell->wait_object)->event; } -} +} /********************************************************************** Reserves a wait array cell for waiting for an object. @@ -334,6 +331,7 @@ ulint* index) /* out: index of the reserved cell */ { sync_cell_t* cell; + os_event_t event; ulint i; ut_a(object); @@ -372,8 +370,8 @@ /* Make sure the event is reset and also store the value of signal_count at which the event was reset. */ - cell->signal_count = sync_cell_event_reset(type, - object); + event = sync_cell_get_event(cell); + cell->signal_count = os_event_reset(event); cell->reservation_time = time(NULL); @@ -413,18 +411,7 @@ ut_a(!cell->waiting); ut_ad(os_thread_get_curr_id() == cell->thread); - if (cell->request_type == SYNC_MUTEX) { - event = ((mutex_t*) cell->wait_object)->event; -#ifdef __WIN__ - /* On windows if the thread about to wait is the one which - has set the state of the rw_lock to RW_LOCK_WAIT_EX, then - it waits on a special event i.e.: wait_ex_event. */ - } else if (cell->request_type == RW_LOCK_WAIT_EX) { - event = ((rw_lock_t*) cell->wait_object)->wait_ex_event; -#endif - } else { - event = ((rw_lock_t*) cell->wait_object)->event; - } + event = sync_cell_get_event(cell); cell->waiting = TRUE; @@ -464,6 +451,7 @@ mutex_t* mutex; rw_lock_t* rwlock; ulint type; + ulint writer; type = cell->request_type; @@ -492,9 +480,7 @@ (ulong) mutex->waiters); } else if (type == RW_LOCK_EX -#ifdef __WIN__ || type == RW_LOCK_WAIT_EX -#endif || type == RW_LOCK_SHARED) { fputs(type == RW_LOCK_EX ? "X-lock on" : "S-lock on", file); @@ -505,21 +491,24 @@ " RW-latch at %p created in file %s line %lu\n", rwlock, rwlock->cfile_name, (ulong) rwlock->cline); - if (rwlock->writer != RW_LOCK_NOT_LOCKED) { + writer = rw_lock_get_writer(rwlock); + if (writer != RW_LOCK_NOT_LOCKED) { fprintf(file, "a writer (thread id %lu) has reserved it in mode %s", (ulong) os_thread_pf(rwlock->writer_thread), - rwlock->writer == RW_LOCK_EX + writer == RW_LOCK_EX ? " exclusive\n" : " wait exclusive\n"); } fprintf(file, - "number of readers %lu, waiters flag %lu\n" + "number of readers %lu, waiters flag %lu, " + "lock_word: %ld\n" "Last time read locked in file %s line %lu\n" "Last time write locked in file %s line %lu\n", - (ulong) rwlock->reader_count, + (ulong) rw_lock_get_reader_count(rwlock), (ulong) rwlock->waiters, + rwlock->lock_word, rwlock->last_s_file_name, (ulong) rwlock->last_s_line, rwlock->last_x_file_name, @@ -773,20 +762,24 @@ return(TRUE); } - } else if (cell->request_type == RW_LOCK_EX - || cell->request_type == RW_LOCK_WAIT_EX) { + } else if (cell->request_type == RW_LOCK_EX) { lock = cell->wait_object; - if (rw_lock_get_reader_count(lock) == 0 - && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) { + /* X_LOCK_DECR is the unlocked state */ + if (lock->lock_word == X_LOCK_DECR) { return(TRUE); } - if (rw_lock_get_reader_count(lock) == 0 - && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX - && os_thread_eq(lock->writer_thread, cell->thread)) { + } else if (cell->request_type == RW_LOCK_WAIT_EX) { + + lock = cell->wait_object; + + ut_ad(lock->lock_word <= 0); + + /* lock_word == 0 means all readers have left */ + if (lock->lock_word == 0) { return(TRUE); } @@ -794,8 +787,9 @@ } else if (cell->request_type == RW_LOCK_SHARED) { lock = cell->wait_object; - if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) { - + /* lock_word > 0 means no writer or reserved writer */ + if (lock->lock_word > 0) { + return(TRUE); } } @@ -839,11 +833,15 @@ /*========================*/ sync_array_t* arr) /* in: wait array */ { +#ifdef HAVE_ATOMIC_BUILTINS + __sync_fetch_and_add(&(arr->sg_count),1); +#else sync_array_enter(arr); arr->sg_count++; sync_array_exit(arr); +#endif } /************************************************************************** @@ -859,6 +857,7 @@ sync_cell_t* cell; ulint count; ulint i; + os_event_t event; sync_array_enter(arr); @@ -868,36 +867,19 @@ while (count < arr->n_reserved) { cell = sync_array_get_nth_cell(arr, i); + i++; - if (cell->wait_object != NULL) { - - count++; - - if (sync_arr_cell_can_wake_up(cell)) { - - if (cell->request_type == SYNC_MUTEX) { - mutex_t* mutex; + if (cell->wait_object == NULL) { + continue; + } - mutex = cell->wait_object; - os_event_set(mutex->event); -#ifdef __WIN__ - } else if (cell->request_type - == RW_LOCK_WAIT_EX) { - rw_lock_t* lock; + count++; - lock = cell->wait_object; - os_event_set(lock->wait_ex_event); -#endif - } else { - rw_lock_t* lock; + if (sync_arr_cell_can_wake_up(cell)) { - lock = cell->wait_object; - os_event_set(lock->event); - } - } + event = sync_cell_get_event(cell); + os_event_set(event); } - - i++; } sync_array_exit(arr); @@ -1014,4 +996,3 @@ sync_array_exit(arr); } - diff --recursive -u base5067/innobase/sync/sync0rw.c m5067/innobase/sync/sync0rw.c --- base5067/innobase/sync/sync0rw.c 2008-08-04 05:19:17.000000000 -0700 +++ m5067/innobase/sync/sync0rw.c 2008-09-08 06:55:12.000000000 -0700 @@ -15,17 +15,96 @@ #include "mem0mem.h" #include "srv0srv.h" -ulint rw_s_system_call_count = 0; -ulint rw_s_spin_wait_count = 0; -ulint rw_s_os_wait_count = 0; +/* + IMPLEMENTATION OF THE RW_LOCK + ============================= +The status of a rw_lock is held in lock_word. The initial value of lock_word is +X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR +for each x-lock. This describes the lock state for each value of lock_word: + +lock_word == X_LOCK_DECR: Unlocked. +0 < lock_word < X_LOCK_DECR: Read locked, no waiting writers. + (X_LOCK_DECR - lock_word) is the + number of readers that hold the lock. +lock_word == 0: Write locked +-X_LOCK_DECR < lock_word < 0: Read locked, with a waiting writer. + (-lock_word) is the number of readers + that hold the lock. +lock_word <= -X_LOCK_DECR: Recursively write locked. lock_word has been + decremented by X_LOCK_DECR once for each lock, + so the number of locks is: + ((-lock_word) / X_LOCK_DECR) + 1 +When lock_word <= -X_LOCK_DECR, we also know that lock_word % X_LOCK_DECR == 0: +other values of lock_word are invalid. + +The lock_word is always read and updated atomically and consistently, so that +it always represents the state of the lock, and the state of the lock changes +with a single atomic operation. This lock_word holds all of the information +that a thread needs in order to determine if it is eligible to gain the lock +or if it must spin or sleep. The one exception to this is that writer_thread +must be verified before recursive write locks: to solve this scenario, we make +writer_thread readable by all threads, but only writeable by the x-lock holder. + +The other members of the lock obey the following rules to remain consistent: + +pass: This is only set to 1 to prevent recursive x-locks. It must + be set as specified by x_lock caller after the lock_word + indicates that the thread holds the lock, but before that + thread resumes execution. It must be reset to 0 during the + final x_unlock, but before the lock_word status is updated. + When an x_lock or move_ownership call wishes to change + pass, it must first update the writer_thread appropriately. +writer_thread: Must be set to the writers thread_id after the lock_word + indicates that the thread holds the lock, but before that + thread resumes execution. It must be reset to -1 during the + final x_unlock, but before the lock_word status is updated. + This ensures that when the lock_word indicates that an x_lock + is held, the only legitimate values for writer_thread are -1 + (x_lock function hasn't completed) or the writer's thread_id. +waiters: May be set to 1 anytime, but to avoid unnecessary wake-up + signals, it should only be set to 1 when there are threads + waiting on event. Must be 1 when a writer starts waiting to + ensure the current x-locking thread sends a wake-up signal + during unlock. May only be reset to 0 immediately before a + a wake-up signal is sent to event. +event: Threads wait on event for read or writer lock when another + thread has an x-lock or an x-lock reservation (wait_ex). A + thread may only wait on event after performing the following + actions in order: + (1) Record the counter value of event (with os_event_reset). + (2) Set waiters to 1. + (3) Verify lock_word <= 0. + (1) must come before (2) to ensure signal is not missed. + (2) must come before (3) to ensure a signal is sent. + These restrictions force the above ordering. + Immediately before sending the wake-up signal, we should: + (1) Verify lock_word == X_LOCK_DECR (unlocked) + (2) Reset waiters to 0. +wait_ex_event: A thread may only wait on the wait_ex_event after it has + performed the following actions in order: + (1) Decrement lock_word by X_LOCK_DECR. + (2) Record counter value of wait_ex_event (os_event_reset, + called from sync_array_reserve_cell). + (3) Verify that lock_word < 0. + (1) must come first to ensures no other threads become reader + or next writer, and notifies unlocker that signal must be sent. + (2) must come before (3) to ensure the signal is not missed. + These restrictions force the above ordering. + Immediately before sending the wake-up signal, we should: + Verify lock_word == 0 (waiting thread holds x_lock) +*/ + +ib_longlong rw_s_spin_wait_count = 0; +ib_longlong rw_s_spin_round_count = 0; +ib_longlong rw_s_os_wait_count = 0; + +ib_longlong rw_s_exit_count = 0; + +ib_longlong rw_x_spin_wait_count = 0; +ib_longlong rw_x_spin_round_count = 0; +ib_longlong rw_x_os_wait_count = 0; -ulint rw_s_exit_count = 0; - -ulint rw_x_system_call_count = 0; -ulint rw_x_spin_wait_count = 0; -ulint rw_x_os_wait_count = 0; - -ulint rw_x_exit_count = 0; +ib_longlong rw_x_exit_count = 0; /* The global list of rw-locks */ rw_lock_list_t rw_lock_list; @@ -99,6 +178,7 @@ object is created, then the following call initializes the sync system. */ +#ifndef HAVE_ATOMIC_BUILTINS mutex_create(rw_lock_get_mutex(lock)); mutex_set_level(rw_lock_get_mutex(lock), SYNC_NO_ORDER_CHECK); @@ -108,13 +188,12 @@ lock->mutex.cmutex_name = cmutex_name; lock->mutex.mutex_type = 1; #endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */ +#endif /* HAVE_ATOMIC_BUILTINS */ + lock->lock_word = X_LOCK_DECR; rw_lock_set_waiters(lock, 0); - rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED); - lock->writer_count = 0; - rw_lock_set_reader_count(lock, 0); - - lock->writer_is_wait_ex = FALSE; + lock->writer_thread = -1; + lock->pass = 0; #ifdef UNIV_SYNC_DEBUG UT_LIST_INIT(lock->debug_list); @@ -126,15 +205,13 @@ lock->cfile_name = cfile_name; lock->cline = cline; + lock->count_os_wait = 0; lock->last_s_file_name = "not yet reserved"; lock->last_x_file_name = "not yet reserved"; lock->last_s_line = 0; lock->last_x_line = 0; lock->event = os_event_create(NULL); - -#ifdef __WIN__ lock->wait_ex_event = os_event_create(NULL); -#endif mutex_enter(&rw_lock_list_mutex); @@ -161,20 +238,18 @@ #ifdef UNIV_DEBUG ut_a(rw_lock_validate(lock)); #endif /* UNIV_DEBUG */ - ut_a(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED); + ut_a(lock->lock_word == X_LOCK_DECR); ut_a(rw_lock_get_waiters(lock) == 0); - ut_a(rw_lock_get_reader_count(lock) == 0); - + lock->magic_n = 0; +#ifndef HAVE_ATOMIC_BUILTINS mutex_free(rw_lock_get_mutex(lock)); +#endif /* HAVE_ATOMIC_BUILTINS */ mutex_enter(&rw_lock_list_mutex); os_event_free(lock->event); - -#ifdef __WIN__ os_event_free(lock->wait_ex_event); -#endif if (UT_LIST_GET_PREV(list, lock)) { ut_a(UT_LIST_GET_PREV(list, lock)->magic_n == RW_LOCK_MAGIC_N); @@ -199,19 +274,12 @@ { ut_a(lock); - mutex_enter(rw_lock_get_mutex(lock)); + ulint waiters = rw_lock_get_waiters(lock); + lint lock_word = lock->lock_word; ut_a(lock->magic_n == RW_LOCK_MAGIC_N); - ut_a((rw_lock_get_reader_count(lock) == 0) - || (rw_lock_get_writer(lock) != RW_LOCK_EX)); - ut_a((rw_lock_get_writer(lock) == RW_LOCK_EX) - || (rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX) - || (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED)); - ut_a((rw_lock_get_waiters(lock) == 0) - || (rw_lock_get_waiters(lock) == 1)); - ut_a((lock->writer != RW_LOCK_EX) || (lock->writer_count > 0)); - - mutex_exit(rw_lock_get_mutex(lock)); + ut_a(waiters == 0 || waiters == 1); + ut_a(lock_word > -X_LOCK_DECR ||(-lock_word) % X_LOCK_DECR == 0); return(TRUE); } @@ -232,18 +300,16 @@ ulint line) /* in: line where requested */ { ulint index; /* index of the reserved wait cell */ - ulint i; /* spin round count */ + ulint i = 0; /* spin round count */ - ut_ad(rw_lock_validate(lock)); + ut_ad(rw_lock_validate(lock)); + rw_s_spin_wait_count++; /* Count calls to this function */ lock_loop: - rw_s_spin_wait_count++; - /* Spin waiting for the writer field to become free */ - i = 0; - while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED - && i < SYNC_SPIN_ROUNDS) { + /* Spin waiting for the writer field to become free */ + while (i < SYNC_SPIN_ROUNDS && lock->lock_word <= 0) { if (srv_spin_wait_delay) { ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); } @@ -262,28 +328,32 @@ lock->cfile_name, (ulong) lock->cline, (ulong) i); } - mutex_enter(rw_lock_get_mutex(lock)); - - /* We try once again to obtain the lock */ - + /* We try once again to obtain the lock */ if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) { - mutex_exit(rw_lock_get_mutex(lock)); + rw_s_spin_round_count += i; return; /* Success */ } else { - /* If we get here, locking did not succeed, we may - suspend the thread to wait in the wait array */ - rw_s_system_call_count++; + if (i < SYNC_SPIN_ROUNDS) { + goto lock_loop; + } - sync_array_reserve_cell(sync_primary_wait_array, - lock, RW_LOCK_SHARED, - file_name, line, - &index); + rw_s_spin_round_count += i; + sync_array_reserve_cell(sync_primary_wait_array, + lock, RW_LOCK_SHARED, + file_name, line, + &index); + + /* Set waiters before checking lock_word to ensure wake-up + signal is sent. This may lead to some unnecessary signals. */ rw_lock_set_waiters(lock, 1); - mutex_exit(rw_lock_get_mutex(lock)); + if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) { + sync_array_free_cell(sync_primary_wait_array, index); + return; /* Success */ + } if (srv_print_latch_waits) { fprintf(stderr, @@ -292,13 +362,16 @@ lock, lock->cfile_name, (ulong) lock->cline); } - rw_s_system_call_count++; + /* these stats may not be accurate */ + lock->count_os_wait++; rw_s_os_wait_count++; - sync_array_wait_event(sync_primary_wait_array, index); + /* Wait for wake-up signal before resuming lock loop. */ + sync_array_wait_event(sync_primary_wait_array, index); - goto lock_loop; - } + i = 0; + goto lock_loop; + } } /********************************************************************** @@ -318,114 +391,151 @@ { ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX)); +#ifdef HAVE_ATOMIC_BUILTINS + os_thread_id_t local_writer_thread = lock->writer_thread; + os_thread_id_t new_writer_thread = os_thread_get_curr_id(); + while (TRUE) { + if (local_writer_thread != -1) { + if(os_compare_and_swap( + &(lock->writer_thread), + local_writer_thread, + new_writer_thread)) { + break; + } + } + local_writer_thread = lock->writer_thread; + } + lock->pass = 0; +#else /* HAVE_ATOMIC_BUILTINS */ mutex_enter(&(lock->mutex)); - lock->writer_thread = os_thread_get_curr_id(); - lock->pass = 0; - mutex_exit(&(lock->mutex)); +#endif /* HAVE_ATOMIC_BUILTINS */ } /********************************************************************** -Low-level function for acquiring an exclusive lock. */ +Function for the next writer to call. Waits for readers to exit. +The caller must have already decremented lock_word by X_LOCK_DECR.*/ UNIV_INLINE -ulint -rw_lock_x_lock_low( -/*===============*/ - /* out: RW_LOCK_NOT_LOCKED if did - not succeed, RW_LOCK_EX if success, - RW_LOCK_WAIT_EX, if got wait reservation */ - rw_lock_t* lock, /* in: pointer to rw-lock */ +void +rw_lock_x_lock_wait( +/*================*/ + rw_lock_t* lock, /* in: pointer to rw-lock */ +#ifdef UNIV_SYNC_DEBUG ulint pass, /* in: pass value; != 0, if the lock will be passed to another thread to unlock */ +#endif const char* file_name,/* in: file name where lock requested */ ulint line) /* in: line where requested */ { -#ifdef UNIV_SYNC_DEBUG - ut_ad(mutex_own(rw_lock_get_mutex(lock))); -#endif /* UNIV_SYNC_DEBUG */ - if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) { + ulint index; + ulint i = 0; - if (rw_lock_get_reader_count(lock) == 0) { - - rw_lock_set_writer(lock, RW_LOCK_EX); - lock->writer_thread = os_thread_get_curr_id(); - lock->writer_count++; - lock->pass = pass; - -#ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, - file_name, line); -#endif - lock->last_x_file_name = file_name; - lock->last_x_line = line; - - /* Locking succeeded, we may return */ - return(RW_LOCK_EX); - } else { - /* There are readers, we have to wait */ - rw_lock_set_writer(lock, RW_LOCK_WAIT_EX); - lock->writer_thread = os_thread_get_curr_id(); - lock->pass = pass; - lock->writer_is_wait_ex = TRUE; + ut_ad(lock->lock_word <= 0); + + while (lock->lock_word < 0) { + if (srv_spin_wait_delay) { + ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); + } + if(i < SYNC_SPIN_ROUNDS) { + i++; + continue; + } + /* If there is still a reader, then go to sleep.*/ + rw_x_spin_round_count += i; + i = 0; + sync_array_reserve_cell(sync_primary_wait_array, + lock, + RW_LOCK_WAIT_EX, + file_name, line, + &index); + /* Check lock_word to ensure wake-up isn't missed.*/ + if(lock->lock_word < 0) { + + /* these stats may not be accurate */ + lock->count_os_wait++; + rw_x_os_wait_count++; + + /* Add debug info as it is needed to detect possible + deadlock. We must add info for WAIT_EX thread for + deadlock detection to work properly. */ #ifdef UNIV_SYNC_DEBUG rw_lock_add_debug_info(lock, pass, RW_LOCK_WAIT_EX, - file_name, line); + file_name, line); #endif - return(RW_LOCK_WAIT_EX); + sync_array_wait_event(sync_primary_wait_array, index); +#ifdef UNIV_SYNC_DEBUG + rw_lock_remove_debug_info(lock, pass, + RW_LOCK_WAIT_EX); +#endif + /* It is possible to wake when lock_word < 0. + We must pass the while-loop check to proceed.*/ + } else { + sync_array_free_cell(sync_primary_wait_array, + index); } + } + rw_x_spin_round_count += i; +} - } else if ((rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX) - && os_thread_eq(lock->writer_thread, - os_thread_get_curr_id())) { - - if (rw_lock_get_reader_count(lock) == 0) { +/********************************************************************** +Low-level function for acquiring an exclusive lock. */ +UNIV_INLINE +ibool +rw_lock_x_lock_low( +/*===============*/ + /* out: RW_LOCK_NOT_LOCKED if did + not succeed, RW_LOCK_EX if success. */ + rw_lock_t* lock, /* in: pointer to rw-lock */ + ulint pass, /* in: pass value; != 0, if the lock will + be passed to another thread to unlock */ + const char* file_name,/* in: file name where lock requested */ + ulint line) /* in: line where requested */ +{ + os_thread_id_t curr_thread = os_thread_get_curr_id(); + ut_ad(curr_thread != -1); /* We use -1 as the unlocked value. */ - rw_lock_set_writer(lock, RW_LOCK_EX); - lock->writer_count++; - lock->pass = pass; - lock->writer_is_wait_ex = FALSE; + if(rw_lock_lock_word_decr(lock, X_LOCK_DECR)) { + ut_ad(lock->writer_thread == -1); + /* Decrement occurred: we are writer or next-writer. */ + lock->writer_thread = curr_thread; + lock->pass = pass; + rw_lock_x_lock_wait(lock, #ifdef UNIV_SYNC_DEBUG - rw_lock_remove_debug_info(lock, pass, RW_LOCK_WAIT_EX); - rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, - file_name, line); + pass, #endif - - lock->last_x_file_name = file_name; - lock->last_x_line = line; - - /* Locking succeeded, we may return */ - return(RW_LOCK_EX); - } + file_name, line); - return(RW_LOCK_WAIT_EX); - - } else if ((rw_lock_get_writer(lock) == RW_LOCK_EX) - && os_thread_eq(lock->writer_thread, - os_thread_get_curr_id()) - && (lock->pass == 0) - && (pass == 0)) { + } else { - lock->writer_count++; + /* Decrement failed: relock or failed lock */ + /* Must verify pass first: otherwise another thread can + call move_ownership suddenly allowing recursive locks. + and after we have verified our thread_id matches + (though move_ownership has since changed it).*/ + if(!pass && !(lock->pass) && + os_thread_eq(lock->writer_thread, curr_thread)) { + /* Relock */ + lock->lock_word -= X_LOCK_DECR; + } else { + /* Another thread locked before us */ + return(FALSE); + } + } #ifdef UNIV_SYNC_DEBUG - rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, file_name, - line); -#endif - - lock->last_x_file_name = file_name; - lock->last_x_line = line; + rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, + file_name, line); +#endif /* UNIV_SYNC_DEBUG */ - /* Locking succeeded, we may return */ - return(RW_LOCK_EX); - } + lock->last_x_file_name = file_name; + lock->last_x_line = line; - /* Locking did not succeed */ - return(RW_LOCK_NOT_LOCKED); + return(TRUE); } /********************************************************************** @@ -448,30 +558,30 @@ ulint line) /* in: line where requested */ { ulint index; /* index of the reserved wait cell */ - ulint state; /* lock state acquired */ ulint i; /* spin round count */ - + ibool spinning = FALSE; + ut_ad(rw_lock_validate(lock)); + i = 0; + lock_loop: - /* Acquire the mutex protecting the rw-lock fields */ - mutex_enter_fast(&(lock->mutex)); - state = rw_lock_x_lock_low(lock, pass, file_name, line); - - mutex_exit(&(lock->mutex)); - - if (state == RW_LOCK_EX) { + if (rw_lock_x_lock_low(lock, pass, file_name, line)) { + rw_x_spin_round_count += i; return; /* Locking succeeded */ - } else if (state == RW_LOCK_NOT_LOCKED) { - - /* Spin waiting for the writer field to become free */ - i = 0; + } else { - while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED - && i < SYNC_SPIN_ROUNDS) { + if (!spinning) { + spinning = TRUE; + rw_x_spin_wait_count++; + } + + /* Spin waiting for the lock_word to become free */ + while (i < SYNC_SPIN_ROUNDS + && lock->lock_word <= 0) { if (srv_spin_wait_delay) { ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); @@ -481,28 +591,12 @@ } if (i == SYNC_SPIN_ROUNDS) { os_thread_yield(); + } else { + goto lock_loop; } - } else if (state == RW_LOCK_WAIT_EX) { - - /* Spin waiting for the reader count field to become zero */ - i = 0; + } - while (rw_lock_get_reader_count(lock) != 0 - && i < SYNC_SPIN_ROUNDS) { - if (srv_spin_wait_delay) { - ut_delay(ut_rnd_interval(0, - srv_spin_wait_delay)); - } - - i++; - } - if (i == SYNC_SPIN_ROUNDS) { - os_thread_yield(); - } - } else { - i = 0; /* Eliminate a compiler warning */ - ut_error; - } + rw_x_spin_round_count += i; if (srv_print_latch_waits) { fprintf(stderr, @@ -511,39 +605,20 @@ lock->cfile_name, (ulong) lock->cline, (ulong) i); } - rw_x_spin_wait_count++; - - /* We try once again to obtain the lock. Acquire the mutex protecting - the rw-lock fields */ - - mutex_enter(rw_lock_get_mutex(lock)); - - state = rw_lock_x_lock_low(lock, pass, file_name, line); - - if (state == RW_LOCK_EX) { - mutex_exit(rw_lock_get_mutex(lock)); - - return; /* Locking succeeded */ - } - - rw_x_system_call_count++; - - sync_array_reserve_cell(sync_primary_wait_array, + sync_array_reserve_cell(sync_primary_wait_array, lock, -#ifdef __WIN__ - /* On windows RW_LOCK_WAIT_EX signifies - that this thread should wait on the - special wait_ex_event. */ - (state == RW_LOCK_WAIT_EX) - ? RW_LOCK_WAIT_EX : -#endif RW_LOCK_EX, file_name, line, &index); + /* Waiters must be set before checking lock_word, to ensure signal + is sent. This could lead to a few unnecessary wake-up signals. */ rw_lock_set_waiters(lock, 1); - mutex_exit(rw_lock_get_mutex(lock)); + if (rw_lock_x_lock_low(lock, pass, file_name, line)) { + sync_array_free_cell(sync_primary_wait_array, index); + return; /* Locking succeeded */ + } if (srv_print_latch_waits) { fprintf(stderr, @@ -552,12 +627,15 @@ lock->cfile_name, (ulong) lock->cline); } - rw_x_system_call_count++; + /* these stats may not be accurate */ + lock->count_os_wait++; rw_x_os_wait_count++; - sync_array_wait_event(sync_primary_wait_array, index); + /* Wait here for wake-up signal before resuming lock-attempt loop. */ + sync_array_wait_event(sync_primary_wait_array, index); - goto lock_loop; + i = 0; + goto lock_loop; } #ifdef UNIV_SYNC_DEBUG @@ -718,7 +796,7 @@ ut_ad(lock); ut_ad(rw_lock_validate(lock)); - mutex_enter(&(lock->mutex)); + rw_lock_debug_mutex_enter(); info = UT_LIST_GET_FIRST(lock->debug_list); @@ -728,7 +806,7 @@ && (info->pass == 0) && (info->lock_type == lock_type)) { - mutex_exit(&(lock->mutex)); + rw_lock_debug_mutex_exit(); /* Found! */ return(TRUE); @@ -736,7 +814,7 @@ info = UT_LIST_GET_NEXT(list, info); } - mutex_exit(&(lock->mutex)); + rw_lock_debug_mutex_exit(); return(FALSE); } @@ -758,22 +836,18 @@ ut_ad(lock); ut_ad(rw_lock_validate(lock)); - mutex_enter(&(lock->mutex)); - if (lock_type == RW_LOCK_SHARED) { - if (lock->reader_count > 0) { + if (rw_lock_get_reader_count(lock) > 0) { ret = TRUE; } } else if (lock_type == RW_LOCK_EX) { - if (lock->writer == RW_LOCK_EX) { + if (rw_lock_get_writer(lock) == RW_LOCK_EX) { ret = TRUE; } } else { ut_error; } - mutex_exit(&(lock->mutex)); - return(ret); } @@ -801,11 +875,10 @@ count++; - mutex_enter(&(lock->mutex)); - - if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED) - || (rw_lock_get_reader_count(lock) != 0) - || (rw_lock_get_waiters(lock) != 0)) { +#ifndef HAVE_ATOMIC_BUILTINS + mutex_enter(rw_lock_get_mutex(lock)); +#endif + if (lock->lock_word != X_LOCK_DECR) { fprintf(stderr, "RW-LOCK: %p ", lock); @@ -814,15 +887,17 @@ } else { putc('\n', stderr); } - + info = UT_LIST_GET_FIRST(lock->debug_list); - while (info != NULL) { + while (info != NULL) { rw_lock_debug_print(info); info = UT_LIST_GET_NEXT(list, info); } } +#ifndef HAVE_ATOMIC_BUILTINS + mutex_exit(rw_lock_get_mutex(lock)); +#endif - mutex_exit(&(lock->mutex)); lock = UT_LIST_GET_NEXT(list, lock); } @@ -845,22 +920,26 @@ "RW-LATCH INFO\n" "RW-LATCH: %p ", lock); - if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED) - || (rw_lock_get_reader_count(lock) != 0) - || (rw_lock_get_waiters(lock) != 0)) { +#ifndef HAVE_ATOMIC_BUILTINS + mutex_enter(rw_lock_get_mutex(lock)); +#endif + if (lock->lock_word != X_LOCK_DECR) { if (rw_lock_get_waiters(lock)) { fputs(" Waiters for the lock exist\n", stderr); } else { putc('\n', stderr); } - + info = UT_LIST_GET_FIRST(lock->debug_list); while (info != NULL) { rw_lock_debug_print(info); info = UT_LIST_GET_NEXT(list, info); } } +#ifndef HAVE_ATOMIC_BUILTINS + mutex_exit(rw_lock_get_mutex(lock)); +#endif } /************************************************************************* @@ -909,14 +988,11 @@ lock = UT_LIST_GET_FIRST(rw_lock_list); while (lock != NULL) { - mutex_enter(rw_lock_get_mutex(lock)); - if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED) - || (rw_lock_get_reader_count(lock) != 0)) { + if (lock->lock_word != X_LOCK_DECR) { count++; } - mutex_exit(rw_lock_get_mutex(lock)); lock = UT_LIST_GET_NEXT(list, lock); } diff --recursive -u base5067/innobase/sync/sync0sync.c m5067/innobase/sync/sync0sync.c --- base5067/innobase/sync/sync0sync.c 2008-08-04 05:19:17.000000000 -0700 +++ m5067/innobase/sync/sync0sync.c 2008-09-08 06:54:53.000000000 -0700 @@ -143,13 +143,11 @@ /* The number of system calls made in this module. Intended for performance monitoring. */ -ulint mutex_system_call_count = 0; - /* Number of spin waits on mutexes: for performance monitoring */ -ulint mutex_spin_round_count = 0; -ulint mutex_spin_wait_count = 0; -ulint mutex_os_wait_count = 0; +ib_longlong mutex_spin_round_count = 0; +ib_longlong mutex_spin_wait_count = 0; +ib_longlong mutex_os_wait_count = 0; ulint mutex_exit_count = 0; /* The global array of wait cells for implementation of the database's own @@ -240,6 +238,8 @@ { #if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER) mutex_reset_lock_word(mutex); +#elif defined(HAVE_ATOMIC_BUILTINS) + mutex_reset_lock_word(mutex); #else os_fast_mutex_init(&(mutex->os_fast_mutex)); mutex->lock_word = 0; @@ -325,7 +325,9 @@ os_event_free(mutex->event); -#if !defined(_WIN32) || !defined(UNIV_CAN_USE_X86_ASSEMBLER) +#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER) +#elif defined(HAVE_ATOMIC_BUILTINS) +#else os_fast_mutex_free(&(mutex->os_fast_mutex)); #endif /* If we free the mutex protecting the mutex list (freeing is @@ -421,6 +423,12 @@ #endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */ ut_ad(mutex); + /* This update is not thread safe, but we don't mind if the count isn't + * exact. Moved out of ifdef that follows because we are willing to sacrifice + * the cost of counting this as the data is valuable. Count the number of + * calls to mutex_spin_wait. */ + mutex_spin_wait_count++; + mutex_loop: i = 0; @@ -433,7 +441,6 @@ spin_loop: #if defined UNIV_DEBUG && !defined UNIV_HOTBACKUP - mutex_spin_wait_count++; mutex->count_spin_loop++; #endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */ @@ -502,8 +509,6 @@ sync_array_reserve_cell(sync_primary_wait_array, mutex, SYNC_MUTEX, file_name, line, &index); - mutex_system_call_count++; - /* The memory order of the array reservation and the change in the waiters field is important: when we suspend a thread, we first reserve the cell and then set waiters field to 1. When threads are @@ -551,7 +556,6 @@ mutex->cfile_name, (ulong) mutex->cline, (ulong) i); #endif - mutex_system_call_count++; mutex_os_wait_count++; #ifndef UNIV_HOTBACKUP @@ -1360,20 +1364,30 @@ FILE* file) /* in: file where to print */ { #ifdef UNIV_SYNC_DEBUG - fprintf(stderr, "Mutex exits %lu, rws exits %lu, rwx exits %lu\n", + fprintf(stderr, "Mutex exits %llu, rws exits %llu, rwx exits %llu\n", mutex_exit_count, rw_s_exit_count, rw_x_exit_count); #endif fprintf(file, -"Mutex spin waits %lu, rounds %lu, OS waits %lu\n" -"RW-shared spins %lu, OS waits %lu; RW-excl spins %lu, OS waits %lu\n", - (ulong) mutex_spin_wait_count, - (ulong) mutex_spin_round_count, - (ulong) mutex_os_wait_count, - (ulong) rw_s_spin_wait_count, - (ulong) rw_s_os_wait_count, - (ulong) rw_x_spin_wait_count, - (ulong) rw_x_os_wait_count); +"Mutex spin waits %llu, rounds %llu, OS waits %llu\n" +"RW-shared spins %llu, OS waits %llu; RW-excl spins %llu, OS waits %llu\n", + mutex_spin_wait_count, + mutex_spin_round_count, + mutex_os_wait_count, + rw_s_spin_wait_count, + rw_s_os_wait_count, + rw_x_spin_wait_count, + rw_x_os_wait_count); + + fprintf(file, + "Spin rounds per wait: %.2f mutex, %.2f RW-shared, " + "%.2f RW-excl\n", + (double) mutex_spin_round_count / + (mutex_spin_wait_count ? mutex_spin_wait_count : 1), + (double) rw_s_spin_round_count / + (rw_s_spin_wait_count ? rw_s_spin_wait_count : 1), + (double) rw_x_spin_round_count / + (rw_x_spin_wait_count ? rw_x_spin_wait_count : 1)); } /*********************************************************************** diff --recursive -u base5067/libmysqld/ha_innodb.cc m5067/libmysqld/ha_innodb.cc --- base5067/libmysqld/ha_innodb.cc 2008-08-04 05:20:03.000000000 -0700 +++ m5067/libmysqld/ha_innodb.cc 2008-09-08 06:53:12.000000000 -0700 @@ -284,6 +284,10 @@ (char*) &export_vars.innodb_dblwr_writes, SHOW_LONG}, {"log_waits", (char*) &export_vars.innodb_log_waits, SHOW_LONG}, + {"have_atomic_builtins", + (char*) &export_vars.innodb_have_atomic_builtins, SHOW_BOOL}, + {"heap_enabled", + (char*) &export_vars.innodb_heap_enabled, SHOW_BOOL}, {"log_write_requests", (char*) &export_vars.innodb_log_write_requests, SHOW_LONG}, {"log_writes", @@ -6463,6 +6467,7 @@ Protocol *protocol= thd->protocol; List field_list; mutex_t* mutex; + rw_lock_t* lock; #ifdef UNIV_DEBUG ulint rw_lock_count= 0; ulint rw_lock_count_spin_loop= 0; @@ -6547,6 +6552,30 @@ mutex_exit_noninline(&mutex_list_mutex); + mutex_enter_noninline(&rw_lock_list_mutex); + + lock = UT_LIST_GET_FIRST(rw_lock_list); + + while (lock != NULL) + { + if (lock->count_os_wait) + { + protocol->prepare_for_resend(); + protocol->store(lock->cfile_name, system_charset_info); + protocol->store((ulonglong)lock->cline); + protocol->store((ulonglong)lock->count_os_wait); + + if (protocol->write()) + { + mutex_exit_noninline(&rw_lock_list_mutex); + DBUG_RETURN(1); + } + } + lock = UT_LIST_GET_NEXT(list, lock); + } + + mutex_exit_noninline(&rw_lock_list_mutex); + #ifdef UNIV_DEBUG protocol->prepare_for_resend(); protocol->store("rw_lock_mutexes", system_charset_info); diff --recursive -u base5067/sql/ha_innodb.cc m5067/sql/ha_innodb.cc --- base5067/sql/ha_innodb.cc 2008-08-04 05:20:03.000000000 -0700 +++ m5067/sql/ha_innodb.cc 2008-09-08 06:53:12.000000000 -0700 @@ -284,6 +284,10 @@ (char*) &export_vars.innodb_dblwr_writes, SHOW_LONG}, {"log_waits", (char*) &export_vars.innodb_log_waits, SHOW_LONG}, + {"have_atomic_builtins", + (char*) &export_vars.innodb_have_atomic_builtins, SHOW_BOOL}, + {"heap_enabled", + (char*) &export_vars.innodb_heap_enabled, SHOW_BOOL}, {"log_write_requests", (char*) &export_vars.innodb_log_write_requests, SHOW_LONG}, {"log_writes", @@ -6463,6 +6467,7 @@ Protocol *protocol= thd->protocol; List field_list; mutex_t* mutex; + rw_lock_t* lock; #ifdef UNIV_DEBUG ulint rw_lock_count= 0; ulint rw_lock_count_spin_loop= 0; @@ -6547,6 +6552,30 @@ mutex_exit_noninline(&mutex_list_mutex); + mutex_enter_noninline(&rw_lock_list_mutex); + + lock = UT_LIST_GET_FIRST(rw_lock_list); + + while (lock != NULL) + { + if (lock->count_os_wait) + { + protocol->prepare_for_resend(); + protocol->store(lock->cfile_name, system_charset_info); + protocol->store((ulonglong)lock->cline); + protocol->store((ulonglong)lock->count_os_wait); + + if (protocol->write()) + { + mutex_exit_noninline(&rw_lock_list_mutex); + DBUG_RETURN(1); + } + } + lock = UT_LIST_GET_NEXT(list, lock); + } + + mutex_exit_noninline(&rw_lock_list_mutex); + #ifdef UNIV_DEBUG protocol->prepare_for_resend(); protocol->store("rw_lock_mutexes", system_charset_info);