[asio] Problem with async_read/async_read_some not reacting to incoming data
Hei, We're rewriting a client with boost asio but have run into some problems when stresstesting the client. The client is fetching textual and graphics from a server with one connection that is open at all time. When the client is getting large amounts of graphics data it will after awhile suddenly stop receiving data, and eventually it will hit our timeouts and the last call sent is an async_read/async_read_some. We are keeping the server well-fed with requests so there should be graphics forthcoming without pause. The problem has been seen on win32, linux and darwin when testing on a gigabit net fetching raw 1080i graphics (4M for each field), and is most frequent on darwin. This is naturally an absolute show-stopper for us. So we are a bit loss what is going wrong and why async_read/async_read_some stops reacting in the middle of the fetch-queue, despite wireshark showing that the data is incoming. When using compression on the data the problem is harder to reproduce, which might suggest a race-condition somewhere. But our code is just using a single thread for io_service and all async-communication is triggered from this io-thread which has a work-object to keep the io_service spinning. We're also making sure there is at most one async_read and one async_write in effect at a time, roughly similar to the chat_client sample. Has anyone seen something similar or have any input on how best to figure out what goes wrong? Are there invariants that says you cannot read and write at the same time? Some symptoms are the same in each test. When we get the last image from the socket the buffersize is zero afterwards, and the next async_read request is to transfer_at_least(1). The async_read never calls the handler for completion of this byte, so Nagle would have kicked in. It is also fairly hard to strip down to a small example using a mock server. I have included some stripped down code below in case that might be helpful spotting something that we cant see. Cheers, Stig Stacktrace during timeout of the asio-thread: (gdb) bt #0 0x91302f66 in kevent () #1 0x003e8010 in boost::asio::detail::kqueue_reactor<false>::run () #2 0x0045f70a in boost::asio::detail::task_io_service<boost::asio::detail::kqueue_reactor<false>
::do_one () #3 0x0045f8c3 in boost::asio::detail::task_io_service<boost::asio::detail::kqueue_reactor<false> ::run () #4 0x0045f972 in boost::asio::io_service::run () #5 0x003f8bbc in vcl::connection_pool::implementation::run () #6 0x003fa2fa in boost::_mfi::mf0<void, vcl::connection_pool::implementation>::operator() () #7 0x003fa356 in boost::_bi::list1<boost::_bi::value<vcl::connection_pool::implementation*> ::operator()<boost::_mfi::mf0<void, vcl::connection_pool::implementation>, boost::_bi::list0> () #8 0x003fa399 in boost::_bi::bind_t<void, boost::_mfi::mf0<void, vcl::connection_pool::implementation>, boost::_bi::list1<boost::_bi::value<vcl::connection_pool::implementation*>
::operator() () #9 0x003fa62e in boost::detail::thread_data<boost::_bi::bind_t<void, boost::_mfi::mf0<void, vcl::connection_pool::implementation>, boost::_bi::list1<boost::_bi::value<vcl::connection_pool::implementation*>
::run () #10 0x003b97ce in thread_proxy () #11 0x913036f5 in _pthread_start () #12 0x913035b2 in thread_start ()
The connection headerfile: #ifndef VCL_ASIO_CONNECTION_HPP #define VCL_ASIO_CONNECTION_HPP #include <vcl/connection.hpp> #include <queue> #include <boost/asio.hpp> #include <boost/thread.hpp> #include <boost/enable_shared_from_this.hpp> namespace soul { struct logger_base; typedef boost::shared_ptr<logger_base> logger; } using boost::asio::ip::tcp; namespace vcl { class connection_manager; class dispatcher; class protocol; class response; typedef boost::shared_ptr<connection_manager> connection_manager_ptr; typedef boost::weak_ptr<connection_manager> connection_manager_wptr; typedef boost::shared_ptr<dispatcher> dispatcher_ptr; typedef boost::shared_ptr<protocol> protocol_ptr; typedef boost::weak_ptr<protocol> protocol_wptr; typedef boost::shared_ptr<response> response_ptr; typedef std::queue<request_ptr> req_queue; enum read_status { rs_no_valid_data, rs_read_one_block, rs_error_occured }; class asio_connection : public connection, public boost::enable_shared_from_this<asio_connection> { public: asio_connection(const soul::logger &logger, boost::asio::io_service& io_service, connection_manager_wptr cmgr, const dispatcher_ptr &disp, const vbl::host_repr_ptr &host, const protocol_ptr &cp); virtual ~asio_connection(); void handle_resolve(const boost::system::error_code& error, tcp::resolver::iterator endpoint_iterator); void handle_connect(const boost::system::error_code& error, tcp::resolver::iterator endpoint_iterator); virtual void lose_connection(); virtual void start_connecting(); virtual vbl::host_repr_ptr get_host(); virtual bool send(const request_ptr &req); static connection_ptr connect(const std::string &host, unsigned short port, const protocol_ptr &cp); private: void handle_write_request(const boost::system::error_code& error); unsigned handle_capture_request(); read_status handle_text_request(); read_status handle_icon_request(unsigned &next_amount); void start_reading(unsigned bytes=1); void handle_read_info(const boost::system::error_code& error, std::size_t tf); read_status _handle_read_info(unsigned &next_amount); void io_send(const request_ptr &req); bool io_dispatch_response(response_ptr myresponse); bool io_dispatch_error(const boost::system::error_code& error); std::string prepared_request(const request_ptr &msg) const; private: soul::logger local_logger; boost::asio::io_service& m_io_service; connection_manager_wptr m_manager; dispatcher_ptr m_dispatcher; tcp::socket m_socket; vbl::host_repr_ptr m_host; tcp::resolver::iterator m_host_iterator; protocol_wptr wcp; req_queue outgoing_requests; req_queue incoming_requests; boost::asio::streambuf inbuffer; request_ptr currently_reading; }; } #endif /* VCL_ASIO_CONNECTION_HPP */ And the stripped down communication-part of the connection object: #include "connection.hpp" #include "../logging.hpp" #include "../internal_response.hpp" #include <vcl/protocol.hpp> #include <vcl/request.hpp> #include "../manager.hpp" #include <vbl/host.hpp> #include <vbl/exceptions.hpp> #include <vcl/callback.hpp> #include <vcl/dispatcher.hpp> #include <boost/bind.hpp> namespace vcl { asio_connection::asio_connection(const soul::logger &logger, boost::asio::io_service& io_service, connection_manager_wptr cmgr, const dispatcher_ptr &disp, const vbl::host_repr_ptr &host, const protocol_ptr &cp) : local_logger(logger), m_io_service(io_service), m_manager(cmgr), m_dispatcher(disp), m_socket(io_service), m_host(host), wcp(cp) { } asio_connection::~asio_connection() { } void asio_connection::start_connecting() { tcp::resolver resolver(m_io_service); tcp::resolver::query query(m_host->get_hostname(), m_host->get_port_as_string()); boost::system::error_code ec; m_host_iterator = resolver.resolve(query, ec); handle_resolve(ec, m_host_iterator); } void asio_connection::handle_resolve(const boost::system::error_code& error, tcp::resolver::iterator endpoint_iterator) { // THIS CAN BE ANY THREAD if (!error) { tcp::endpoint endpoint = *endpoint_iterator; m_socket.async_connect(endpoint, boost::bind(&asio_connection::handle_connect, shared_from_this(), boost::asio::placeholders::error, ++m_host_iterator)); } else { m_io_service.post(boost::bind(&asio_connection::io_dispatch_error, shared_from_this(),error)); } } void asio_connection::handle_connect(const boost::system::error_code& error, tcp::resolver::iterator endpoint_iterator) { if (!error) { if (protocol_ptr p = wcp.lock()) { p->set_connection(shared_from_this()); m_dispatcher->dispatch(make_callback("connection made", boost::bind(&protocol::connection_made, p))); } // Now we start reading start_reading(); } else { io_dispatch_error(error); lose_connection(); } } bool asio_connection::io_dispatch_error(const boost::system::error_code& error) { if (protocol_ptr p = wcp.lock()) { m_dispatcher->dispatch(make_callback("Error occured", boost::bind(&protocol::error, p, error))); return true; } else { return false; } } void asio_connection::lose_connection() { /* vaious cleanup */ } void asio_connection::io_send(const request_ptr &msg) { // if the request expects an answer, push it in the incoming requests queu if (msg->get_type() != noreply_request) { incoming_requests.push(msg); } bool write_in_progress = !outgoing_requests.empty(); outgoing_requests.push(msg); if (!write_in_progress) { std::string data(prepared_request(outgoing_requests.front())); boost::asio::async_write(m_socket, boost::asio::buffer(data.data(), data.size()), boost::bind(&asio_connection::handle_write_request, shared_from_this(), boost::asio::placeholders::error)); } } void asio_connection::handle_write_request(const boost::system::error_code& error) { if (!error) { outgoing_requests.pop(); if (!outgoing_requests.empty()) { std::string data(prepared_request(outgoing_requests.front())); boost::asio::async_write(m_socket, boost::asio::buffer(data.data(), data.size()), boost::bind(&asio_connection::handle_write_request, shared_from_this(), boost::asio::placeholders::error)); } } else { lose_connection(); } } bool asio_connection::send(const request_ptr &msg) { if (!msg.get()) return false; // make sure we do the real stuff on the io-thread m_io_service.post(boost::bind(&asio_connection::io_send, shared_from_this(), msg)); return true; } void asio_connection::start_reading(unsigned bytes) { boost::asio::async_read(m_socket, inbuffer, boost::asio::transfer_at_least(bytes), boost::bind(&asio_connection::handle_read_info, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } bool asio_connection::io_dispatch_response(response_ptr myresponse) { if (protocol_ptr p = wcp.lock()) { // make a callback and see what happens m_dispatcher->dispatch(make_callback("response received", boost::bind(&protocol::dispatch_response, p, myresponse))); return true; } else { return false; } } void asio_connection::handle_read_info(const boost::system::error_code& error, std::size_t tf) { if (!error) { unsigned next_amount = 0; read_status rs = _handle_read_info(next_amount); while (rs == rs_read_one_block) rs = _handle_read_info(next_amount); if (next_amount < 2) next_amount = 1; start_reading(next_amount); } else if (error.value() == boost::system::errc::operation_canceled) { // ignore this error for now, resolve it later lose_connection(); } else { lose_connection(); } } }
Has anyone seen something similar or have any input on how best to figure out what goes wrong?
We use asio for very intensive streaming/playback, and we never encountered such an issue.
Are there invariants that says you cannot read and write at the same time?
No, you can.
Some symptoms are the same in each test. When we get the last image from the socket the buffersize is zero afterwards, and the next async_read request is to transfer_at_least(1). The async_read never calls the handler for completion of this byte, so Nagle would have kicked in.
One of the main invariants in asio is that every async_read ends with a call of its handler. So you have to check 2 points: 1) ensure that async_read was really called, 2) ensure that the data you're waiting for were *not* received during in the previous call of async_read.
Stig Sandø wrote:
Hei,
We're rewriting a client with boost asio but have run into some problems when stresstesting the client. The client is fetching textual and graphics from a server with one connection that is open at all time. When the client is getting large amounts of graphics data it will after awhile suddenly stop receiving data, and eventually it will hit our timeouts and the last call sent is an async_read/async_read_some. We are keeping the server well-fed with requests so there should be graphics forthcoming without pause. The problem has been seen on win32, linux and darwin when testing on a gigabit net fetching raw 1080i graphics (4M for each field), and is most frequent on darwin. This is naturally an absolute show-stopper for us.
So we are a bit loss what is going wrong and why async_read/async_read_some stops reacting in the middle of the fetch-queue, despite wireshark showing that the data is incoming. When using compression on the data the problem is harder to reproduce, which might suggest a race-condition somewhere. But our code is just using a single thread for io_service and all async-communication is triggered from this io-thread which has a work-object to keep the io_service spinning. We're also making sure there is at most one async_read and one async_write in effect at a time, roughly similar to the chat_client sample.
I would be suspicious of the 'incoming_request' queue, where is that data being popped from the queue, if it is not from the context of the io_service thread then it is not thread safe.
Has anyone seen something similar or have any input on how best to figure out what goes wrong? Are there invariants that says you cannot read and write at the same time? Some symptoms are the same in each test. When we get the last image from the socket the buffersize is zero afterwards, and the next async_read request is to transfer_at_least(1). The async_read never calls the handler for completion of this byte, so Nagle would have kicked in. It is also fairly hard to strip down to a small example using a mock server.
I have included some stripped down code below in case that might be helpful spotting something that we cant see.
Cheers, Stig
[snip ...] HTH -- Bill Somerville
participants (3)
-
Bill Somerville
-
Igor R
-
Stig Sandø