I'm trying (in vain so far) to find a suitable circular_buffer that allows chunked reads/writes which I can use in boost asio.
I have 2 threads, Thread 1 reads data from a socket (boost::asio::tp::tcp::socket) and writes it into a buffer (boost::asio::mutable_buffer)
Thread 2 reads the buffer and parses out individual messages.
Thread 2 shouldn't care about what Thread 1 is doing, specifically whether or not it has completed (or is in the process of) more writes than what Thread 2 has read.
All Thread 2 should receive is a small packet of information (the interface being perhaps only iterator begin() and size_t size() ) relating to the chunk of data that was written into the buffer. Reading from the iterator should be safe (ie: it should wrap when it reaches the end of the buffer)
I have an implementation from before I started using boost::asio, which is below. It's quite clearly not standard compliant, but it worked for me.
Is there an alternative method which I should pursue rather than "chunked circular buffers"?
Will I be able to create something like I've pasted below successfully to work with boost asio, or are there issues I'm unaware of? (The reason I ask this question is because I find it surprising that something like this doesn't exist already)
Any and all help greatly appreciated.
#ifndef _ring_buf_h_
#define _ring_buf_h_
#include <errno.h>
#include <boost/thread/mutex.hpp>
namespace ring {
typedef unsigned char byte;
static const uint32_t RING_SIZE = 2 * 1024 * 1024; // 2 meg
class iterator
{
private:
const byte* buf;
int cur;
size_t buf_size;
public:
iterator() : buf(), cur(), buf_size() {}
iterator(const byte* _buf, int _cur, size_t _buf_size) : buf(_buf), cur(_cur), buf_size(_buf_size) {}
const byte& operator*() const { return buf[cur % RING_SIZE]; }
const byte& operator[](int n) const { return buf[(cur + n) % RING_SIZE]; }
iterator& operator+=(int n) { cur += n; buf_size -= n; return *this; }
iterator& operator++() { ++cur; --buf_size; return *this; } // post-inc
iterator operator++(int) { iterator tmp(*this); ++cur; --buf_size; return tmp; } // pre-inc
bool operator==(iterator b) const { return cur == b.cur; }
bool operator!=(iterator b) const { return cur != b.cur; }
size_t size() const { return buf_size; }
int offset() const { return cur; }
};
//----------------------------------------------------------------
inline iterator operator+(iterator a, int b) { return a += b; }
inline iterator operator+(int b, iterator a) { return a += b; }
class buffer
{
mutable boost::mutex mtx;
byte buf[RING_SIZE];
unsigned beg;
unsigned end; // may wrap, wrapping is blessed for unsigned types only
public:
buffer() : beg(), end()
{
memset(buf, 0, RING_SIZE);
}
// create an iterator object whose size = unread data in buf
iterator data() const
{
boost::mutex::scoped_lock l(mtx);
return iterator(buf, beg % RING_SIZE, end - beg);
}
// free space in ring
unsigned size() const
{
boost::mutex::scoped_lock l(mtx);
return end - beg;
}
// frees up space in the ring (called once data has been read)
void adv(unsigned n)
{
boost::mutex::scoped_lock l(mtx);
beg += n;
}
// read data from a file descriptor into the buffer
// returns:
// -errno for error
// 0 for EOF
// > 0 for successful reads
int read(int fd)
{
mtx.lock();
unsigned free = RING_SIZE - (end - beg);
if (!free)
{
mtx.unlock();
return -ENOMEM; // must never get here
}
unsigned free_end = std::min(free, RING_SIZE - end % RING_SIZE);
unsigned free_beg = free - free_end;
iovec io[] = { { (char*)buf + end % RING_SIZE, free_end }, { (char*)buf, free_beg } };
mtx.unlock();
ssize_t n = readv(fd, io, sizeof io / sizeof *io);
if (n == -1)
return -errno;
mtx.lock();
end += n;
mtx.unlock();
return n;
}
};
//----------------------------------------------------------------
} // namespace ring
#endif