[Asio] Socket Read/Write Thread-Safety

Hi, I'm implementing a full-duplex protocol over TCP/IP using Boost.Asio. Multiple threads will be calling 'io_service::run_one()', so I have to serialize my completion handlers. Since I need to be reading and writing on a socket at the same time, can I use the 'boost::asio::async_read(...)' and 'boost::asio::async_write(...)' functions? The documentation states that these are implemented in terms of calls to the stream's async_read_some and async_write_some functions, and the 'basic_stream_socket' class isn't thread-safe. Thanks, Timothy

Hi, I'm implementing a full-duplex protocol over TCP/IP using Boost.Asio. Multiple threads will be calling 'io_service::run_one()', so I have to serialize my completion handlers. Since I need to be reading and writing on a socket at the same time, can I use the 'boost::asio::async_read(...)' and 'boost::asio::async_write(...)' functions? The documentation states that these are implemented in terms of calls to the stream's async_read_some and async_write_some functions, and the 'basic_stream_socket' class isn't thread-safe.
Your statements are correct, but I'm not sure I got the question. If you ask whether you may call async_read and async_write simultaneously on the same socket, then the answer is yes, of course.

Your statements are correct, but I'm not sure I got the question. If you ask whether you may call async_read and async_write simultaneously on the same socket, then the answer is yes, of course.
The 'boost::asio::async_read(...)' and 'boost::asio::async_write(...)' functions might call async_read_some and async_write_some concurrently in some scenarios, as they do not synchronize accesses to the stream object that I'm passing to them as a reference. A look in their implementations confirms this: if the stream's async_read_some or async_write_some didn't satisfy the completion condition, it tries again, using the same reference to the stream object. With multiple threads calling io_service::run_one(), the result can be one thread calling async_read_some while another is calling async_write_some at the same time. This seems to prevent me from using async_read and async_write at all, so I'm wondering if the stream has some undocumented thread-safety guarantees, or if there's a proper way to achieve the same effect.

On Fri, Jan 28, 2011 at 1:48 PM, Timothy Liang
Hi, I'm implementing a full-duplex protocol over TCP/IP using Boost.Asio. Multiple threads will be calling 'io_service::run_one()', so I have to serialize my completion handlers. Since I need to be reading and writing on a socket at the same time, can I use the 'boost::asio::async_read(...)' and 'boost::asio::async_write(...)' functions? The documentation states that these are implemented in terms of calls to the stream's async_read_some and async_write_some functions, and the 'basic_stream_socket' class isn't thread-safe.
I think you want multiple threads calling io_service::run instead of run_one. To serialize your connection-related completion handlers you'd want to wrap them in a Boost.Asio strand. HTH -- Dean Michael Berris about.me/deanberris

I think you want multiple threads calling io_service::run instead of run_one.
I need to test for a program interruption signal, so I can't use io_service::run.
To serialize your connection-related completion handlers you'd want to wrap them in a Boost.Asio strand.
I'm not concerned about my own completion handlers. The handlers in Asio's composed operations is the problem. And I have no way to wrap those in strands.

I'm not concerned about my own completion handlers. The handlers in Asio's composed operations is the problem. And I have no way to wrap those in strands.
<

<
> http://www.boost.org/doc/libs/1_45_0/doc/html/boost_asio/overview/core/stran...
Ah! It also says, "The io_service::strand::wrap() function creates a new completion handler that defines asio_handler_invoke so that the function object is executed through the strand." And the composed operations have a hook on their intermediate handlers that call asio_handler_invoke with the completion handler's context. So I simply need to pass the wrapped handler object directly to the composed operations as the completion handler. Cool. Now I'm beginning to wonder if this will work with my design. I want my program to run handlers as concurrently as possible. I'm currently using a single io_service and setting up a thread pool like the HTTP Server 3 example. Since the io_service doesn't know if a handler would be blocked by a strand, it could needlessly assign an about-to-be-blocked handler to a free thread. How do I fix that?

On Mon, Jan 31, 2011 at 7:30 PM, Timothy Liang
<
> http://www.boost.org/doc/libs/1_45_0/doc/html/boost_asio/overview/core/stran...
Ah! It also says, "The io_service::strand::wrap() function creates a new completion handler that defines asio_handler_invoke so that the function object is executed through the strand." And the composed operations have a hook on their intermediate handlers that call asio_handler_invoke with the completion handler's context. So I simply need to pass the wrapped handler object directly to the composed operations as the completion handler. Cool.
Now I'm beginning to wonder if this will work with my design. I want my program to run handlers as concurrently as possible. I'm currently using a single io_service and setting up a thread pool like the HTTP Server 3 example. Since the io_service doesn't know if a handler would be blocked by a strand, it could needlessly assign an about-to-be-blocked handler to a free thread. How do I fix that?
It doesn't do that. It's smart enough to see that it's in a given strand which may be blocked so it moves on to the next available scheduled handler. I do it all the time and I haven't seen it be the bottleneck for anything I've developed with Boost.Asio even in the early days and leading up to especially now. Give it a shot and then measure to identify which part of the solution is causing you problems. HTH -- Dean Michael Berris about.me/deanberris

