[Thread] Timed join returning true before thread terminated

Hello, I'm working on a multi-threaded application that uses boost threads. The threads are deployed in a cascading mechanism such as: Main thread creates thread1 thread 1 creates threads 2, 3 and 4 thread 2 creates threads 5-14 The shutdown mechanism is to send a thread interrupt and then do a timed join to wait for the child thread[s] to finish their shutdown. So Main sends a thread interrupt to thread1, and then waits for X seconds thread 1 sends a thread interrupt to thread 2, and does a timed wait, then thread 3 and wait then thread 4 and wait thread 2 does the same for each of its children: send a thread interrupt and then wait. The problem is, that on rare occasions (1 out of 474 attempts in my last test cycle), thread 1 will return early from the timed_join, and it returns true, indicating the child thread is dead -- but it's not. I have timed logging that shows when a specific threads, shutdown activities start and stop, and I can see that thread 1 isn't waiting 10 seconds for the child to exit, and I can see the child is still running. Any suggestions for this? The only thing I can think of right now is that maybe thread1 is getting an interrupt for something that is causing it to leave it's timed_join early? I haven't looked into the boost code for this yet. I'm hoping that this is something others have encountered and already solved? Or maybe other debugging tips could be provided? Thanks in advance, -=John

Le 09/03/12 04:32, John Rocha a écrit :
Hello,
I'm working on a multi-threaded application that uses boost threads. The threads are deployed in a cascading mechanism such as:
Main thread creates thread1
thread 1 creates threads 2, 3 and 4
thread 2 creates threads 5-14
The shutdown mechanism is to send a thread interrupt and then do a timed join to wait for the child thread[s] to finish their shutdown.
So Main sends a thread interrupt to thread1, and then waits for X seconds
thread 1 sends a thread interrupt to thread 2, and does a timed wait, then thread 3 and wait then thread 4 and wait
thread 2 does the same for each of its children: send a thread interrupt and then wait.
How and when thread1 starts the shutdown processing?
The problem is, that on rare occasions (1 out of 474 attempts in my last test cycle), thread 1 will return early from the timed_join, and it returns true, indicating the child thread is dead -- but it's not.
I have timed logging that shows when a specific threads, shutdown activities start and stop, and I can see that thread 1 isn't waiting 10 seconds for the child to exit, and I can see the child is still running.
Any suggestions for this? The only thing I can think of right now is that maybe thread1 is getting an interrupt for something that is causing it to leave it's timed_join early? I haven't looked into the boost code for this yet. I'm hoping that this is something others have encountered and already solved?
Or maybe other debugging tips could be provided? Could you post a simple example? This would help to better analyze the issue.
Best, Vicente

On 09/03/12 03:32, John Rocha wrote:
The problem is, that on rare occasions (1 out of 474 attempts in my last test cycle), thread 1 will return early from the timed_join, and it returns true, indicating the child thread is dead -- but it's not.
Which compiler and OS are you running? Anthony -- Author of C++ Concurrency in Action http://www.stdthread.co.uk/book/ just::thread C++11 thread library http://www.stdthread.co.uk Just Software Solutions Ltd http://www.justsoftwaresolutions.co.uk 15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976

Hello Anthony, *Compiler:* gcc (GCC) 4.1.2 20070115 (prerelease) (SUSE Linux) *OS: * SUSE Linux Enterprise Server 10 SP1 (i586) - Kernel \r (\l). Linux <hostname> 2.6.16.46-0.12-bigsmp #1 SMP Thu May 17 14:00:09 UTC 2007 i686 i686 i386 GNU/Linux *Boost Version: * 1.43.0 *Additional Machine Information* root:# more /proc/cpuinfo processor : 0 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 0 siblings : 4 core id : 0 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4525.99 processor : 1 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 0 siblings : 4 core id : 1 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4522.08 processor : 2 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 0 siblings : 4 core id : 2 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4522.10 processor : 3 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 0 siblings : 4 core id : 3 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4522.09 processor : 4 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 1 siblings : 4 core id : 0 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4522.12 processor : 5 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 1 siblings : 4 core id : 1 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4522.16 processor : 6 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 1 siblings : 4 core id : 2 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4522.11 processor : 7 vendor_id : GenuineIntel cpu family : 6 model : 26 model name : Intel(R) Xeon(R) CPU E5520 @ 2.27GHz stepping : 5 cpu MHz : 1596.000 cache size : 8192 KB physical id : 1 siblings : 4 core id : 3 cpu cores : 4 fdiv_bug : no hlt_bug : no f00f_bug : no coma_bug : no fpu : yes fpu_exception : yes cpuid level : 11 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe nx rdtscp lm constant_tsc pni monitor ds_cpl vmx est tm2 cx16 xtpr dca p opcnt lahf_lm bogomips : 4522.11 Thank you, -=John On 3/9/2012 1:17 AM, Anthony Williams wrote:
On 09/03/12 03:32, John Rocha wrote:
The problem is, that on rare occasions (1 out of 474 attempts in my last test cycle), thread 1 will return early from the timed_join, and it returns true, indicating the child thread is dead -- but it's not.
Which compiler and OS are you running?
Anthony

