Craig, I have added your comments and it seems the system to finishes. But when I iterate over the results I have another assertion in in_memory.hpp[141]. Commenting out helps. Christian #include <algorithm> #include <cmath> #include <numeric> #include <boost/mapreduce.hpp> namespace prime_calculator { std::size_t is_prime( std::size_t number ) { long n = static_cast<long>( number ); if( number == 0 || number == 1 ) return 0; n = std::abs( n ); std::size_t sqrt_number = static_cast< std::size_t >( std::sqrt( static_cast< double >( n ))); for( std::size_t i = 2; i <= sqrt_number; i++ ) { if( n % i == 0 ) return 0; } return 1; } template< typename MapTask > class number_source : boost::noncopyable { public: number_source( std::size_t start , std::size_t end ) : _start ( start ) , _end ( end ) , _current( start ) {} const bool setup_key( typename MapTask::key_type& key ) const { if( _current < _end ) { key = _current; return true; } else { return false; } } const bool get_data( typename MapTask::key_type& key , typename MapTask::value_type& value ) { if( _current < _end ) { value.first = _current; value.second = _current; _current++; return true; } else { return false; } } private: std::size_t _start; std::size_t _end; std::size_t _current; }; struct map_task : public boost::mapreduce::map_task< std::size_t // MapKey , std::pair< std::size_t , std::size_t > // MapValue > { template<typename Runtime> static void map( Runtime& runtime , const std::size_t& //key , value_type& value ) { runtime.emit_intermediate( is_prime( value.first ), value.first ); } }; struct reduce_task : public boost::mapreduce::reduce_task< std::size_t , unsigned > { template< typename Runtime , typename It > static void reduce( Runtime& runtime , const std::size_t& key , It it , const It ite ) { if( key > 0 ) { for_each( it , ite , boost::bind( &Runtime::emit , &runtime , _1 , 0 ) ); } } }; typedef boost::mapreduce::job< prime_calculator::map_task , prime_calculator::reduce_task , boost::mapreduce::null_combiner , prime_calculator::number_source< prime_calculator::map_task >
job;
} // namespace prime_calculator int _tmain(int argc, _TCHAR* argv[]) { boost::mapreduce::specification spec; boost::mapreduce::results result; prime_calculator::job::datasource_type datasource( 0, 1000 ); spec.map_tasks = 0; spec.reduce_tasks = std::max( 1U, boost::thread::hardware_concurrency() ); std::cout << "\nRunning Parallel Prime_Calculator MapReduce..." << std::endl; prime_calculator::job job( datasource, spec ); job.run< boost::mapreduce::schedule_policy::cpu_parallel<prime_calculator::job
( result ); std::cout << "\nMapReduce Finished." << std::endl;
for( prime_calculator::job::const_result_iterator it = job.begin_results() ; it!=job.end_results() ; ++it ) { std::cout << it->first << " "; } return 0; }