
Maybe I'm making it myself too easy but I'd see every pipeline stage as a scheduler, say, for Asynchronous a stealing threadpool scheduler(with one or more threads), every stage getting a job transforming input data and posting to the queue of the next scheduler a functor doing the next stage transformation, etc. Then I'd create a composite in one line of code to make sure work-stealing happens and that would be it for the infrastructure.
Purely for the fun, it took me a few minutes to write such pipeline, a simple version using a thread for every stage, then one with work stealing. There are a tons of stuff to improve, for example strings should be moved but I hope you get the idea. Now one "just" needs to write the syntactic sugar to have beautiful pipelines. Cheers, Christophe The simple version is: #include <iostream> #include <string> #include <vector> #include <regex> #include <functional> #include <boost/asynchronous/scheduler/threadpool_scheduler.hpp> #include <boost/asynchronous/queue/lockfree_queue.hpp> #include <boost/asynchronous/scheduler_shared_proxy.hpp> #include <boost/asynchronous/post.hpp> #include <boost/algorithm/string/trim.hpp> using namespace std; struct pipeline { void process(std::vector<std::string> const& input) { // create a scheduler for every stage, use only one thread auto scheduler1 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler2 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler3 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
// job for first stage auto grep = [scheduler2,scheduler3](std::string const& re,std::string const& item) { std::regex regex(re); if (std::regex_match(item, regex)) { // job for second stage auto trim = [scheduler3](std::string const& item) { std::string tc(boost::algorithm::trim_copy(item)); // 3rd stage job, cout boost::asynchronous::post_future(scheduler3, [tc](){std::cout << "->" << tc << endl;}); }; auto trim_ = std::bind(trim, std::move(item)); boost::asynchronous::post_future(scheduler2, trim_); } }; for(auto s : input) { auto grep_error = std::bind(grep, "Error.*", std::move(s)); boost::asynchronous::post_future(scheduler1, grep_error); } } }; int main() { std::vector<std::string> input = { "Error: foobar", "Error. foo", " Warning: barbaz", "Notice: qux", "\tError: abc" }; pipeline p; p.process(input); // we are going to shutdown, schedulers will all block until completely done return 0; } And the stealing version is: #include <iostream> #include <string> #include <vector> #include <regex> #include <functional> #include <boost/asynchronous/scheduler/stealing_multiqueue_threadpool_scheduler.hpp> #include <boost/asynchronous/scheduler/composite_threadpool_scheduler.hpp> #include <boost/asynchronous/queue/lockfree_queue.hpp> #include <boost/asynchronous/scheduler_shared_proxy.hpp> #include <boost/asynchronous/post.hpp> #include <boost/algorithm/string/trim.hpp> using namespace std; struct pipeline { pipeline() { auto scheduler1 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::stealing_multiqueue_threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler2 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::stealing_multiqueue_threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler3 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::stealing_multiqueue_threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
// composite pool made of the previous pools // keeping it alive will ensure automatic work-stealing between pools. m_composite = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::composite_threadpool_scheduler<> (scheduler1,scheduler2,scheduler3)); } void process(std::vector<std::string> const& input) { auto grep = [this](std::string const& re,std::string const& item) { std::regex regex(re); if (std::regex_match(item, regex)) { auto trim = [this](std::string const& item) { std::string tc(boost::algorithm::trim_copy(item)); // post to third pool of composite boost::asynchronous::post_future(m_composite, [tc](){std::cout << "->" << tc << endl;},"",3); }; auto trim_ = std::bind(trim, std::move(item)); // post to second pool of composite boost::asynchronous::post_future(m_composite, trim_,"",2); } }; for(auto s : input) { auto grep_error = std::bind(grep, "Error.*", std::move(s)); // post to first pool of composite boost::asynchronous::post_future(m_composite, grep_error,"",1); } } private: boost::asynchronous::any_shared_scheduler_proxy<> m_composite; }; int main() { std::vector<std::string> input = { "Error: foobar", "Error. foo", " Warning: barbaz", "Notice: qux", "\tError: abc" }; pipeline p; p.process(input); // we are going to shutdown, schedulers will all block until completely done return 0; }