Hello, I have been able to extract the thread start/stop logic from our code base into a standalone program that illustrates the problem. Even this is still sort of long, 900 lines or so. The code's thread starup/shutdown model sort of mimics a process startup shutdown model. The startup sequence/logic is Main creates SQ. SQ creates lookup, audit and notify. lookup creates workers 1-10 The shutdown logic is main sends thread interrupt to SQ and joins SQ sends thread interrupt to lookup and joins lookup sends thread interrupt to W1 and joins W1 shutdown lookup sends thread interrupt to W2 and joins W2 shutdown ..... lookup sends thread interrupt to W10 and joins W10 shutdown lookup shutdown SQ sends thread interrupt to audit and joins audit shutdown SQ sends thread interrupt to notify and joins notify shutdown SQ shutdown main finishes its shutdown I've created a bash script that runs the program and then checks for a core. It repeats until a core is found (or I eventually kill it). Sometimes it will core with a seg fault. Sometimes it will core with an abort because condvars are being destroyed and the mutex is still locked or someone is still waiting on the condvar. Sometimes it will core citing 'pure virtual method called' because a base class is invokes a derived function on an object that has been deleted from underneath it. The root cause for all of these appears to be that one of the thread join() calls is returning before the thread has actually finished. I say this based on debug output -- shared below. The premature return leads to objects being destroyed, that are still being used by othere threads. Moreover, I have found that if I use a raw pthread_join() in my shutdown logic and then follow that with a boost::thread::join(), then it always works (stopped after 1000 succesful runs). I was working on the premise that boost::thread::join() does more logic than just the join. So pthread_join() waited for the thread to exit, and then boost::thread::join() took care of all the accounting for the thread object. This is obviously a hack, but it is an interesting observation. I am running on a Linux server with a concurreny level of 8 -- as indiated by the hardware_concurrency() method. root@psbu-jrr2-lnx:# uname -a Linux psbu-jrr2-lnx 2.6.16.46-0.12-bigsmp #1 SMP Thu May 17 14:00:09 UTC 2007 i686 i686 i386 GNU/Linux root@psbu-jrr2-lnx:# g++ --version g++ (GCC) 4.1.2 20070115 (prerelease) (SUSE Linux) Copyright (C) 2006 Free Software Foundation, Inc. This is free software; see the source for copying conditions. There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. Boost version 1.43 Annotated log output is below. TIME THRD# LOG INFORMATION -------------- ------ ----------------------------------------------- 11:37:56.119156 (30773) shutdown():main shutting down SQ 11:37:56.119176 (30773) shutdown():main waiting on SQ thread 11:37:56.119195 (30774) operator()():Thread SQ(0xbfc8dd78) RX shutdown interrupt. 11:37:56.119216 (30774) thread_shutdown():SQ starts thread_shutdown 11:37:56.119228 (30774) shutdown():SQ shutting down LU 11:37:56.119240 (30774) shutdown():SQ waiting on LU thread 11:37:56.119324 (30777) operator()():Thread LU(0x8085188) RX shutdown interrupt. 11:37:56.119342 (30777) thread_shutdown():LU starts thread_shutdown 11:37:56.119354 (30777) shutdown():LU shutting down Worker#1 11:37:56.119368 (30777) shutdown():LU waiting on Worker#1 thread 11:37:56.119395 (30773) shutdown():main shutdown SQ in time 00:00:00.000143 11:37:56.119414 (30778) operator()():Thread Worker#1(0x8084da8) RX shutdown interrupt. 11:37:56.119441 (30777) shutdown():LU shutdown Worker#1 in time 00:00:00.000070 11:37:56.119454 (30777) shutdown():LU shutting down Worker#2 11:37:56.119469 (30777) shutdown():LU waiting on Worker#2 thread 11:37:56.119499 (30779) operator()():Thread Worker#2(0x8085690) RX shutdown interrupt. 11:37:56.119603 (30773) ~sq_thread_c():Thread SQ (0xbfc8dd78) From this we see that main(30773) invokes SQ.shutdown(), which sends the SQ thread a thread_interupt, and then waits (via join) on the SQ thread to complete. The SQ thread (30774) receives the thread_interrupt, which causes it to 'throw' out of its thread_main() and get caught by the base class operator() method, which then invokes the SQ.thread_shutdown() method. That starts by invoking the shutdown() method on its first child -- the lookup thread. This causes a thread_interrupt to be sent to the lookup thread and then does a join() until the lookup thread has exited. The lookup thread (3077) receives the thread_interupt, which causes it to 'throw' out of its thread_main(), and get caught by the base class operator() method, which then invokes the lookup.thread_shutdown() method. That starts by invoking the shutdown() method on its first child -- worker #1. And then does a join on worker#1. What SHOULD have happened is another cascading action where worker #1 gets the interrupt, throws out to operator(), which then invokes it's shutdown(), etc. However, we don't see any such logs from worker #1. Instead, the main thread (30773), suddenly prints a log that says the serial queue thread shutdown is complete. Meaning that SQ.join() returned. Worker thread#1 does shutdown, which then advances policy thread to worker #2. Then main starts destructing all of the objects -- some of which are thread objects for threads that are still running. This then leads to the 'pure virtual method called' issue because a base class is invoked after its object has been destroyed, so its vtable is gone. =============================================================================== #include <sys/syscall.h> #include <iostream> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread.hpp> #include <boost/foreach.hpp> using boost::posix_time::time_duration; using boost::posix_time::microsec_clock; using boost::posix_time::to_simple_string; using boost::posix_time::ptime; ////////////////////////////////////////////////////////////////////// // Simulation of the logging mechanism we have. We don't use printf, it's // actually a mutex controlled input into a buffe, which is then periodically // drained by a separate thread into a file. For simplicity I just changed it // into mutex controlled output. // boost::mutex output_ctl; #define EE_LOG_MSG(level, fmt, args...) { \ char buf[1024]; \ snprintf(buf, sizeof(buf), fmt, ##args); \ boost::lock_guard<boost::mutex> lock(output_ctl); \ std::cout << to_simple_string(microsec_clock().local_time()) \ << " (" << gettid() << ") " \ << __FUNCTION__ << "():" << buf << "\n"; \ } #define EE_LOG_ECODE(level, err, fmt, args...) { \ char buf[1024]; \ snprintf(buf, sizeof(buf), fmt, ##args); \ boost::lock_guard<boost::mutex> lock(output_ctl); \ std::cout << to_simple_string(microsec_clock().local_time()) \ << " (" << gettid() << ") " \ << __FUNCTION__ << "():" << buf << "Error=[" \ << strerror(err) << "]\n"; \ } // get the thread id (as seen by 'top' or 'ps' pid_t gettid() { return syscall(__NR_gettid); } ////////////////////////////////////////////////////////////////////// // Need to simulate the code that waits for all threads to start up, just use // simple barriers. // // 10 worker threads // 1 lookup thread // 1 queue thread // 1 audit thread // 1 notify thread // 1 main thread //--- // 15 threads boost::barrier init_done(15); ////////////////////////////////////////////////////////////////////// // class used to exchange data from one thread to another. // // History from this file indicates that it originally used raw pthread_xxxx // calls, and thenlater switched to boost::mutex/condvar // template <class T, class Container = std::deque<T> > class mt_queue_c { public: explicit mt_queue_c(unsigned int max_size, const Container &cont = Container()) : m_c(cont), m_max_size(max_size), m_drop_total(0), m_drop_oflow(0), m_queue_enabled(true) {} virtual ~mt_queue_c() {}; int enqueue(const T &elem, std::string *ps_error = NULL); int dequeue(T &dest, std::string *ps_error = NULL); void off(); void on(); protected: Container m_c; unsigned int m_max_size; unsigned int m_drop_total; unsigned int m_drop_oflow; boost::timed_mutex m_access_mtx; boost::condition_variable_any m_data_cv; bool m_queue_enabled; void _make_error_string(std::string *p_s_err, const char *p_src, const char *p_func, int errval); }; // make_error_string() // Helper function for integrating with the _make_error_string method. It // automatically pushes on the __FUNCTION__ value. // #define make_error_string(p_s_err, p_src, errval) \ _make_error_string(p_s_err, p_src, __FUNCTION__, errval) template <class T, class Container> inline void mt_queue_c<T,Container>::_make_error_string (std::string *p_s_err, const char *p_src, const char *p_func, int errval) { std::stringstream ss_err; if (!p_s_err) { return; } if (!p_src) { p_src = "bad p_src parm"; } if (!p_func) { p_func = "bad p_func parm"; } ss_err << " Error-Response: (" << errval << ") " << strerror(errval); *p_s_err = "["; *p_s_err += p_func; *p_s_err += "()]: "; *p_s_err += p_src; *p_s_err += ss_err.str(); } template <class T, class Container> inline int mt_queue_c<T,Container>::enqueue (const T &elem, std::string *ps_error /* = NULL */) { const char *p_err = ""; int ret_err; boost::lock_guard<boost::timed_mutex> lock(m_access_mtx); // if the max size is non-zero then enforce the size limit if (m_max_size && m_c.size() >= m_max_size) { p_err = "Queue limit reached. Cannot enqueue this element."; ret_err = ENOSPC; // closest match goto enqueue_done; } m_c.push_back(elem); m_data_cv.notify_one(); ret_err = 0; enqueue_done: if (ret_err) { make_error_string(ps_error, p_err, ret_err); if (ENOSPC == ret_err) { m_drop_oflow++; } m_drop_total++; } return (ret_err); } template <class T, class Container> inline int mt_queue_c<T,Container>::dequeue (T &dest, std::string *ps_error /* = NULL */) { const char *p_err = ""; int ret_err; boost::unique_lock<boost::timed_mutex> lock(m_access_mtx); while (!m_c.size() || !m_queue_enabled) { m_data_cv.wait(lock); } if (m_c.empty()) { p_err = "Woke up but queue is empty."; ret_err = ENOENT; // closest existing match. Error no entity. goto dequeue_done; } dest = m_c.front(); m_c.pop_front(); ret_err = 0; dequeue_done: if (ret_err) { make_error_string(ps_error, p_err, ret_err); } return (ret_err); } template <class T, class Container> inline void mt_queue_c<T,Container>::off () { boost::lock_guard<boost::timed_mutex> lock(m_access_mtx); m_queue_enabled = false; } template <class T, class Container> inline void mt_queue_c<T,Container>::on () { boost::lock_guard<boost::timed_mutex> lock(m_access_mtx); m_queue_enabled = true; if (m_c.size()) { m_data_cv.notify_all(); } } ////////////////////////////////////////////////////////////////////// // We actually exchange much more complex data. I just simplify it to ints so // that information is exchanged, and threads run. typedef mt_queue_c<int> t_msg_queue; ////////////////////////////////////////////////////////////////////// class thread_base_c { public: thread_base_c(const std::string &name, int prio, int sched) : m_name(name), m_priority(prio), m_sched(sched), m_thread(), m_shutdown(false) { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~thread_base_c () { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; // Functor. This is the API that the boost thread library will invoke as // the new threads main. void operator()() { std::string s_err; // set my scheduling and priority levels if (!setscheduling()) { return; } // call the threads init routine if (!thread_init()) { s_err = "Thread:"; s_err += m_name + " thread_init() method failed."; EE_LOG_MSG(EE_ERROR, s_err.c_str()); thread_shutdown(); return; } // call the threads main loop, watching for boost::interrupt which // means its time for us to shutdown try { init_done.wait(); if (!thread_main()) { s_err = "Thread:"; s_err += m_name + " thread_main() method failed."; EE_LOG_MSG(EE_ERROR, s_err.c_str()); thread_shutdown(); return; } } catch (boost::thread_interrupted &e) { // We've received a boost thread interrupt from our parent thread, // it's time to shut down, invoke our shutdown routine and then // return. EE_LOG_MSG(EE_TRACE, "Thread %s(%p) RX shutdown interrupt.", m_name.c_str(), this); thread_shutdown(); return; } } void create_thread() { if (m_thread.get_id() != boost::thread::id()) { EE_LOG_MSG(EE_ERROR, "Thread already exists for this object: %s (%p)", m_name.c_str(), this); return; } // create a thread using our object and store it m_thread = boost::thread(boost::ref(*this)); } void shutdown(const std::string &s_caller) { EE_LOG_MSG(EE_TRACE, "%s shutting down %s", s_caller.c_str(), m_name.c_str()); ptime start_time(microsec_clock::local_time()); m_shutdown = true; m_thread.interrupt(); EE_LOG_MSG(EE_TRACE, "%s waiting on %s thread", s_caller.c_str(), m_name.c_str()); // If this is uncommented, it appears to work -- at least I haven't // seen any problems in 2000 test cycles. // int error(pthread_join(m_thread.native_handle(), NULL)); // if (error) { // EE_LOG_ECODE(EE_ERROR, error, "pthread_join(%s) error for %s.", // m_name.c_str(), s_caller.c_str()); // } m_thread.join(); time_duration how_long(microsec_clock::local_time() - start_time); EE_LOG_MSG(EE_TRACE, "%s shutdown %s in time %s", s_caller.c_str(), m_name.c_str(), to_simple_string(how_long).c_str()); } protected: virtual bool thread_init() = 0; virtual bool thread_main() = 0; virtual void thread_shutdown() = 0; bool setscheduling() { std::stringstream ss_result; struct sched_param old_sched; struct sched_param new_sched; pthread_t tid; int errval; int policy; tid = pthread_self(); errval = pthread_getschedparam(tid, &policy, &old_sched); if (errval) { EE_LOG_ECODE(EE_ERROR, errval, "Cannot get scheduling/priority parameters."); return (false); } // initialize new_sched to whats established, and then change to our // new values. new_sched = old_sched; new_sched.sched_priority = m_priority; errval = pthread_setschedparam(tid, m_sched, &new_sched); if (errval) { EE_LOG_ECODE(EE_ERROR, errval, "Cannot set the scheduling/priority parameters."); return (false); } ss_result << "Scheduling and priorities changed for thread '" << m_name << "' from: (" << policy << ")" << get_sched_str(policy) << "/" << old_sched.sched_priority << " to: (" << m_sched << ")" << get_sched_str(m_sched) << "/" << m_priority; EE_LOG_MSG(EE_DEBUG(0), ss_result.str().c_str()); return (true); } # define CASE_RETVAL(x) case x: return (#x) char const* get_sched_str(int sched_type) { switch (sched_type) { CASE_RETVAL(SCHED_OTHER); CASE_RETVAL(SCHED_FIFO); CASE_RETVAL(SCHED_RR); CASE_RETVAL(SCHED_BATCH); } return ("Unknown"); } inline void check_for_shutdown () { boost::this_thread::interruption_point(); if (m_shutdown) { throw boost::thread_interrupted(); } } std::string m_name; // Name of the object instance. Used in // debugging and error messages. int m_priority; // Scheduling priority of the object thread. int m_sched; // Scheduling type of the object thread // see pthread_setsched() for furhter info. // Our Thread boost::thread m_thread; // Has the thread been asked to shutdown? bool m_shutdown; private: // don't allow these, because boost::thread cannot be copied thread_base_c& operator=(const thread_base_c&); }; ////////////////////////////////////////////////////////////////////// class worker_thread_c : public thread_base_c { public: worker_thread_c(const std::string name, int prio, int sched, t_msg_queue &sq) : thread_base_c(name, prio, sched), m_work_q(sq) { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~worker_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() { return (true); } virtual bool thread_main() { while (1) { int qval; check_for_shutdown(); m_work_q.dequeue(qval); // EE_LOG_MSG(EE_TRACE, "RX: %d", qval); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() {}; protected: t_msg_queue &m_work_q; }; ////////////////////////////////////////////////////////////////////// class lookup_thread_c : public thread_base_c { public: lookup_thread_c (int prio, int sched, t_msg_queue &sq) : thread_base_c("LU", prio, sched), m_serial_q(sq), m_work_q(40000), v_work_thread_pointers() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~lookup_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); v_work_thread_pointers.clear(); } virtual bool thread_init() { const int num_workers(10); std::stringstream ss_worker_name; int idx; try { // spawn the worker threads for (idx = 1; idx <= num_workers; ++idx) { // create the threads name ss_worker_name.str(""); ss_worker_name << "Worker#" << idx; P_SHR_WORKER_T p_worker(new worker_thread_c(ss_worker_name.str(), 18, SCHED_RR, m_work_q)); p_worker->create_thread(); v_work_thread_pointers.push_back(p_worker); } } catch (std::exception &e) { EE_LOG_MSG(EE_ERROR, "Failed to allocate/initialize data for look-Up " "thread. Response is: '%s'", e.what()); return (false); } return (true); } virtual bool thread_main() { while (1) { int sq_val; check_for_shutdown(); m_serial_q.dequeue(sq_val); // EE_LOG_MSG(EE_TRACE, "RX: %d", sq_val); m_work_q.enqueue(sq_val); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() { EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__); // shutdown the down stream queue m_work_q.off(); // tell our children to shutdown BOOST_FOREACH(P_SHR_WORKER_T p_worker, v_work_thread_pointers) { p_worker->shutdown(m_name); } EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__); } protected: t_msg_queue &m_serial_q; t_msg_queue m_work_q; typedef boost::shared_ptr<worker_thread_c> P_SHR_WORKER_T; std::vector<P_SHR_WORKER_T> v_work_thread_pointers; }; ////////////////////////////////////////////////////////////////////// class audit_thread_c : public thread_base_c { public: audit_thread_c(int prio, int sched) : thread_base_c("Audit", prio, sched) { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~audit_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() { return (true); }; virtual bool thread_main() { while (1) { check_for_shutdown(); sleep (1); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() {}; }; ////////////////////////////////////////////////////////////////////// class notify_thread_c : public thread_base_c { public: notify_thread_c(int prio, int sched) : thread_base_c("Notify", prio, sched) { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~notify_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() {return (true);}; virtual bool thread_main() { while (1) { check_for_shutdown(); sleep(1); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() {}; }; ////////////////////////////////////////////////////////////////////// class sq_thread_c : public thread_base_c { public: sq_thread_c (int prio, int sched, t_msg_queue &serial_in) : thread_base_c("SQ", prio, sched), m_main_in(serial_in), m_serial_q(2000), p_audit(), p_notify(), p_lu() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~sq_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() { try { p_audit.reset(new audit_thread_c(18, SCHED_RR)); p_audit->create_thread(); p_notify.reset(new notify_thread_c(18, SCHED_RR)); p_notify->create_thread(); p_lu.reset(new lookup_thread_c(18, SCHED_RR, m_serial_q)); p_lu->create_thread(); } catch (std::exception &ex) { EE_LOG_MSG(EE_ERROR, "Failed to allocate/initialize data for SQ thread. " "Response is: '%s'", ex.what()); return (false); } return (true); } virtual bool thread_main() { while (1) { int sq_val; check_for_shutdown(); m_main_in.dequeue(sq_val); m_serial_q.enqueue(sq_val); // EE_LOG_MSG(EE_TRACE, "%s RX: %d", m_name.c_str(), sq_val); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() { EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__); // shut down the downstream queue m_serial_q.off(); // tell the subordinate threads to shut down if (p_lu) { p_lu->shutdown(m_name); } else { EE_LOG_MSG(EE_ERROR,"Unexpected null lookup thread pointer " "during shutdown of %s", m_name.c_str()); } if (p_notify) { p_notify->shutdown(m_name); } else { EE_LOG_MSG(EE_ERROR, "Unexpected null notify thread pointer " "during shutdown of %s", m_name.c_str()); } if (p_audit) { p_audit->shutdown(m_name); } else { EE_LOG_MSG(EE_ERROR, "Unexpected null audit thread " "pointer during shutdown of %s", m_name.c_str()); } EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__); } protected: t_msg_queue &m_main_in; t_msg_queue m_serial_q; // Child thread management variables boost::shared_ptr<audit_thread_c> p_audit; boost::shared_ptr<notify_thread_c> p_notify; boost::shared_ptr<lookup_thread_c> p_lu; }; int main () { t_msg_queue main_out(2000); EE_LOG_MSG(EE_TRACE, "Concurrency Level: %d", boost::thread::hardware_concurrency()); sq_thread_c serial_Q_Thread_Main(19, SCHED_RR, main_out); serial_Q_Thread_Main.create_thread(); // the real thread startup has a more complex init synchronization. This is // a quick hack. Wait until all threads have reached the barrier, then have // main sleep a bit so that all child threads can actually progress past // the barrier. init_done.wait(); usleep(10 * 1000); EE_LOG_MSG(EE_TRACE, "All threads are up, pumping 1000 entries into the queue."); for (int idx(0); idx < 1000; ++idx) { main_out.enqueue(idx); } serial_Q_Thread_Main.shutdown(__FUNCTION__); return (0); }

