Interested in bounded buffer class for threaded producer/consumer communication?

Hi all, I seem to have come across a producer/consumer situation a few times now, and everytime I do, I seem to want a slightly different functionality for a bounded buffer-kind of class that is not generally provided. I am looking for such a class that is also efficient for large amounts of data, and so I wonder if there does already exists a class in boost to efficiently push work between producers and consumers when they are running in different threads, i.e., that is thread safe. Asio seems to have a lot of communicating mechanisms, but I did not yet come across such a mechanism (pipe/channel/lane/unidirectional stream/bounded buffer... I'll call it a 'lane' for now) between two threads. If it does not exist yet in boost, I wonder if there is a general interest for such a class. The situation, is that - one or more threads produce data 'packages' of some type T, until the buffer is full. - one or more other threads have to wait for the availability of these packages, and once available, get one and process them. Googling around there are numerous suggestions (e.g. blog entries) for an implementation of thread safe queues that synchronize writing/reading and wake readers when data is available. However, I think there are fewer implementations that both use a circular buffer and will also block the producer when the buffer is 'full', and even less that are efficient for large amounts of data. Typical examples for this situation occur in signal processing. For example, it could be that one thread reads packets of work from the hard disk or some other device, write it to the lane-object, while one or a few different other threads try to read packets from the lane-object for processing. In a particular case, I had a consumer which needed a certain amount of packets at a time. The lane holds a (circular) buffer of a given size, and if it is full, producing threads will be blocked. Consuming threads will be blocked if the buffer is empty. There is a bounded_buffer example given to implement such a thing with the circular buffer. It does exactly what I suggest, but I assume it is not so efficient for pushing large amounts of data over, as each individual object requires a push, thus a lock if implemented as in bounded_buffer, and involves unnecessary updates of the counters (involving a decrement() each time) on each push, which is not necessary if you know you are pushing a number of elements at once. I assume this could be fixed by wrapping the insert(range) method instead of the individual push functions. What I found most useful however, is when the 'main' interface of such a class consists of these methods: template<typename T> class lane { public: lane(size_t capacity); void write(const T& element); // (or push if you wish) void write(const T* elements, size_t n); // or as a data range void write_end(); bool read(const T& element); // false if write_end() has been called and empty() size_t read(const T* elements, size_t n); // returns n, unless write_end() has been called and is empty() size(), empty(), etc. } ..and that's basically how I implemented it right now. The lane is just like the circular buffer in boost, and owns the elements, and thus copies the data once on write to its own buffer and once on read. The write_end() method will change the state of the lane, such that blocked reading threads will no longer be blocked, i.e., it assumes the producer(s) is finished and the consumer should finish as well after the last data has been read. No element ever gets overwritten before being read. In my case I never needed it to be an actually "real" container, thus valid iterators and begin() and end()'s are not necessary. With such a class, setting up an efficient multi-threaded consumer/producer scenario, that can be easily end by either the producer or yet another separate (e.g., user interface) thread, is really easy. As said, especially in signal processing application (why it is somewhat low-leveler), this kind of class is very nice to have. I guess there are many flavours of such a 'lane', e.g. with deque's or circular_buffers, with or without owning data, with or without bounding the producer and with or without providing efficient large data pushers. In my case, the above interface seemed to have been the solution a few times, but I don't know how common it really is. So, I'm curious if this problem frequently arises, if it's solved already and/or if there would be interest in such a class. Regards, André Offringa

André Offringa wrote:
Hi all,
I seem to have come across a producer/consumer situation a few times now, and everytime I do, I seem to want a slightly different functionality for a bounded buffer-kind of class that is not generally provided. I am looking for such a class that is also efficient for large amounts of data, and so I wonder if there does already exists a class in boost to efficiently push work between producers and consumers when they are running in different threads, i.e., that is thread safe. Asio seems to have a lot of communicating mechanisms, but I did not yet come across such a mechanism (pipe/channel/lane/unidirectional stream/bounded buffer... I'll call it a 'lane' for now) between two threads.
If it does not exist yet in boost, I wonder if there is a general interest for such a class.
I used an example from circular_buffer library. Worked great and only a few minutes to insert into my app. Robert Ramey

On 12/13/2011 02:43 AM, Robert Ramey wrote:
André Offringa wrote:
Hi all,
I seem to have come across a producer/consumer situation a few times now, and everytime I do, I seem to want a slightly different functionality for a bounded buffer-kind of class that is not generally provided. I am looking for such a class that is also efficient for large amounts of data, and so I wonder if there does already exists a class in boost to efficiently push work between producers and consumers when they are running in different threads, i.e., that is thread safe. [..] I used an example from circular_buffer library. Worked great and only a few minutes to insert into my app.
Hi Robert, Thanks for your reply. Just to clarify, that example was actually the one I was referring to my mail: "There is a bounded_buffer example given to implement such a thing with the circular buffer. It does exactly what I suggest, but I assume it is not so efficient for pushing large amounts of data over, as each individual object requires a push, thus a lock implemented as in bounded_buffer, and involves unnecessary updates of the counters (involving a decrement() each time) on each push, which is not necessary if you know you are pushing a number of elements at once. I assume this could be fixed by wrapping the insert(range) method instead of the individual push functions." Apart from not providing an efficient push of a range, it misses the write_end() functionality that I mentioned. Hence the question if that would be something which is generally useful, or if it is just in my case. Eitherway, I agree that the bounded_buffer example is on itself already useful in many situations. Regards, André

boos.task has a bounded and a unbounded buffer - unfortunately I've to wait until boost.context will pass the mini-review before I can submit the refactored version of boost.task. Oliver -------- Original-Nachricht --------
Datum: Tue, 13 Dec 2011 13:23:26 +0100 Von: "André Offringa" <offringa@gmail.com> An: boost@lists.boost.org Betreff: Re: [boost] Interested in bounded buffer class for threaded producer/consumer communication?
André Offringa wrote:
Hi all,
I seem to have come across a producer/consumer situation a few times now, and everytime I do, I seem to want a slightly different functionality for a bounded buffer-kind of class that is not generally provided. I am looking for such a class that is also efficient for large amounts of data, and so I wonder if there does already exists a class in boost to efficiently push work between producers and consumers when they are running in different threads, i.e., that is
On 12/13/2011 02:43 AM, Robert Ramey wrote: thread
safe. [..] I used an example from circular_buffer library. Worked great and only a few minutes to insert into my app.
Hi Robert,
Thanks for your reply. Just to clarify, that example was actually the one I was referring to my mail:
"There is a bounded_buffer example given to implement such a thing with the circular buffer. It does exactly what I suggest, but I assume it is not so efficient for pushing large amounts of data over, as each individual object requires a push, thus a lock implemented as in bounded_buffer, and involves unnecessary updates of the counters (involving a decrement() each time) on each push, which is not necessary if you know you are pushing a number of elements at once. I assume this could be fixed by wrapping the insert(range) method instead of the individual push functions."
Apart from not providing an efficient push of a range, it misses the write_end() functionality that I mentioned. Hence the question if that would be something which is generally useful, or if it is just in my case.
Eitherway, I agree that the bounded_buffer example is on itself already useful in many situations.
Regards, André
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
-- Empfehlen Sie GMX DSL Ihren Freunden und Bekannten und wir belohnen Sie mit bis zu 50,- Euro! https://freundschaftswerbung.gmx.de

I seem to have come across a producer/consumer situation a few times now, and everytime I do, I seem to want a slightly different functionality for a bounded buffer-kind of class that is not generally provided. I am looking for such a class that is also efficient for large amounts of data, and so I wonder if there does already exists a class in boost to efficiently push work between producers and consumers when they are running in different threads, i.e., that is thread safe. Asio seems to have a lot of communicating mechanisms, but I did not yet come across such a mechanism (pipe/channel/lane/unidirectional stream/bounded buffer... I'll call it a 'lane' for now) between two threads.
If it does not exist yet in boost, I wonder if there is a general interest for such a class.
boost.lockfree contains some producer/consumer data structures, it has a different focus than your proposal, but it might be a building block for more high-level data structures for inter-thread communication. tim
participants (4)
-
André Offringa
-
Oliver Kowalke
-
Robert Ramey
-
Tim Blechmann