What you want to achieve seems doable with: https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/reference/experime... There is an example of it in Asio repository as well. You might consider using the concurrent version if you are utilizing a multi-threaded executor. On Fri, Dec 15, 2023, 7:12 PM accelerator0099 via Boost-users < boost-users@lists.boost.org> wrote:
Consider the producer-consumer model:
producer -+- consumer1 +- consumer2 +- consumer3 +- consumer4 +- consumer5
The producer periodly push items into a queue, and those consumers wait on that queue for items
It's easy to implement in a synchronous, multi-threaded environment: the producer just locks the queue and push into it, and notify one consumer through a condition variable. Consumers waiting on that condition variable will be woken one by one, popping items and consume them
How to do those in a asynchronous environment?
Consumers async_pop() on that queue, posting their completion tokens into the executor. When there are items available, the executor notify one consumer by calling its completion token
Is it right?
How do the producer tell the executor "The asynchronous operation is done, You should call that completion token"?
My implemention of a possible async queue (based on timers):
#include
#include <deque> #include <iostream> using namespace std::literals; namespace io = boost::asio; using io::awaitable; auto& uawait = io::use_awaitable;
template<class D> awaitable<void> delay(const D& dur) { io::steady_timer tm{ co_await io::this_coro::executor }; tm.expires_after(dur); co_await tm.async_wait(uawait); }
template<class T> class aqueue { std::deque<T> mq; public: using reference = T&; using const_reference = const T&; size_t size() const noexcept { return mq.size(); } bool empty() const noexcept { return mq.empty(); }
awaitable<T> async_pop() { while (empty()) { co_await delay(1ms); } auto t = std::move(mq.front()); mq.pop_front(); co_return t; }
awaitable<void> async_push(T t) { mq.push_back(std::move(t)); co_return; } };
class consumer { public: const int id; aqueue<int>& queue;
consumer(int _i, aqueue<int>& _q) : id{ _i }, queue{ _q } {} awaitable<void> operator()() { std::cout << "Consumer " << id << " started\n"; auto i = co_await queue.async_pop(); std::cout << "Consumer " << id << " got " << i << '\n'; } };
int main() { io::io_context ctx; aqueue<int> aq; auto producer = [&]() -> awaitable<void> { std::cout << "Producer started\n"; for (int i{}; i != 5; ++i) { co_await delay(1s); co_await aq.async_push(i); } }; io::co_spawn(ctx, producer, io::detached); for (int i{}; i != 5; ++i) io::co_spawn(ctx, consumer{ i, aq }, io::detached); std::cout << "RUN\n"; ctx.run(); return 0; }
Is it a proper way to do so using timers? I think there should be a better way
_______________________________________________ Boost-users mailing list Boost-users@lists.boost.org https://lists.boost.org/mailman/listinfo.cgi/boost-users