
While reviewing the current implementation of boost thread in an effort to split the code into platform specific parts for easier maintanace, I looked into the current implementation of condition variables too. Altough I know the past discussions about this topic to some degree, nevertheless I like to drop in another solution for review. The idea is based on the PulseEvent and SetEvent solution of "Strategies for Implementing POSIX Condition Variables on Win32" from Douglas C. Schmidt http://www.cs.wustl.edu/~schmidt/win32-cv-1.html . The most "concise and intuitive" solution there is described as suffering from the so called "lost wakeup bug". Nevertheless I used this as a starting point. To overcome the lost wakeup bug I introduced the concept of a gate, and using stateful (manual reset) events. The gate is closed whenever a broadcast or signal is underway. On entry to wait the gate also has to be passed through, while simultaneously successful entries are recorded. The trick is that the gate can be closed by one thread (the signaling one) and reopened by another (the waiting one). This can be achieved by using a Semaphore instead of a mutex for the gate implementation. Semaphores can be set/reset from different threads. Altough I think I was careful enough, it might well be that I made an mistake. Also since the idea is so simple and it needs so less code for implementation, I am wondering whether someone else came up with this solution. I would be very glad if someone could take a look at the code and show me its pitfalls. For my draft I used VC7.1 on W2K. The code is without error handling yet. The main() implements a small test, simulating a queue that is served by three threads. Code follows: #include <windows.h> #include <boost/thread/thread.hpp> using namespace boost; typedef CRITICAL_SECTION pthread_mutex_t; void pthread_mutex_init(pthread_mutex_t* mutex) { InitializeCriticalSection(mutex); } void pthread_mutex_destroy(pthread_mutex_t* mutex) { DeleteCriticalSection(mutex); } void pthread_mutex_lock(pthread_mutex_t* mutex) { EnterCriticalSection(mutex); } void pthread_mutex_unlock(pthread_mutex_t* mutex) { LeaveCriticalSection(mutex); } // the condition variable implementation starts here // -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- #define SIGNAL 0 #define BROADCAST 1 typedef struct { HANDLE events_[2]; HANDLE gate_; int waiters_; } pthread_cond_t; void pthread_cond_init(pthread_cond_t *cv) { cv->events_[SIGNAL] = CreateEvent(NULL,FALSE,FALSE,NULL); cv->events_[BROADCAST] = CreateEvent(NULL,TRUE,FALSE,NULL); // use a semaphore instead of mutex since // they can be used cross thread cv->gate_ = CreateSemaphore(NULL,1,1,NULL); cv->waiters_ = 0; } void pthread_cond_destroy(pthread_cond_t* cv) { CloseHandle(cv->events_[SIGNAL]); CloseHandle(cv->events_[BROADCAST]); CloseHandle(cv->gate_); } void pthread_cond_wait( pthread_cond_t* cv, pthread_mutex_t* external_mutex) { // we may only enter when no wakeups active // this will prevent the lost wakeup WaitForSingleObject(cv->gate_, INFINITE); ++cv->waiters_; // count waiters passing through ReleaseSemaphore(cv->gate_,1,NULL); LeaveCriticalSection(external_mutex); switch(WaitForMultipleObjects( 2, cv->events_,FALSE,INFINITE)) { // on unblocking of the thread the gate is closed case WAIT_OBJECT_0+SIGNAL: --cv->waiters_; // one is leaving ReleaseSemaphore(cv->gate_,1,NULL);//reopen // no need to reset event, it is automatic break; case WAIT_OBJECT_0+BROADCAST: if (0 == --cv->waiters_) { ResetEvent( cv->events_[BROADCAST]); // last leaving, reopen the gate ReleaseSemaphore(cv->gate_,1,NULL); } break; } EnterCriticalSection(external_mutex); } void pthread_cond_broadcast (pthread_cond_t *cv) { WaitForSingleObject(cv->gate_, INFINITE);//close gate if (cv->waiters_) // if no waiters just reopen gate SetEvent (cv->events_[BROADCAST]); // wake all else ReleaseSemaphore(cv->gate_,1,NULL); } void pthread_cond_signal (pthread_cond_t *cv) { WaitForSingleObject(cv->gate_, INFINITE);//close gate if (cv->waiters_) // if no waiters just reopen gate SetEvent (cv->events_[SIGNAL]); // wake one else ReleaseSemaphore(cv->gate_,1,NULL); } // -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- // the condition variable implementation ends here // a small example for testing follows: // 3 worker threads try to consume input and update output // main thread generates input and then waits until all // input has been processed by checking the output pthread_mutex_t m; pthread_cond_t ci, co; bool run; int input, output, total; int done1, done2, done3; void do_work1() { pthread_mutex_lock(&m); while (run) { while (0 == input && run) pthread_cond_wait(&ci, &m); if (!run) break; --input; ++output; ++done1; pthread_cond_signal(&co); } pthread_mutex_unlock(&m); } void do_work2() { pthread_mutex_lock(&m); while (run) { while (0 == input && run) pthread_cond_wait(&ci, &m); if (!run) break; --input; ++output; ++done2; pthread_cond_signal(&co); } pthread_mutex_unlock(&m); } void do_work3() { pthread_mutex_lock(&m); while (run) { while (0 == input && run) pthread_cond_wait(&ci, &m); if (!run) break; --input; ++output; ++done3; pthread_cond_signal(&co); } pthread_mutex_unlock(&m); } int main(int argc, char* argv) { pthread_mutex_init(&m); pthread_cond_init(&ci); pthread_cond_init(&co); done1 = done2 = done3 = 0; total = input = output = 0; run = true; boost::thread t1(do_work1); boost::thread t2(do_work2); boost::thread t3(do_work3); for (int n = 0; n<1000; ++n) { pthread_mutex_lock(&m); input+=5; total+=5; pthread_cond_broadcast(&ci); pthread_mutex_unlock(&m); } pthread_mutex_lock(&m); while (output < total) pthread_cond_wait(&co, &m); run = false; pthread_cond_broadcast(&ci); pthread_mutex_unlock(&m); t1.join(); t2.join(); t3.join(); pthread_cond_destroy(&co); pthread_cond_destroy(&ci); pthread_mutex_destroy(&m); return 0; } Regards, Roland