It doesn't do that. It's smart enough to see that it's in a given strand which may be blocked so it moves on to the next available scheduled handler. I do it all the time and I haven't seen it be the bottleneck for anything I've developed with Boost.Asio even in the early days and leading up to especially now.
Give it a shot and then measure to identify which part of the solution is causing you problems.
Wow, you're right! Strands clearly do more than they appear to do. Thanks.

Hi All, I'm implementing a tcp socket to be used with a client and server derived classes. I'm testing the code with boost test but I get the following error: "The I/O operation has been aborted because of either a thread exit or an application request" Here is the test code. std::string host = "localhost"; unsigned short port = 20001; //Server* server = new Server(); //server->acceptAsyncOnThread(port); Server server; server.acceptAsyncOnThread(port); Client client; TcpSessionPtr connection = client.connect(host, port); bool accepted = false; boost::timer timer; while (!accepted && timer.elapsed() < 1.0) { //accepted = server->hasAccepted(); accepted = server.hasAccepted(); } //delete server; BOOST_CHECK_EQUAL( accepted, true ); Can anybody please explain me the reason of that error? Moreover, do you expect that using the commented code (i.e. the Server instance is created on the heap and deleted before the BOOST_CHECK_EQUAL call) may solve the problem? Why? Regards Gianni

Il 4/5/2011 3:36 PM, Igor R ha scritto:
"The I/O operation has been aborted because of either a thread exit or an application request" Perhaps the thread that invoked async. operation exited before the operation was completed?
I'm not an expert so I try to explain my scenario. The server runs an async accept on a thread with basically the following code: boost::asio::ip::tcp::endpoint endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort); acceptor.open(endpoint.protocol()); acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); acceptor.bind(endpoint); acceptor.listen(); TcpSessionPtr connection(new TcpSession(io_service)); acceptor.async_accept(connection->getSocket(), boost::bind(&BoostTcpSocket::handleAccept, this, connection, boost::asio::placeholders::error)); in the handleAccept() callback a new async accept is called to allow the server to be able to accept other clients. Here is the code: TcpSessionPtr connection(new TcpSession(io_service)); acceptor.async_accept(connection->getSocket(), boost::bind(&BoostTcpSocket::handleAccept, this, connection, boost::asio::placeholders::error)); So, it sounds reasonable that if the application ends an async accept operation may be pending. Now, how could I cancel a pending operation? Is there another way to solve the problem? Regards Gianni

The server runs an async accept on a thread with basically the following code: boost::asio::ip::tcp::endpoint endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort); acceptor.open(endpoint.protocol()); acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); acceptor.bind(endpoint); acceptor.listen(); TcpSessionPtr connection(new TcpSession(io_service)); acceptor.async_accept(connection->getSocket(), boost::bind(&BoostTcpSocket::handleAccept, this, connection, boost::asio::placeholders::error));
Do you mean that the above code runs not in io_service::run() thread? When does this thread exit, just after it executes the above code?
So, it sounds reasonable that if the application ends an async accept operation may be pending. Now, how could I cancel a pending operation? Is there another way to solve the problem?
You can close() your acceptor, but its destructor closes it anyway.

Il 4/5/2011 8:32 PM, Igor R ha scritto:
The server runs an async accept on a thread with basically the following code: boost::asio::ip::tcp::endpoint endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort); acceptor.open(endpoint.protocol()); acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); acceptor.bind(endpoint); acceptor.listen(); TcpSessionPtr connection(new TcpSession(io_service)); acceptor.async_accept(connection->getSocket(), boost::bind(&BoostTcpSocket::handleAccept, this, connection, boost::asio::placeholders::error)); Do you mean that the above code runs not in io_service::run() thread? When does this thread exit, just after it executes the above code?
No, I'm sorry I forgot one line at the end: thread = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service)); thread is a boost::thread member variable of the TcpSocket class I'm trying to implement. io_service and acceptor are also member variables of TcpSocket class.
You can close() your acceptor, but its destructor closes it anyway.
Yes, you are right in fact in the TcpSocket destructor I already call: if (acceptor.is_open()) acceptor.close(); Regards Gianni

No, I'm sorry I forgot one line at the end:
thread = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service));
Ok, but this line just creates a thread and passes to it io_service::run() -- it doesn't block your current thread, where you issued async_accept. Ensure that the thread where you *call* async_accept() doesn't ends before the operation is complete.

Il 4/6/2011 10:03 AM, Igor R ha scritto:
No, I'm sorry I forgot one line at the end:
thread = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service)); Ok, but this line just creates a thread and passes to it io_service::run() -- it doesn't block your current thread, where you issued async_accept. Ensure that the thread where you *call* async_accept() doesn't ends before the operation is complete.
Igor, thanks for your patience. You are right, the piece of code I posted does not block the thread. May be it's better going back to the test I posted at first: Server server; server.acceptAsyncOnThread(port); Client client; TcpSessionPtr connection = client.connect(host, port); bool accepted = false; boost::timer timer; while (!accepted && timer.elapsed() < 1.0) { accepted = server.accepted(); } Here the Server class derives from TcpSocket and the acceptAsyncOnThread() basically calls the code I posted in the prevoius email including the thread. The server.accepted() returns true if the callback of the accept_async is called (and it works correctly indeed). The process is blocked by a loop with a timer (as you can see from the code). I would be glad if you could find the error in that code. Regards Gianni
participants (4)
-
Dean Michael Berris
-
Gianni Ambrosio
-
Igor R
-
Timothy Liang