
Peter Dimov wrote:
(This redundant wakeup may not be visible in a benchmark, but it can steal CPU from other busy non-contending threads in a real application.)
To test this, I added "free" threads to the benchmark. These compete on a different mutex. With my latest experiment, this reveals odd/intriguing patterns of the form
2R+4W+0F: 29 2R+4W+4F: 18
The additional four free threads improve performance!
That was because I had an inefficiency in the algorithm, of course. Still, the free threads do seem to help the scheduler sometimes. I finally managed to match the "naive" implementation with a "fancy" semaphore-based scheme. The basic algorithm is that each unlock unblocks one waiting thread; the twist is that if the thread that wakes up is a reader, it then unblocks all waiters. When a writer wakes up, it doesn't unblock anyone, since it is on its way to exclusive access anyway. I haven't reviewed the code for subtle correctness issues, though; no time for that now. :-/ Here's the code: #ifndef BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED #define BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED // MS compatible compilers support #pragma once #if defined(_MSC_VER) && (_MSC_VER >= 1020) # pragma once #endif // Copyright (c) 2005, 2006 Peter Dimov // // Distributed under the Boost Software License, Version 1.0. // See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include <algorithm> #include <cassert> #include <limits.h> #include <windows.h> inline bool atomic_compare_exchange( long * destination, long * comparand, long exchange ) { long ov = *comparand; long nv = InterlockedCompareExchange( destination, exchange, ov ); if( ov == nv ) { return true; } else { *comparand = nv; return false; } } inline bool atomic_compare_exchange_void( void * destination, void * comparand, void const * exchange ) { return atomic_compare_exchange( (long*)destination, (long*)comparand, *(long const*)exchange ); } class rw_mutex { private: rw_mutex( rw_mutex const & ); rw_mutex & operator=( rw_mutex const & ); private: struct state { unsigned writer: 1; unsigned readers: 31; }; state state_; HANDLE sema_wp_; // writer pending HANDLE sema_rw_; // readers+writers blocked long retries_[ 6 ]; long blocked_; public: rw_mutex(): blocked_( 0 ) { state st = { 0 }; state_ = st; sema_wp_ = CreateSemaphore( 0, 0, LONG_MAX, 0 ); sema_rw_ = CreateSemaphore( 0, 0, LONG_MAX, 0 ); std::fill( retries_, retries_ + 6, 0 ); } ~rw_mutex() { CloseHandle( sema_wp_ ); CloseHandle( sema_rw_ ); } void unblock_one() { if( (volatile const long&)blocked_ > 0 ) { long k = InterlockedExchangeAdd( &blocked_, -1 ); if( k > 0 ) { ReleaseSemaphore( sema_rw_, 1, 0 ); } else { // undo InterlockedIncrement( &blocked_ ); } } } void unblock_all() { if( (volatile const long&)blocked_ > 0 ) { long k = InterlockedExchange( &blocked_, 0 ); if( k > 0 ) { ReleaseSemaphore( sema_rw_, 1, 0 ); } } } void rdlock() { state st = state_; for( ;; ) { state xchg = st; if( st.writer ) { InterlockedIncrement( &blocked_ ); WaitForSingleObject( sema_rw_, INFINITE ); unblock_all(); st = state_; } else { xchg.readers = st.readers + 1; if( atomic_compare_exchange_void( &state_, &st, &xchg ) ) { return; // got read lock } else { InterlockedIncrement( &retries_[ 0 ] ); } } } } void lock() { state st = state_; for( ;; ) { state xchg = st; if( st.writer ) { InterlockedIncrement( &blocked_ ); WaitForSingleObject( sema_rw_, INFINITE ); st = state_; } else if( st.readers ) { xchg.writer = 1; if( atomic_compare_exchange_void( &state_, &st, &xchg ) ) { WaitForSingleObject( sema_wp_, INFINITE ); return; } else { InterlockedIncrement( &retries_[ 2 ] ); } } else // free { xchg.writer = 1; if( atomic_compare_exchange_void( &state_, &st, &xchg ) ) { return; } else { InterlockedIncrement( &retries_[ 3 ] ); } } } } void rdunlock() { state st = state_; for( ;; ) { state xchg = st; assert( st.readers > 0 ); --xchg.readers; if( atomic_compare_exchange_void( &state_, &st, &xchg ) ) { break; } else { InterlockedIncrement( &retries_[ 4 ] ); } } if( st.writer && st.readers == 1 ) // we were the last reader and a writer is waiting { ReleaseSemaphore( sema_wp_, 1, 0 ); } else { unblock_one(); } } void unlock() { state st = state_; for( ;; ) { state xchg = st; assert( st.readers == 0 ); xchg.writer = 0; //xchg.blocked = 0; if( atomic_compare_exchange_void( &state_, &st, &xchg ) ) { break; } else { InterlockedIncrement( &retries_[ 5 ] ); } } unblock_one(); } public: long retries( int ix ) { return InterlockedExchange( &retries_[ ix ], 0 ); } public: // lock classes class scoped_read_lock { private: rw_mutex & mx_; scoped_read_lock( scoped_read_lock const & ); scoped_read_lock & operator=( scoped_read_lock const & ); public: scoped_read_lock( rw_mutex & mx ): mx_( mx ) { mx_.rdlock(); } ~scoped_read_lock() { mx_.rdunlock(); } }; class scoped_write_lock { private: rw_mutex & mx_; scoped_write_lock( scoped_write_lock const & ); scoped_write_lock & operator=( scoped_write_lock const & ); public: scoped_write_lock( rw_mutex & mx ): mx_( mx ) { mx_.lock(); } ~scoped_write_lock() { mx_.unlock(); } }; }; #endif // #ifndef BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED