[asio] suitable chunked circular buffer implementation?
Hi folks I'm trying (in vain so far) to find a suitable circular_buffer that allows chunked reads/writes which I can use in boost asio. This is my use case: I have 2 threads, Thread 1 reads data from a socket (boost::asio::tp::tcp::socket) and writes it into a buffer (boost::asio::mutable_buffer) Thread 2 reads the buffer and parses out individual messages. Thread 2 shouldn't care about what Thread 1 is doing, specifically whether or not it has completed (or is in the process of) more writes than what Thread 2 has read. All Thread 2 should receive is a small packet of information (the interface being perhaps only iterator begin() and size_t size() ) relating to the chunk of data that was written into the buffer. Reading from the iterator should be safe (ie: it should wrap when it reaches the end of the buffer) Writing into the buffer shouldn't invalidate the iterators that were created previously (which were passed to the 2nd thread) I have an implementation from before I started using boost::asio, which is below. It's quite clearly not standard compliant, but it worked for me. Is there anything out there that works like I've described? Is there an alternative method which I should pursue rather than "chunked circular buffers"? Will I be able to create something like I've pasted below successfully to work with boost asio, or are there issues I'm unaware of? (The reason I ask this question is because I find it surprising that something like this doesn't exist already) Any and all help greatly appreciated. TIA Steve #ifndef _ring_buf_h_ #define _ring_buf_h_ #include <errno.h> #include <boost/thread/mutex.hpp> namespace ring { typedef unsigned char byte; static const uint32_t RING_SIZE = 2 * 1024 * 1024; // 2 meg class iterator { private: const byte* buf; int cur; size_t buf_size; public: iterator() : buf(), cur(), buf_size() {} iterator(const byte* _buf, int _cur, size_t _buf_size) : buf(_buf), cur(_cur), buf_size(_buf_size) {} const byte& operator*() const { return buf[cur % RING_SIZE]; } const byte& operator[](int n) const { return buf[(cur + n) % RING_SIZE]; } iterator& operator+=(int n) { cur += n; buf_size -= n; return *this; } iterator& operator++() { ++cur; --buf_size; return *this; } // post-inc iterator operator++(int) { iterator tmp(*this); ++cur; --buf_size; return tmp; } // pre-inc bool operator==(iterator b) const { return cur == b.cur; } bool operator!=(iterator b) const { return cur != b.cur; } size_t size() const { return buf_size; } int offset() const { return cur; } }; //---------------------------------------------------------------- inline iterator operator+(iterator a, int b) { return a += b; } inline iterator operator+(int b, iterator a) { return a += b; } class buffer { mutable boost::mutex mtx; byte buf[RING_SIZE]; unsigned beg; unsigned end; // may wrap, wrapping is blessed for unsigned types only public: buffer() : beg(), end() { memset(buf, 0, RING_SIZE); } // create an iterator object whose size = unread data in buf iterator data() const { boost::mutex::scoped_lock l(mtx); return iterator(buf, beg % RING_SIZE, end - beg); } // free space in ring unsigned size() const { boost::mutex::scoped_lock l(mtx); return end - beg; } // frees up space in the ring (called once data has been read) void adv(unsigned n) { boost::mutex::scoped_lock l(mtx); beg += n; } // read data from a file descriptor into the buffer // returns: // -errno for error // 0 for EOF // > 0 for successful reads int read(int fd) { mtx.lock(); unsigned free = RING_SIZE - (end - beg); if (!free) { mtx.unlock(); return -ENOMEM; // must never get here } unsigned free_end = std::min(free, RING_SIZE - end % RING_SIZE); unsigned free_beg = free - free_end; iovec io[] = { { (char*)buf + end % RING_SIZE, free_end }, { (char*)buf, free_beg } }; mtx.unlock(); ssize_t n = readv(fd, io, sizeof io / sizeof *io); if (n == -1) return -errno; mtx.lock(); end += n; mtx.unlock(); return n; } }; //---------------------------------------------------------------- } // namespace ring #endif
participants (1)
-
Steve Lorimer