I didn't get a response to my email, but I ended up figuring out how
to use POSIX message queues with the boost::asio::io_service. Under
Linux you use select/epoll with message queue descriptors. I ended up
using boost::asio::posix::stream_descriptor to interact with message
queues. You can specify boost::asio::null_buffers() for the
async_read_some() and async_write_some() calls. These usually do
some reading or writing, but with null_buffers() they do not do any
reading or writing. You get a callbacks when the select/epoll
indicates that a message queue can be written to or read from. In the
callbacks, you can do the actual mq_send() and mq_receive() calls.
I've written some test code and included it below. There is one
message queue, a reader thread, and a writer thread. The writer
thread uses a timer to send a message through the message queue every
100 ms. The reader thread wakes up immediately to process the
messages.
-- Sergio
#include <iostream>
#include <string>
#include
#include
#include
#include
#include
using namespace std;
const std::string MSG_QUEUE_NAME = "/test_msg_queue";
struct message
{
unsigned char buffer[128];
};
class Writer
{
public:
Writer(boost::asio::io_service& ioService)
: ioService(ioService),
streamDescriptor(ioService),
timer(ioService)
{
struct mq_attr mattr;
mattr.mq_maxmsg = 10;
mattr.mq_msgsize = sizeof(struct message);
mqid = mq_open(MSG_QUEUE_NAME.c_str(), O_CREAT | O_WRONLY |
O_NONBLOCK, S_IREAD | S_IWRITE, &mattr);
cout << "writer mqid = " << mqid << endl;
streamDescriptor.assign(mqid);
streamDescriptor.async_write_some(
boost::asio::null_buffers(),
boost::bind(&Writer::handleWrite,
this,
boost::asio::placeholders::error));
}
void handleWrite(boost::system::error_code ec)
{
timer.expires_from_now(boost::posix_time::microseconds(100));
timer.async_wait(boost::bind(&Writer::handleTimer, this,
boost::asio::placeholders::error));
}
void handleTimer(boost::system::error_code& ec)
{
message msg;
int sendRet = mq_send(mqid, (const char*)&msg, sizeof(message), 0);
cout << "sendRet = " << sendRet << endl;
streamDescriptor.async_write_some(
boost::asio::null_buffers(),
boost::bind(&Writer::handleWrite,
this,
boost::asio::placeholders::error));
}
private:
boost::asio::io_service& ioService;
boost::asio::posix::stream_descriptor streamDescriptor;
boost::asio::deadline_timer timer;
mqd_t mqid;
};
void writerThread()
{
try
{
boost::asio::io_service ioService;
Writer writer(ioService);
ioService.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}
class Reader
{
public:
Reader(boost::asio::io_service& ioService)
: ioService(ioService),
streamDescriptor(ioService)
{
struct mq_attr mattr;
mattr.mq_maxmsg = 10;
mattr.mq_msgsize = sizeof(struct message);
mqid = mq_open(MSG_QUEUE_NAME.c_str(), O_CREAT | O_RDONLY |
O_NONBLOCK, S_IREAD | S_IWRITE, &mattr);
cout << "reader mqid = " << mqid << endl;
streamDescriptor.assign(mqid);
streamDescriptor.async_read_some(
boost::asio::null_buffers(),
boost::bind(&Reader::handleRead,
this,
boost::asio::placeholders::error));
}
void handleRead(boost::system::error_code ec)
{
u_int pri;
message msg;
ssize_t receiveRet = mq_receive(mqid, (char *) &msg, sizeof(msg), &pri);
cout << "receiveRet = " << receiveRet << endl;
streamDescriptor.async_read_some(
boost::asio::null_buffers(),
boost::bind(&Reader::handleRead,
this,
boost::asio::placeholders::error));
}
private:
boost::asio::io_service& ioService;
boost::asio::posix::stream_descriptor streamDescriptor;
mqd_t mqid;
};
void readerThread()
{
try
{
boost::asio::io_service ioService;
Reader reader(ioService);
ioService.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}
int main(int argc, char* argv[])
{
mq_unlink(MSG_QUEUE_NAME.c_str());
boost::thread writer(writerThread);
boost::thread reader(readerThread);
writer.join();
reader.join();
return 0;
}
On Tue, Aug 25, 2009 at 8:52 AM, Sergio Martinez wrote:
I'm working on a project where we want a single thread to
asynchronously interact with posix message queues, TCP sockets and
drivers. From what I've read, boost::asio::io_service provides a
common framework dealing with asynchronous IO. It looks like there's
a asio library for TCP and posix::stream_descriptor could be used for
interacting with a driver file descriptor. What I don't see is
support for posix message queues.
The io_service documentation states the following: "The io_service
class also includes facilities intended for developers of custom
asynchronous services.".
I was thinking of adding a custom service for posix message queues.
Has anyone already done this? Is there any fundamental reason why it
could not be done? (i.e. message queues are not byte streams, etc.)
Thanks,
Sergio