[Threads] How to keep number of running thread constant, performing different operations
I have a problem in organizing concurrent thread launch. My program
structure is the following:
#include <iostream>
#include
On Jun 17, 2011, at 9:05 AM, Alessandro Candini wrote:
I have different threads which have to work on completely different input and output data (non critical sections): an atomic operation per thread, each one with different time execution but everyone with an intense use of the CPU. Let's say I have 10 operations to perform (10 threads): I would like to run concurrently only 2 threads because of resource consumption.
My problem is that when a threads ends its execution, I would like to suddenly start another thread performing operation 3, in order to have constantly 2 threads working, and so on until the end of operations.
How can I achieve this? I thought to insert my threads into a vector...but I have no idea on how start and join them in order to obtain what described above.
Look into consumer/producer (http://en.wikipedia.org/wiki/Producer-consumer_problem). At the end of processing, a thread would then add a new work request to the appropriate work queue so that processing of the next stage would commence when a thread is available to do it. Brad -- Brad Howes Calling Team - Skype Prague Skype: br.howes
I have different threads which have to work on completely different input and output data (non critical sections): an atomic operation per thread, each one with different time execution but everyone with an intense use of the CPU. Let's say I have 10 operations to perform (10 threads): I would like to run concurrently only 2 threads because of resource consumption.
My problem is that when a threads ends its execution, I would like to suddenly start another thread performing operation 3, in order to have constantly 2 threads working, and so on until the end of operations.
How can I achieve this? I thought to insert my threads into a vector...but I have no idea on how start and join them in order to obtain what described above.
Are you looking implicit for a thread_group? http://www.boost.org/doc/libs/1_46_0/doc/html/thread/thread_management.html#... Regards, Olaf
On Fri, Jun 17, 2011 at 9:05 AM, Alessandro Candini
I have a problem in organizing concurrent thread launch. My program structure is the following: #include <iostream> #include
using namespace std;
class Worker { private: boost::thread m_Thread;
public: Worker() { /* the thread is not-a-thread until we call start() */ }
void start(int N) { m_Thread = boost::thread(&Worker::**processQueue, this, N); }
void join() { m_Thread.join(); }
void processQueue(unsigned N) { /* Do some long long stuff... */ } };
int main(int argc, char* argv[]) { Worker worker_1, worker_2, worker_3, worker_4;
// How to start threads and join them in order // to make constantly two of them running?
return 0; }
I have different threads which have to work on completely different input and output data (non critical sections): an atomic operation per thread, each one with different time execution but everyone with an intense use of the CPU. Let's say I have 10 operations to perform (10 threads): I would like to run concurrently only 2 threads because of resource consumption.
My problem is that when a threads ends its execution, I would like to suddenly start another thread performing operation 3, in order to have constantly 2 threads working, and so on until the end of operations.
How can I achieve this? I thought to insert my threads into a vector...but I have no idea on how start and join them in order to obtain what described above.
Can anyone post me a little example?
Thanks in advance.
I think you should use a thread pool. You can extend your thread pool with more threads, without impacting other application parts. Take a look at this thread pool implementation: http://threadpool.sourceforge.net/tutorial/intro.html http://threadpool.sourceforge.net/tutorial/intro.html You only need to package your work in tasks and put them into the queue. After one of the threads gets ready (from the pool) it will grab another task. With this thread pool implementation you can also give task priorities. With Kind Regards, Ovanes
On Fri, Jun 17, 2011 at 10:12:47AM +0200, Ovanes Markarian wrote:
On Fri, Jun 17, 2011 at 9:05 AM, Alessandro Candini
wrote: I have different threads which have to work on completely different input and output data (non critical sections): an atomic operation per thread, each one with different time execution but everyone with an intense use of the CPU. Let's say I have 10 operations to perform (10 threads): I would like to run concurrently only 2 threads because of resource consumption.
My problem is that when a threads ends its execution, I would like to suddenly start another thread performing operation 3, in order to have constantly 2 threads working, and so on until the end of operations.
How can I achieve this? I thought to insert my threads into a vector...but I have no idea on how start and join them in order to obtain what described above.
Can anyone post me a little example?
Thanks in advance.
I think you should use a thread pool. You can extend your thread pool with more threads, without impacting other application parts. Take a look at this thread pool implementation: http://threadpool.sourceforge.net/tutorial/intro.html
Or you could stick to the things that are in Boost, like Boost.Asio. Create an asio::io_service; create an asio::io_service::work to keep the workers alive; start N threads, each running asio::io_service::run(); enqueue work by invoking asio::io_service::post() with your tasks. When you're done, destroy the 'work' object and the run() calls will terminate when they're done. For managing the N threads, you can use a thread_group. -- Lars Viklund | zao@acc.umu.se
On Fri, Jun 17, 2011 at 11:10 AM, Lars Viklund
On Fri, Jun 17, 2011 at 10:12:47AM +0200, Ovanes Markarian wrote:
On Fri, Jun 17, 2011 at 9:05 AM, Alessandro Candini
wrote: [...] For managing the N threads, you can use a thread_group.
Yes, but than you must implement and debug your own thread pool pattern. Thread group only creates a number of threads, but you need to implement the producer/consumer problem to retrieve work tasks from each thread from the queue. Now there is a choise to either use a ready thread pool or implement it yourself... AFAIK Boost has no official thread pool implementation, which is not in internal impl namespace and guarantees the interface. Regards, Ovanes
On Fri, Jun 17, 2011 at 12:20:31PM +0200, Ovanes Markarian wrote:
On Fri, Jun 17, 2011 at 11:10 AM, Lars Viklund
wrote: On Fri, Jun 17, 2011 at 10:12:47AM +0200, Ovanes Markarian wrote:
On Fri, Jun 17, 2011 at 9:05 AM, Alessandro Candini
wrote: [...] For managing the N threads, you can use a thread_group.
Yes, but than you must implement and debug your own thread pool pattern. Thread group only creates a number of threads, but you need to implement the producer/consumer problem to retrieve work tasks from each thread from the queue. Now there is a choise to either use a ready thread pool or implement it yourself... AFAIK Boost has no official thread pool implementation, which is not in internal impl namespace and guarantees the interface.
We have thread_group, we have Asio. One needs nothing else. I think you're misunderstanding what I suggested. I do not consider doing this remotely bothersome, particularly as you only need features present in Boost itself. It should need no debugging, as it's practically impossible to get wrong. If you for some reason need completion notifications from your workers, have that as part of your task, or wrap your functor in something. ---8<--- asio::io_service io; scoped_ptrasio::io_service::work work(new asio::io_service::work(io)); boost::thread_group tg; auto N = boost::thread::hardware_concurrency(); for (unsigned i = 0; i < N; ++i) tg.create_thread(boost::bind(&asio::io_service::run, boost::ref(io)); io.post(some_task); io.post(some_other_task); // invoke io.post whenever you want to enqueue something. // time passes, shutdown time has arrived work.reset(); tg.join_all(); ---8<--- -- Lars Viklund | zao@acc.umu.se
On 17/06/11 12:39, Lars Viklund wrote:
On Fri, Jun 17, 2011 at 12:20:31PM +0200, Ovanes Markarian wrote:
On Fri, Jun 17, 2011 at 11:10 AM, Lars Viklund
wrote: On Fri, Jun 17, 2011 at 10:12:47AM +0200, Ovanes Markarian wrote:
On Fri, Jun 17, 2011 at 9:05 AM, Alessandro Candini
wrote: [...] For managing the N threads, you can use a thread_group.
Yes, but than you must implement and debug your own thread pool pattern. Thread group only creates a number of threads, but you need to implement the producer/consumer problem to retrieve work tasks from each thread from the queue. Now there is a choise to either use a ready thread pool or implement it yourself... AFAIK Boost has no official thread pool implementation, which is not in internal impl namespace and guarantees the interface. We have thread_group, we have Asio. One needs nothing else. I think you're misunderstanding what I suggested.
I do not consider doing this remotely bothersome, particularly as you only need features present in Boost itself.
It should need no debugging, as it's practically impossible to get wrong. If you for some reason need completion notifications from your workers, have that as part of your task, or wrap your functor in something.
---8<--- asio::io_service io; scoped_ptrasio::io_service::work work(new asio::io_service::work(io));
boost::thread_group tg; auto N = boost::thread::hardware_concurrency();
for (unsigned i = 0; i< N; ++i) tg.create_thread(boost::bind(&asio::io_service::run, boost::ref(io));
io.post(some_task); io.post(some_other_task);
// invoke io.post whenever you want to enqueue something.
// time passes, shutdown time has arrived
work.reset(); tg.join_all(); ---8<---
Could you please give me a working example as little as possible, to better understand all this stuff? -- Alessandro Candini MEEO S.r.l. Via Saragat 9 I-44122 Ferrara, Italy Tel: +39 0532 1861501 Fax: +39 0532 1861637 http://www.meeo.it ======================================== "ATTENZIONE:le informazioni contenute in questo messaggio sono da considerarsi confidenziali ed il loro utilizzo è riservato unicamente al destinatario sopra indicato. Chi dovesse ricevere questo messaggio per errore è tenuto ad informare il mittente ed a rimuoverlo definitivamente da ogni supporto elettronico o cartaceo." "WARNING:This message contains confidential and/or proprietary information which may be subject to privilege or immunity and which is intended for use of its addressee only. Should you receive this message in error, you are kindly requested to inform the sender and to definitively remove it from any paper or electronic format."
Comments inline. On Fri, Jun 17, 2011 at 01:33:02PM +0200, Alessandro Candini wrote:
On 17/06/11 12:39, Lars Viklund wrote:
---8<--- asio::io_service io; scoped_ptrasio::io_service::work work(new asio::io_service::work(io));
An io_service acts as the hub and dispatcher of completion handlers, typically used for network communication but can also be used for invoking functions on any thread that services the io_service. The 'work' object is needed to pretend that there are pending operations that the io_service is not aware of. If an io_service is out of work, it returns from the run() functions, which we do not want to do until we're done.
boost::thread_group tg; auto N = boost::thread::hardware_concurrency();
A thread_group is part of Boost.Thread and lets you create and join groups of threads. hardware_concurrency() simply counts the number of physical processors you have, so it's a good guess to base the number of worker threads on.
for (unsigned i = 0; i< N; ++i) tg.create_thread(boost::bind(&asio::io_service::run, boost::ref(io));
io_service::run() blocks until the service is out of work, and can be run in as many threads as you want. There's also single-shot variants and polling variants of it.
io.post(some_task); io.post(some_other_task);
io_service::post() takes something callable with the signature `void ()' and invokes it eventually on any thread that is servicing the io_service and isn't busy.
// invoke io.post whenever you want to enqueue something.
// time passes, shutdown time has arrived
work.reset(); tg.join_all(); ---8<---
When we're out of pending handlers, and have destroyed the last work item, the functions will start to return, and we can join all the threads.
Could you please give me a working example as little as possible, to better understand all this stuff?
io.post([when_done, other_data] { auto result = compute(other_data); when_done(result); }); This is the form most of my tasks have in the applications where I have a similar setup. You post a task that when completed informs whoever cares via a callback. As Jeroen mentioned on IRC, which I forgot to mention in my first message, this assumes that tasks are largely independent. If they're not and you have more concurrent tasks than there's workers, you might end up with blocking everything, as a handler runs to completion before returning control to run(). Normally, that's not horribly limiting, as the things you tend to block on tend to be asynchronous operations that Asio provides, like reading/writing to sockets, waiting on timers, etc. I recommend that you read the Asio docs, particularly the prose and background bits, and take a look at the examples. Michael Caisse's Boostcon presentation on an Asio Flash XML Server is quite enlightening as well. -- Lars Viklund | zao@acc.umu.se
participants (5)
-
Alessandro Candini
-
Brad Howes
-
Lars Viklund
-
Olaf Peter
-
Ovanes Markarian