
While reviewing the current implementation of boost thread in an effort to split the code into platform specific parts for easier maintanace, I looked into the current implementation of condition variables too. Altough I know the past discussions about this topic to some degree, nevertheless I like to drop in another solution for review. The idea is based on the PulseEvent and SetEvent solution of "Strategies for Implementing POSIX Condition Variables on Win32" from Douglas C. Schmidt http://www.cs.wustl.edu/~schmidt/win32-cv-1.html . The most "concise and intuitive" solution there is described as suffering from the so called "lost wakeup bug". Nevertheless I used this as a starting point. To overcome the lost wakeup bug I introduced the concept of a gate, and using stateful (manual reset) events. The gate is closed whenever a broadcast or signal is underway. On entry to wait the gate also has to be passed through, while simultaneously successful entries are recorded. The trick is that the gate can be closed by one thread (the signaling one) and reopened by another (the waiting one). This can be achieved by using a Semaphore instead of a mutex for the gate implementation. Semaphores can be set/reset from different threads. Altough I think I was careful enough, it might well be that I made an mistake. Also since the idea is so simple and it needs so less code for implementation, I am wondering whether someone else came up with this solution. I would be very glad if someone could take a look at the code and show me its pitfalls. For my draft I used VC7.1 on W2K. The code is without error handling yet. The main() implements a small test, simulating a queue that is served by three threads. Code follows: #include <windows.h> #include <boost/thread/thread.hpp> using namespace boost; typedef CRITICAL_SECTION pthread_mutex_t; void pthread_mutex_init(pthread_mutex_t* mutex) { InitializeCriticalSection(mutex); } void pthread_mutex_destroy(pthread_mutex_t* mutex) { DeleteCriticalSection(mutex); } void pthread_mutex_lock(pthread_mutex_t* mutex) { EnterCriticalSection(mutex); } void pthread_mutex_unlock(pthread_mutex_t* mutex) { LeaveCriticalSection(mutex); } // the condition variable implementation starts here // -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- #define SIGNAL 0 #define BROADCAST 1 typedef struct { HANDLE events_[2]; HANDLE gate_; int waiters_; } pthread_cond_t; void pthread_cond_init(pthread_cond_t *cv) { cv->events_[SIGNAL] = CreateEvent(NULL,FALSE,FALSE,NULL); cv->events_[BROADCAST] = CreateEvent(NULL,TRUE,FALSE,NULL); // use a semaphore instead of mutex since // they can be used cross thread cv->gate_ = CreateSemaphore(NULL,1,1,NULL); cv->waiters_ = 0; } void pthread_cond_destroy(pthread_cond_t* cv) { CloseHandle(cv->events_[SIGNAL]); CloseHandle(cv->events_[BROADCAST]); CloseHandle(cv->gate_); } void pthread_cond_wait( pthread_cond_t* cv, pthread_mutex_t* external_mutex) { // we may only enter when no wakeups active // this will prevent the lost wakeup WaitForSingleObject(cv->gate_, INFINITE); ++cv->waiters_; // count waiters passing through ReleaseSemaphore(cv->gate_,1,NULL); LeaveCriticalSection(external_mutex); switch(WaitForMultipleObjects( 2, cv->events_,FALSE,INFINITE)) { // on unblocking of the thread the gate is closed case WAIT_OBJECT_0+SIGNAL: --cv->waiters_; // one is leaving ReleaseSemaphore(cv->gate_,1,NULL);//reopen // no need to reset event, it is automatic break; case WAIT_OBJECT_0+BROADCAST: if (0 == --cv->waiters_) { ResetEvent( cv->events_[BROADCAST]); // last leaving, reopen the gate ReleaseSemaphore(cv->gate_,1,NULL); } break; } EnterCriticalSection(external_mutex); } void pthread_cond_broadcast (pthread_cond_t *cv) { WaitForSingleObject(cv->gate_, INFINITE);//close gate if (cv->waiters_) // if no waiters just reopen gate SetEvent (cv->events_[BROADCAST]); // wake all else ReleaseSemaphore(cv->gate_,1,NULL); } void pthread_cond_signal (pthread_cond_t *cv) { WaitForSingleObject(cv->gate_, INFINITE);//close gate if (cv->waiters_) // if no waiters just reopen gate SetEvent (cv->events_[SIGNAL]); // wake one else ReleaseSemaphore(cv->gate_,1,NULL); } // -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- // the condition variable implementation ends here // a small example for testing follows: // 3 worker threads try to consume input and update output // main thread generates input and then waits until all // input has been processed by checking the output pthread_mutex_t m; pthread_cond_t ci, co; bool run; int input, output, total; int done1, done2, done3; void do_work1() { pthread_mutex_lock(&m); while (run) { while (0 == input && run) pthread_cond_wait(&ci, &m); if (!run) break; --input; ++output; ++done1; pthread_cond_signal(&co); } pthread_mutex_unlock(&m); } void do_work2() { pthread_mutex_lock(&m); while (run) { while (0 == input && run) pthread_cond_wait(&ci, &m); if (!run) break; --input; ++output; ++done2; pthread_cond_signal(&co); } pthread_mutex_unlock(&m); } void do_work3() { pthread_mutex_lock(&m); while (run) { while (0 == input && run) pthread_cond_wait(&ci, &m); if (!run) break; --input; ++output; ++done3; pthread_cond_signal(&co); } pthread_mutex_unlock(&m); } int main(int argc, char* argv) { pthread_mutex_init(&m); pthread_cond_init(&ci); pthread_cond_init(&co); done1 = done2 = done3 = 0; total = input = output = 0; run = true; boost::thread t1(do_work1); boost::thread t2(do_work2); boost::thread t3(do_work3); for (int n = 0; n<1000; ++n) { pthread_mutex_lock(&m); input+=5; total+=5; pthread_cond_broadcast(&ci); pthread_mutex_unlock(&m); } pthread_mutex_lock(&m); while (output < total) pthread_cond_wait(&co, &m); run = false; pthread_cond_broadcast(&ci); pthread_mutex_unlock(&m); t1.join(); t2.join(); t3.join(); pthread_cond_destroy(&co); pthread_cond_destroy(&ci); pthread_mutex_destroy(&m); return 0; } Regards, Roland