Hi,
so here are the pieces of code that cause the problem:
The following routine is the main loop of the master process. It
dispatches the data to and from the worker processes:
template<typename Float>
void My_complex<Float>::dispatch()
{
FC_ASSERT( comm.rank() == 0 );
Master master_process( comm );
while( !points_to_explore->empty() ||
master_process.some_working() ) {
int command, w_rank;
Ball<Float> *the_ball;
Ball<Float> result( dim, S, S[0], 0);
if( w_rank = master_process.listen( command ) ) {
switch( command ) {
case MW::ask_for_job:
if( !points_to_explore->empty() ) {
the_ball = points_to_explore->top();
points_to_explore->pop();
// Here is the place where the error occurs:
master_process.send_work( w_rank, *the_ball );
delete the_ball;
} else {
master_process.worker_try_again( w_rank );
}
break;
case MW::return_result:
// In this piece of code the problem also appears, but the
// code is essetialy the same with the roles of master and
// worker exchanged...
master_process.get_result( w_rank, result );
the_ball = new Ball<Float>( result );
enqueue( the_ball ); // enques in points_to_explore
break;
case MW::job_done:
master_process.free_worker( w_rank );
break;
}
}
sleep(1);
}
master_process.suspend_all_workers();
return;
}
At the very same time the worker processes run the next routine. They
receive a ball from the master and explore it. During exploration new
found balls are sent back to the master.
template<typename Float>
void My_complex<Float>::working_horse( std::ostream &s )
{
FC_ASSERT( comm.rank() != 0 );
worker_process = new Worker_type( comm );
Ball<Float> data( dim, S, S[0], 0);
// here the data is received from the master process:
while( worker_process->get_work( data ) ) {
Ball<Float> *the_ball = new Ball<Float>( data );
explore_cell( the_ball, the_ball, s );
worker_process->done();
}
delete worker_process;
}
Communication from master to worker is implemented in the next snippet.
That is the code that generates the output provided in the last mail.
The idea is the following: Since the boost::mpi::communicator::send()
and boost::mpi::communicator::recv() routines refuse to work with my
serialization, I implemented the communication using the C Bindings of
MPI. So the data gets serialized in some text_[io]archive over a
std::stringstream (binary_[io]archive isn't working), and the string
extracted from this stringstream is sent over the communication channel.
On receiving it gets corrupted sometimes.
#include
#include <string>
#include <sstream>
#include
#include
#include
#include
#include <cstdlib>
typedef boost::archive::text_oarchive Oarchive;
typedef boost::archive::text_iarchive Iarchive;
template
void Master::send_work( const int w,
const Jobtype &data )
{
FC_DEBUG_OUTPUT( << "Master sends data to worker " << w << "\n" );
/*
* comm.send( w, MW::send_job_data_tag, data );
*/
{
std::stringstream ss;
{
Oarchive oa(ss);
oa << data;
}
int size = ss.str().size();
comm.send( w, MW::send_job_data_tag );
FC_DEBUG_OUTPUT( << "Master sends size of data to worker "
<< w << "\n" );
MPI_Send( &size, 1, MPI_INT, w,
MW::send_job_data_size, MPI_Comm(comm) );
FC_DEBUG_OUTPUT( << "Master sends the data to worker "
<< w << "\n" );
FC_DEBUG_OUTPUT( << ss.str() << "\n" );
char *buf = const_cast( ss.str().c_str() );
MPI_Send( buf , size, MPI_CHAR,
w, MW::send_job_data_tag, MPI_Comm(comm) );
}
if( working[w] == MW::worker_free ) {
working[w] = MW::worker_working;
++num_working_workers;
}
}
template
bool Worker::get_work( Jobtype &data )
{
FC_DEBUG_OUTPUT( << "Worker process " << comm.rank()
<< " is trying to get some work.\n" );
while( true ) {
int ask_for_job = MW::ask_for_job;
comm.send( 0, MW::listen_tag, ask_for_job );
mpi::status status = comm.probe( 0, mpi::any_tag );
if( status.tag() == MW::try_again_tag ) {
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " keeps on waiting...\n" );
sleep(1); //usleep(1);
continue;
}
if( status.tag() == MW::suspend_tag ) {
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " is suspended.\n" );
comm.recv( 0, MW::suspend_tag );
return false;
}
/*
* comm.recv( 0, MW::send_job_data_tag, data );
*/
if( status.tag() == MW::send_job_data_tag ) {
comm.recv( 0, MW::send_job_data_tag );
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " is receiving job from Master\n" );
int size;
MPI_Status mstatus;
MPI_Recv( &size, 1, MPI_INT, 0,
MW::send_job_data_size, MPI_Comm(comm), &mstatus );
char *buf = static_cast( malloc( size ) );
MPI_Recv( buf, size, MPI_CHAR, 0,
MW::send_job_data_tag, MPI_Comm(comm), &mstatus );
std::string s( buf );
std::stringstream ss( s );
FC_DEBUG_OUTPUT( << ss.str() << "\n" );
{
Iarchive ia( ss );
ia >> data;
}
free( buf );
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " received job.\n" );
return true;
}
}
}
Once again the serialization:
void Ball<Float>::serialize( Archive & ar, const unsigned int version )
{
ar & BOOST_SERIALIZATION_NVP( membership ); //vector<bool>
ar & BOOST_SERIALIZATION_NVP( members ); //vector<int>
ar & BOOST_SERIALIZATION_NVP( r ); //int
ar & BOOST_SERIALIZATION_NVP( is_infinity ); //bool
ar & BOOST_SERIALIZATION_NVP( up_to_date ); //bool
ar & BOOST_SERIALIZATION_NVP( *QR ); //own type, see below
}
void Subspan<Float>::serialize( Archive & ar,
const unsigned int version )
{
ar & BOOST_SERIALIZATION_NVP( membership ); // vector<bool>
ar & BOOST_SERIALIZATION_NVP( members ); // vector<int>
for( int i=0; i
On 20 Sep 2010, at 17:49, Martin Huenniger wrote:
Hi Matthias,
yes //and so on does further serializations. Here is the code, there a some redundancies for later design decisions:
Could you please send a stripped down but complete example code that exhibits your problems? That way it will be easiest to find what is going on
Matthias