[thread] new thread_pool and dispatcher classes

Here are a few classes which I think are very useful: 1. A thread_pool class. I think there's not much I can tell you that you don't know about it. It's something that has threads, and can accept tasks to be pushed into a queue, and executed in turn when a thread is available. It can also be suspended (stop handling tasks, but keep the threads alive) and resumed. 2. Two 'dispatcher' classes, one for plain threads, and one for thread_pools. They accept a single task, or several tasks, to be executed, either by dedicated threads (the thread_dispatcher class), or by a thread_pool (the thread_pool_dispatcher class). These classes provide a wait() method that block until all tasks are done. IMO, these classes fill a hole in the Boost.Thread library, and can be a basic for the future<> template (I have to admit I didn't follow the future<> discussion, so I hope there's nothing here to contradict any consensus reached in that discussion, if any). A few notes: 1. The semahpore class is there because it was needed for the thread_pool_dispather, but I think it will be a good general purpose addition to Boost.Thread. If I'm the only one thinking so, it can be moved as implementation detail of thread_pool_dispather. 2. The thread_task typedef should really be removed. Instead, a 'task' typedef should be added to boost::thread. So all occurances of 'thread_task' should be replaced with 'thread::task'. 3. I'm not sure about the name 'dispatcher', but couldn't find anything I'm happy with. Something that catches the essence of the classes. Any suggestions are welcome. 4. The thread_pool class can benefit from two additional features: 4.A. add_threads(size_t) and remove_threads(size_t). I think I can add those if found useful. 4.B. pre_loop and post_loop hooks to be executed by each of the worker threads in the thread_pool, immediately after creation, and before termination. I actually have a use case for those, but I don't want to distract the discussion from the main things. 5. The asserts in thread_pool::~thread_pool, thread_pool::suspend, thread_pool::resume, thread_dispatcher::~thread_dispatcher, thread_pool_dispatcher::~thread_pool_dispatcher are supposed to catch user errors. These asserts are questionable. I'm not sure whether those use cases are indeed errors, and if so, should assert be the right way to deal with them. I chose to be a hard-liner for now, with an option to soften. 6. The other asserts are there to catch errors in my code. 7. Some or maybe all of the asserts should be replaced by BOOST_ASSERT. I'm not sure of the Boost convention here. I hope it will be found useful! #ifndef BOOST_THREAD_SEMAPHORE_HPP #define BOOST_THREAD_SEMAPHORE_HPP #include <cstdlib> #include <cassert> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> namespace boost { class semaphore { public: semaphore(std::size_t count) : m_count(count), m_mutex(), m_condition() { } void lock() { mutex::scoped_lock sl(m_mutex); while (m_count == 0) m_condition.wait(sl); assert(m_count > 0); m_count--; } void unlock() { mutex::scoped_lock sl(m_mutex); m_count++; m_condition.notify_one(); } private: std::size_t m_count; mutex m_mutex; condition m_condition; }; } // namespace boost #endif #ifndef BOOST_THREAD_DISPATCHER_HPP #define BOOST_THREAD_DISPATCHER_HPP #include <cassert> #include <set> #include <algorithm> #include <boost/noncopyable.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/thread_task.hpp> namespace boost { class thread_dispatcher : private noncopyable { public: thread_dispatcher(const thread_task &t) : m_threads() { create_thread(t); } template <typename FwdIter> thread_dispatcher(FwdIter task_begin, FwdIter task_end) : m_threads() { std::for_each(task_begin, task_end, bind(&thread_dispatcher::create_thread, this, _1)); } ~thread_dispatcher() { assert(m_threads.empty()); } void wait() { std::for_each(m_threads.begin(), m_threads.end(), bind(&thread::join, _1)); m_threads.clear(); } private: std::set< shared_ptr<thread> > m_threads; void create_thread(const thread_task &t) { m_threads.insert(shared_ptr<thread>(new thread(t))); } }; } // namespace boost #endif #ifndef BOOST_THREAD_POOL_HPP #define BOOST_THREAD_POOL_HPP #include <cstdlib> #include <cassert> #include <set> #include <deque> #include <algorithm> #include <boost/noncopyable.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/thread_task.hpp> namespace boost { class thread_pool : private noncopyable { enum state { active_, suspended_, terminating_ }; public: typedef function<void ()> task; thread_pool(std::size_t thread_count) : m_mutex(), m_condition(), m_threads(), m_tasks(), m_state(active_) { thread_task f = bind(&thread_pool::thread_func, this); while (thread_count > 0) { m_threads.insert(shared_ptr<thread>(new thread(f))); thread_count--; } } ~thread_pool() { { mutex::scoped_lock sl(m_mutex); assert(m_state == active_); m_state = terminating_; m_condition.notify_all(); } std::for_each(m_threads.begin(), m_threads.end(), bind(&thread::join, _1)); } bool suspended() const { mutex::scoped_lock sl(m_mutex); assert(m_state == active_ || m_state == suspended_); return m_state == suspended_; } void suspend() { mutex::scoped_lock sl(m_mutex); assert(m_state == active_); m_state = suspended_; } void resume() { mutex::scoped_lock sl(m_mutex); assert(m_state == suspended_); m_state = active_; m_condition.notify_all(); } void enqueue(const task &t) { mutex::scoped_lock sl(m_mutex); assert(m_state == active_ || m_state == suspended_); m_tasks.push_back(t); m_condition.notify_one(); } void discard_pending_tasks() { mutex::scoped_lock sl(m_mutex); assert(m_state == active_ || m_state == suspended_); m_tasks.clear(); } private: mutable mutex m_mutex; condition m_condition; std::set< shared_ptr<thread> > m_threads; std::deque< task > m_tasks; state m_state; void thread_func() { task t; while (t = get_task(), t) { t(); t.clear(); } } task get_task() { mutex::scoped_lock sl(m_mutex); while (m_state == suspended_ || m_state == active_ && m_tasks.empty()) m_condition.wait(sl); assert(m_state != suspended_); if (!m_tasks.empty()) { task t = m_tasks.front(); m_tasks.pop_front(); return t; } assert(m_state == terminating_ && m_tasks.empty()); return task(); } }; } // namespace boost #endif #ifndef BOOST_THREAD_POOL_DISPATCHER_HPP #define BOOST_THREAD_POOL_DISPATCHER_HPP #include <cstdlib> #include <cassert> #include <algorithm> #include <boost/noncopyable.hpp> #include <boost/bind.hpp> #include <boost/thread/semaphore.hpp> #include <boost/thread/thread_pool.hpp> namespace boost { class thread_pool_dispatcher : private noncopyable { public: thread_pool_dispatcher(const thread_pool::task &t, thread_pool &pool) : m_task_count(0), m_sem(0) { enqueue(t, pool); } template <typename FwdIter> thread_pool_dispatcher(FwdIter task_begin, FwdIter task_end, thread_pool &pool) : m_task_count(0), m_sem(0) { std::for_each(task_begin, task_end, bind(&thread_pool_dispatcher::enqueue, this, _1, ref(pool))); } ~thread_pool_dispatcher() { assert(m_task_count == 0); } void wait() { while (m_task_count > 0) { m_sem.lock(); m_task_count--; } } private: std::size_t m_task_count; semaphore m_sem; void enqueue(const thread_pool::task &t, thread_pool &pool) { m_task_count++; pool.enqueue(bind(&thread_pool_dispatcher::execute_and_notify, this, t)); } void execute_and_notify(const thread_pool::task &t) { t(); m_sem.unlock(); } }; } // namespace boost #endif #ifndef BOOST_THREAD_TASK_HPP #define BOOST_THREAD_TASK_HPP #include <boost/function.hpp> namespace boost { typedef function<void ()> thread_task; } // namespace boost #endif

