Hello everyone! I am writing a client for a protocol which goes something like this:
The client connects to the server, and it submits "orders" -- every time it gets an order it gets back an "ack". However, the server might asynchronously send a "fill" at any time. So I want to be listening for a fill even while the client might be doing other things (listening to another server for one-way data stream. The client then chooses to submit an order or not based on the information it gets in the datastream, or it might choose to submit an order if it receives a fill.)
I thought the way to do it would be to do
async_read_until() and have it call my fill handler when it receives something.
Then the function that submits orders would call
write()
and then block waiting for the ack message using
read_until()
However, when I implement that, the client never seems to notice that a fill message is sent. It receives the acks just fine, and it successfully reads the datafeed coming in from the other tcp connection. I thought maybe the read_until() would cancel the async_read_until() so after I received the ack message I called async_read_until() again, but that didn't work either...
(The server works as it should, I tested it with telnet.)
Is it possible to mix sync/async reads in this way? Can anyone tell if I am doing something wrong? Do I need to do something else to make this work?
Thanks, I appreciate your help!
Here is the code in question:
(as an aside, is it bad form to call connect() in the constructor? Should I call connect() and setup the async_read_until() in a separate start() method?)
MarketConnectionTCP::MarketConnectionTCP(boost::asio::io_service &io_service_, std::string ip_,
unsigned short agentPort_, unsigned short feedPort_) :
_io_service(io_service_), _agentSocket(_io_service), _feedSocket(_io_service),
_timer(_io_service, boost::posix_time::seconds(1))
{
boost::asio::ip::address ipAddress(boost::asio::ip::address::from_string(
ip_));
tcp::endpoint agentEndpoint(ipAddress, agentPort_);
Logger::stream() << "Trying to connect to agent:" << agentEndpoint
<< std::endl;
_agentSocket.connect(agentEndpoint);
Logger::stream() << "Agent connected:" << _agentSocket.remote_endpoint()
<< std::endl;
boost::asio::ip::tcp::endpoint feedEndpoint(ipAddress, feedPort_);
Logger::stream() << "Trying to connect to feed:" << feedEndpoint
<< std::endl;
_feedSocket.connect(feedEndpoint);
Logger::stream() << "Feed connected:" << _feedSocket.remote_endpoint()
<< std::endl;
boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n", boost::bind(
&MarketConnectionTCP::handleReadFill, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on agent connection:"
<< _agentSocket.remote_endpoint() << std::endl;
boost::asio::async_read_until(_feedSocket, _feedBuf, "\r\n", boost::bind(
&MarketConnectionTCP::handleReadFeed, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on feed connection:"
<< _feedSocket.remote_endpoint() << std::endl;
_timer.async_wait(boost::bind(&MarketConnectionTCP::handleTimer, this,
boost::asio::placeholders::error));
Logger::stream() << "Timer is waiting" << std::endl;
}
OrderID MarketConnectionTCP::sendOrder(const Order &order)
{
boost::asio::streambuf orderBuf;
std::ostream orderStream(&orderBuf);
orderStream << order << "\r\n";
// This blocks until we get an ack with the order id
Logger::stream() << "Sending order to:" << _agentSocket.remote_endpoint()
<< ":" << order << std::endl;
boost::asio::write(_agentSocket, orderBuf);
Logger::stream() << "Waiting for ack from:"
<< _agentSocket.remote_endpoint() << std::endl;
boost::asio::streambuf ackBuf;
std::istream ackStream(&ackBuf);
boost::asio::read_until(_agentSocket, ackBuf, "\r\n");
OrderID id;
ackStream >> id;
Logger::stream() << "Received ack from:" << _agentSocket.remote_endpoint()
<< ":" << id << std::endl;
// listen again
boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n",
boost::bind(&MarketConnectionTCP::handleReadFill, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on agent connection:"
<< _agentSocket.remote_endpoint() << std::endl;
return id;
}
void MarketConnectionTCP::handleReadFill(const boost::system::error_code& err)
{
if (!err)
{
std::istream fillStream(&_fillBuf);
Fill newFill;
fillStream >> newFill;
if (newFill.isValid())
{
Logger::stream() << "Fill received on:"
<< _agentSocket.remote_endpoint() << ":" << newFill
<< std::endl;
receiveFill(newFill);
}
else
{
Logger::stream() << "Invalid fill received on:"
<< _agentSocket.remote_endpoint() << std::endl;
}
// listen again
boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n",
boost::bind(&MarketConnectionTCP::handleReadFill, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on agent connection:"
<< _agentSocket.remote_endpoint() << std::endl;
}
else
{
delete this;
}
}