
Here are a few classes which I think are very useful: 1. A thread_pool class. I think there's not much I can tell you that you don't know about it. It's something that has threads, and can accept tasks to be pushed into a queue, and executed in turn when a thread is available. It can also be suspended (stop handling tasks, but keep the threads alive) and resumed. 2. Two 'dispatcher' classes, one for plain threads, and one for thread_pools. They accept a single task, or several tasks, to be executed, either by dedicated threads (the thread_dispatcher class), or by a thread_pool (the thread_pool_dispatcher class). These classes provide a wait() method that block until all tasks are done. IMO, these classes fill a hole in the Boost.Thread library, and can be a basic for the future<> template (I have to admit I didn't follow the future<> discussion, so I hope there's nothing here to contradict any consensus reached in that discussion, if any). A few notes: 1. The semahpore class is there because it was needed for the thread_pool_dispather, but I think it will be a good general purpose addition to Boost.Thread. If I'm the only one thinking so, it can be moved as implementation detail of thread_pool_dispather. 2. The thread_task typedef should really be removed. Instead, a 'task' typedef should be added to boost::thread. So all occurances of 'thread_task' should be replaced with 'thread::task'. 3. I'm not sure about the name 'dispatcher', but couldn't find anything I'm happy with. Something that catches the essence of the classes. Any suggestions are welcome. 4. The thread_pool class can benefit from two additional features: 4.A. add_threads(size_t) and remove_threads(size_t). I think I can add those if found useful. 4.B. pre_loop and post_loop hooks to be executed by each of the worker threads in the thread_pool, immediately after creation, and before termination. I actually have a use case for those, but I don't want to distract the discussion from the main things. 5. The asserts in thread_pool::~thread_pool, thread_pool::suspend, thread_pool::resume, thread_dispatcher::~thread_dispatcher, thread_pool_dispatcher::~thread_pool_dispatcher are supposed to catch user errors. These asserts are questionable. I'm not sure whether those use cases are indeed errors, and if so, should assert be the right way to deal with them. I chose to be a hard-liner for now, with an option to soften. 6. The other asserts are there to catch errors in my code. 7. Some or maybe all of the asserts should be replaced by BOOST_ASSERT. I'm not sure of the Boost convention here. I hope it will be found useful! #ifndef BOOST_THREAD_SEMAPHORE_HPP #define BOOST_THREAD_SEMAPHORE_HPP #include <cstdlib> #include <cassert> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> namespace boost { class semaphore { public: semaphore(std::size_t count) : m_count(count), m_mutex(), m_condition() { } void lock() { mutex::scoped_lock sl(m_mutex); while (m_count == 0) m_condition.wait(sl); assert(m_count > 0); m_count--; } void unlock() { mutex::scoped_lock sl(m_mutex); m_count++; m_condition.notify_one(); } private: std::size_t m_count; mutex m_mutex; condition m_condition; }; } // namespace boost #endif #ifndef BOOST_THREAD_DISPATCHER_HPP #define BOOST_THREAD_DISPATCHER_HPP #include <cassert> #include <set> #include <algorithm> #include <boost/noncopyable.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/thread_task.hpp> namespace boost { class thread_dispatcher : private noncopyable { public: thread_dispatcher(const thread_task &t) : m_threads() { create_thread(t); } template <typename FwdIter> thread_dispatcher(FwdIter task_begin, FwdIter task_end) : m_threads() { std::for_each(task_begin, task_end, bind(&thread_dispatcher::create_thread, this, _1)); } ~thread_dispatcher() { assert(m_threads.empty()); } void wait() { std::for_each(m_threads.begin(), m_threads.end(), bind(&thread::join, _1)); m_threads.clear(); } private: std::set< shared_ptr<thread> > m_threads; void create_thread(const thread_task &t) { m_threads.insert(shared_ptr<thread>(new thread(t))); } }; } // namespace boost #endif #ifndef BOOST_THREAD_POOL_HPP #define BOOST_THREAD_POOL_HPP #include <cstdlib> #include <cassert> #include <set> #include <deque> #include <algorithm> #include <boost/noncopyable.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/thread_task.hpp> namespace boost { class thread_pool : private noncopyable { enum state { active_, suspended_, terminating_ }; public: typedef function<void ()> task; thread_pool(std::size_t thread_count) : m_mutex(), m_condition(), m_threads(), m_tasks(), m_state(active_) { thread_task f = bind(&thread_pool::thread_func, this); while (thread_count > 0) { m_threads.insert(shared_ptr<thread>(new thread(f))); thread_count--; } } ~thread_pool() { { mutex::scoped_lock sl(m_mutex); assert(m_state == active_); m_state = terminating_; m_condition.notify_all(); } std::for_each(m_threads.begin(), m_threads.end(), bind(&thread::join, _1)); } bool suspended() const { mutex::scoped_lock sl(m_mutex); assert(m_state == active_ || m_state == suspended_); return m_state == suspended_; } void suspend() { mutex::scoped_lock sl(m_mutex); assert(m_state == active_); m_state = suspended_; } void resume() { mutex::scoped_lock sl(m_mutex); assert(m_state == suspended_); m_state = active_; m_condition.notify_all(); } void enqueue(const task &t) { mutex::scoped_lock sl(m_mutex); assert(m_state == active_ || m_state == suspended_); m_tasks.push_back(t); m_condition.notify_one(); } void discard_pending_tasks() { mutex::scoped_lock sl(m_mutex); assert(m_state == active_ || m_state == suspended_); m_tasks.clear(); } private: mutable mutex m_mutex; condition m_condition; std::set< shared_ptr<thread> > m_threads; std::deque< task > m_tasks; state m_state; void thread_func() { task t; while (t = get_task(), t) { t(); t.clear(); } } task get_task() { mutex::scoped_lock sl(m_mutex); while (m_state == suspended_ || m_state == active_ && m_tasks.empty()) m_condition.wait(sl); assert(m_state != suspended_); if (!m_tasks.empty()) { task t = m_tasks.front(); m_tasks.pop_front(); return t; } assert(m_state == terminating_ && m_tasks.empty()); return task(); } }; } // namespace boost #endif #ifndef BOOST_THREAD_POOL_DISPATCHER_HPP #define BOOST_THREAD_POOL_DISPATCHER_HPP #include <cstdlib> #include <cassert> #include <algorithm> #include <boost/noncopyable.hpp> #include <boost/bind.hpp> #include <boost/thread/semaphore.hpp> #include <boost/thread/thread_pool.hpp> namespace boost { class thread_pool_dispatcher : private noncopyable { public: thread_pool_dispatcher(const thread_pool::task &t, thread_pool &pool) : m_task_count(0), m_sem(0) { enqueue(t, pool); } template <typename FwdIter> thread_pool_dispatcher(FwdIter task_begin, FwdIter task_end, thread_pool &pool) : m_task_count(0), m_sem(0) { std::for_each(task_begin, task_end, bind(&thread_pool_dispatcher::enqueue, this, _1, ref(pool))); } ~thread_pool_dispatcher() { assert(m_task_count == 0); } void wait() { while (m_task_count > 0) { m_sem.lock(); m_task_count--; } } private: std::size_t m_task_count; semaphore m_sem; void enqueue(const thread_pool::task &t, thread_pool &pool) { m_task_count++; pool.enqueue(bind(&thread_pool_dispatcher::execute_and_notify, this, t)); } void execute_and_notify(const thread_pool::task &t) { t(); m_sem.unlock(); } }; } // namespace boost #endif #ifndef BOOST_THREAD_TASK_HPP #define BOOST_THREAD_TASK_HPP #include <boost/function.hpp> namespace boost { typedef function<void ()> thread_task; } // namespace boost #endif