On 5/6/07, Yuval Ronen <ronen_yuval@yahoo.com> wrote: <snip> namespace boost {
class semaphore { public: semaphore(std::size_t count) : m_count(count), m_mutex(), m_condition() { }
void lock() { mutex::scoped_lock sl(m_mutex); while (m_count == 0) m_condition.wait(sl); assert(m_count > 0); m_count--; }
void unlock() { mutex::scoped_lock sl(m_mutex); m_count++; m_condition.notify_one(); }
private: std::size_t m_count; mutex m_mutex; condition m_condition; };
} // namespace boost
<snip> Could we use more conventional method names such as wait() and post() instead of lock() and unlock(), please? Even acquire() and release() sound better. In addition, I'd suggest two more member methods: try_wait/try_acquire(size_t timeout) and count(). Greg

Gregory Dai wrote:
On 5/6/07, Yuval Ronen <ronen_yuval@yahoo.com> wrote:
<snip>
namespace boost {
class semaphore { public: semaphore(std::size_t count) : m_count(count), m_mutex(), m_condition() { }
void lock() { mutex::scoped_lock sl(m_mutex); while (m_count == 0) m_condition.wait(sl); assert(m_count > 0); m_count--; }
void unlock() { mutex::scoped_lock sl(m_mutex); m_count++; m_condition.notify_one(); }
private: std::size_t m_count; mutex m_mutex; condition m_condition; };
} // namespace boost
<snip>
Could we use more conventional method names such as wait() and post() instead of lock() and unlock(), please? Even acquire() and release() sound better. In addition, I'd suggest two more member methods: try_wait/try_acquire(size_t timeout) and count().
lock() and unlock() are good for usage with scoped_lock (in the code I posted I actually didn't use the semaphore with a scoped_lock, but I think it's a common use case). As far as the additions you suggest - I probably didn't wrote a full fledged semaphore, just what I needed to implement the stuff I needed. I agree that if it makes it to be publicly available, these edges should be smoothed.