Roland Schwarz schrieb: <snip> I found a small bug in my implementation. The --cv->waiters_ is unprotected. Change this to to use InterlockedDecrement(&cv->waiters_) instead.
switch(WaitForMultipleObjects( 2, cv->events_,FALSE,INFINITE)) { // on unblocking of the thread the gate is closed case WAIT_OBJECT_0+SIGNAL: InterlockedDecrement(&cv->waiters_); // one is leaving ReleaseSemaphore(cv->gate_,1,NULL);//reopen // no need to reset event, it is automatic break; case WAIT_OBJECT_0+BROADCAST: if (0 == IntelockedDecrement(&cv->waiters_)) { ResetEvent( cv->events_[BROADCAST]); // last leaving, reopen the gate ReleaseSemaphore(cv->gate_,1,NULL); } break; }
Regards, Roland

Roland Schwarz wrote: ... Your locking is way too strict (with two waiters on CV, you're blocking the second signal across context switch). Your pthread_cond_destroy() doesn't conform to pthreads (you need to lock the gate sema to synchronize destruction with respect to exiting waiters). Uhmm, and what about timed waits and/or cancellation? regards, alexander.

Alexander Terekhov schrieb:
Your locking is way too strict (with two waiters on CV, you're blocking the second signal across context switch).
I already was suspecting something along this lines. Thank you for having it pointed out.
Your pthread_cond_destroy() doesn't conform to pthreads (you need to lock the gate sema to synchronize destruction with respect to exiting waiters).
Yes I know this. But the code sketchy at best yet. Do you think there would be a principal problem? I won't try this until I am confident the main idea is worth extending.
Uhmm, and what about timed waits and/or cancellation?
I planned to extend to use WaitForMultipleObjectEx with timeout != INFINITE and APC's to cancel it. Reasonably? Regards, Roland

Alexander Terekhov wrote:
For algorithm, see pthreads-win32.
Thank you for the pointer, I already knew it, but I have to admit, that I did not study the algorithm beforehand, to be able to approach the problem from a different angle. (Forgive me my germanish tounge). I suppose you are refering to Algorithm 8a / IMPL_SEM,UNBLOCK_STRATGEY == UNBLOCK_ALL
Your locking is way too strict (with two waiters on CV, you're blocking the second signal across context switch).
Now that I tried to compare to your algorithm I do not understand what you really mean by this. Could you please elaborate? Do you refer to WaitForMultipleObjects waiting on two events? What is "second signal" refering to? Two callers to pthread_cond_signal? Besides your comment "is way too strict" do you think my code is incorrect too? Timeout handling and cancellation aside, I see my algorithm semantically equivalent to yours. The main difference is that I am using Events to queue up waiters, while you are using a Semaphore. Also I am using two Event objects and one Semaphore per condition, while you are using 3 Semaphore objects per condition. So also the resource cost is roughly same. I will try to add timeout handling next, to see how much code size will increase. Regards, Roland

Roland Schwarz wrote: [... too strict locking ...]
Could you please elaborate?
Please see http://groups.google.de/group/comp.programming.threads/msg/846010a82d6c51b0 including followups. regards, alexander.

Alexander Terekhov schrieb:
Roland Schwarz wrote:
[... too strict locking ...]
Could you please elaborate?
Please see
http://groups.google.de/group/comp.programming.threads/msg/846010a82d6c51b0
including followups.
Thank you, this was very instructive indeed. I.e. the seqeunce t1: Thread A enters wait t2: Thread B signals must result in a guaranted wakeup. While t1: ThreadA enters wait t2: Thread B signals ... tx: Thread X signals ... ty: Thread Y signals ... should not block off any successive signallers (X,Y,...) until another wait is entered. My and as I learned by now your first attempt too suffers exactly from this. Thank you, Roland

Roland Schwarz <roland.schwarz@chello.at> writes:
Altough I think I was careful enough, it might well be that I made an mistake. Also since the idea is so simple and it needs so less code for implementation, I am wondering whether someone else came up with this solution. I would be very glad if someone could take a look at the code and show me its pitfalls.
With your changes to use InterlockedDecrement, it looks good. Off hand, I can't think of a reason why it would break --- I don't believe it suffers from deadlocks, lost wakeups, spurious wakeups, or unfairness. You just need to extend it to work with other mutex types, now. Anthony -- Anthony Williams Software Developer Just Software Solutions Ltd http://www.justsoftwaresolutions.co.uk

Anthony Williams schrieb:
With your changes to use InterlockedDecrement, it looks good. Off hand, I can't think of a reason why it would break --- I don't believe it suffers from deadlocks, lost wakeups, spurious wakeups, or unfairness.
Thanks for looking into it. About spurious wakeups: They are possible, but AFAIK they are considered to be quite normal in condition/mutex paradigm. As you will anyway use condition variables in a loop to reevaluate the state variables, this is no harm. About unfairness: Since I do not try to control the order of the wakeups this is left to the scheduler, and thread priorities respectively. Of course I do hold off any waiters, that entered after a broadcast or signal. But this is by design. But then: In my small example I am seeing that the done1,done2,done3 are considerably unbalanced. I am not sure yet if this is an indication of unfairness, or just an artifact of how I am trying to measure it. Any ideas?
You just need to extend it to work with other mutex types, now.
This is what currently puzzles me somewhat. The condition documentation is not explicit on whether a try_mutex is different upon return of the wait. It simply says "and then reaquires the lock". On the other hand it would not be meaningful to return without holding the lock again. Also I feel a little surprised, that try and timed are attributed to the mutex, I would have expected them properties of locks? Regards, Roland

Hi. I'm definitely no expert on this, but there's something that bothers me about conditions in general and this implementation in particular. The Win32 CriticalSection used is a *recursive* mutex. Meaning that the LeaveCriticalSection() used inside pthread_cond_wait() doesn't guaranty the the mutex is actually unlocked. Insuring this requires counting the number of times the mutex was locked before entering pthread_cond_wait(). Is this a problem? Is there any reference to the difference between a condition with recursive mutex to a condition with non-recursive mutex? Just some thoughts I wanted to share... Yuval

Yuval Ronen wrote: [...]
pthread_cond_wait(). Is this a problem? Is there any reference to the difference between a condition with recursive mutex to a condition with non-recursive mutex?
http://lists.boost.org/Archives/boost/2004/07/68013.php regards, alexander.

Alexander Terekhov wrote:
Yuval Ronen wrote: [...]
pthread_cond_wait(). Is this a problem? Is there any reference to the difference between a condition with recursive mutex to a condition with non-recursive mutex?
As far as I could understand from this, the POSIX standard specifies that calling pthread_cond_wait() with a recursively locked mutex (lock_count > 1) is forbidden and yields undefined behaviour. Is this reasonable? This question should probably be addressed to the POSIX guys, but since you're an expert, I'd like to hear your opinion. Is there a good reason to not supplying full support for condition-variables with recursive mutexes? To me it sounds like a reasonable use case. And if using a c.v. with recursive mutex is a bad idea, then why not forbid it altogether? The current specification of "it's ok to use a recursive mutex as long as it's used as a non-recursive mutex" is especially weird to me... Thanks a lot for your comments, Yuval

Yuval Ronen schrieb:
As far as I could understand from this, the POSIX standard specifies that calling pthread_cond_wait() with a recursively locked mutex (lock_count > 1) is forbidden and yields undefined behaviour.
This issue is known, http://aspn.activestate.com/ASPN/Mail/Message/2118928 As there is currently a rewrite going on this issue will be covered. I currently think a meaningful implementation would throw an exception from the wait (with the mutex still locked). Either the stack unwinding will restore the program invariant then or a user supplied exception handler might try to remedy this situation. Would this be reasonable? Roland

Roland Schwarz wrote:
Yuval Ronen schrieb:
As far as I could understand from this, the POSIX standard specifies that calling pthread_cond_wait() with a recursively locked mutex (lock_count > 1) is forbidden and yields undefined behaviour.
This issue is known, http://aspn.activestate.com/ASPN/Mail/Message/2118928
As there is currently a rewrite going on this issue will be covered. I currently think a meaningful implementation would throw an exception from the wait (with the mutex still locked). Either the stack unwinding will restore the program invariant then or a user supplied exception handler might try to remedy this situation.
Would this be reasonable?
My intuition is to make it work. Meaning that the wait() will unlock lock_count() times, to a full unlock state, and then lock it back again the same amout of times. Why shouldn't it work? However, the link that Alexander Terekhov gave points out that the POSIX people think it's a bad idea, and the link you gave points out that the consensus on this mailing list is also that it's a bad idea, so maybe I'm wrong after all... ;-) I guess I'll have to read the thread where this consensus emerged and see if I agree or not. And even if I agree, throwing seems weird. As I mentioned, if a combination of a condition variable and a recursive mutex is a bad thing, then make it not compile at all. Isn't this better? Allowing usage of a recursive mutex as long as it's not being used recursively is the opposite of logical, to me. Yuval

Yuval Ronen wrote:
My intuition is to make it work. Meaning that the wait() will unlock lock_count() times, to a full unlock state, and then lock it back again the same amout of times. Why shouldn't it work?
Ok, I'll try to explain, altough I am afraid I am not too good in this. You are true, in believing that it can be made working. But you are about to create extremely dangerous code by this. Once you place this semantics into work _every_ subroutine call from within a locked scope attains wait semantics, i.e the state of your locked memory might have changed. This wouldn't be expected by most programmers. E.g. ... { scoped_lock l(m); // do something to the locked data foo(); // call a member function which _might_ wait // do something else to the locked data // ouch, the locking was removed in between, data has // unexpectedly changed. I did not expect this, did you? } Ok, this is no good, but why then allow recursive mutex anyways? I find it harder to come up with a convincing example, but basically I think to forbid it entirely simply is too strict. It is possible to write correct, yet safe programs with recursive_mutex. I'll try: class deep_thought { recursive_mutex m; condition c; int answer; public: void think() { scoped_lock l(m); the_answer_is(42); cout << show_me_the_answer(); } void the_answer_is(int n) { scoped_lock l(m); answer = n; } int show_me_the_answer() { scoped_lock l(m); while (answer < 42) c.wait(l); return answer; } }; You have a single memory location to protect: answer. So you have to use a single mutex. You have three public functions that can be called from other threads, and you found it convenient to also use them from the same thread by recursively call into them. So your only choice is to use a recursive mutex. Everything is ok as long as you are not trying to call the "show_me_the_answer" with the mutex already locked. But wait: Even this is correct, if the answer already is larger than 42. Now correctnes has become a runtime property. And this is why I think the a reasonable reaction is to throw an exception. You caused a runtime error. Of course, better programming style would have choosen a different grouping of the functions, so the recursive_mutex could have been avoided entirely. But you also have things like reinterpret_cast<> which usually are beeing avoided whenever possible. Still it is there because there are times when it is useful, you simply have to put a lot more thought into it when using it.
And even if I agree, throwing seems weird. As I mentioned, if a combination of a condition variable and a recursive mutex is a bad thing, then make it not compile at all. Isn't this better? Allowing usage of a recursive mutex as long as it's not being used recursively is the opposite of logical, to me.
I hope my example was helpful to show you that making it not compile is way too restrictive. Roland

My intuition is to make it work. Meaning that the wait() will unlock lock_count() times, to a full unlock state, and then lock it back again the same amout of times. Why shouldn't it work?
Ok, I'll try to explain, altough I am afraid I am not too good in this. You are true, in believing that it can be made working. But you are about to create extremely dangerous code by this.
Once you place this semantics into work _every_ subroutine call from within a locked scope attains wait semantics, i.e the state of your locked memory might have changed. This wouldn't be expected by most programmers.
E.g.
... { scoped_lock l(m); // do something to the locked data foo(); // call a member function which _might_ wait // do something else to the locked data // ouch, the locking was removed in between, data has // unexpectedly changed. I did not expect this, did you? }
I can't understand why this example shows why a condition variable with recursive mutex is dangerous. How this example would be any different if the mutex is non-recursive? Calling foo() which might wait, *always* have the potential of releasing the mutex, so a recursive mutex doesn't change anything here. And this is, actually, a good thing, that it doesn't change anything. This way the behaviour is expected, and all is dandy.
Ok, this is no good, but why then allow recursive mutex anyways?
I find it harder to come up with a convincing example, but basically I think to forbid it entirely simply is too strict. It is possible to write correct, yet safe programs with recursive_mutex. I'll try:
<snip exmaple why a condition variable with recursive mutex can be useful> I disagree that the runtime correctness issue in this example is relevant to the condition variable stuff. This example had some logic about what can or can't be the answer, and tried to manipulate the condition variable to validate this. I find this very confusing. If I wanted to say that an asnwer smaller than 42 is a runtime error, then I'd just write if (answer < 42) throw ...; which is much simpler and understandable. IMHO, Condition variable are meant for completely different purposes than checking runtime correctness of application business logic.
Of course, better programming style would have choosen a different grouping of the functions, so the recursive_mutex could have been avoided entirely. But you also have things like reinterpret_cast<> which usually are beeing avoided whenever possible. Still it is there because there are times when it is useful, you simply have to put a lot more thought into it when using it.
Ok, then. If it's considered ok, then allow it, as I can't find any good reason to say that a recursive mutex is ok, but a condition variable with such a mutex is not ok. Of course, anything I say is completely my humble opinion, and may very well turn out to be complete nonsense. Yuval
participants (4)
-
Alexander Terekhov
-
Anthony Williams
-
Roland Schwarz
-
Yuval Ronen