On 14/03/12 20:09, John Rocha wrote:
I have been able to extract the thread start/stop logic from our code base into a standalone program that illustrates the problem. Even this is still sort of long, 900 lines or so.
Thanks for the example. Your problem is that you are overlaying TWO interruption mechanisms --- boost::thread::interrupt() and your own m_shutdown flag. thread::join() is an interruption point, so if your thread sees the m_shutdown flag before the boost::thread::interrupt(), then it will pick up the interrupt when it calls join() on its own worker threads. I would suggest that you avoid the use of m_shutdown, since it is redundant. Also, wrap your calls to join() in scope with a boost::this_thread::disable_cancellation object so that the join cannot be interrupted.
void shutdown(const std::string &s_caller) { EE_LOG_MSG(EE_TRACE, "%s shutting down %s", s_caller.c_str(), m_name.c_str());
ptime start_time(microsec_clock::local_time());
m_shutdown = true; m_thread.interrupt(); m_thread.join();
inline void check_for_shutdown () { boost::this_thread::interruption_point();
if (m_shutdown) { throw boost::thread_interrupted(); } }
Anthony -- Author of C++ Concurrency in Action http://www.stdthread.co.uk/book/ just::thread C++11 thread library http://www.stdthread.co.uk Just Software Solutions Ltd http://www.justsoftwaresolutions.co.uk 15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976

Hello, First I want to thank you for helping with this. How did you come to this answer so quickly? Is this a technique or tool that I can learn, or was this wisdom from having worked with your library for so long? Next, I'd like to ensure that I fully understand what was occurring. Can you please confirm that I've got this right. I've read and thought about your answer and looked at the code and I believe these are the details. Main Uses SQ.shutdown() to shutdown the SQ thread it sets the SQ.m_shutdown flag sends the SQ thread a boost thread interrupt joins on the thread SQ Thread This thread is running under it's thread_main() and __happens__ to be in the is_shutdown() method, after it checked the interruption_point() but BEFORE it checks the m_shutdown() logic. SQ.m_shudown was just changed to true, so is_shutdown() sees that and throws a boost::thread_interrupted exception. However, it should also be noted that the boost thread data has its internal interrupt requested flag set too. Now SQ Thread, "throws" out of thread_main() is caught by the base classes operator() method and enters the SQ.thread_shutdown method. SQ.thread_shutdown needs to shut down it's lookup child thread. So it invokes the LU.shutdown method. The LU.shutdown method is just like above: sets the LU.m_shutdown flag sends the LU thread a boost thread interrupt joins on the LU thread However, join() is an interruption point, and I haven't "cleared the interrupt" for the boost thread yet. Therefore, when the SQ thread invokes join(), it checks if there are any outstanding interrupts it needs to honor. There are, so it throws another thread interrupted exception, which exits the join(), and I don't catch; therefore the SQ thread exits prematurely due to my faulty logic. Yes, I know, a wordy explanation for the brilliantly summary you gave me. I just want to double check that I've got the details down correctly. One final word. I feel ungrateful for bringing this up, and I'm still looking into this. However, I've encountered a new symptom. I've added the boost::this_thread::disable_interruption object to the beginning of my thread_base_c::shutdown() function and I've removed all references to m_shutdown -- as you recommended. However now, I occasionally get a deadlock during the shutdown. I call it a deadlock when the shutdown process stalls for longer than 5 minutes. I've run the test multiple times and I'll see the deadlock on rare occasions. I've run seven different test cycles with the deadlock occurring at different times for each: 189, 398, 797, 999, 1282, 1527, 3416 (not in that order) This could be an artifact of my simulation, and I'm just now starting to crawl through the gdb output -- nicely enough I can connect to the running process and see it's current running state. Do you have any debugging insights or tips that I should apply for this investigation? Thank you for all your help, -=John On 3/14/2012 3:23 PM, Anthony Williams wrote:
On 14/03/12 20:09, John Rocha wrote:
I have been able to extract the thread start/stop logic from our code base into a standalone program that illustrates the problem. Even this is still sort of long, 900 lines or so.
Thanks for the example. Your problem is that you are overlaying TWO interruption mechanisms --- boost::thread::interrupt() and your own m_shutdown flag.
thread::join() is an interruption point, so if your thread sees the m_shutdown flag before the boost::thread::interrupt(), then it will pick up the interrupt when it calls join() on its own worker threads.
I would suggest that you avoid the use of m_shutdown, since it is redundant. Also, wrap your calls to join() in scope with a boost::this_thread::disable_cancellation object so that the join cannot be interrupted.
void shutdown(const std::string &s_caller) { EE_LOG_MSG(EE_TRACE, "%s shutting down %s", s_caller.c_str(), m_name.c_str());
ptime start_time(microsec_clock::local_time());
m_shutdown = true; m_thread.interrupt(); m_thread.join();
inline void check_for_shutdown () { boost::this_thread::interruption_point();
if (m_shutdown) { throw boost::thread_interrupted(); } }
Anthony

On 15/03/12 16:47, John Rocha wrote:
First I want to thank you for helping with this. How did you come to this answer so quickly? Is this a technique or tool that I can learn, or was this wisdom from having worked with your library for so long?
I tried it out, saw the problem manifest, trapped it in gdb, and examined the code. I guess it's just experience.
Main Uses SQ.shutdown() to shutdown the SQ thread it sets the SQ.m_shutdown flag sends the SQ thread a boost thread interrupt joins on the thread
SQ Thread This thread is running under it's thread_main() and __happens__ to be in the is_shutdown() method, after it checked the interruption_point() but BEFORE it checks the m_shutdown() logic. SQ.m_shudown was just changed to true, so is_shutdown() sees that and throws a boost::thread_interrupted exception. However, it should also be noted that the boost thread data has its internal interrupt requested flag set too.
Now SQ Thread, "throws" out of thread_main() is caught by the base classes operator() method and enters the SQ.thread_shutdown method.
SQ.thread_shutdown needs to shut down it's lookup child thread. So it invokes the LU.shutdown method.
The LU.shutdown method is just like above: sets the LU.m_shutdown flag sends the LU thread a boost thread interrupt joins on the LU thread
However, join() is an interruption point, and I haven't "cleared the interrupt" for the boost thread yet. Therefore, when the SQ thread invokes join(), it checks if there are any outstanding interrupts it needs to honor. There are, so it throws another thread interrupted exception, which exits the join(), and I don't catch; therefore the SQ thread exits prematurely due to my faulty logic.
Yes, that matches my understanding.
One final word. I feel ungrateful for bringing this up, and I'm still looking into this. However, I've encountered a new symptom. I've added the boost::this_thread::disable_interruption object to the beginning of my thread_base_c::shutdown() function and I've removed all references to m_shutdown -- as you recommended.
However now, I occasionally get a deadlock during the shutdown. I call it a deadlock when the shutdown process stalls for longer than 5 minutes. I've run the test multiple times and I'll see the deadlock on rare occasions. I've run seven different test cycles with the deadlock occurring at different times for each: 189, 398, 797, 999, 1282, 1527, 3416 (not in that order)
This could be an artifact of my simulation, and I'm just now starting to crawl through the gdb output -- nicely enough I can connect to the running process and see it's current running state.
Do you have any debugging insights or tips that I should apply for this investigation?
My first thought is to check that the code is not blocked in a non-interruptible call. Anthony -- Author of C++ Concurrency in Action http://www.stdthread.co.uk/book/ just::thread C++11 thread library http://www.stdthread.co.uk Just Software Solutions Ltd http://www.justsoftwaresolutions.co.uk 15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976

Le 15/03/12 18:25, Anthony Williams a écrit :
On 15/03/12 16:47, John Rocha wrote:
First I want to thank you for helping with this. How did you come to this answer so quickly? Is this a technique or tool that I can learn, or was this wisdom from having worked with your library for so long?
I tried it out, saw the problem manifest, trapped it in gdb, and examined the code. I guess it's just experience.
Main Uses SQ.shutdown() to shutdown the SQ thread it sets the SQ.m_shutdown flag sends the SQ thread a boost thread interrupt joins on the thread
SQ Thread This thread is running under it's thread_main() and __happens__ to be in the is_shutdown() method, after it checked the interruption_point() but BEFORE it checks the m_shutdown() logic. SQ.m_shudown was just changed to true, so is_shutdown() sees that and throws a boost::thread_interrupted exception. However, it should also be noted that the boost thread data has its internal interrupt requested flag set too.
Now SQ Thread, "throws" out of thread_main() is caught by the base classes operator() method and enters the SQ.thread_shutdown method.
SQ.thread_shutdown needs to shut down it's lookup child thread. So it invokes the LU.shutdown method.
The LU.shutdown method is just like above: sets the LU.m_shutdown flag sends the LU thread a boost thread interrupt joins on the LU thread
However, join() is an interruption point, and I haven't "cleared the interrupt" for the boost thread yet. Therefore, when the SQ thread invokes join(), it checks if there are any outstanding interrupts it needs to honor. There are, so it throws another thread interrupted exception, which exits the join(), and I don't catch; therefore the SQ thread exits prematurely due to my faulty logic.
Yes, that matches my understanding.
Hi, does this means that there are some restrictions on the code that can be executed on the boost::thread_interrupted catch handler or some guidelines that s/he must follow? Is it legal that the user throws itself a boost::thread_interrupted exception? Is there a way to clear the interrupted flag so that the join() is not interrupted on the boost::thread_interrupted catch handler? What was wrong with the user code that made the library crash? Which precondition of the library was violated? Maybe some assertions help so that the error is identified as soon as possible. Best, Vicente

On 3/15/2012 11:41 AM, Vicente J. Botet Escriba wrote:
Hi,
does this means that there are some restrictions on the code that can be executed on the boost::thread_interrupted catch handler or some guidelines that s/he must follow? Is it legal that the user throws itself a boost::thread_interrupted exception? Is there a way to clear the interrupted flag so that the join() is not interrupted on the boost::thread_interrupted catch handler?
What was wrong with the user code that made the library crash? Which precondition of the library was violated? Maybe some assertions help so that the error is identified as soon as possible.
Best, Vicente
Hello Vicente, I'd like to point out that the library didn't crash. My code did. This was caused by faulty logic on my part -- a race condition with a very small window of opportunity. I had a cascading startup and shutdown design, for example (please pardon my verbose answer): Startup: -------- main spawns T1 and waits for T1 to indicate it is done with init T1 spawns T2 and waits for T2 to indicate it is done with init T2 spawns T3 and waits for T3 to indicate it is donewith init T3 spawns T4 and waits for T4 to indicate it is done with init T4 comes up finishes it's init, informs T3 it's init is done and enters its main loop. T3 finishes its init, signals T2 its init is done and enters its main loop. T2 finishes its init, signals T1 its init is done and enters its main loop. T1 spawns T2 and waits for T2 to indicate it is done with it's init and then enters its main loop. The system is now running, so main blocks until it receives a terminate signal (-2). Shutdown: --------- This is pretty much the reverse of the startup. signal -2 is received which unblocks main main tells T1 to shutdown and then join()s on the thread waiting for it to finish. T1 tells T2 to shutdown and then join()s on the thread waiting for it to finish. T2 tells T3 to shutdown and then join()s on the thread waiting for it to finish. T3 tells T4 to shutdown and then join()s on the thread waiting for it to finish. T4 does its shutdown logic and then exits the thread T3 wakes from join() finishes its shutdown logic and exits the thread T2 wakes from join() finishes its shutdown logic and exits the thread T1 wakes from join() finishes its shutdown logic and exits the thread main finishes its shutdown and the program exits. As part of that shutdown it invokes the destructor for its T1 object. Part of T1's destruction is to destruct the T2 object, which destructrs T3, which destructs T4. With this understanding of the overall architecture, we can focus on my faulty shutdown logic. main() told T1 to shutdown but it used two(2) methods to inform T1. The first was the thread_interrupt, the second was to set a shutdown flag in T1's object. However, when main uses T1.interupt() that also causes a flag to be set in the boost thread object too. T1 happens to be in it's T1.check_for_shutdown() routine, after it invoked boost::this_thread::interruption_point() check but before the "if shutdown flag" check. T1.check_for_shutdown() // this will throw if the boost interruption requested flag is set boost::this_thread::interruption_point(); **** the code is here when main() ran it T1.shutdown() *** if (T1.m_shutdown) { throw boost::thread_interrupted } So I detected the shutdown with MY logic m_shutdown, not with an interruption point. Consequently the boost thread's interruption is still pending, and will remain pending until another interruption point is hit. In my code, the thread_interrupted is caught by a handler that is waiting fot this and it then invokes T1.thread_shutdown() routine. Which for T1 is to send a boost interrupt to T2, and then join on T2. BUT, join() is an interuption point, my code would now act upon that pending interruption, thowing ANOTHER thread_interrupted, exiting join(), finishing off shutdown logic and then exiting the thread. So my code would terminate T1 before all of its child threads had terminated. Now, main legally unblocks from join(), since T1 exited, and then it invokes the destructor on T1, which does cascade destructions ont the T1-T4 objects. THIS is what leads to my segmentation fault, called pure virtual, etc. errors. Because there are threads still alive running code based on that object, access data from that object, which was just deleted out from underneath it. Can the user throw a boost::thread_interrupted exception? To be honest this wasn't the problem. Throwing this doesn't impact the setting of the threads "do I have an interrupt pending" flag. I could have thrown my own custom exception and I still would have encountered this problem. The problem is that my "check_for_shutdown" logic assumed that when it exited no interrupts would be pending. Is there a way to clear the interrupted flag so that the join is not interrupted? I would argue that clearing the flag isn't correct. One could add logic such as: try { boost::this_thread::interruption_point(): } catch (boost::thread_interrupted &) { } join() Which would clear the flag. However, while I am blocked in join(), some other thread could send me another interrupt which would break me out of join(). Not what I wanted. I feel that Anthony's sugestion of blocking interrupts for this method is the appropriate way to go. I don't think any assertions could help, but maybe a documentation improvement? Or maybe it's there and I didn't read carefully enough? After looking at the boost codebase and this learning exchange, I learned that when the interrupt() method is invoked the thread notes the interrupt and the interrupt is pending until an interrupt point is hit. Moreover, only one interrupt can be pending at a time. For example, two calls to interrupt() set the threads interrupt flag once. Its either on or off. So the next interruption point will clear that flag. It's like the old style signals. Regards, -=John

Le 15/03/12 20:25, John Rocha a écrit :
On 3/15/2012 11:41 AM, Vicente J. Botet Escriba wrote:
Hi,
does this means that there are some restrictions on the code that can be executed on the boost::thread_interrupted catch handler or some guidelines that s/he must follow? Is it legal that the user throws itself a boost::thread_interrupted exception? Is there a way to clear the interrupted flag so that the join() is not interrupted on the boost::thread_interrupted catch handler?
What was wrong with the user code that made the library crash? Which precondition of the library was violated? Maybe some assertions help so that the error is identified as soon as possible.
Best, Vicente
Hello Vicente,
I'd like to point out that the library didn't crash. My code did. This was caused by faulty logic on my part -- a race condition with a very small window of opportunity.
Thanks for the clarification. For a quick read of your post I thought that it was the case. My bad.
main() told T1 to shutdown but it used two(2) methods to inform T1. The first was the thread_interrupt, the second was to set a shutdown flag in T1's object. However, when main uses T1.interupt() that also causes a flag to be set in the boost thread object too.
T1 happens to be in it's T1.check_for_shutdown() routine, after it invoked boost::this_thread::interruption_point() check but before the "if shutdown flag" check.
T1.check_for_shutdown() // this will throw if the boost interruption requested flag is set boost::this_thread::interruption_point();
**** the code is here when main() ran it T1.shutdown() ***
if (T1.m_shutdown) { throw boost::thread_interrupted }
So I detected the shutdown with MY logic m_shutdown, not with an interruption point. Consequently the boost thread's interruption is still pending, and will remain pending until another interruption point is hit.
In my code, the thread_interrupted is caught by a handler that is waiting fot this and it then invokes T1.thread_shutdown() routine. Which for T1 is to send a boost interrupt to T2, and then join on T2.
BUT, join() is an interuption point, my code would now act upon that pending interruption, thowing ANOTHER thread_interrupted, exiting join(), finishing off shutdown logic and then exiting the thread.
So my code would terminate T1 before all of its child threads had terminated.
Now, main legally unblocks from join(), since T1 exited, and then it invokes the destructor on T1, which does cascade destructions ont the T1-T4 objects.
I don't know if there is a possible improvement here as the destructor of the Tx implies the destructor of a thread instance. The Boost.Thread implementation detach a joinable thread on destruction, while the c++11 standard calls to terminate(). I don't know if a call to terminate() would show the issue more clearly in your case.
THIS is what leads to my segmentation fault, called pure virtual, etc. errors. Because there are threads still alive running code based on that object, access data from that object, which was just deleted out from underneath it.
Can the user throw a boost::thread_interrupted exception? To be honest this wasn't the problem. Throwing this doesn't impact the setting of the threads "do I have an interrupt pending" flag. I could have thrown my own custom exception and I still would have encountered this problem. The problem is that my "check_for_shutdown" logic assumed that when it exited no interrupts would be pending.
I see.
Is there a way to clear the interrupted flag so that the join is not interrupted? I would argue that clearing the flag isn't correct. One could add logic such as: try { boost::this_thread::interruption_point(): } catch (boost::thread_interrupted &) { } join()
Which would clear the flag.
Thanks for the trick.
However, while I am blocked in join(), some other thread could send me another interrupt which would break me out of join(). Not what I wanted. I feel that Anthony's sugestion of blocking interrupts for this method is the appropriate way to go.
I agree that disabling interrupts is the correct way to avoid new interruptions. But you need to clear them before.
I don't think any assertions could help, but maybe a documentation improvement? Or maybe it's there and I didn't read carefully enough?
I agree now. Asserts could not help in this case. IIUC, a boost::thread_interrupted catch handler should clear the interrupt flag (as the boost::thread_interrupted exception can be throw by the user code) and disable interrupts before attempting to use ant interruption point. Are there other ways? Best, Vicente

On 15/03/12 22:44, Vicente J. Botet Escriba wrote:
I agree that disabling interrupts is the correct way to avoid new interruptions. But you need to clear them before.
Disabling interrupts doesn't stop the flag being set, it just stops the interruption points acting on it. There is thus no need to clear the flag. If you want to call code with an interruption point without being interrupted, disable interruptions. Whether or not the flag is set beforehand, or set during the call is now irrelevant. It may or may not be set at the end of the block either way. Anthony -- Author of C++ Concurrency in Action http://www.stdthread.co.uk/book/ just::thread C++11 thread library http://www.stdthread.co.uk Just Software Solutions Ltd http://www.justsoftwaresolutions.co.uk 15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976

Le 16/03/12 09:51, Anthony Williams a écrit :
On 15/03/12 22:44, Vicente J. Botet Escriba wrote:
I agree that disabling interrupts is the correct way to avoid new interruptions. But you need to clear them before.
Disabling interrupts doesn't stop the flag being set, it just stops the interruption points acting on it. There is thus no need to clear the flag. If you want to call code with an interruption point without being interrupted, disable interruptions. Whether or not the flag is set beforehand, or set during the call is now irrelevant. It may or may not be set at the end of the block either way.
Thanks for the correction. Vicente

Hello Anthony, I followed your advice to verify where the threads where blocked. I analyzed the GDB data and I found that the LU thread was blocked on a join() for worker thread #7 (having sent it an interrupt) and that worker thread #7 was block on a boost::condition_variable_any. I then carefully looked at my boost thread source code, for version 1.43 of boost, and I believe there is a race condition between the boost thread's interrupt() and convar::wait() methods. Its a very small window, but I've been able to reproduce the issue with logs. Can you please read the following to find any flaws with my logic, or confirm my suspicion? Also, if this is a defect with the library, what should I do next? Do I raise a defect somewhere? Thank you, -=John Consider the following: Thread T1 is invoking T2.interrupt(), while thread T2 is invoking T2.wait() on one of it's condvars. Suppose the timing was as follows, where on an SMP system T1 and T2 are running simultaneously (or this could happen with inoportune time slices). T2: wait(boost::mutex &m) T2: res=0 T2: detail::interruption_checker check_for_interuption(&cond) T2: lock_guard<mutex> guard(thread_info->data_mutex); T1: interrupt() T1: detail::thread_data_ptr const local_thread_info=get_thread_info(); T1: if (local_thread_info) T1: lock_guard<mutex> lk (local_thread_info->data_mutex) // T1 blocks because T2 has the mutex T2: check_for_interruption() T2: if (thread_info->interrupt_requested) T2: check_for_interruption returns T2: thread_info->current_cond=cond T2: interruption_checker returns, this releases the data_mutex that T1 is blocked on. T1: local_thread_info->interrupt_requested=true; T1: if(local_thread_info->current_cond) // this was set right above T1: pthread_cond_broadcast(local_thread_info->current_cond) T1: interrupt() returns T2: boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); T2: m.unlock(); T2: res=pthread_cond_wait(&cond,&internal_mutex); This would result in the condvar blocking and ignoring the interruption request. Because the condvar brodcast happened AFTER the interrupt check, but before the other thread actually blocked on the condvar. I confirmed that a pthread_cond_broadcast(cv) only unblocks other threads that are CURRENTLY blocked on the cv from the man page as well as from a test program. The man page indicates: The pthread_cond_broadcast() and pthread_cond_signal() functions shall have no effect if there are no threads currently blocked on cond. And the test I created to verify this was basically std::cout << "Calling broadcast for CV\n"; pthread_cond_broadcast(&cv); std::cout << "Calling wait\n"; pthread_mutex_lock(&mtx); pthread_cond_wait(&cv, &mtx); std::cout << "Done waiting\n"; I confirmed that the above locks up and never proceeds. Finally, I changed my test to run over and over while being monitored by valgrind/helgrind as well as updating the library to add some trace statements, along with some more trace statement in my original code. I was able to confirm my hypothesis. Relevant log information is below: I've done some post processing of the logs as follows: - I removed the leading date "2012-Mar-16" - I prefixed each with the 'thread name', easier to follow than the TID - I stripped out a lot of the 'noise' entries that don't add any relevant data Iteration #5 with /views/TEST/BOOST_INC_LIB_DEBUG/lib Start the test wait for it to complete main : 09:18:37.130338 (32675) main():Concurrency Level: 8 main : 09:18:37.130496 (32675) thread_base_c():Thread SQ (0xbfb26414) main : 09:18:37.130518 (32675) sq_thread_c():Thread SQ (0xbfb26414) SQ : 09:18:37.130592 (32676) operator()():Thread SQ is born .... SQ : 09:18:37.130662 (32676) thread_base_c():Thread Audit (0x808c110) SQ : 09:18:37.130674 (32676) audit_thread_c():Thread Audit (0x808c110) SQ : 09:18:37.130700 (32676) thread_base_c():Thread Notify (0x808cd80) SQ : 09:18:37.130715 (32676) notify_thread_c():Thread Notify (0x808cd80) Audit : 09:18:37.130739 (32677) operator()():Thread Audit is born Notify: 09:18:37.130757 (32678) operator()():Thread Notify is born SQ : 09:18:37.130835 (32676) thread_base_c():Thread LU (0x808cf68) SQ : 09:18:37.130880 (32676) lookup_thread_c():Thread LU (0x808cf68) LU : 09:18:37.130937 (32679) operator()():Thread LU is born .... .... .... LU : 09:18:37.131117 (32679) thread_base_c():Thread Worker#1 (0x808ca28) LU : 09:18:37.131140 (32679) worker_thread_c():Thread Worker#1 (0x808ca28) LU : 09:18:37.131188 (32679) thread_base_c():Thread Worker#2 (0x808d8a0) LU : 09:18:37.131202 (32679) worker_thread_c():Thread Worker#2 (0x808d8a0) LU : 09:18:37.131227 (32679) thread_base_c():Thread Worker#3 (0x808da90) LU : 09:18:37.131240 (32679) worker_thread_c():Thread Worker#3 (0x808da90) LU : 09:18:37.131267 (32679) thread_base_c():Thread Worker#4 (0x808dc90) LU : 09:18:37.131280 (32679) worker_thread_c():Thread Worker#4 (0x808dc90) W1 : 09:18:37.131305 (32680) operator()():Thread Worker#1 is born .... W4 : 09:18:37.131344 (32683) operator()():Thread Worker#4 is born .... W3 : 09:18:37.131394 (32682) operator()():Thread Worker#3 is born W2 : 09:18:37.131416 (32681) operator()():Thread Worker#2 is born .... .... LU : 09:18:37.131690 (32679) thread_base_c():Thread Worker#5 (0x808dcc8) LU : 09:18:37.131727 (32679) worker_thread_c():Thread Worker#5 (0x808dcc8) LU : 09:18:37.131775 (32679) thread_base_c():Thread Worker#6 (0x808e0f0) LU : 09:18:37.131791 (32679) worker_thread_c():Thread Worker#6 (0x808e0f0) W5 : 09:18:37.131804 (32684) operator()():Thread Worker#5 is born W6 : 09:18:37.131825 (32685) operator()():Thread Worker#6 is born .... .... LU : 09:18:37.131900 (32679) thread_base_c():Thread Worker#7 (0x808e250) LU : 09:18:37.131926 (32679) worker_thread_c():Thread Worker#7 (0x808e250) LU : 09:18:37.131969 (32679) thread_base_c():Thread Worker#8 (0x808e4e0) LU : 09:18:37.131990 (32679) worker_thread_c():Thread Worker#8 (0x808e4e0) LU : 09:18:37.132024 (32679) thread_base_c():Thread Worker#9 (0x808e6e0) LU : 09:18:37.132038 (32679) worker_thread_c():Thread Worker#9 (0x808e6e0) W7 : 09:18:37.132062 (32686) operator()():Thread Worker#7 is born W8 : 09:18:37.132076 (32687) operator()():Thread Worker#8 is born .... LU : 09:18:37.132110 (32679) thread_base_c():Thread Worker#10 (0x808ebe0) .... W9 : 09:18:37.132142 (32688) operator()():Thread Worker#9 is born LU : 09:18:37.132167 (32679) worker_thread_c():Thread Worker#10 (0x808ebe0) .... W10 : 09:18:37.132228 (32689) operator()():Thread Worker#10 is born ... ... ... LU : 09:18:37.150406 (32679) shutdown():LU shutting down Worker#6 LU : 09:18:37.150428 (32679) interrupt before lock LU : 09:18:37.150443 (32679) interrupt after lock LU : 09:18:37.150462 (32679) interrupt before broadcast LU : 09:18:37.150475 (32679) interrupt after broadcast W8 : 09:18:37.150488 (32687) wait after condvar wait W8 : 09:18:37.150510 (32687) wait before checker W8 : 09:18:37.150529 (32687) wait after checker W10 : 09:18:37.150565 (32689) wait after condvar wait LU : 09:18:37.150594 (32679) shutdown():LU waiting on Worker#6 thread W6 : 09:18:37.150622 (32685) wait after condvar wait W7 : 09:18:37.150642 (32686) wait after condvar wait W9 : 09:18:37.150665 (32688) wait after condvar wait W8 : 09:18:37.150690 (32687) wait before condvar wait W10 : 09:18:37.150746 (32689) wait before checker W10 : 09:18:37.150771 (32689) wait after checker W10 : 09:18:37.150793 (32689) wait before condvar wait W6 : 09:18:37.150835 (32685) operator()():Thread Worker#6(0x808e0f0) RX shutdown interrupt. W7 : 09:18:37.150904 (32686) wait before checker <<<<<< W7 : 09:18:37.150923 (32686) wait after checker <<<<<< LU : 09:18:37.150941 (32679) shutdown():LU shutdown Worker#6 in time 00:00:00.000493 LU : 09:18:37.150970 (32679) shutdown():LU shutting down Worker#7 W9 : 09:18:37.150996 (32688) wait before checker W9 : 09:18:37.151019 (32688) wait after checker LU : 09:18:37.151036 (32679) interrupt before lock <<<<<< LU : 09:18:37.151050 (32679) interrupt after lock <<<<<< LU : 09:18:37.151061 (32679) interrupt before broadcast <<<<<< LU : 09:18:37.151073 (32679) interrupt after broadcast <<<<<< LU : 09:18:37.151085 (32679) shutdown():LU waiting on Worker#7 thread W7 : 09:18:37.151097 (32686) wait before condvar wait <<<<<< W9 : 09:18:37.151115 (32688) wait before condvar wait W10 : 09:18:37.151131 (32689) wait after condvar wait W10 : 09:18:37.151145 (32689) wait before checker W10 : 09:18:37.151156 (32689) wait after checker W8 : 09:18:37.151169 (32687) wait after condvar wait W10 : 09:18:37.151186 (32689) wait before condvar wait W8 : 09:18:37.151200 (32687) wait before checker W8 : 09:18:37.151214 (32687) wait after checker W8 : 09:18:37.151225 (32687) wait before condvar wait From the above, we can see: At 09:18:37.150923 worker thread number 7 exited the checker code, this was part of the condvar broadcast done when W6 was shutting down. W7 doesn't run again until 09:18:37.151097 just before the condvar wait. However, the LU thread tickles W&'s condvar just before W7 waits on it. This is at 09:18:37.151061 and 09:18:37.151073 =============================================================================== Updated test program. I moved jrr_debug out of my program and into the boost thread library so that I could use it there, as well as in my test program. It was a hack to allow me to integrate logging with my test program. =============================================================================== #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread.hpp> #include <boost/foreach.hpp> using boost::posix_time::time_duration; using boost::posix_time::microsec_clock; using boost::posix_time::to_simple_string; using boost::posix_time::ptime; ////////////////////////////////////////////////////////////////////// // Simulation of the logging mechanism we have. We don't use printf, it's // actually a mutex controlled input into a buffe, which is then periodically // drained by a separate thread into a file. For simplicity I just changed it // into mutex controlled output. // #define EE_LOG_MSG(level, fmt, args...) { \ /* prefix the format with the function name */ \ char fbuf[1024]; \ snprintf(fbuf, sizeof(fbuf), "%s():%s", __FUNCTION__, fmt); \ \ char buf[1024]; \ snprintf(buf, sizeof(buf), fbuf, ##args); \ jrr_debug_out(buf); \ } #define EE_LOG_ECODE(level, err, fmt, args...) { \ /* prefix the format with the function name */ \ char fbuf[1024]; \ snprintf(fbuf, sizeof(fbuf), "%s():%s", __FUNCTION__, fmt); \ \ char buf[1024]; \ snprintf(buf, sizeof(buf), fbuf, ##args); \ jrr_debug_out(buf, err); \ } ////////////////////////////////////////////////////////////////////// // Need to simulate the code that waits for all threads to start up, just use // simple barriers. // // 10 worker threads // 1 lookup thread // 1 queue thread // 1 audit thread // 1 notify thread // 1 main thread //--- // 15 threads boost::barrier init_done(15); ////////////////////////////////////////////////////////////////////// // class used to exchange data from one thread to another. // // History from this file indicates that it originally used raw pthread_xxxx // calls, and thenlater switched to boost::mutex/condvar // template <class T, class Container = std::deque<T> > class mt_queue_c { public: explicit mt_queue_c(unsigned int max_size, const Container &cont = Container()) : m_c(cont), m_max_size(max_size), m_drop_total(0), m_drop_oflow(0), m_queue_enabled(true) {} virtual ~mt_queue_c() {}; int enqueue(const T &elem, std::string *ps_error = NULL); int dequeue(T &dest, std::string *ps_error = NULL); void off(); void on(); protected: Container m_c; unsigned int m_max_size; unsigned int m_drop_total; unsigned int m_drop_oflow; boost::timed_mutex m_access_mtx; boost::condition_variable_any m_data_cv; bool m_queue_enabled; void _make_error_string(std::string *p_s_err, const char *p_src, const char *p_func, int errval); }; // make_error_string() // Helper function for integrating with the _make_error_string method. It // automatically pushes on the __FUNCTION__ value. // #define make_error_string(p_s_err, p_src, errval) \ _make_error_string(p_s_err, p_src, __FUNCTION__, errval) template <class T, class Container> inline void mt_queue_c<T,Container>::_make_error_string (std::string *p_s_err, const char *p_src, const char *p_func, int errval) { std::stringstream ss_err; if (!p_s_err) { return; } if (!p_src) { p_src = "bad p_src parm"; } if (!p_func) { p_func = "bad p_func parm"; } ss_err << " Error-Response: (" << errval << ") " << strerror(errval); *p_s_err = "["; *p_s_err += p_func; *p_s_err += "()]: "; *p_s_err += p_src; *p_s_err += ss_err.str(); } template <class T, class Container> inline int mt_queue_c<T,Container>::enqueue (const T &elem, std::string *ps_error /* = NULL */) { const char *p_err = ""; int ret_err; boost::lock_guard<boost::timed_mutex> lock(m_access_mtx); // if the max size is non-zero then enforce the size limit if (m_max_size && m_c.size() >= m_max_size) { p_err = "Queue limit reached. Cannot enqueue this element."; ret_err = ENOSPC; // closest match goto enqueue_done; } m_c.push_back(elem); m_data_cv.notify_one(); ret_err = 0; enqueue_done: if (ret_err) { make_error_string(ps_error, p_err, ret_err); if (ENOSPC == ret_err) { m_drop_oflow++; } m_drop_total++; } return (ret_err); } template <class T, class Container> inline int mt_queue_c<T,Container>::dequeue (T &dest, std::string *ps_error /* = NULL */) { const char *p_err = ""; int ret_err; boost::unique_lock<boost::timed_mutex> lock(m_access_mtx); while (!m_c.size() || !m_queue_enabled) { m_data_cv.wait(lock); } if (m_c.empty()) { p_err = "Woke up but queue is empty."; ret_err = ENOENT; // closest existing match. Error no entity. goto dequeue_done; } dest = m_c.front(); m_c.pop_front(); ret_err = 0; dequeue_done: if (ret_err) { make_error_string(ps_error, p_err, ret_err); } return (ret_err); } template <class T, class Container> inline void mt_queue_c<T,Container>::off () { boost::lock_guard<boost::timed_mutex> lock(m_access_mtx); m_queue_enabled = false; } template <class T, class Container> inline void mt_queue_c<T,Container>::on () { boost::lock_guard<boost::timed_mutex> lock(m_access_mtx); m_queue_enabled = true; if (m_c.size()) { m_data_cv.notify_all(); } } ////////////////////////////////////////////////////////////////////// // We actually exchange much more complex data. I just simplify it to ints so // that information is exchanged, and threads run. typedef mt_queue_c<int> t_msg_queue; ////////////////////////////////////////////////////////////////////// class thread_base_c { public: thread_base_c(const std::string &name, int prio, int sched) : m_name(name), m_priority(prio), m_sched(sched), m_thread() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~thread_base_c () { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; // Functor. This is the API that the boost thread library will invoke as // the new threads main. void operator()() { EE_LOG_MSG(EE_TRACE, "Thread %s is born", m_name.c_str()); std::string s_err; // set my scheduling and priority levels if (!setscheduling()) { return; } // call the threads init routine if (!thread_init()) { s_err = "Thread:"; s_err += m_name + " thread_init() method failed."; EE_LOG_MSG(EE_ERROR, s_err.c_str()); thread_shutdown(); return; } // call the threads main loop, watching for boost::interrupt which // means its time for us to shutdown try { init_done.wait(); if (!thread_main()) { s_err = "Thread:"; s_err += m_name + " thread_main() method failed."; EE_LOG_MSG(EE_ERROR, s_err.c_str()); thread_shutdown(); return; } } catch (boost::thread_interrupted &e) { // We've received a boost thread interrupt from our parent thread, // it's time to shut down, invoke our shutdown routine and then // return. EE_LOG_MSG(EE_TRACE, "Thread %s(%p) RX shutdown interrupt.", m_name.c_str(), this); thread_shutdown(); return; } } void create_thread() { if (m_thread.get_id() != boost::thread::id()) { EE_LOG_MSG(EE_ERROR, "Thread already exists for this object: %s (%p)", m_name.c_str(), this); return; } // create a thread using our object and store it m_thread = boost::thread(boost::ref(*this)); } void shutdown(const std::string &s_caller) { // We're already shutting down. So we don't need another // thread_interrupt to tell us to shut down (again). boost::this_thread::disable_interruption raii_interrupt_disable; EE_LOG_MSG(EE_TRACE, "%s shutting down %s", s_caller.c_str(), m_name.c_str()); ptime start_time(microsec_clock::local_time()); m_thread.interrupt(); EE_LOG_MSG(EE_TRACE, "%s waiting on %s thread", s_caller.c_str(), m_name.c_str()); m_thread.join(); time_duration how_long(microsec_clock::local_time() - start_time); EE_LOG_MSG(EE_TRACE, "%s shutdown %s in time %s", s_caller.c_str(), m_name.c_str(), to_simple_string(how_long).c_str()); } protected: virtual bool thread_init() = 0; virtual bool thread_main() = 0; virtual void thread_shutdown() = 0; bool setscheduling() { std::stringstream ss_result; struct sched_param old_sched; struct sched_param new_sched; pthread_t tid; int errval; int policy; tid = pthread_self(); errval = pthread_getschedparam(tid, &policy, &old_sched); if (errval) { EE_LOG_ECODE(EE_ERROR, errval, "Cannot get scheduling/priority parameters."); return (false); } // initialize new_sched to whats established, and then change to our // new values. new_sched = old_sched; new_sched.sched_priority = m_priority; errval = pthread_setschedparam(tid, m_sched, &new_sched); if (errval) { EE_LOG_ECODE(EE_ERROR, errval, "Cannot set the scheduling/priority parameters."); return (false); } ss_result << "Scheduling and priorities changed for thread '" << m_name << "' from: (" << policy << ")" << get_sched_str(policy) << "/" << old_sched.sched_priority << " to: (" << m_sched << ")" << get_sched_str(m_sched) << "/" << m_priority; EE_LOG_MSG(EE_DEBUG(0), ss_result.str().c_str()); return (true); } # define CASE_RETVAL(x) case x: return (#x) char const* get_sched_str(int sched_type) { switch (sched_type) { CASE_RETVAL(SCHED_OTHER); CASE_RETVAL(SCHED_FIFO); CASE_RETVAL(SCHED_RR); CASE_RETVAL(SCHED_BATCH); } return ("Unknown"); } inline void check_for_shutdown () { boost::this_thread::interruption_point(); } std::string m_name; // Name of the object instance. Used in // debugging and error messages. int m_priority; // Scheduling priority of the object thread. int m_sched; // Scheduling type of the object thread // see pthread_setsched() for furhter info. // Our Thread boost::thread m_thread; private: // don't allow these, because boost::thread cannot be copied thread_base_c& operator=(const thread_base_c&); }; ////////////////////////////////////////////////////////////////////// class worker_thread_c : public thread_base_c { public: worker_thread_c(const std::string name, int prio, int sched, t_msg_queue &sq) : thread_base_c(name, prio, sched), m_work_q(sq) { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~worker_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() { return (true); } virtual bool thread_main() { while (1) { int qval; check_for_shutdown(); m_work_q.dequeue(qval); // EE_LOG_MSG(EE_TRACE, "RX: %d", qval); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() {}; protected: t_msg_queue &m_work_q; }; ////////////////////////////////////////////////////////////////////// class lookup_thread_c : public thread_base_c { public: lookup_thread_c (int prio, int sched, t_msg_queue &sq) : thread_base_c("LU", prio, sched), m_serial_q(sq), m_work_q(40000), v_work_thread_pointers() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~lookup_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); v_work_thread_pointers.clear(); } virtual bool thread_init() { const int num_workers(10); std::stringstream ss_worker_name; int idx; try { // spawn the worker threads for (idx = 1; idx <= num_workers; ++idx) { // create the threads name ss_worker_name.str(""); ss_worker_name << "Worker#" << idx; P_SHR_WORKER_T p_worker(new worker_thread_c(ss_worker_name.str(), 18, SCHED_RR, m_work_q)); p_worker->create_thread(); v_work_thread_pointers.push_back(p_worker); } } catch (std::exception &e) { EE_LOG_MSG(EE_ERROR, "Failed to allocate/initialize data for look-Up " "thread. Response is: '%s'", e.what()); return (false); } return (true); } virtual bool thread_main() { while (1) { int sq_val; check_for_shutdown(); m_serial_q.dequeue(sq_val); // EE_LOG_MSG(EE_TRACE, "RX: %d", sq_val); m_work_q.enqueue(sq_val); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() { EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__); // shutdown the down stream queue m_work_q.off(); // tell our children to shutdown BOOST_FOREACH(P_SHR_WORKER_T p_worker, v_work_thread_pointers) { p_worker->shutdown(m_name); } EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__); } protected: t_msg_queue &m_serial_q; t_msg_queue m_work_q; typedef boost::shared_ptr<worker_thread_c> P_SHR_WORKER_T; std::vector<P_SHR_WORKER_T> v_work_thread_pointers; }; ////////////////////////////////////////////////////////////////////// class audit_thread_c : public thread_base_c { public: audit_thread_c(int prio, int sched) : thread_base_c("Audit", prio, sched) { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~audit_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() { return (true); }; virtual bool thread_main() { while (1) { check_for_shutdown(); sleep (1); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() {}; }; ////////////////////////////////////////////////////////////////////// class notify_thread_c : public thread_base_c { public: notify_thread_c(int prio, int sched) : thread_base_c("Notify", prio, sched) { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~notify_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() {return (true);}; virtual bool thread_main() { while (1) { check_for_shutdown(); sleep(1); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() {}; }; ////////////////////////////////////////////////////////////////////// class sq_thread_c : public thread_base_c { public: sq_thread_c (int prio, int sched, t_msg_queue &serial_in) : thread_base_c("SQ", prio, sched), m_main_in(serial_in), m_serial_q(2000), p_audit(), p_notify(), p_lu() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); }; virtual ~sq_thread_c() { EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this); } virtual bool thread_init() { try { p_audit.reset(new audit_thread_c(18, SCHED_RR)); p_audit->create_thread(); p_notify.reset(new notify_thread_c(18, SCHED_RR)); p_notify->create_thread(); p_lu.reset(new lookup_thread_c(18, SCHED_RR, m_serial_q)); p_lu->create_thread(); } catch (std::exception &ex) { EE_LOG_MSG(EE_ERROR, "Failed to allocate/initialize data for SQ thread. " "Response is: '%s'", ex.what()); return (false); } return (true); } virtual bool thread_main() { while (1) { int sq_val; check_for_shutdown(); m_main_in.dequeue(sq_val); m_serial_q.enqueue(sq_val); // EE_LOG_MSG(EE_TRACE, "%s RX: %d", m_name.c_str(), sq_val); } // we should never break out and return. Returning false will ensure // the infra logs an error for this. return (false); } virtual void thread_shutdown() { EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__); // shut down the downstream queue m_serial_q.off(); // tell the subordinate threads to shut down if (p_lu) { p_lu->shutdown(m_name); } else { EE_LOG_MSG(EE_ERROR,"Unexpected null lookup thread pointer " "during shutdown of %s", m_name.c_str()); } if (p_notify) { p_notify->shutdown(m_name); } else { EE_LOG_MSG(EE_ERROR, "Unexpected null notify thread pointer " "during shutdown of %s", m_name.c_str()); } if (p_audit) { p_audit->shutdown(m_name); } else { EE_LOG_MSG(EE_ERROR, "Unexpected null audit thread " "pointer during shutdown of %s", m_name.c_str()); } EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__); } protected: t_msg_queue &m_main_in; t_msg_queue m_serial_q; // Child thread management variables boost::shared_ptr<audit_thread_c> p_audit; boost::shared_ptr<notify_thread_c> p_notify; boost::shared_ptr<lookup_thread_c> p_lu; }; int main () { t_msg_queue main_out(2000); EE_LOG_MSG(EE_TRACE, "Concurrency Level: %d", boost::thread::hardware_concurrency()); sq_thread_c serial_Q_Thread_Main(19, SCHED_RR, main_out); serial_Q_Thread_Main.create_thread(); // the real thread startup has a more complex init synchronization. This is // a quick hack. Wait until all threads have reached the barrier, then have // main sleep a bit so that all child threads can actually progress past // the barrier. init_done.wait(); usleep(10 * 1000); EE_LOG_MSG(EE_TRACE, "All threads are up, pumping 1000 entries into the queue."); for (int idx(0); idx < 1000; ++idx) { main_out.enqueue(idx); } serial_Q_Thread_Main.shutdown(__FUNCTION__); return (0); } =============================================================================== Diffs of the changes that where made to the 1.43 version of the boost thread library =============================================================================== Index: boost-1.43.0/boost/thread/pthread/condition_variable.hpp =================================================================== *** boost/thread/pthread/.CC/cache/condition_variable.hpp@@/main/server/1 Fri Mar 16 10:58:00 2012 --- boost/thread/pthread/condition_variable.hpp Fri Mar 16 08:32:13 2012 *************** *** 12,17 **** --- 12,19 ---- #include <boost/config/abi_prefix.hpp> + extern void jrr_debug_out(const char *); + namespace boost { inline void condition_variable::wait(unique_lock<mutex>& m) *************** *** 72,87 **** --- 74,95 ---- BOOST_VERIFY(!pthread_cond_destroy(&cond)); } + template<typename lock_type> void wait(lock_type& m) { int res=0; { + jrr_debug_out(" wait before checker"); detail::interruption_checker check_for_interruption(&cond); + jrr_debug_out(" wait after checker"); { boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); m.unlock(); + jrr_debug_out(" wait before condvar wait"); res=pthread_cond_wait(&cond,&internal_mutex); + jrr_debug_out(" wait after condvar wait"); + } m.lock(); } Index: boost-1.43.0/boost/thread/thread.hpp =================================================================== *** boost/thread/.CC/cache/thread.hpp@@/main/server/1 Fri Mar 16 10:58:00 2012 --- boost/thread/thread.hpp Thu Mar 15 17:45:08 2012 *************** *** 23,27 **** --- 23,33 ---- #include <boost/thread/detail/thread_interruption.hpp> #include <boost/thread/detail/thread_group.hpp> + extern void jrr_debug_out(const char *); + + + extern void jrr_debug_out(const char *buffer, + int err); + #endif Index: boost-1.43.0/libs/thread/src/pthread/thread.cpp =================================================================== *** libs/thread/src/pthread/.CC/cache/thread.cpp@@/main/server/1 Fri Mar 16 10:58:00 2012 --- libs/thread/src/pthread/thread.cpp Fri Mar 16 08:04:42 2012 *************** *** 25,30 **** --- 25,32 ---- #include "timeconv.inl" + + namespace boost { namespace detail *************** *** 408,423 **** --- 410,430 ---- } } + void thread::interrupt() { detail::thread_data_ptr const local_thread_info=get_thread_info(); if(local_thread_info) { + jrr_debug_out("interrupt before lock"); lock_guard<mutex> lk(local_thread_info->data_mutex); + jrr_debug_out("interrupt after lock"); local_thread_info->interrupt_requested=true; if(local_thread_info->current_cond) { + jrr_debug_out("interrupt before broadcast"); BOOST_VERIFY(!pthread_cond_broadcast(local_thread_info->current_cond)); + jrr_debug_out("interrupt after broadcast"); } } } *************** *** 605,607 **** --- 612,651 ---- } + + + + + #include <boost/thread.hpp> + #include <boost/date_time/posix_time/posix_time.hpp> + #include <sys/syscall.h> + #include <iostream> + + using boost::posix_time::time_duration; + using boost::posix_time::microsec_clock; + using boost::posix_time::to_simple_string; + using boost::posix_time::ptime; + + boost::mutex output_ctl; + + + + // get the thread id (as seen by 'top' or 'ps' + static pid_t gettid() { + return syscall(__NR_gettid); + } + + + void jrr_debug_out (const char *buffer) { + boost::lock_guard<boost::mutex> lock(output_ctl); + std::cout << to_simple_string(microsec_clock().local_time()) + << " (" << gettid() << ") " << buffer << "\n"; + } + + void jrr_debug_out (const char *buffer, + int err) { + boost::lock_guard<boost::mutex> lock(output_ctl); + std::cout << to_simple_string(microsec_clock().local_time()) + << " (" << gettid() << ") " << buffer << "Error=[" + << strerror(err) << "]\n"; + }

I followed your advice to verify where the threads where blocked. I analyzed the GDB data and I found that the LU thread was blocked on a join() for worker thread #7 (having sent it an interrupt) and that worker thread #7 was block on a boost::condition_variable_any. This bug: https://svn.boost.org/trac/boost/ticket/4978 bit us at my work -- it was merged in 1.47. SInce you're using 1.43, perhaps it's
Hi John, On Fri, Mar 16, 2012 at 12:00 PM, John Rocha <jrr@cisco.com> wrote: the same issue?
I then carefully looked at my boost thread source code, for version 1.43 of boost, and I believe there is a race condition between the boost thread's interrupt() and convar::wait() methods. Its a very small window, but I've been able to reproduce the issue with logs.
Can you please read the following to find any flaws with my logic, or confirm my suspicion? Also, if this is a defect with the library, what should I do next? Do I raise a defect somewhere?
The patch was pretty trivial for us to apply, for what that's worth. HTH, Nate

Nice! Thanks! And NUTS!!!! It appears that 1.43 is very different from the version that the patch was created for. in 1.43 wait is a template, while in the patch wait is an inline. We probably need to move up to boost 1.47, but I doubt our QA division would accept that since we're suppose to be close to done. It's nice to know that I'm not nuts! -=John On 3/16/2012 11:41 AM, Nathan Crookston wrote:
Hi John,
I followed your advice to verify where the threads where blocked. I analyzed the GDB data and I found that the LU thread was blocked on a join() for worker thread #7 (having sent it an interrupt) and that worker thread #7 was block on a boost::condition_variable_any. This bug: https://svn.boost.org/trac/boost/ticket/4978 bit us at my work -- it was merged in 1.47. SInce you're using 1.43, perhaps it's
On Fri, Mar 16, 2012 at 12:00 PM, John Rocha<jrr@cisco.com> wrote: the same issue?
I then carefully looked at my boost thread source code, for version 1.43 of boost, and I believe there is a race condition between the boost thread's interrupt() and convar::wait() methods. Its a very small window, but I've been able to reproduce the issue with logs.
Can you please read the following to find any flaws with my logic, or confirm my suspicion? Also, if this is a defect with the library, what should I do next? Do I raise a defect somewhere? The patch was pretty trivial for us to apply, for what that's worth.
HTH, Nate _______________________________________________ Boost-users mailing list Boost-users@lists.boost.org http://lists.boost.org/mailman/listinfo.cgi/boost-users
participants (4)
-
Anthony Williams
-
John Rocha
-
Nathan Crookston
-
Vicente J. Botet Escriba