Yuval Ronen 写道:
Here are a few classes which I think are very useful:
1. A thread_pool class. I think there's not much I can tell you that you don't know about it. It's something that has threads, and can accept tasks to be pushed into a queue, and executed in turn when a thread is available. It can also be suspended (stop handling tasks, but keep the threads alive) and resumed.
2. Two 'dispatcher' classes, one for plain threads, and one for thread_pools. They accept a single task, or several tasks, to be executed, either by dedicated threads (the thread_dispatcher class), or by a thread_pool (the thread_pool_dispatcher class). These classes provide a wait() method that block until all tasks are done.
IMO, these classes fill a hole in the Boost.Thread library, and can be a basic for the future<> template (I have to admit I didn't follow the future<> discussion, so I hope there's nothing here to contradict any consensus reached in that discussion, if any).
A few notes:
1. The semahpore class is there because it was needed for the thread_pool_dispather, but I think it will be a good general purpose addition to Boost.Thread. If I'm the only one thinking so, it can be moved as implementation detail of thread_pool_dispather.
2. The thread_task typedef should really be removed. Instead, a 'task' typedef should be added to boost::thread. So all occurances of 'thread_task' should be replaced with 'thread::task'.
3. I'm not sure about the name 'dispatcher', but couldn't find anything I'm happy with. Something that catches the essence of the classes. Any suggestions are welcome.
4. The thread_pool class can benefit from two additional features:
4.A. add_threads(size_t) and remove_threads(size_t). I think I can add those if found useful.
4.B. pre_loop and post_loop hooks to be executed by each of the worker threads in the thread_pool, immediately after creation, and before termination. I actually have a use case for those, but I don't want to distract the discussion from the main things.
5. The asserts in thread_pool::~thread_pool, thread_pool::suspend, thread_pool::resume, thread_dispatcher::~thread_dispatcher, thread_pool_dispatcher::~thread_pool_dispatcher are supposed to catch user errors. These asserts are questionable. I'm not sure whether those use cases are indeed errors, and if so, should assert be the right way to deal with them. I chose to be a hard-liner for now, with an option to soften.
6. The other asserts are there to catch errors in my code.
7. Some or maybe all of the asserts should be replaced by BOOST_ASSERT. I'm not sure of the Boost convention here.
I hope it will be found useful! ------------------------------------------------------------------------
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost We already have thread pool and dispatcher in Boost.Asio, although it is a very simple one.

Atry wrote:
Yuval Ronen 写道:
Here are a few classes which I think are very useful:
1. A thread_pool class. I think there's not much I can tell you that you don't know about it. It's something that has threads, and can accept tasks to be pushed into a queue, and executed in turn when a thread is available. It can also be suspended (stop handling tasks, but keep the threads alive) and resumed.
2. Two 'dispatcher' classes, one for plain threads, and one for thread_pools. They accept a single task, or several tasks, to be executed, either by dedicated threads (the thread_dispatcher class), or by a thread_pool (the thread_pool_dispatcher class). These classes provide a wait() method that block until all tasks are done.
We already have thread pool and dispatcher in Boost.Asio, although it is a very simple one.
Oh, I didn't know that. Thanks for the pointer! Do they provide the same functionality (in general, not up to the last feature)? I will take a look at them. IMO, if they are like mine, they should reside in Boost.Thread rather than in Asio.

