[Threads] Simple active object wrapper, take 2

This is a reworking of the wrapper to make any object active (that is, to execute all its methods in its own thread of execution). Changes since the previous post: 1. Extensible - the proxy implementation method can now be extended to any number of parameters. 2. Future values - where the client can, if desired, postpone the wait to acquire the result of invoking a method on an active method by explicitly capturing a handle to the result for later inspection. 3. Policy for void proxies - the implementer of the class which wraps a non- active class, can specify whether a client must wait for a void method to complete exceution or not. This would be useful if a wrapped method had side effects that a client would expect to see after invoking the method. Comments welcome. Matt (Sorry this is so long, I can only post in text mode via gmane) // test_active_object.cpp -------------------------------------------------- #include "active_object.hpp" using std::cin; using std::endl; using ActiveWrapper::active; // Active-object wrapper using ActiveWrapper::future; // Future value encapsulator using ActiveWrapper::output; // std::cout locking object using ActiveWrapper::sleep; // 3-second pause to make effects observable // An example class to test active behaviour struct Object { int non_void_with_param(int i) { { output() << "\tInside Object::non_void_with_param" << endl; } sleep(); return (i*i); } const char* non_void_without_param(void) { { output() << "\tInside Object::non_void_without_param" << endl; } sleep(); return "Hello"; } void void_with_param(double) { { output() << "\tInside Object::void_with_param" << endl; } sleep(); } void void_without_param(void) { { output() << "\tInside Object::void_without_param" << endl; } sleep(); } }; // An active version of the Object class struct ActiveObject : private active<Object> { ActiveObject() : object(), non_void_with_param(&object, &Object::non_void_with_param, this), non_void_without_param(&object, &Object::non_void_without_param, this), void_with_param(&object, &Object::void_with_param, this), void_without_param(&object, &Object::void_without_param, this) { } // Proxy declarations for all methods of the active object proxy<int (int)> non_void_with_param; proxy<const char* (void)> non_void_without_param; proxy<void (double), true> void_with_param; proxy<void (void)> void_without_param; private: Object object; }; int main(int, char**) { { ActiveObject object; { output() << "Invoking methods from main thread" << endl; } // Implicit wait until task is completed by acquiring result int i = object.non_void_with_param(5); { output() << "Result of non_void_with_param: " << i << endl; } // Explicitly bypass wait for result future<const char*> c = object.non_void_without_param(); { output() << "Returned from non_void_without_param" << endl; } // Block until task completion by accessing result { output() << "Result of non_void_without_param: " << c << endl; } // Block until task completion, due to proxy policy parameter object.void_with_param(3.1415927); { output() << "Returned from void_with_param" << endl; } // Explicitly bypass completion-wait policy future<void> v = object.void_with_param(1e2); { output() << "Not waiting for void_with_param" << endl; } // Non-blocking call due to default policy object.void_without_param(); { output() << "Returned from void_without_param" << endl; } // Block until active object thread closes on destruction { output() << "Waiting for completion of all tasks" << endl; } } { output() << "Press <Enter> to continue..." << endl; } cin.get(); return 0; } // active_object.hpp ------------------------------------------------------- #include <iomanip> #include <iostream> #include <queue> #include <boost/any.hpp> #include <boost/bind.hpp> #include <boost/function.hpp> #include <boost/optional.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/xtime.hpp> #include <boost/tuple/tuple.hpp> #include <boost/utility.hpp> namespace ActiveWrapper { // Lock the output stream to prevent interleaving struct output { output() : lock(mutex) {} static unsigned count; private: static boost::mutex mutex; boost::mutex::scoped_lock lock; }; unsigned output::count; boost::mutex output::mutex; template <typename T> std::ostream& operator<< (const output& out, T t) { ++out.count; return std::cout << std::setw(2) << std::setfill(' ') << out.count << ": " << t; } // Pause to demonstrate effect int sleep(void) { boost::xtime time; boost::xtime_get(&time, boost::TIME_UTC); time.sec += 3; boost::thread::sleep(time); } namespace detail { // A generic thread-safe queue template <typename T> struct safe_queue { safe_queue(void) : terminated(false) { } bool enqueue (T t) { if (terminated) return false; boost::mutex::scoped_lock lock(mutex); queue.push(t); not_empty.notify_one(); return true; } boost::optional<T> dequeue(void) { boost::mutex::scoped_lock lock(mutex); if (terminated) return boost::optional<T>(); while (queue.empty()) { not_empty.wait(lock); if (terminated) return boost::optional<T>(); } T t = queue.front(); queue.pop(); return t; } void terminate(void) { boost::mutex::scoped_lock lock(mutex); terminated = true; not_empty.notify_all(); } private: bool terminated; std::queue<T> queue; boost::mutex mutex; boost::condition not_empty; }; // Object used to wait for the completion of a task struct synch_type { synch_type(boost::optional<boost::any*> a, boost::mutex* m, boost::condition* c) : result(a), mutex(m), condition(c), completed(false) {} ~synch_type() { if (result) delete *result; delete mutex; delete condition; } void wait_for_completion(void) { boost::mutex::scoped_lock lock(*mutex); if (completed == false) { if (result) { while ((*result)->empty()) (*condition).wait(lock); } else { (*condition).wait(lock); } completed = true; } } void notify(void) { boost::mutex::scoped_lock lock(*mutex); (*condition).notify_all(); completed = true; } boost::optional<boost::any*> get_result(void) { return result; } private: boost::optional<boost::any*> result; boost::mutex* mutex; boost::condition* condition; bool completed; }; // A reference-counted pointer to a synch type typedef boost::shared_ptr<synch_type> synch_ptr; // A type to enforce waiting policy on synchronisation struct waiter_type { waiter_type(synch_ptr s, bool force) : synch(s), force_wait(force), waited_for_completion(false) {} ~waiter_type() { if (force_wait) { wait_for_completion(); } } void wait_for_completion(void) { if (waited_for_completion == false) { synch->wait_for_completion(); waited_for_completion = true; } } boost::optional<boost::any*> result(void) { waited_for_completion = true; return synch->get_result(); } private: synch_ptr synch; bool force_wait; bool waited_for_completion; }; } // An encapsulation of a result that might require waiting for template <typename T> struct future { future(boost::shared_ptr<detail::waiter_type> w) : waiter(w) {} operator T& (void) { if (!value) { waiter->wait_for_completion(); boost::optional<boost::any*> result = waiter->result(); value = boost::any_cast<T>(*(*result)); } return *value; } protected: boost::optional<T> value; boost::shared_ptr<detail::waiter_type> waiter; }; template <> struct future<void> { future(boost::shared_ptr<detail::waiter_type> w) : waiter(w) {}; protected: boost::shared_ptr<detail::waiter_type> waiter; }; // Forward reference required by proxy_impl template <typename base> class active; namespace detail { // The type of tasks that are executed by the active object wrapper typedef boost::function<boost::any (void)> task_type; // A task and associated notification machinery typedef boost::tuple<task_type, boost::shared_ptr<synch_type> > task_descriptor; // A thead safe queue of tasks to be performed typedef safe_queue<task_descriptor> task_queue; // Dispatch to task queue, returning a facility to wait for the result template <typename result_type> future<result_type> invoke(task_queue* queue, bool force_wait, task_type task) { synch_ptr synch(new synch_type(new boost::any(), new boost::mutex(), new boost::condition())); task_descriptor td(task, synch); queue->enqueue(td); boost::shared_ptr<waiter_type> waiter (new waiter_type(synch, force_wait)); return future<result_type>(waiter); } template <> future<void> invoke<void>(task_queue* queue, bool force_wait, task_type task) { synch_ptr synch(new synch_type(boost::optional<boost::any*>(), new boost::mutex(), new boost::condition())); task_descriptor td(task, synch); queue->enqueue(td); boost::shared_ptr<waiter_type> waiter (new waiter_type(synch, force_wait)); return future<void>(waiter); } template <typename base, typename signature, bool force_wait> class proxy_impl; // Handler for non-void functions with one parameter template <typename base, typename R, typename T1, bool force_wait> class proxy_impl<base, R (T1), force_wait> { typedef R result_type; typedef T1 arg1_type; typedef result_type (base::*method_type)(arg1_type); typedef boost::function<result_type (arg1_type)> wrapper_type; typedef boost::function<future<result_type> (task_type)> invocation_type; static boost::any wrap(wrapper_type task, arg1_type arg1) { return task(arg1); } invocation_type invocation; wrapper_type task; public: proxy_impl(base* object, method_type method, active<base>* active) { task = boost::bind(method, object, _1); invocation = boost::bind(&invoke<result_type>, active->queue(), force_wait, _1); } future<result_type> operator()(arg1_type arg1) { return invocation(boost::bind(&wrap, task, arg1)); } }; // Handler for void functions with one parameter template <typename base, typename T1, bool force_wait> class proxy_impl<base, void (T1), force_wait> { typedef void result_type; typedef T1 arg1_type; typedef result_type (base::*method_type)(arg1_type); typedef boost::function<result_type (arg1_type)> wrapper_type; typedef boost::function<future<result_type> (task_type)> invocation_type; static boost::any wrap(wrapper_type task, arg1_type arg1) { task(arg1); return boost::any(); } invocation_type invocation; wrapper_type task; public: proxy_impl(base* object, method_type method, active<base>* active) { task = boost::bind(method, object, _1); invocation = boost::bind(&invoke<result_type>, active->queue(), force_wait, _1); } future<result_type> operator()(arg1_type arg1) { return invocation(boost::bind(&wrap, task, arg1)); } }; // Handler for non-void functions with no parameters template <typename base, typename R, bool force_wait> class proxy_impl<base, R (void), force_wait> { typedef R result_type; typedef result_type (base::*method_type)(void); typedef boost::function<result_type (void)> wrapper_type; typedef boost::function<future<result_type> (task_type)> invocation_type; static boost::any wrap(wrapper_type task) { return task(); } invocation_type invocation; wrapper_type task; public: proxy_impl(base* object, method_type method, active<base>* active) { task = boost::bind(method, object); invocation = boost::bind(&invoke<result_type>, active->queue(), force_wait, _1); } future<result_type> operator()(void) { return invocation(boost::bind(&wrap, task)); } }; // Handler for void functions without parameters template <typename base, bool force_wait> class proxy_impl<base, void (void), force_wait> { typedef void result_type; typedef result_type (base::*method_type)(void); typedef boost::function<result_type (void)> wrapper_type; typedef boost::function<future<result_type> (task_type)> invocation_type; static boost::any wrap(wrapper_type task) { task(); return boost::any(); } invocation_type invocation; wrapper_type task; public: proxy_impl(base* object, method_type method, active<base>* active) { task = boost::bind(method, object); invocation = boost::bind(&invoke<result_type>, active->queue(), force_wait, _1); } future<result_type> operator()(void) { return invocation(boost::bind(&wrap, task)); } }; } // Wrapper class to supply active behaviour to other objects template <typename base> class active { void thread_function(void) { while (true) { boost::optional<detail::task_descriptor> td = tasks.dequeue(); if (!td) break; detail::task_type task = td->template get<0>(); detail::synch_ptr synch = td->template get<1>(); boost::optional<boost::any*> result = synch->get_result(); if (result) { *(*result) = task(); } else { task(); } synch->notify(); } } detail::task_queue tasks; boost::thread thread; protected: active(void) : thread(boost::bind(&active<base>::thread_function, this)) { } ~active() { tasks.terminate(); thread.join(); } template <typename signature, bool force_wait = false> struct proxy : public detail::proxy_impl<base, signature, force_wait> { typedef detail::proxy_impl<base, signature, force_wait> impl_type; typedef typename impl_type::method_type method_type; proxy(base* object, method_type method, active<base>* active) : impl_type(object, method, active) {} }; public: detail::task_queue* queue(void) { return &tasks; } }; };
participants (1)
-
Matthew Vogt