ASIO sockets - 2nd write fails
I am having a problem writing a TCP socket message using boost:asio async_* I am new to using boost asio sockets. reading what I could find (this list, samples, other forums) I have added boost::asio socket use to my application. What works is: accept, receive, parse and reply to a message. After my client sends a message and receives a response it remains connected and starts an async receive. Several seconds later my server tries to send another message to the client (by calling server::write()). the code runs ok up to connection::handle_writeMessage() where error is: 2719 “The file handle supplied is not valid”. I thought I read (links below) that using strand::wrap() to queue the async_* calls and io_service::post() to post the initial function call would allow a thread that didn’t call io_service::run() to initiate the sending or a message. http://stackoverflow.com/questions/4078484/using-boost-sockets-do-i-need-onl... http://stackoverflow.com/questions/4090567/better-understanding-boosts-chat-... http://groups.google.com/group/boost-list/browse_thread/thread/124a643a06d67... did I interpret this incorrectly? Did I code it wrong? Thank you for your time and advice. to develop my code I started with the http3 and chat server samples. Changes include: - server::threads is a class member - server::run() returns without waiting for threads to stop so I start other threads in my program - added server::stop() to stop threads. - server::write() is called from another thread. - connection::do_writeMessage() and connection::handle_writeMessage() are from the chat server sample. A summary of my server code is below. its long, but should be familiar from the samples. Server.hpp std::vector<boost::shared_ptr<boost::thread> > threads; server::server : thread_pool_size_(2), // 1 thread for wait, read, reply. 1 for unsolicited writes. acceptor_(io_service_), new_connection_(new connection(io_service_, request_handler_)), request_handler_() { async_accept(new_connection_->socket(), boost::bind(&server::handle_accept, this, boost::asio::placeholders::error)); } void server::run() { // 1 thread for wait, read, reply. 1 for unsolicited writes. // Create a pool of threads to run all of the io_services. //std::vector<boost::shared_ptr<boost::thread> > threads; for (std::size_t i = 0; i < thread_pool_size_; ++i) { boost::shared_ptr<boost::thread> thread(new boost::thread( boost::bind(&boost::asio::io_service::run, &io_service_))); threads.push_back(thread); } } void server::stop() { io_service_.stop(); // Wait for all threads in the pool to exit. for (std::size_t i = 0; i < threads.size(); ++i) { threads[i]->join(); } } server::handle_accept (const boost::system::error_code& e) { if (!e) { // read, parse and reply new_connection_->start(); new_connection_.reset(new connection(io_service_, request_handler_)); // ready to accept another connection cout << "waiting for new accept ..." << endl; acceptor_.async_accept(new_connection_->socket(), boost::bind(&server::handle_accept, this, boost::asio::placeholders::error)); } } // public // called from another thread in my program to send an unsolicited message to my connected client void server::write(const string& msg) { io_service_.post(boost::bind(&connection::do_writeMessage, new_connection_, msg)); } - - - - - - - - - - connection::connection(boost::asio::io_service& io_service, request_handler& handler) : strand_(io_service), socket_(io_service), request_handler_(handler) { _running = true; } void connection::start() { buffer_.fill (0); socket_.async_read_some(boost::asio::buffer(buffer_), strand_.wrap( boost::bind(&connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ) ); } void connection::handle_read(const boost::system::error_code& e, std::size_t bytes_transferred) { if (!e) { // parse what we have so far. request_parser_.parse() is my code and it works correctly. // when successful result==true, request boost::tribool result; request_.reset(); boost::tie(result, boost::tuples::ignore) = request_parser_.parse( request_, buffer_.data(), buffer_.data() + bytes_transferred); if (result) { // we read an entire message // this is my code to process the message and put fill in the reply_ buffer. request_handler_.handle_request(request_, reply_); // send the reply boost::asio::async_write(socket_, reply_.to_buffers(), strand_.wrap( boost::bind(&connection::handle_writeResponse, shared_from_this(), boost::asio::placeholders::error) ) ); } else if (!result) { // there was an error reading the message boost::asio::async_write(socket_, reply_.to_buffers(), strand_.wrap( boost::bind(&connection::handle_writeResponse, shared_from_this(), boost::asio::placeholders::error) ) ); } else { // read some ok. need to read some more. socket_.async_read_some(boost::asio::buffer(buffer_), strand_.wrap( boost::bind(&connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ) ); } } else { cout << "connection::handle_read: error: " << e.value() << " " << e.message() << endl; } // If an error occurs then no new asynchronous operations are started. This // means that all shared_ptr references to the connection object will // disappear and the object will be destroyed automatically after this // handler returns. The connection class's destructor closes the socket. } void connection::handle_writeResponse(const boost::system::error_code& e) { if (!e) { // test. This message is received by my client. do_writeMessage ("hello world"); // ready to read more start(); } else { cout << "connection::handle_writeMessage: error sending to client: " << e.value() << " " << e.message() << endl; } // No new asynchronous operations are started. This means that all shared_ptr // references to the connection object will disappear and the object will be // destroyed automatically after this handler returns. The connection class's // destructor closes the socket. } - - - - - - - - - - // call to this function is posed to io_service from server::write // public void connection::do_writeMessage(string msg) { bool write_in_progress = !_messagesToWrite.empty(); _messagesToWrite.push_back(msg); // if there are any in the queue, handle_writeMessage is already sending them. if (write_in_progress) { cout << "connection::do_writeMessage: write in progress." << endl; } else { // start the async write. boost::asio::async_write(socket_, boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()), strand_.wrap( boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/, boost::asio::placeholders::error) ) ); } } // async write is done. if no error and more messages to write, write them, too. void connection::handle_writeMessage(const boost::system::error_code& error) { if (!error) { cout << "connection::handle_writeMessage: message written ok." << endl; // remove the message we just wrote _messagesToWrite.pop_front(); // any more? if (!_messagesToWrite.empty()) { // write the next one and call this function when that'd done. boost::asio::async_write(socket_, boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()), strand_.wrap( boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/, boost::asio::placeholders::error) ) ); } else { cout << "connection::handle_writeMessage: no more messages" << endl; } } else { cout << "connection::handle_writeMessage: error: " << error.value() << " " << error.message() << endl; } // let this async chain end. } - - - - - - - - - - main() { pTCPServer = new http::server3::server (argv[TCPPortNumber]); if (pTCPServer) { pTCPServer->run(); } // start thread that will initiate 2nd message … … … }
Several seconds later my server tries to send another message to the client (by calling server::write()). the code runs ok up to connection::handle_writeMessage() where error is: 2719 “The file handle supplied is not valid”.
FWIW, this usually means that the socket is already closed.
Hi Bill - without looking at your code in detail (so I'm not sure where the bug is), you might want to consider having a single thread running the io_service, and have the callbacks / completion handlers forward any significant work through a wait queue to a thread pool. This isolates all of the Asio interaction from the code doing the app work, which is going to greatly simplify your thread safety and your Asio related networking code. The networking code then does not have to deal with multiple threads, strands, mutex locks, and OS level socket thread safety, and the app code only has to mutex protect the data that is shared between multiple threads. There's many wait queue examples available, or I can share one I adapted from an Anthony Williams (Boost Thread author) example. Make sure your completion handler lifetimes are correct, which it looks like you've handled with "shared_from_this". Otherwise, as Igor already mentioned, you probably have a socket that is already closed. Cliff
On Fri, May 6, 2011 at 9:57 PM, bill comment <billcomment@gmail.com> wrote:
I am having a problem writing a TCP socket message using boost:asio async_*
I am new to using boost asio sockets. reading what I could find (this list, samples, other forums) I have added boost::asio socket use to my application.
What works is: accept, receive, parse and reply to a message.
After my client sends a message and receives a response it remains connected and starts an async receive.
Several seconds later my server tries to send another message to the client (by calling server::write()). the code runs ok up to connection::handle_writeMessage() where error is: 2719 “The file handle supplied is not valid”.
I thought I read (links below) that using strand::wrap() to queue the async_* calls and io_service::post() to post the initial function call would allow a thread that didn’t call io_service::run() to initiate the sending or a message.
http://stackoverflow.com/questions/4078484/using-boost-sockets-do-i-need-onl...
http://stackoverflow.com/questions/4090567/better-understanding-boosts-chat-...
http://groups.google.com/group/boost-list/browse_thread/thread/124a643a06d67...
did I interpret this incorrectly? Did I code it wrong?
Thank you for your time and advice.
to develop my code I started with the http3 and chat server samples. Changes include:
- server::threads is a class member
- server::run() returns without waiting for threads to stop so I start other threads in my program
- added server::stop() to stop threads.
- server::write() is called from another thread.
- connection::do_writeMessage() and connection::handle_writeMessage() are from the chat server sample.
A summary of my server code is below. its long, but should be familiar from the samples.
Server.hpp
std::vector<boost::shared_ptr<boost::thread> > threads;
server::server
: thread_pool_size_(2), // 1 thread for wait, read, reply. 1 for unsolicited writes.
acceptor_(io_service_),
new_connection_(new connection(io_service_, request_handler_)),
request_handler_()
{
async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error));
}
void server::run()
{
// 1 thread for wait, read, reply. 1 for unsolicited writes.
// Create a pool of threads to run all of the io_services.
//std::vector<boost::shared_ptr<boost::thread> > threads;
for (std::size_t i = 0; i < thread_pool_size_; ++i)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
}
void server::stop()
{
io_service_.stop();
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i)
{
threads[i]->join();
}
}
server::handle_accept (const boost::system::error_code& e)
{
if (!e)
{
// read, parse and reply
new_connection_->start();
new_connection_.reset(new connection(io_service_, request_handler_));
// ready to accept another connection
cout << "waiting for new accept ..." << endl;
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error));
}
}
// public
// called from another thread in my program to send an unsolicited message to my connected client
void server::write(const string& msg)
{
io_service_.post(boost::bind(&connection::do_writeMessage, new_connection_, msg));
'new_connection_' always points to an invalid socket because after a connection is established you always replace it with a new one.
}
- - - - - - - - - -
connection::connection(boost::asio::io_service& io_service,
request_handler& handler)
: strand_(io_service),
socket_(io_service),
request_handler_(handler)
{
_running = true;
}
void connection::start()
{
buffer_.fill (0);
socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap(
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
)
);
}
void connection::handle_read(const boost::system::error_code& e,
std::size_t bytes_transferred)
{
if (!e)
{
// parse what we have so far. request_parser_.parse() is my code and it works correctly.
// when successful result==true, request
boost::tribool result;
request_.reset();
boost::tie(result, boost::tuples::ignore) = request_parser_.parse(
request_, buffer_.data(), buffer_.data() + bytes_transferred);
if (result)
{
// we read an entire message
// this is my code to process the message and put fill in the reply_ buffer.
request_handler_.handle_request(request_, reply_);
// send the reply
boost::asio::async_write(socket_, reply_.to_buffers(),
strand_.wrap(
boost::bind(&connection::handle_writeResponse, shared_from_this(),
boost::asio::placeholders::error)
)
);
}
else if (!result)
{
// there was an error reading the message
boost::asio::async_write(socket_, reply_.to_buffers(),
strand_.wrap(
boost::bind(&connection::handle_writeResponse, shared_from_this(),
boost::asio::placeholders::error)
)
);
}
else
{
// read some ok. need to read some more.
socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap(
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
)
);
}
}
else
{
cout << "connection::handle_read: error: " << e.value() << " " << e.message() << endl;
}
// If an error occurs then no new asynchronous operations are started. This
// means that all shared_ptr references to the connection object will
// disappear and the object will be destroyed automatically after this
// handler returns. The connection class's destructor closes the socket.
}
void connection::handle_writeResponse(const boost::system::error_code& e)
{
if (!e)
{
// test. This message is received by my client.
do_writeMessage ("hello world");
// ready to read more
start();
}
else
{
cout << "connection::handle_writeMessage: error sending to client: " << e.value() << " " << e.message() << endl;
}
// No new asynchronous operations are started. This means that all shared_ptr
// references to the connection object will disappear and the object will be
// destroyed automatically after this handler returns. The connection class's
// destructor closes the socket.
}
- - - - - - - - - -
// call to this function is posed to io_service from server::write
// public
void connection::do_writeMessage(string msg)
{
bool write_in_progress = !_messagesToWrite.empty();
_messagesToWrite.push_back(msg);
// if there are any in the queue, handle_writeMessage is already sending them.
if (write_in_progress)
{
cout << "connection::do_writeMessage: write in progress." << endl;
}
else
{
// start the async write.
boost::asio::async_write(socket_,
boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),
strand_.wrap(
boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,
boost::asio::placeholders::error)
)
);
}
}
// async write is done. if no error and more messages to write, write them, too.
void connection::handle_writeMessage(const boost::system::error_code& error)
{
if (!error)
{
cout << "connection::handle_writeMessage: message written ok." << endl;
// remove the message we just wrote
_messagesToWrite.pop_front();
// any more?
if (!_messagesToWrite.empty())
{
// write the next one and call this function when that'd done.
boost::asio::async_write(socket_,
boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),
strand_.wrap(
boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,
boost::asio::placeholders::error)
)
);
}
else
{
cout << "connection::handle_writeMessage: no more messages" << endl;
}
}
else
{
cout << "connection::handle_writeMessage: error: " << error.value() << " " << error.message() << endl;
}
// let this async chain end.
}
- - - - - - - - - -
main()
{
pTCPServer = new http::server3::server (argv[TCPPortNumber]);
if (pTCPServer)
{
pTCPServer->run();
}
// start thread that will initiate 2nd message
… … …
}
_______________________________________________ Boost-users mailing list Boost-users@lists.boost.org http://lists.boost.org/mailman/listinfo.cgi/boost-users
participants (4)
-
bill comment
-
Cliff Green
-
Igor R
-
肖锋