[asio] How to write custom async event provider

Hi, I'm very impressed with Asio library proposal. What I'm missing is a description (or even tutorial) about creating custom asynchronous providers. I have an object which don't sends events, but I have to ask it for events. I have to create a thread for for this task. What I want to do is an Asio wrapper for this object that will create the worker thread behind and will dispatch events through Asio demuxer. Can you point to an documentation, example or something what will help me? Thank you in advance. Regards, Vaclav

Hi Vaclav, --- Vaclav Vesely <vaclav.vesely@email.cz> wrote:
I'm very impressed with Asio library proposal. What I'm missing is a description (or even tutorial) about creating custom asynchronous providers.
I have an object which don't sends events, but I have to ask it for events. I have to create a thread for for this task. What I want to do is an Asio wrapper for this object that will create the worker thread behind and will dispatch events through Asio demuxer.
Can you point to an documentation, example or something what will help me? Thank you in advance.
Have a look at the example in src/examples/services, in particular the logger-related classes. The logger_service uses a background thread, with a private demuxer object to dispatch the work. One thing that you said you wanted, but this example doesn't do, is dispatching the events through the main demuxer. However that should just be a matter of calling demuxer.post() with an appropriate handler object. You might also want to take a look at the implementation of the ipv4::host_resolver. The host_resolver_service emulates asynchronous host resolution using a background thread to call blocking functions like gethostbyname. Cheers, Chris

Christopher Kohlhoff wrote:
--- Vaclav Vesely <vaclav.vesely@email.cz> wrote:
I'm missing is a description (or even tutorial) about creating custom asynchronous providers. ...
Have a look at the example in src/examples/services, in particular the logger-related classes. The logger_service uses a background thread, with a private demuxer object to dispatch the work. ...
Thank you, it helped me much. I've written ASIO background_thread class which calls arbitrary functions in a background thread. If you find it useful, you can add it to examples. After some refining it may be even added to the ASIO library. Moreover background_thread has async_run_loop member function which calls the work function as long as the complete handler returns true. It's common usage of asynchronous classes (for example sockets, where handler for async_read will probably start a new read operation). It would be handy to cover this technique generally for all ASIO classes. Regards, Vaclav #include <iostream> #include "boost/asio.hpp" #include "boost/bind.hpp" #include "boost/date_time/posix_time/posix_time.hpp" #include "boost/thread.hpp" using namespace boost; using namespace std; //----------------------------------------------------------------------------- template<typename Demuxer> class background_thread { public: background_thread(Demuxer& demuxer): m_demuxer(demuxer), m_work_demuxer(), m_shutdown(false) { } ~background_thread() { m_shutdown = true; } private: template<typename Function, typename Handler> class run_proc { public: run_proc(Demuxer& demuxer, Function function, Handler handler): m_demuxer(demuxer), m_function(function), m_handler(handler), m_work(m_demuxer) { } void operator()() { m_function(); m_demuxer.post(m_handler); } private: Demuxer& m_demuxer; Function m_function; Handler m_handler; typename Demuxer::work m_work; }; public: template<typename Function, typename Handler> void async_run(Function function, Handler handler) { m_work_demuxer.post(run_proc<Function, Handler>( m_demuxer, function, handler)); { mutex::scoped_lock lock(m_mutex); m_has_work = true; m_has_work_condition.notify_one(); } start_work_thread(); } private: template<typename Function, typename Handler> class run_loop_handler { public: run_loop_handler(background_thread& thread, Function function, Handler handler): m_thread(thread), m_function(function), m_handler(handler) { } void operator()() { if(m_handler()) m_thread.async_run(m_function, *this); } private: background_thread& m_thread; Function m_function; Handler m_handler; }; public: template<typename Function, typename Handler> void async_run_loop(Function function, Handler handler) { async_run(function, run_loop_handler<Function, Handler>(*this, function, handler)); } private: void work_thread_proc() { while(!m_shutdown) { { mutex::scoped_lock lock(m_mutex); while(!m_has_work) m_has_work_condition.wait(lock); m_has_work = false; } m_work_demuxer.run(); m_work_demuxer.reset(); } } void start_work_thread() { mutex::scoped_lock lock(m_mutex); if(!m_work_thread) { m_work_thread.reset(new thread( bind(&background_thread::work_thread_proc, this))); } } private: Demuxer& m_demuxer; mutex m_mutex; asio::demuxer m_work_demuxer; scoped_ptr<thread> m_work_thread; bool m_has_work; condition m_has_work_condition; bool m_shutdown; }; //----------------------------------------------------------------------------- mutex cout_mutex; int wait(int seconds) { { mutex::scoped_lock lock(cout_mutex); cout << "Wait for " << seconds << " seconds" << endl; } xtime xt; xtime_get(&xt, boost::TIME_UTC); xt.sec += seconds; thread::sleep(xt); static int count = 0; return ++count; } void wait_finished() { { mutex::scoped_lock lock(cout_mutex); cout << "Waiting finished." << endl; } } bool wait_three_times() { static int count = 3; { mutex::scoped_lock lock(cout_mutex); cout << "Countdown " << count << endl; } return (--count > 0); } //----------------------------------------------------------------------------- void main() { asio::demuxer demuxer; background_thread<asio::demuxer> bg_thread(demuxer); bg_thread.async_run(bind(wait, 1), wait_finished); demuxer.run(); bg_thread.async_run_loop(bind(wait, 1), wait_three_times); demuxer.reset(); demuxer.run(); } //-----------------------------------------------------------------------------

