
Where in the sandbox is mapreduce? Thanks for all your comments! I'll get back to you once I can run the prime calculator. On Fri, Aug 21, 2009 at 3:52 PM, Craig Henderson<cdm.henderson@googlemail.com> wrote:
-----Original Message----- From: boost-users-bounces@lists.boost.org [mailto:boost-users- bounces@lists.boost.org] On Behalf Of Christian Henning Sent: 21 August 2009 15:05 To: boost-users@lists.boost.org Subject: Re: [Boost-users] [mapreduce] Prim Calculator
Hi Craig, what you're suggesting is exactly what I do. For reasons I don't understand I'm getting the following assertion:
Running Parallel Prime_Calculator MapReduce...Assertion failed: map_key != typename map_task_type::key_type(), file c:\chh\boost\boost\mapreduce\job.hpp, line 210
I don't understand why that happens? Can you help me out here.
Hi Christian The assertion is an incorrect error condition which meant that the key could not be a default constructed valued (in the case of your numeric key, zero). I have updated the sandbox - thanks for finding this. Get the latest version of job.hpp & in_memory.hpp from the sandbox.
There are a couple other problems in your implementation, too. * number_source constructor should initialize _current to be start, not zero. * using a global to hold prime_numbers isn't thread-safe, and adding locking will introduce contention. The MapReduce library provides mechanisms to avoid this. In reduce_task::reduce, replace the line copy( it, ite, back_inserter( prime_numbers )); with for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0)); which will emit the final results.
To display the results: for (prime_calculator::job::const_result_iterator it=job.begin_results(); it!=job.end_results(); ++it) std::cout << it->first << " ";
For performance, the datasource should return a range of integers so each task tests many prime numbers rather than one at a time. This will reduce lock contention in retrieving map keys.
Finally, 0 & 1 are not prime numbers - this is a bug in your is_prime function.
Regards -- Craig
#include <algorithm> #include <cmath> #include <numeric>
#include <boost/mapreduce.hpp>
namespace prime_calculator {
std::size_t is_prime( std::size_t number ) { long n = static_cast<long>( number );
if( number == 0 ) return 1;
n = std::abs( n ); std::size_t sqrt_number = static_cast< std::size_t >( std::sqrt( static_cast< double >( n )));
for( std::size_t i = 2; i <= sqrt_number; i++ ) { if( n % i == 0 ) return 0; }
return 1; }
template< typename MapTask > class number_source : boost::noncopyable { public:
number_source( std::size_t start , std::size_t end ) : _start( start ) , _end ( end ) , _current( start ) // CH {}
const bool setup_key( typename MapTask::key_type& key ) const { if( _current < _end ) { key = _current; return true; } else { return false; } }
const bool get_data( typename MapTask::key_type& key , typename MapTask::value_type& value ) { if( _current < _end ) { value.first = _current; value.second = _current; _current++;
return true; } else { return false; } }
private:
std::size_t _start; std::size_t _end; std::size_t _current; };
struct map_task : public boost::mapreduce::map_task< std::size_t // MapKey , std::pair< std::size_t , std::size_t > // MapValue > { template<typename Runtime> static void map( Runtime& runtime , const std::size_t& /*key*/ , value_type& value ) { runtime.emit_intermediate( is_prime( value.first ), value.first ); } };
//std::vector< std::size_t > prime_numbers; // CH
struct reduce_task : public boost::mapreduce::reduce_task< std::size_t , unsigned > { template< typename Runtime , typename It > static void reduce( Runtime& runtime , const std::size_t& key , It it , const It ite ) { if( key > 0 ) { // copy( it, ite, back_inserter( prime_numbers )); // CH for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0)); // CH } } };
typedef boost::mapreduce::job< prime_calculator::map_task , prime_calculator::reduce_task , boost::mapreduce::null_combiner , prime_calculator::number_source< prime_calculator::map_task >
job;
} // namespace prime_calculator
int main(int argc, char* argv[]) { boost::mapreduce::specification spec;
boost::mapreduce::results result; prime_calculator::job::datasource_type datasource( 0, 1000 );
spec.map_tasks = 0; spec.reduce_tasks = std::max( 1U, boost::thread::hardware_concurrency() );
std::cout << "\nRunning Parallel Prime_Calculator MapReduce..."; prime_calculator::job job( datasource, spec ); job.run< boost::mapreduce::schedule_policy::cpu_parallel<prime_calculator::job> >( result ); std::cout << "\nMapReduce Finished.";
for (prime_calculator::job::const_result_iterator it=job.begin_results(); it!=job.end_results(); ++it) std::cout << it->first << " ";
return 0; }
_______________________________________________ Boost-users mailing list Boost-users@lists.boost.org http://lists.boost.org/mailman/listinfo.cgi/boost-users