
Joel Falcou wrote: how do you generate worklaod for each processor ?
The current implementation is rudimentary in that it creates n map tasks on threads and each map task is fed data to process. It does not change the thread affinity and does not use any knowledge of the data - size, locality (to reduce disk access contention, for example). The code for the CPU Parallel scheduler is below. Scheduling is a particular interest of mine, and will be the basis of a lot of future work. This is just the start. // Map Tasks time_t start_time = time(NULL); boost::thread_group map_threads; unsigned const map_tasks = (spec.map_tasks==0)? num_cpus : std::min(num_cpus, spec.map_tasks); for (unsigned loop=0; loop<map_tasks; ++loop) { boost::shared_ptr<results> this_result(new results); all_results.push_back(this_result); boost::thread *thread = new boost::thread(detail::run_next_map_task, boost::ref(job), boost::ref(*this_result), boost::ref(m)); map_threads.add_thread(thread); } map_threads.join_all(); result.map_runtime = time(NULL) - start_time; // Reduce Tasks start_time = time(NULL); boost::thread_group reduce_threads; unsigned const reduce_tasks = std::min<unsigned const>(num_cpus, job.number_of_partitions()); unsigned partition = 0; for (unsigned loop=0; loop<reduce_tasks; ++loop) { boost::shared_ptr<results> this_result(new results); all_results.push_back(this_result); boost::thread *thread = new boost::thread(detail::run_next_reduce_task, boost::ref(job), boost::ref(partition), boost::ref(*this_result), boost::ref(m)); reduce_threads.add_thread(thread); } reduce_threads.join_all(); result.reduce_runtime = time(NULL) - start_time; -- Craig