Hi Vaclav, --- Vaclav Vesely <vaclav.vesely@email.cz> wrote:
Thank you, it helped me much.
No problem!
I've written ASIO background_thread class which calls arbitrary functions in a background thread. If you find it useful, you can add it to examples. After some refining it may be even added to the ASIO library.
Yes, thanks, this use case could make a good example.
Moreover background_thread has async_run_loop member function which calls the work function as long as the complete handler returns true. It's common usage of asynchronous classes (for example sockets, where handler for async_read will probably start a new read operation). It would be handy to cover this technique generally for all ASIO classes.
Actually I think there may be a way to simplify your code here - assuming I correctly understand what you're trying to do. The code seems to use the data members m_has_work and m_has_work_condition to keep restarting the demuxer::run() function whenever it has new work to do. The worker thread waits on the condition if the demuxer doesn't have any work to do. However I think this is duplicating what is already done for you inside the demuxer. You should be able to achieve the same thing by giving the m_work_demuxer some additional work to do. This work will keep the m_work_demuxer's run() function going even if there is nothing else to do. What I mean is something like: - add a new auto_ptr<asio::demuxer::work> data member to the background_thread class. - in the background_thread constructor, initialise the data member like so: m_dummy_work(new asio::demuxer::work(m_work_demuxer)) - in the background_thread destructor, destroy the work and wait for the thread to exit: m_dummy_work.reset(); m_work_thread->join(); With this change, the work_thread_proc function only needs to call demuxer::run(), and you no longer need the m_has_work, m_has_work_condition and m_shutdown data members. Cheers, Chris

On 3/23/06, Christopher Kohlhoff <chris@kohlhoff.com> wrote:
Hi Vaclav,
--- Vaclav Vesely <vaclav.vesely@email.cz> wrote:
Thank you, it helped me much.
No problem!
I've written ASIO background_thread class which calls arbitrary functions in a background thread. If you find it useful, you can add it to examples. After some refining it may be even added to the ASIO library.
Yes, thanks, this use case could make a good example.
But is it network or even IO related? Wouldn't it be better to have a generic async/event driven design that's not tightly coupled with networking?

