Proposal: MapReduce library (single machine)

MapReduce is a programming model from Google that is designed for scalable data processing. Google's implementation is for scalability over many thousands of commodity machines, but there is value in using the idiom on multi-core processors to partition processing to efficiently use the CPU available, and avoid the complexities of multi-threaded development. I have a implemented a MapReduce runtime library in C++ using Boost and would like to see if there is interest from you all to submit the library to Boost. I have preliminary documentation on my website at http://www.craighenderson.co.uk/mapreduce/ Regards -- Craig Craig Henderson <http://craighenderson.co.uk/> http://www.craighenderson.co.uk http://www.siteupdatenotification.com

MapReduce is a programming model from Google that is designed for scalable data processing. Google's implementation is for scalability over many thousands of commodity machines, but there is value in using the idiom on multi-core processors to partition processing to efficiently use the CPU available, and avoid the complexities of multi-threaded development.
Broad scope comment: MapReduce is merely a parallel map and fold operations, which is quite limited in terms of expressivity compared to broader level parallel pattern like parallel DP or algorithmic skeletons. See : * Cole, M., Algorithmic skeletons: structured management of parallel computation, MIT Press, 1989. * Skillicorn, D. B., Architecture-Independent Parallel Computation, IEEE Computer, 1990, 23, 38-50 * Cole, M., Algorithmic skeletons in Research Directions in Parallel Functional Programming, Springer, 1999 * Sérot, J., Ginhac, D., Skeletons for parallel image processing: an overview of the SKIPPER project. Parallel Computing, 2002, 28, 1685-1708 for some seminal bibliography google "forgot" to mention and * Falcou J., Serot J., Formal semantics applied to the implementation of a skeleton-based parallel programming library, ParCo 2007 - Aachen, Sept. 2007 for some recent advance using Boost Actual comment: Do you have actual performance data for non trivial task ? more precisely, I kno from experience that such kind of implementation may suffer from some C++ induced overhead. How do you compare to han written pthread or boost::thread code ? I'm interested to see how this performs

Actual comment: Do you have actual performance data for non trivial task ? more precisely, I kno from experience that such kind of implementation may suffer from some C++ induced overhead. How do you compare to han written pthread or boost::thread code ?
I'm interested to see how this performs
I have designed the class infrastructure to be as flexible as possible using templates. Job scheduling is a particular interest of mine, and is a policy that can be specified. The current 'library' includes two schedulers, mapreduce::schedule_policy::cpu_parallel as in the example which maximises the use of the CPU cores in the machine, and mapreduce::schedule_policy::sequential which runs one Map task followed by one Reduce task. This is useful for debugging the algorithms. What I haven't shown in the documentation is that intermediates::local_disk<> takes three parameters; the last two being defaulted. These are for Sorting and Merging the intermediate results. The current implementation uses a crude system() call to the OS, which of course need improving. Interestingly it is the sorting that takes much of the time in my tests so far. So, to answer your question, I don't have specific performance metrics and comparisons that I can shared with you at this time. The principle for the library is that everything is templated (policy-based) so can be swapped around and re-implemented to best suite the needs of the application. The supplied implementations provide the framework and a decent implementation of the policies, but will not be optimal for all users. -- Craig

Craig Henderson wrote:
I have designed the class infrastructure to be as flexible as possible using templates. Job scheduling is a particular interest of mine, and is a policy that can be specified. The current 'library' includes two schedulers, mapreduce::schedule_policy::cpu_parallel as in the example which maximises the use of the CPU cores in the machine, and mapreduce::schedule_policy::sequential which runs one Map task followed by one Reduce task. This is useful for debugging the algorithms.
So, to answer your question, I don't have specific performance metrics and comparisons that I can shared with you at this time. The principle for the library is that everything is templated (policy-based) so can be swapped around and re-implemented to best suite the needs of the application. The supplied implementations provide the framework and a decent implementationof the policies, but will not be optimal for all users Well, some figures could be nice to at least check we don't go slower
Do you have different kind of parallel scheduling like openMP can have : static, dynamic, etc ... than on a single CPU ;) A simple scalability test could be already enough. THe other quirks I have are : * it seems t have a lot of work to be done to take one user function and turn it into something your library could manage. * it seems we have to write some loop ourselves at some point in the mapper and reducer. Can't this be leveraged somehow ? What an end-user may want to write is the single element->element sequential function for map and the element->element->element fold function to be used on top of the element list. -- ___________________________________________ Joel Falcou - Assistant Professor PARALL Team - LRI - Universite Paris Sud XI Tel : (+33)1 69 15 66 35

