-----Original Message----- From: boost-users-bounces@lists.boost.org [mailto:boost-users- bounces@lists.boost.org] On Behalf Of Christian Henning Sent: 21 August 2009 15:05 To: boost-users@lists.boost.org Subject: Re: [Boost-users] [mapreduce] Prim Calculator
Hi Craig, what you're suggesting is exactly what I do. For reasons I don't understand I'm getting the following assertion:
Running Parallel Prime_Calculator MapReduce...Assertion failed: map_key != typename map_task_type::key_type(), file c:\chh\boost\boost\mapreduce\job.hpp, line 210
I don't understand why that happens? Can you help me out here.
Hi Christian The assertion is an incorrect error condition which meant that the key could not be a default constructed valued (in the case of your numeric key, zero). I have updated the sandbox - thanks for finding this. Get the latest version of job.hpp & in_memory.hpp from the sandbox. There are a couple other problems in your implementation, too. * number_source constructor should initialize _current to be start, not zero. * using a global to hold prime_numbers isn't thread-safe, and adding locking will introduce contention. The MapReduce library provides mechanisms to avoid this. In reduce_task::reduce, replace the line copy( it, ite, back_inserter( prime_numbers )); with for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0)); which will emit the final results. To display the results: for (prime_calculator::job::const_result_iterator it=job.begin_results(); it!=job.end_results(); ++it) std::cout << it->first << " "; For performance, the datasource should return a range of integers so each task tests many prime numbers rather than one at a time. This will reduce lock contention in retrieving map keys. Finally, 0 & 1 are not prime numbers - this is a bug in your is_prime function. Regards -- Craig #include <algorithm> #include <cmath> #include <numeric> #include <boost/mapreduce.hpp> namespace prime_calculator { std::size_t is_prime( std::size_t number ) { long n = static_cast<long>( number ); if( number == 0 ) return 1; n = std::abs( n ); std::size_t sqrt_number = static_cast< std::size_t >( std::sqrt( static_cast< double >( n ))); for( std::size_t i = 2; i <= sqrt_number; i++ ) { if( n % i == 0 ) return 0; } return 1; } template< typename MapTask > class number_source : boost::noncopyable { public: number_source( std::size_t start , std::size_t end ) : _start( start ) , _end ( end ) , _current( start ) // CH {} const bool setup_key( typename MapTask::key_type& key ) const { if( _current < _end ) { key = _current; return true; } else { return false; } } const bool get_data( typename MapTask::key_type& key , typename MapTask::value_type& value ) { if( _current < _end ) { value.first = _current; value.second = _current; _current++; return true; } else { return false; } } private: std::size_t _start; std::size_t _end; std::size_t _current; }; struct map_task : public boost::mapreduce::map_task< std::size_t // MapKey , std::pair< std::size_t , std::size_t > // MapValue > { template<typename Runtime> static void map( Runtime& runtime , const std::size_t& /*key*/ , value_type& value ) { runtime.emit_intermediate( is_prime( value.first ), value.first ); } }; //std::vector< std::size_t > prime_numbers; // CH struct reduce_task : public boost::mapreduce::reduce_task< std::size_t , unsigned > { template< typename Runtime , typename It > static void reduce( Runtime& runtime , const std::size_t& key , It it , const It ite ) { if( key > 0 ) { // copy( it, ite, back_inserter( prime_numbers )); // CH for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0)); // CH } } }; typedef boost::mapreduce::job< prime_calculator::map_task , prime_calculator::reduce_task , boost::mapreduce::null_combiner , prime_calculator::number_source< prime_calculator::map_task >
job;
} // namespace prime_calculator int main(int argc, char* argv[]) { boost::mapreduce::specification spec; boost::mapreduce::results result; prime_calculator::job::datasource_type datasource( 0, 1000 ); spec.map_tasks = 0; spec.reduce_tasks = std::max( 1U, boost::thread::hardware_concurrency() ); std::cout << "\nRunning Parallel Prime_Calculator MapReduce..."; prime_calculator::job job( datasource, spec ); job.run< boost::mapreduce::schedule_policy::cpu_parallel<prime_calculator::job> >( result ); std::cout << "\nMapReduce Finished."; for (prime_calculator::job::const_result_iterator it=job.begin_results(); it!=job.end_results(); ++it) std::cout << it->first << " "; return 0; }