Olaf van der Spek wrote:
--- Vaclav Vesely <vaclav.vesely@email.cz> wrote:
I've written ASIO background_thread class which calls arbitrary functions in a background thread. If you find it useful, you can add it to examples. After some refining it may be even added to the ASIO library.
But is it network or even IO related? Wouldn't it be better to have a generic async/event driven design that's not tightly coupled with networking?
IMHO ASIO defines generic async design (demuxers) which is not coupled with networking. And in ASIO there already is not-IO ASIO object - deadline_timer. Maybe the library can be divided into general Async and specialized Async.Io. But it has been already accepted in this form. Regards, Vaclav

Christopher Kohlhoff wrote:
--- Vaclav Vesely <vaclav.vesely@email.cz> wrote: ...
I've written ASIO background_thread class which calls arbitrary functions in a background thread. If you find it useful, you can add it to examples. After some refining it may be even added to the ASIO library. ... Actually I think there may be a way to simplify your code here - assuming I correctly understand what you're trying to do.
The code seems to use the data members m_has_work and m_has_work_condition to keep restarting the demuxer::run() function whenever it has new work to do. The worker thread waits on the condition if the demuxer doesn't have any work to do. ... Not exactly. The worker thread stops when there is no work to do and async_run it's again if it's not running.
However I think now, that your solution with a work thread permanently running is better. The reworked example is attached. Regards, Vaclav #include <iostream> #include "boost/asio.hpp" #include "boost/bind.hpp" #include "boost/date_time/posix_time/posix_time.hpp" #include "boost/thread.hpp" //----------------------------------------------------------------------------- template<typename Demuxer> class async_object { public: typedef Demuxer demuxer_type; public: async_object(demuxer_type& demuxer): m_demuxer(demuxer) { } demuxer_type& demuxer() { return m_demuxer; } protected: demuxer_type& m_demuxer; }; //----------------------------------------------------------------------------- template<typename Demuxer> class background_worker: public async_object<Demuxer> { public: background_worker(Demuxer& demuxer): async_object(demuxer), m_work_demuxer(), m_dummy_work(new boost::asio::demuxer::work(m_work_demuxer)) { } ~background_worker() { m_dummy_work.reset(); if(m_work_thread) m_work_thread->join(); } private: template<typename Function, typename Handler> class run_proc { public: run_proc(demuxer_type& demuxer, Function function, Handler handler): m_demuxer(demuxer), m_work(demuxer), m_function(function), m_handler(handler) { } void operator()() { m_function(); m_demuxer.post(m_handler); } private: demuxer_type& m_demuxer; typename demuxer_type::work m_work; Function m_function; Handler m_handler; }; public: template<typename Function, typename Handler> void async_run(Function function, Handler handler) { m_work_demuxer.post(run_proc<Function, Handler>(m_demuxer, function, handler)); if(!m_work_thread) { m_work_thread.reset(new boost::thread(boost::bind( &background_worker::work_thread_proc, this))); } } private: void work_thread_proc() { m_work_demuxer.run(); } private: boost::asio::demuxer m_work_demuxer; boost::scoped_ptr<boost::thread> m_work_thread; boost::scoped_ptr<boost::asio::demuxer::work> m_dummy_work; }; //----------------------------------------------------------------------------- boost::mutex cout_mutex; void wait(int seconds) { { boost::mutex::scoped_lock lock(cout_mutex); std::cout << "Wait for " << seconds << " seconds" << std::endl; } boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += seconds; boost::thread::sleep(xt); } void wait_finished() { { boost::mutex::scoped_lock lock(cout_mutex); std::cout << "Waiting finished." << std::endl; } } //----------------------------------------------------------------------------- void main() { boost::asio::demuxer demuxer; background_worker<boost::asio::demuxer> bg_thread(demuxer); bg_thread.async_run(boost::bind(wait, 1), wait_finished); bg_thread.async_run(boost::bind(wait, 1), wait_finished); demuxer.run(); bg_thread.async_run(boost::bind(wait, 1), wait_finished); demuxer.reset(); demuxer.run(); } //-----------------------------------------------------------------------------
participants (3)
-
Christopher Kohlhoff
-
Olaf van der Spek
-
Vaclav Vesely