Hello everyone,
I am using the boost library now for a couple of weeks.
Unfortunately I've run into some problems with my C++ code and I
would like to make sure that these problems are not caused due to
incorrect use of boost. So basically I've written a small class
which is suppossed to take messages and send them asynchronously.
If there are more messages than can be processed, they queue up.
The code is mainly based on the tutorial code provided on the
boost website:
http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/tutorial/tutdaytime3.html
I have 2 questions:
1. The constructor launches the threads, from this point on I want
to be able to queue up messages, which are processed afer that by
using the function MessageService::write. Is there any possible
problem which could occur due to buffer or thread issues? Because
sometimes the messages are not processed until the end and when
they are processed they are corrupted.
2. Sometimes the messages are not processed when the application
would quit. I have to make sure, that the messages are sent at
least at the end. Sometimes there are 10.000+ messages queued up,
so this might take a bit time. For this I've wrote
MessageService::await(). It seems really bogus to me. I am
checking the pending messages left for the following reason:
When the host I am connecting to does not listen on the port I am
connecting to, I don't receive an error by the functions I am
using. So the messages are queued up and nothing happens. I don't
know why. Have I forgot any checking whether the connection could
be established?
In the hope my question is not too generic, I would be really
grateful for any advice. I am sorry I could not provide any
specific errors, but this is due to the reason all the problems
evolve on the client side by receiving corrupted messages.
- Konrad
/*
* MessageService.cpp
*
* Created on: 04.06.2011
* Author: Konrad Johannes Reiche
*/
#include "MessageService.h"
using namespace google::protobuf::io;
using boost::asio::ip::tcp;
MessageService::MessageService(std::string ip, std::string port) :
work(io_service), resolver(io_service), socket(io_service) {
messageQueue = new std::deque<AgentMessage>;
tcp::resolver::query query(ip, port);
endpoint_iterator = resolver.resolve(query);
tcp::endpoint endpoint = *endpoint_iterator;
socket.async_connect(endpoint,
boost::bind(&MessageService::handle_connect,
this, boost::asio::placeholders::error,
++endpoint_iterator));
boost::thread t(boost::bind(&boost::asio::io_service::run,
&io_service));
}
void MessageService::await() {
while (!messageQueue->empty()) {
signal(SIGINT, exit);
int messagesLeft = messageQueue->size();
sleep(3);
std::cout << "Pending Profiler Agents Messages: "
<< messageQueue->size() << std::endl;
if (messagesLeft == messageQueue->size()) {
std::cout << "Connection Error" <<
std::endl;
break;
}
}
}
void MessageService::write(AgentMessage agentMessage, long
systemTime,
int JVM_ID) {
agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
agentMessage.set_jvm_id(JVM_ID);
agentMessage.set_systemtime(systemTime);
io_service.post(boost::bind(&MessageService::do_write, this,
agentMessage));
}
void MessageService::do_close() {
socket.close();
}
void MessageService::transmitMessage(AgentMessage agentMessage) {
boost::asio::streambuf b;
std::ostream os(&b);
ZeroCopyOutputStream *raw_output = new
OstreamOutputStream(&os);
CodedOutputStream *coded_output = new
CodedOutputStream(raw_output);
coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);
delete coded_output;
delete raw_output;
boost::system::error_code ignored_error;
boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this,
boost::asio::placeholders::error));
}
void MessageService::do_write(AgentMessage agentMessage) {
bool write_in_progress = !messageQueue->empty();
messageQueue->push_back(agentMessage);
if (!write_in_progress) {
transmitMessage(agentMessage);
}
}
void MessageService::handle_write(const boost::system::error_code
&error) {
if (!error) {
messageQueue->pop_front();
if (!messageQueue->empty()) {
transmitMessage(messageQueue->front());
}
} else {
std::cout << error << std::endl;
do_close();
}
}
void MessageService::handle_connect(const
boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator) {
// can be used to receive commands from the Java profiler
interface
}
MessageService::~MessageService() {
// TODO Auto-generated destructor stub
}