Do you have different kind of parallel scheduling like openMP can have : static, dynamic, etc ...
I've already answered this in other threads... the scheduling is implemented in a policy class so other threading approaches can be used. Current implementations are Sequential (single thread Map followed by Reduce phases), and CPU Parallel to maximize CPU core utilization.
So, to answer your question, I don't have specific performance metrics ... Well, some figures could be nice to at least check we don't go slower than on a single CPU ;) A simple scalability test could be already enough.
I'm running some tests and will update the site with performance comparisons shortly
* it seems t have a lot of work to be done to take one user function and turn it into something your library could manage.
Can you expand on this a bit? Sure there is some scaffolding for defining types and constructing objects, but the WordCount example is just 5 lines for the Map and 4 lines for the Reduce - that sounds quite lightweight me :) Seriously, though, I'd like to understand your concern about 'a lot of work', and hear suggestion on reducing the overhead.
* it seems we have to write some loop ourselves at some point in the mapper and reducer. Can't this be leveraged somehow ? What an end-user may want to write is the single element->element sequential function for map and the element->element->element fold function to be used on top of the element list.
The idea of MapReduce is to map (k1,v1) --> list(k2,v2) and then reduce (k2,list(v2)) --> list(v2). This inevitably requires iteration over collections. A generic Map & Reduce task could be written to delegate to sequential functions as you suggest, but I see this as an extension to the library rather than a core component. Thanks -- Craig

Craig Henderson wrote:
I've already answered this in other threads... the scheduling is implemented in a policy class so other threading approaches can be used. Current implementations are Sequential (single thread Map followed by Reduce phases), and CPU Parallel to maximize CPU core utilization.
I saw that, the question was, for your parallel scheduler, how do you generate worklaod for each processor ?
I'm running some tests and will update the site with performance comparisons shortly
The idea of MapReduce is to map (k1,v1) --> list(k2,v2) and then reduce (k2,list(v2)) --> list(v2). This inevitably requires iteration over collections. A generic Map & Reduce task could be written to delegate to sequential functions as you suggest, but I see this as an extension to the library rather than a core component. Well, canonically, running a map function only require the (k1,v1)->(k2,v2) funcion. The sequence iteration is leveraged by the map skeleton. Similary for Reduce where a fold like function is strictly needed. Having to specify how to iterate over the sequence is uneeded IMHO and add clutter to what you need to write. I don't see an actual improvement on this
Great point if I still have to iterate myself on my data and just use yopur tool to generate the scheduling. I can do it by hand with a thread_pool and it won't be more verbose. An "optimal" way to have this should be : map_reduce<SomeSchedulingPolicy>( input_seq, output_seq, map_func, reduce_func) and having xxx_seq be conforming to some IterableSequence concept and have xxx_func be functions object or PFO conforming to the standard map/fold prototype. Instrospection on ypes and presence of given methods/functions then helps finding how to iterate over the sequence (using type_traits and suc) and generate the appropriate, optimized iteration code calling map and fold where it should.

I'm running some tests and will update the site with performance comparisons shortly
Great
I've posted metrics from three runs of WordCount on a ~10Gb dataset at http://www.craighenderson.co.uk/mapreduce/ Scalability is not linear, as you would expect, as there is contention in reading the files from 8 or 16 threads simultaneously. This is where multi-machine MapReduce clearly comes into its own - assuming the data is distributed with a decent replication filesystem. -- Craig

Joel Falcou wrote: how do you generate worklaod for each processor ?
The current implementation is rudimentary in that it creates n map tasks on threads and each map task is fed data to process. It does not change the thread affinity and does not use any knowledge of the data - size, locality (to reduce disk access contention, for example). The code for the CPU Parallel scheduler is below. Scheduling is a particular interest of mine, and will be the basis of a lot of future work. This is just the start. // Map Tasks time_t start_time = time(NULL); boost::thread_group map_threads; unsigned const map_tasks = (spec.map_tasks==0)? num_cpus : std::min(num_cpus, spec.map_tasks); for (unsigned loop=0; loop<map_tasks; ++loop) { boost::shared_ptr<results> this_result(new results); all_results.push_back(this_result); boost::thread *thread = new boost::thread(detail::run_next_map_task, boost::ref(job), boost::ref(*this_result), boost::ref(m)); map_threads.add_thread(thread); } map_threads.join_all(); result.map_runtime = time(NULL) - start_time; // Reduce Tasks start_time = time(NULL); boost::thread_group reduce_threads; unsigned const reduce_tasks = std::min<unsigned const>(num_cpus, job.number_of_partitions()); unsigned partition = 0; for (unsigned loop=0; loop<reduce_tasks; ++loop) { boost::shared_ptr<results> this_result(new results); all_results.push_back(this_result); boost::thread *thread = new boost::thread(detail::run_next_reduce_task, boost::ref(job), boost::ref(partition), boost::ref(*this_result), boost::ref(m)); reduce_threads.add_thread(thread); } reduce_threads.join_all(); result.reduce_runtime = time(NULL) - start_time; -- Craig

