[asio] Problem implementing a line-based protocol

I am trying to write a server and client that use a line-based text protocol for communication. It works rather well now, but there is some weird problem: One of my messages loses the first 4 characters. (reproducible, but I don't do anything special in that one case, I think) boost::asio::async_read_until is used to read the line into a boost::asio::streambuf the socket is a boost::asio::local::stream_ protocol::socket std::string msg; std::istream is(&buffer); std::getline(is, msg); I looked at the buffer.size(), the size reported to the handler function and compared it to the length of the message, which looks consistant, so I don't assume that the read call read some characters over the \n. (but that would be the only logical explanation) * there are no multiple read operations on the socket * I reuse the same buffer for multiple async_read_until calls Does anyone have a code snippet that can help me figure out what could be stuck and lost in the buffer, or where else the problem could be?

On 01/11/2011 12:45 PM, Hannes Brandstätter-Müller wrote: [snip]
Does anyone have a code snippet that can help me figure out what could be stuck and lost in the buffer, or where else the problem could be?
IIRC, this might have to do with istream / streambuf not doing what you may expect. Have you tried using streambuf's commit / consume member functions? Cheers, Rutger

On Tue, Jan 11, 2011 at 13:32, Rutger ter Borg
On 01/11/2011 12:45 PM, Hannes Brandstätter-Müller wrote: [snip]
Does anyone have a code snippet that can help me figure out what could
be stuck and lost in the buffer, or where else the problem could be?
IIRC, this might have to do with istream / streambuf not doing what you may expect. Have you tried using streambuf's commit / consume member functions?
Cheers,
Rutger
That I did not try, but I just tried another, simpler program that does the communication the same way. It worked perfectly, and even showed the buffer size to be bigger (as almost all the messages were sent simultaneously) and they were processed line by line, as intended. I'm now totally at loss what could cause that. The most noticeable difference between the programs is that the non-working one is compiled with the MPI (openmpi) compiler (which adds some libraries and calls the GCC) instead of only gc++. Confused, getting a headache, Hannes

On 01/11/2011 01:50 PM, Hannes Brandstätter-Müller wrote:
That I did not try, but I just tried another, simpler program that does the communication the same way. It worked perfectly, and even showed the buffer size to be bigger (as almost all the messages were sent simultaneously) and they were processed line by line, as intended.
Could you provide example code? Rutger

On Tue, Jan 11, 2011 at 14:09, Rutger ter Borg
On 01/11/2011 01:50 PM, Hannes Brandstätter-Müller wrote:
That I did not try, but I just tried another, simpler program that does the communication the same way. It worked perfectly, and even showed the buffer size to be bigger (as almost all the messages were sent simultaneously) and they were processed line by line, as intended.
Could you provide example code?
void start_read() { // Set a deadline for the read operation. m_input_deadline.expires_from_now(boost::posix_time::seconds(30)); // Start an asynchronous operation to read a newline-delimited message. logging::instance().log_debug("Starting Read", this); boost::asio::async_read_until(m_socket, m_input_buffer, '\n', boost::bind( &birfh_ipc_session::handle_read, shared_from_this(), _1, _2)); } void handle_read(const boost::system::error_code& ec, std::size_t size) { if (stopped()) { return; } if (!ec) { logging::instance().log_debug("Got Buffer Size: " + base::as_string(m_input_buffer.size()), this); logging::instance().log_debug("Got Message Size: " + base::as_string(size), this); // Extract the newline-delimited message from the buffer. std::string msg; std::istream is(&m_input_buffer); std::getline(is, msg); logging::instance().log_debug("Got Message Size (read): " + base::as_string(msg.length()), this); if (!msg.empty()) { logging::instance().log_debug("Got Message: " + msg, this); m_message_queue.push_back(birfh_message(msg)); } else { // We received a heartbeat message from the client. If there's nothing // else being sent or ready to be sent, send a heartbeat right back. logging::instance().log_debug("Heartbeat received", this); if (m_output_queue.empty()) { m_output_queue.push_back("\n"); // Signal that the output queue contains messages. Modifying the // expiry will wake the output actor, if it is waiting on the timer. m_non_empty_output_queue.expires_at(boost::posix_time::neg_infin); } } start_read(); } else { m_message_queue.push_back(my_message(my_protocol::exception_tag, false, ec.message())); stop(); } } Log file (this is how it should work, with the small test case): Starting Read Session started. Got Buffer Size: 512 Got Message Size: 39 Got Message Size (read): 38 Got Message: LOGON_ALGO:0:birfh-hannes:1337:11626:1 Starting Read Got Buffer Size: 473 Got Message Size: 200 Got Message Size (read): 199 Got Message: PARAMETERNAMES:0:22 serialization::archive 7 0 0 10 0 0 0 4 test 4 blub 4 test 4 blub 4 test 4 blub 4 test 4 blub 4 test 4 blub 4 test 4 blub 4 test 4 blub 4 test 4 blub 4 test 4 blub 4 test 4 blub:2 Starting Read Got Buffer Size: 273 Got Message Size: 19 Got Message Size (read): 18 Got Message: JOBDONE:0:0:1337:3 This is what happens with the other program (again, log): Starting Read Session started. Got Buffer Size: 39 Got Message Size: 39 Got Message Size (read): 38 Got Message: LOGON_ALGO:0:birfh-hannes:1337:12175:1 Starting Read Got Buffer Size: 132 Got Message Size: 132 Got Message Size (read): 131 Got Message: METERNAMES:0:22 serialization::archive 7 0 0 5 0 0 0 6 InSize 4 5809 7 InCount 1 5 6 MaxMut 1 2 7 WinSize 2 20 7 GC-Cont 2 -1:2 Starting Read Got Buffer Size: 19 Got Message Size: 19 Got Message Size (read): 18 Got Message: JOBDONE:0:0:1337:3 As you might notice, the PARAMETERNAMES is missing a "PARA" Hannes

On 01/11/2011 02:23 PM, Hannes Brandstätter-Müller wrote:
Could you provide example code?
As you might notice, the PARAMETERNAMES is missing a "PARA"
Hannes
I think you should make sure the istream is destructed before the underlying streambuf is touched again (in thise case, through start_read). E.g., just put the stuff after declaring the msg until just before the point where you call start_read in its own scope. HtH, Rutger

I posted about a week ago that I had some problems with asio and sending and receiving a line-based message protocol. I rewrote some parts of the code to clean it up and to see if there were any more errors hidden in the code duplications that were in there. I now don't get the truncated message mentioned before, but now it seems that some lines are lost completely. Setup: * Self-written server using boost::asio::read_until with '\n' as the delimiter. Works flawlessly even if there are multiple lines in the input... * Self-written client using async_write and a loop of io_service.run_one() and checking the resulting ec to enable timeouts. When the client uses normal GCC to compile, it works flawlessly, at least in my tests. But when I use MPI, especially the calls to MPI::Init() and MPI::Finalize() and comppile with mpic++ wrapper, some messages do not arrive at the server. The server is unchanged. The not-arriving messages use boost serialization for some data, but one message that uses it does get through. I have tried with openmpi and mpich2. I am now seriously out of ideas what else to try. Hannes
participants (2)
-
Hannes Brandstätter-Müller
-
Rutger ter Borg