Hi,
i wrote a small test project that makes use of boost::thread
to split jobs into parts that are delegated to threads.
A class "Job" contains a state machine (IDLE, EXECUTING, ABORTED, ...).
That class can be derived from and methods "execute" and "report"
can be overwritten.
A class JobProvider can be derived from to feed new data to a Job.
A Template "Foreman" can then start several threads and delegate
Jobs that get their data from "JobProvider" to the Jobs and they
report when they are done.
In the example, MyJobProvider provides numbers from 3 to 100.
MyJob::execute calculates if these numbers are prime.
MyJob::report prints out the numbers that are prime.
All the methods "report" are protected, only one of them at a time
is ever called, methods "execute" are run in parallel.
The performance is poor, can anybody give me a hint why?
Best regards,
Torsten.
######## job.h:
#ifndef JOB_H
#define JOB_H 1
#include
extern boost::mutex io_mutex;
#define QWE { boost::mutex::scoped_lock lck(io_mutex); printf("file %s line
%i\n", __FILE__, __LINE__); }
class Job {
boost::mutex job_mutex;
boost::mutex cond_mutex;
boost::mutex* m_report_mutex;
boost::condition* m_condition;
boost::thread* thr;
bool m_stopped;
int m_state;
void setState(int state);
protected:
bool is_stopped(void);
public:
enum {
CMD_IDLE = 212,
CMD_EXEC,
CMD_KILL
};
enum {
STATE_IDLE = 37,
STATE_EXEC_PREPARE,
STATE_EXEC,
STATE_EXEC_ABORTED,
STATE_REPORT_PREPARE,
STATE_REPORT,
STATE_DEAD
};
void set_foreman_data(boost::mutex* report_mutex, boost::condition*
condition);
void init(void);
virtual void execute_handler(void);
void report_handler(void);
void fork(void);
int getState(void);
void doCommand(int command);
virtual void execute(void);
virtual void report(void);
};
class JobProvider {
public:
bool provide(Job& j);
};
#endif
######## job.cc:
#include
#include
#include "job.h"
int Job::getState(void)
{
boost::mutex::scoped_lock lock(job_mutex);
return m_state;
}
void Job::setState(int state)
{
boost::mutex::scoped_lock lock(job_mutex);
m_state = state;
}
void Job::doCommand(int cmd)
{
boost::mutex::scoped_lock lock(job_mutex);
// sparse transitions
if((m_state == STATE_EXEC || m_state == STATE_EXEC_PREPARE) && cmd ==
CMD_KILL)
{
m_state = STATE_EXEC_ABORTED;
m_stopped = true;
}
else if(m_state == STATE_IDLE && cmd == CMD_EXEC)
{
m_state = STATE_EXEC_PREPARE;
}
else if(m_state == STATE_IDLE && cmd == CMD_KILL)
{
m_state = STATE_DEAD;
m_stopped = true;
}
}
bool Job::is_stopped(void)
{
boost::mutex::scoped_lock lock(job_mutex);
return m_stopped;
}
void Job::execute_handler(void)
{
int state;
do {
state = getState();
switch(state)
{
case STATE_EXEC_PREPARE:
execute();
state = getState();
if(state != STATE_EXEC_ABORTED)
{
boost::mutex::scoped_lock lock(*m_report_mutex);
report();
m_condition->notify_one();
setState(STATE_IDLE);
}
break;
case STATE_DEAD:
case STATE_EXEC_ABORTED:
return;
}
} while(1);
}
void Job::init(void)
{
m_state = STATE_IDLE;
}
void Job::fork(void)
{
thr = new boost::thread(boost::bind(execute_handler, this));
}
void Job::report_handler(void)
{
boost::mutex::scoped_lock lock(*m_report_mutex);
report();
}
void Job::set_foreman_data(boost::mutex* report_mutex, boost::condition*
condition)
{
m_report_mutex = report_mutex;
m_condition = condition;
}
void Job::execute(void)
{
}
void Job::report(void)
{
}
######## foreman.h:
#ifndef FOREMAN_H
#define FOREMAN_H 1
#include
#include
#include "job.h"
template class Foreman {
JOBB* jb;
JOBB_PROVIDER* m_provider;
int m_threads;
bool m_finished;
boost::mutex report_mutex;
boost::mutex m_cond_mutex;
boost::condition m_condition;
boost::thread* me;
void handler(void);
public:
Foreman(int threads, JOBB_PROVIDER* provider);
~Foreman();
void spawn(void);
void cleanup(void);
void abort(void);
bool is_finished(void);
};
template
Foreman::Foreman(int threads, JOBB_PROVIDER* provider)
: m_threads(threads), m_provider(provider)
{
jb = new JOBB[m_threads];
}
template
Foreman::~Foreman()
{
delete[] jb;
}
template
void Foreman::handler(void)
{
bool all_done;
do
{
boost::mutex::scoped_lock lock(m_cond_mutex);
all_done = true;
for(int i = 0; i < m_threads; ++i)
{
int state = jb[i].getState();
if(state == Job::STATE_IDLE)
{
if(m_provider->provide(jb[i]))
{
jb[i].doCommand(Job::CMD_EXEC);
all_done = false;
}
else
{
m_finished = true;
}
}
else
{
all_done = false;
}
}
if(m_finished || all_done)
{
return;
}
m_condition.wait(lock);
} while(1); // (!m_finished || !all_done);
}
template
void Foreman::spawn(void)
{
for(int i = 0; i < m_threads; ++i)
{
jb[i].set_foreman_data(&report_mutex, &m_condition);
jb[i].init();
jb[i].fork();
}
m_finished = false;
// me = new boost::thread(boost::bind(handler, this));
handler();
}
template
void Foreman::cleanup(void)
{
// me->join();
}
template
void Foreman::abort(void)
{
}
template
bool Foreman::is_finished(void)
{
return m_finished;
}
#endif
######## my_job.h:
#ifndef MY_JOB_H
#define MY_JOB_H 1
#include "job.h"
class MyJob : public Job
{
int m_suspect;
bool is_prime;
public:
MyJob();
virtual void execute(void);
virtual void report(void);
void setJobStuff(int suspect);
};
class MyJobProvider : public JobProvider
{
int m_actual;
public:
MyJobProvider();
bool provide(MyJob& j);
};
#endif
######## my_job.cc:
#include <iostream>
#include
#include "my_job.h"
#include "job.h"
MyJob::MyJob()
{
m_suspect = 0;
}
void MyJob::setJobStuff(int suspect)
{
m_suspect = suspect;
}
void MyJob::execute(void)
{
int root = (int)sqrt((double)m_suspect);
for(int j = 2; j <= root; j++)
{
if(is_stopped())
{
return;
}
if((m_suspect % j) == 0)
{
is_prime = false;
return;
}
}
is_prime = true;
}
void MyJob::report(void)
{
if(is_prime)
{
std::cout << m_suspect << " is prime" << std::endl;
}
}
MyJobProvider::MyJobProvider() : m_actual(3) {}
#define NN 100
bool MyJobProvider::provide(MyJob& j)
{
j.setJobStuff(m_actual);
m_actual++;
if(m_actual > NN) return false;
return true;
}
######## qwe.cc:
#include <iostream>
#include "job.h"
#include "my_job.h"
#include "foreman.h"
boost::mutex io_mutex;
void test1(void)
{
MyJobProvider p;
MyJob j;
while(p.provide(j)) {
j.execute();
j.report();
}
}
void test2(void)
{
MyJobProvider p;
Foreman fm(7, &p);
fm.spawn();
do {
} while(!fm.is_finished());
fm.cleanup();
}
int main(int argc, char** argv) {
test2();
return 0;
}