I have a implemented a MapReduce runtime library in C++ using Boost and would like to see if there is interest from you all to submit the library to Boost. I have preliminary documentation on my website at http://www.craighenderson.co.uk/mapreduce/
Interesting for sure. However, how is the execution back-end handled? What would I have to provide to create a custom job dispatcher? For example, I'd like to use the Vista ThreadPool as well as a custom one for this; is this possible? There is a policy for scheduling, so I guess the answer is yes, but I'd like to see what the requirements are. Otherwise, this seems to be an interesting library, and I think Boost should have such nice parallel programming utilities. Cheers, Anteru

Interesting for sure. However, how is the execution back-end handled? What would I have to provide to create a custom job dispatcher? For example, I'd like to use the Vista ThreadPool as well as a custom one for this; is this possible? There is a policy for scheduling, so I guess the answer is yes, but I'd like to see what the requirements are.
There are two functions provided to run a map and reduce task from a scheduler: void run_next_map_task(detail::job_interface &job, results &result, boost::mutex &m); void run_next_reduce_task(detail::job_interface &job, unsigned &partition, results &result, boost::mutex &m); The scheduler can call these from any thread, so is just responsible for the thread creation and management, providing the mapreduce runtime with timings, and consolidation of results from each thread. The entire cpu_parallel scheduler is just 40 lines (including { } braces alone on lines). This could probably be refactored further to reduce the implementation overhead, but 40 lines isn't much code. -- Craig

Hi Craig, Yes, there is interest! I think what would is needed for this to be a useful library: 1) compare to the C phoenix library http://mapreduce.stanford.edu The library has just been updated to v2 and supports linux x86_64 (provides many datasets/examples and it could be benchmarked against. There is also a great paper/video/slides describing the library) 2) have a plan so that the library can eventually range from working on 1 multi-core system to distributed It should work with the great open source projects - http://kosmosfs.sf.net (C++ distributed file system) - http://hypertable.org (C++ distributed db, which already works on top of kfs) Both kfs and hypertable lack a C++ mapreduce library to work with and use a java one) Do you plan to set a project/mailing list ? regards On Sun, Jun 14, 2009 at 11:45 PM, Craig Henderson<cdm.henderson@googlemail.com> wrote:
MapReduce is a programming model from Google that is designed for scalable data processing. Google's implementation is for scalability over many thousands of commodity machines, but there is value in using the idiom on multi-core processors to partition processing to efficiently use the CPU available, and avoid the complexities of multi-threaded development.
I have a implemented a MapReduce runtime library in C++ using Boost and would like to see if there is interest from you all to submit the library to Boost. I have preliminary documentation on my website at http://www.craighenderson.co.uk/mapreduce/

1) compare to the C phoenix library http://mapreduce.stanford.edu The library has just been updated to v2 and supports linux x86_64 (provides many datasets/examples and it could be benchmarked against. There is also a great paper/video/slides describing the library)
I have seen Pheonix, but it is not available for my development platform (Windows). I guess the primary difference in design is that my library uses C++ templates to build a type specific MapReduce runtime, so there is minimal void* casting required.
2) have a plan so that the library can eventually range from working on 1 multi-core system to distributed
This is the eventual goal, and I have designed the library to be extensible to be able to do this in the future. With the policy based designed, making use of the open source projects that you mention should be non-intrusive; the library allows the user to supply a handler for the intermediate files (to use kosmosfs or hypertable, for example)
- http://kosmosfs.sf.net (C++ distributed file system) - http://hypertable.org (C++ distributed db, which already
Thanks for the references
Do you plan to set a project/mailing list ?
If there is enough interest, then I will. Regards -- Craig
participants (6)
-
Anteru
-
Craig Henderson
-
joel
-
Joel Falcou
-
Joel.Falcou@lri.fr
-
Jose