On Mon, 07 May 2007 07:18:33 +0300, "Yuval Ronen" <ronen_yuval@yahoo.com> said:
Atry wrote:
We already have thread pool and dispatcher in Boost.Asio, although it is a very simple one.
Oh, I didn't know that. Thanks for the pointer! Do they provide the same functionality (in general, not up to the last feature)? I will take a look at them. IMO, if they are like mine, they should reside in Boost.Thread rather than in Asio.
Asio doesn't include any functionality to spawn threads (in a pool or otherwise). However, the io_service class can be used to dispatch work items in a thread pool using a combination of io_service::post() and io_service::run(). It's the same mechanism that is used to dispatch completion handlers for asynchronous operations. Cheers, Chris

Christopher Kohlhoff 写道:
On Mon, 07 May 2007 07:18:33 +0300, "Yuval Ronen" <ronen_yuval@yahoo.com> said:
Atry wrote:
We already have thread pool and dispatcher in Boost.Asio, although it is a very simple one.
Oh, I didn't know that. Thanks for the pointer! Do they provide the same functionality (in general, not up to the last feature)? I will take a look at them. IMO, if they are like mine, they should reside in Boost.Thread rather than in Asio.
Asio doesn't include any functionality to spawn threads (in a pool or otherwise). However, the io_service class can be used to dispatch work items in a thread pool using a combination of io_service::post() and io_service::run(). It's the same mechanism that is used to dispatch completion handlers for asynchronous operations.
Cheers, Chris _______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
Yes. But in most cases only fixed number of threads is needed, so I think io_service is enough. And I have another question. Do we need to implement another message queue or dispatcher if we need another event handler in a special case, such as gui or thread pool? Or, we have only one public interface (like io_service) for a "event queue", and repost any native event/message to it?

Christopher Kohlhoff wrote:
On Mon, 07 May 2007 07:18:33 +0300, "Yuval Ronen" <ronen_yuval@yahoo.com> said:
Atry wrote:
We already have thread pool and dispatcher in Boost.Asio, although it is a very simple one. Oh, I didn't know that. Thanks for the pointer! Do they provide the same functionality (in general, not up to the last feature)? I will take a look at them. IMO, if they are like mine, they should reside in Boost.Thread rather than in Asio.
Asio doesn't include any functionality to spawn threads (in a pool or otherwise). However, the io_service class can be used to dispatch work items in a thread pool using a combination of io_service::post() and io_service::run(). It's the same mechanism that is used to dispatch completion handlers for asynchronous operations.
It seems that if my thread_pool will not spawn its own threads, but instead provide a run() method, it will become similar (though not identical) to asio::io_service. This is an interesting idea for me. I'm quite fond of it. At first sight it seems my dispatcher classes have the benefit of being able to wait on multiple tasks (handlers) being executes concurrently. Is that true? If so, do you see that as an important advantage? Do you think that this functionality (of thread_pool/dispatchers/asio::io_service) should be in Boost.Thread? IMHO, it sounds as if it has more to do with "multi-threading policy" than "I/O (sync or async)".

On Mon, 07 May 2007 20:06:39 +0300, "Yuval Ronen" <ronen_yuval@yahoo.com> said:
Do you think that this functionality (of thread_pool/dispatchers/asio::io_service) should be in Boost.Thread? IMHO, it sounds as if it has more to do with "multi-threading policy" than "I/O (sync or async)".
The run()/post() functionality of io_service cannot be moved out of asio, because the implementation of it is tied to the underlying asynchronous I/O mechanism. However, I'm sure you could distil a "dispatcher" concept from it and reimplement this concept to provide a range of different execution guarantees (e.g. FIFO, LIFO etc). Cheers, Chris

On Mon, 07 May 2007 20:06:39 +0300, "Yuval Ronen" <ronen_yuval@yahoo.com> said:
At first sight it seems my dispatcher classes have the benefit of being able to wait on multiple tasks (handlers) being executes concurrently. Is that true? If so, do you see that as an important advantage?
I forgot to add that, with asio::io_service, you can wait for all concurrently executing tasks to finish by simply waiting for the io_service::run() call to exit. This is an important feature when trying to code a clean shutdown of the (chains of) tasks in a thread pool. Cheers, Chris
participants (4)
-
Atry
-
Christopher Kohlhoff
-
Gregory Dai
-
Yuval Ronen