Hello,
I have been able to extract the thread start/stop logic from our code base into
a standalone program that illustrates the problem. Even this is still sort of
long, 900 lines or so.
The code's thread starup/shutdown model sort of mimics a process startup
shutdown model.
The startup sequence/logic is
Main creates SQ.
SQ creates lookup, audit and notify.
lookup creates workers 1-10
The shutdown logic is
main sends thread interrupt to SQ and joins
SQ sends thread interrupt to lookup and joins
lookup sends thread interrupt to W1 and joins
W1 shutdown
lookup sends thread interrupt to W2 and joins
W2 shutdown
.....
lookup sends thread interrupt to W10 and joins
W10 shutdown
lookup shutdown
SQ sends thread interrupt to audit and joins
audit shutdown
SQ sends thread interrupt to notify and joins
notify shutdown
SQ shutdown
main finishes its shutdown
I've created a bash script that runs the program and then checks for a core. It
repeats until a core is found (or I eventually kill it).
Sometimes it will core with a seg fault.
Sometimes it will core with an abort because condvars are being destroyed and
the mutex is still locked or someone is still waiting on the condvar.
Sometimes it will core citing 'pure virtual method called' because a base class
is invokes a derived function on an object that has been deleted from
underneath it.
The root cause for all of these appears to be that one of the thread join()
calls is returning before the thread has actually finished. I say this based on
debug output -- shared below. The premature return leads to objects being
destroyed, that are still being used by othere threads.
Moreover, I have found that if I use a raw pthread_join() in my shutdown logic
and then follow that with a boost::thread::join(), then it always works
(stopped after 1000 succesful runs). I was working on the premise that
boost::thread::join() does more logic than just the join. So pthread_join()
waited for the thread to exit, and then boost::thread::join() took care of all
the accounting for the thread object. This is obviously a hack, but it is an
interesting observation.
I am running on a Linux server with a concurreny level of 8 -- as indiated by
the hardware_concurrency() method.
root@psbu-jrr2-lnx:# uname -a
Linux psbu-jrr2-lnx 2.6.16.46-0.12-bigsmp #1 SMP Thu May 17 14:00:09 UTC 2007
i686 i686 i386 GNU/Linux
root@psbu-jrr2-lnx:# g++ --version
g++ (GCC) 4.1.2 20070115 (prerelease) (SUSE Linux)
Copyright (C) 2006 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
Boost version 1.43
Annotated log output is below.
TIME THRD# LOG INFORMATION
-------------- ------ -----------------------------------------------
11:37:56.119156 (30773) shutdown():main shutting down SQ
11:37:56.119176 (30773) shutdown():main waiting on SQ thread
11:37:56.119195 (30774) operator()():Thread SQ(0xbfc8dd78) RX shutdown
interrupt.
11:37:56.119216 (30774) thread_shutdown():SQ starts thread_shutdown
11:37:56.119228 (30774) shutdown():SQ shutting down LU
11:37:56.119240 (30774) shutdown():SQ waiting on LU thread
11:37:56.119324 (30777) operator()():Thread LU(0x8085188) RX shutdown
interrupt.
11:37:56.119342 (30777) thread_shutdown():LU starts thread_shutdown
11:37:56.119354 (30777) shutdown():LU shutting down Worker#1
11:37:56.119368 (30777) shutdown():LU waiting on Worker#1 thread
11:37:56.119395 (30773) shutdown():main shutdown SQ in time 00:00:00.000143
11:37:56.119414 (30778) operator()():Thread Worker#1(0x8084da8) RX
shutdown interrupt.
11:37:56.119441 (30777) shutdown():LU shutdown Worker#1 in time
00:00:00.000070
11:37:56.119454 (30777) shutdown():LU shutting down Worker#2
11:37:56.119469 (30777) shutdown():LU waiting on Worker#2 thread
11:37:56.119499 (30779) operator()():Thread Worker#2(0x8085690) RX
shutdown interrupt.
11:37:56.119603 (30773) ~sq_thread_c():Thread SQ (0xbfc8dd78)
From this we see that main(30773) invokes SQ.shutdown(), which sends the SQ
thread a thread_interupt, and then waits (via join) on the SQ thread to
complete.
The SQ thread (30774) receives the thread_interrupt, which causes it to
'throw' out of its thread_main() and get caught by the base class operator()
method, which then invokes the SQ.thread_shutdown() method. That starts by
invoking the shutdown() method on its first child -- the lookup thread. This
causes a thread_interrupt to be sent to the lookup thread and then does a
join() until the lookup thread has exited.
The lookup thread (3077) receives the thread_interupt, which causes it to
'throw' out of its thread_main(), and get caught by the base class operator()
method, which then invokes the lookup.thread_shutdown() method. That starts
by invoking the shutdown() method on its first child -- worker #1. And then
does a join on worker#1.
What SHOULD have happened is another cascading action where worker #1 gets
the interrupt, throws out to operator(), which then invokes it's
shutdown(), etc. However, we don't see any such logs from worker #1.
Instead, the main thread (30773), suddenly prints a log that says the serial
queue thread shutdown is complete. Meaning that SQ.join() returned.
Worker thread#1 does shutdown, which then advances policy thread to worker
#2.
Then main starts destructing all of the objects -- some of which are thread
objects for threads that are still running. This then leads to the 'pure
virtual method called' issue because a base class is invoked after its object
has been destroyed, so its vtable is gone.
===============================================================================
#include
#include <iostream>
#include
#include
#include
using boost::posix_time::time_duration;
using boost::posix_time::microsec_clock;
using boost::posix_time::to_simple_string;
using boost::posix_time::ptime;
//////////////////////////////////////////////////////////////////////
// Simulation of the logging mechanism we have. We don't use printf, it's
// actually a mutex controlled input into a buffe, which is then periodically
// drained by a separate thread into a file. For simplicity I just changed it
// into mutex controlled output.
//
boost::mutex output_ctl;
#define EE_LOG_MSG(level, fmt, args...) { \
char buf[1024]; \
snprintf(buf, sizeof(buf), fmt, ##args); \
boost::lock_guardboost::mutex lock(output_ctl); \
std::cout << to_simple_string(microsec_clock().local_time()) \
<< " (" << gettid() << ") " \
<< __FUNCTION__ << "():" << buf << "\n"; \
}
#define EE_LOG_ECODE(level, err, fmt, args...) { \
char buf[1024]; \
snprintf(buf, sizeof(buf), fmt, ##args); \
boost::lock_guardboost::mutex lock(output_ctl); \
std::cout << to_simple_string(microsec_clock().local_time()) \
<< " (" << gettid() << ") " \
<< __FUNCTION__ << "():" << buf << "Error=[" \
<< strerror(err) << "]\n"; \
}
// get the thread id (as seen by 'top' or 'ps'
pid_t gettid() {
return syscall(__NR_gettid);
}
//////////////////////////////////////////////////////////////////////
// Need to simulate the code that waits for all threads to start up, just use
// simple barriers.
//
// 10 worker threads
// 1 lookup thread
// 1 queue thread
// 1 audit thread
// 1 notify thread
// 1 main thread
//---
// 15 threads
boost::barrier init_done(15);
//////////////////////////////////////////////////////////////////////
// class used to exchange data from one thread to another.
//
// History from this file indicates that it originally used raw pthread_xxxx
// calls, and thenlater switched to boost::mutex/condvar
//
template
class mt_queue_c {
public:
explicit mt_queue_c(unsigned int max_size,
const Container &cont = Container()) :
m_c(cont),
m_max_size(max_size),
m_drop_total(0),
m_drop_oflow(0),
m_queue_enabled(true)
{}
virtual ~mt_queue_c() {};
int enqueue(const T &elem,
std::string *ps_error = NULL);
int dequeue(T &dest,
std::string *ps_error = NULL);
void off();
void on();
protected:
Container m_c;
unsigned int m_max_size;
unsigned int m_drop_total;
unsigned int m_drop_oflow;
boost::timed_mutex m_access_mtx;
boost::condition_variable_any m_data_cv;
bool m_queue_enabled;
void _make_error_string(std::string *p_s_err,
const char *p_src,
const char *p_func,
int errval);
};
// make_error_string()
// Helper function for integrating with the _make_error_string method. It
// automatically pushes on the __FUNCTION__ value.
//
#define make_error_string(p_s_err, p_src, errval) \
_make_error_string(p_s_err, p_src, __FUNCTION__, errval)
template inline void
mt_queue_c::_make_error_string (std::string *p_s_err,
const char *p_src,
const char *p_func,
int errval)
{
std::stringstream ss_err;
if (!p_s_err) {
return;
}
if (!p_src) {
p_src = "bad p_src parm";
}
if (!p_func) {
p_func = "bad p_func parm";
}
ss_err << " Error-Response: (" << errval << ") " << strerror(errval);
*p_s_err = "[";
*p_s_err += p_func;
*p_s_err += "()]: ";
*p_s_err += p_src;
*p_s_err += ss_err.str();
}
template inline int
mt_queue_c::enqueue (const T &elem,
std::string *ps_error /* = NULL */)
{
const char *p_err = "";
int ret_err;
boost::lock_guardboost::timed_mutex lock(m_access_mtx);
// if the max size is non-zero then enforce the size limit
if (m_max_size && m_c.size() >= m_max_size) {
p_err = "Queue limit reached. Cannot enqueue this element.";
ret_err = ENOSPC; // closest match
goto enqueue_done;
}
m_c.push_back(elem);
m_data_cv.notify_one();
ret_err = 0;
enqueue_done:
if (ret_err) {
make_error_string(ps_error, p_err, ret_err);
if (ENOSPC == ret_err) {
m_drop_oflow++;
}
m_drop_total++;
}
return (ret_err);
}
template inline int
mt_queue_c::dequeue (T &dest,
std::string *ps_error /* = NULL */)
{
const char *p_err = "";
int ret_err;
boost::unique_lockboost::timed_mutex lock(m_access_mtx);
while (!m_c.size() || !m_queue_enabled) {
m_data_cv.wait(lock);
}
if (m_c.empty()) {
p_err = "Woke up but queue is empty.";
ret_err = ENOENT; // closest existing match. Error no entity.
goto dequeue_done;
}
dest = m_c.front();
m_c.pop_front();
ret_err = 0;
dequeue_done:
if (ret_err) {
make_error_string(ps_error, p_err, ret_err);
}
return (ret_err);
}
template inline void
mt_queue_c::off ()
{
boost::lock_guardboost::timed_mutex lock(m_access_mtx);
m_queue_enabled = false;
}
template inline void
mt_queue_c::on ()
{
boost::lock_guardboost::timed_mutex lock(m_access_mtx);
m_queue_enabled = true;
if (m_c.size()) {
m_data_cv.notify_all();
}
}
//////////////////////////////////////////////////////////////////////
// We actually exchange much more complex data. I just simplify it to ints so
// that information is exchanged, and threads run.
typedef mt_queue_c<int> t_msg_queue;
//////////////////////////////////////////////////////////////////////
class thread_base_c {
public:
thread_base_c(const std::string &name,
int prio,
int sched) :
m_name(name),
m_priority(prio),
m_sched(sched),
m_thread(),
m_shutdown(false)
{
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
};
virtual ~thread_base_c () {
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
};
// Functor. This is the API that the boost thread library will invoke as
// the new threads main.
void operator()() {
std::string s_err;
// set my scheduling and priority levels
if (!setscheduling()) {
return;
}
// call the threads init routine
if (!thread_init()) {
s_err = "Thread:";
s_err += m_name + " thread_init() method failed.";
EE_LOG_MSG(EE_ERROR, s_err.c_str());
thread_shutdown();
return;
}
// call the threads main loop, watching for boost::interrupt which
// means its time for us to shutdown
try {
init_done.wait();
if (!thread_main()) {
s_err = "Thread:";
s_err += m_name + " thread_main() method failed.";
EE_LOG_MSG(EE_ERROR, s_err.c_str());
thread_shutdown();
return;
}
}
catch (boost::thread_interrupted &e) {
// We've received a boost thread interrupt from our parent thread,
// it's time to shut down, invoke our shutdown routine and then
// return.
EE_LOG_MSG(EE_TRACE, "Thread %s(%p) RX shutdown interrupt.",
m_name.c_str(), this);
thread_shutdown();
return;
}
}
void create_thread() {
if (m_thread.get_id() != boost::thread::id()) {
EE_LOG_MSG(EE_ERROR,
"Thread already exists for this object: %s (%p)",
m_name.c_str(), this);
return;
}
// create a thread using our object and store it
m_thread = boost::thread(boost::ref(*this));
}
void shutdown(const std::string &s_caller) {
EE_LOG_MSG(EE_TRACE, "%s shutting down %s",
s_caller.c_str(), m_name.c_str());
ptime start_time(microsec_clock::local_time());
m_shutdown = true;
m_thread.interrupt();
EE_LOG_MSG(EE_TRACE, "%s waiting on %s thread",
s_caller.c_str(), m_name.c_str());
// If this is uncommented, it appears to work -- at least I haven't
// seen any problems in 2000 test cycles.
// int error(pthread_join(m_thread.native_handle(), NULL));
// if (error) {
// EE_LOG_ECODE(EE_ERROR, error, "pthread_join(%s) error for %s.",
// m_name.c_str(), s_caller.c_str());
// }
m_thread.join();
time_duration how_long(microsec_clock::local_time() - start_time);
EE_LOG_MSG(EE_TRACE, "%s shutdown %s in time %s",
s_caller.c_str(), m_name.c_str(),
to_simple_string(how_long).c_str());
}
protected:
virtual bool thread_init() = 0;
virtual bool thread_main() = 0;
virtual void thread_shutdown() = 0;
bool setscheduling() {
std::stringstream ss_result;
struct sched_param old_sched;
struct sched_param new_sched;
pthread_t tid;
int errval;
int policy;
tid = pthread_self();
errval = pthread_getschedparam(tid, &policy, &old_sched);
if (errval) {
EE_LOG_ECODE(EE_ERROR, errval,
"Cannot get scheduling/priority parameters.");
return (false);
}
// initialize new_sched to whats established, and then change to our
// new values.
new_sched = old_sched;
new_sched.sched_priority = m_priority;
errval = pthread_setschedparam(tid, m_sched, &new_sched);
if (errval) {
EE_LOG_ECODE(EE_ERROR, errval,
"Cannot set the scheduling/priority parameters.");
return (false);
}
ss_result << "Scheduling and priorities changed for thread '" << m_name
<< "' from: (" << policy << ")" << get_sched_str(policy)
<< "/" << old_sched.sched_priority
<< " to: (" << m_sched << ")" << get_sched_str(m_sched)
<< "/" << m_priority;
EE_LOG_MSG(EE_DEBUG(0), ss_result.str().c_str());
return (true);
}
# define CASE_RETVAL(x) case x: return (#x)
char const* get_sched_str(int sched_type)
{
switch (sched_type) {
CASE_RETVAL(SCHED_OTHER);
CASE_RETVAL(SCHED_FIFO);
CASE_RETVAL(SCHED_RR);
CASE_RETVAL(SCHED_BATCH);
}
return ("Unknown");
}
inline void check_for_shutdown () {
boost::this_thread::interruption_point();
if (m_shutdown) {
throw boost::thread_interrupted();
}
}
std::string m_name; // Name of the object instance. Used in
// debugging and error messages.
int m_priority; // Scheduling priority of the object thread.
int m_sched; // Scheduling type of the object thread
// see pthread_setsched() for furhter info.
// Our Thread
boost::thread m_thread;
// Has the thread been asked to shutdown?
bool m_shutdown;
private:
// don't allow these, because boost::thread cannot be copied
thread_base_c& operator=(const thread_base_c&);
};
//////////////////////////////////////////////////////////////////////
class worker_thread_c : public thread_base_c {
public:
worker_thread_c(const std::string name,
int prio,
int sched,
t_msg_queue &sq) :
thread_base_c(name, prio, sched),
m_work_q(sq)
{
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
};
virtual ~worker_thread_c() {
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
}
virtual bool thread_init() {
return (true);
}
virtual bool thread_main() {
while (1) {
int qval;
check_for_shutdown();
m_work_q.dequeue(qval);
// EE_LOG_MSG(EE_TRACE, "RX: %d", qval);
}
// we should never break out and return. Returning false will ensure
// the infra logs an error for this.
return (false);
}
virtual void thread_shutdown() {};
protected:
t_msg_queue &m_work_q;
};
//////////////////////////////////////////////////////////////////////
class lookup_thread_c : public thread_base_c {
public:
lookup_thread_c (int prio,
int sched,
t_msg_queue &sq) :
thread_base_c("LU", prio, sched),
m_serial_q(sq),
m_work_q(40000),
v_work_thread_pointers()
{
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
};
virtual ~lookup_thread_c() {
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
v_work_thread_pointers.clear();
}
virtual bool thread_init() {
const int num_workers(10);
std::stringstream ss_worker_name;
int idx;
try {
// spawn the worker threads
for (idx = 1; idx <= num_workers; ++idx) {
// create the threads name
ss_worker_name.str("");
ss_worker_name << "Worker#" << idx;
P_SHR_WORKER_T
p_worker(new worker_thread_c(ss_worker_name.str(),
18, SCHED_RR, m_work_q));
p_worker->create_thread();
v_work_thread_pointers.push_back(p_worker);
}
}
catch (std::exception &e) {
EE_LOG_MSG(EE_ERROR,
"Failed to allocate/initialize data for look-Up "
"thread. Response is: '%s'", e.what());
return (false);
}
return (true);
}
virtual bool thread_main() {
while (1) {
int sq_val;
check_for_shutdown();
m_serial_q.dequeue(sq_val);
// EE_LOG_MSG(EE_TRACE, "RX: %d", sq_val);
m_work_q.enqueue(sq_val);
}
// we should never break out and return. Returning false will ensure
// the infra logs an error for this.
return (false);
}
virtual void thread_shutdown() {
EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__);
// shutdown the down stream queue
m_work_q.off();
// tell our children to shutdown
BOOST_FOREACH(P_SHR_WORKER_T p_worker, v_work_thread_pointers) {
p_worker->shutdown(m_name);
}
EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__);
}
protected:
t_msg_queue &m_serial_q;
t_msg_queue m_work_q;
typedef boost::shared_ptr P_SHR_WORKER_T;
std::vector v_work_thread_pointers;
};
//////////////////////////////////////////////////////////////////////
class audit_thread_c : public thread_base_c {
public:
audit_thread_c(int prio,
int sched) :
thread_base_c("Audit", prio, sched)
{
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
};
virtual ~audit_thread_c() {
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
}
virtual bool thread_init() { return (true); };
virtual bool thread_main() {
while (1) {
check_for_shutdown();
sleep (1);
}
// we should never break out and return. Returning false will ensure
// the infra logs an error for this.
return (false);
}
virtual void thread_shutdown() {};
};
//////////////////////////////////////////////////////////////////////
class notify_thread_c : public thread_base_c {
public:
notify_thread_c(int prio,
int sched) :
thread_base_c("Notify", prio, sched)
{
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
};
virtual ~notify_thread_c() {
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
}
virtual bool thread_init() {return (true);};
virtual bool thread_main() {
while (1) {
check_for_shutdown();
sleep(1);
}
// we should never break out and return. Returning false will ensure
// the infra logs an error for this.
return (false);
}
virtual void thread_shutdown() {};
};
//////////////////////////////////////////////////////////////////////
class sq_thread_c : public thread_base_c {
public:
sq_thread_c (int prio,
int sched,
t_msg_queue &serial_in) :
thread_base_c("SQ", prio, sched),
m_main_in(serial_in),
m_serial_q(2000),
p_audit(),
p_notify(),
p_lu()
{
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
};
virtual ~sq_thread_c() {
EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
}
virtual bool thread_init() {
try {
p_audit.reset(new audit_thread_c(18, SCHED_RR));
p_audit->create_thread();
p_notify.reset(new notify_thread_c(18, SCHED_RR));
p_notify->create_thread();
p_lu.reset(new lookup_thread_c(18, SCHED_RR, m_serial_q));
p_lu->create_thread();
}
catch (std::exception &ex) {
EE_LOG_MSG(EE_ERROR,
"Failed to allocate/initialize data for SQ thread. "
"Response is: '%s'", ex.what());
return (false);
}
return (true);
}
virtual bool thread_main() {
while (1) {
int sq_val;
check_for_shutdown();
m_main_in.dequeue(sq_val);
m_serial_q.enqueue(sq_val);
// EE_LOG_MSG(EE_TRACE, "%s RX: %d", m_name.c_str(), sq_val);
}
// we should never break out and return. Returning false will ensure
// the infra logs an error for this.
return (false);
}
virtual void thread_shutdown() {
EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__);
// shut down the downstream queue
m_serial_q.off();
// tell the subordinate threads to shut down
if (p_lu) {
p_lu->shutdown(m_name);
} else {
EE_LOG_MSG(EE_ERROR,"Unexpected null lookup thread pointer "
"during shutdown of %s", m_name.c_str());
}
if (p_notify) {
p_notify->shutdown(m_name);
} else {
EE_LOG_MSG(EE_ERROR, "Unexpected null notify thread pointer "
"during shutdown of %s", m_name.c_str());
}
if (p_audit) {
p_audit->shutdown(m_name);
} else {
EE_LOG_MSG(EE_ERROR, "Unexpected null audit thread "
"pointer during shutdown of %s", m_name.c_str());
}
EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__);
}
protected:
t_msg_queue &m_main_in;
t_msg_queue m_serial_q;
// Child thread management variables
boost::shared_ptr p_audit;
boost::shared_ptr p_notify;
boost::shared_ptr p_lu;
};
int
main ()
{
t_msg_queue main_out(2000);
EE_LOG_MSG(EE_TRACE, "Concurrency Level: %d",
boost::thread::hardware_concurrency());
sq_thread_c serial_Q_Thread_Main(19, SCHED_RR, main_out);
serial_Q_Thread_Main.create_thread();
// the real thread startup has a more complex init synchronization. This is
// a quick hack. Wait until all threads have reached the barrier, then have
// main sleep a bit so that all child threads can actually progress past
// the barrier.
init_done.wait();
usleep(10 * 1000);
EE_LOG_MSG(EE_TRACE,
"All threads are up, pumping 1000 entries into the queue.");
for (int idx(0); idx < 1000; ++idx) {
main_out.enqueue(idx);
}
serial_Q_Thread_Main.shutdown(__FUNCTION__);
return (0);
}