Boost.ASIO C++ 20 coroutine equivalent of PPL task_completion_event or JavaScript Promise.resolve?
I'm trying to write a simple ASIO send/receiver that uses message queueing. The goal is to have a number of virtual streams over a single TCP connection. This is pretty simple using callbacks, but I can't figure out the coroutines alternative. Here is my simplified sample code. Note that I'm using asio::experimental::promise the same way I would use std::promise which obviously doesn't work, but it illustrates what I'm trying to do. What is the simplest solution to my problem? #pragma once #include "asio.hpp" #include "asio/co_spawn.hpp" #include "asio/detached.hpp" #include "asio/experimental/co_composed.hpp" #include "asio/experimental/promise.hpp" #include "asio/use_awaitable.hpp" #include <coroutine> #include <cstdint> #include <map> #include <memory> #include <queue> struct Message { uint64_t id; uint64_t streamId; uint64_t priority; }; class NetworkConnection { public: NetworkConnection(asio::io_context &ioContext) : ioContext_(ioContext), sendTimer_(ioContext_), receiveTimer_(ioContext_){}; asio::awaitable<std::unique_ptr<Message>> receive(uint64_t streamId) { auto promise = std::make_shared< asio::experimental::promise<std::unique_ptr<Message>>>(); auto future = promise->get_future(); receives_[streamId] = promise; if (!receiving_) { receiving_ = true; co_spawn(ioContext_, startReceiving(), asio::detached); } co_return co_await future; } asio::awaitable<void> send(std::unique_ptr<Message> message) { auto promise = std::make_shared<asio::experimental::promise<void>>(); auto future = promise->get_future(); sends_[message->id] = promise; if (!sending_) { sending_ = true; co_spawn(ioContext_, startSending(), asio::detached); } co_await future; co_return; } private: asio::io_context &ioContext_; asio::steady_timer receiveTimer_; asio::steady_timer sendTimer_; std::map<uint64_t, std::shared_ptr< asio::experimental::promise<std::unique_ptr<Message>>>> receives_; std::map<uint64_t, std::shared_ptr<asio::experimental::promise<void>>> sends_; std::queue<std::unique_ptr<Message>> sendQueue_; bool receiving_ = false; bool sending_ = false; void completeSend(uint64_t messageId) { auto it = sends_.find(messageId); if (it != sends_.end()) { it->second->set_value(); // This resumes the send operation sends_.erase(it); // Clean up } } void completeReceive(std::unique_ptr<Message> message) { auto it = receives_.find(message->streamId); if (it != receives_.end()) { it->second->set_value( std::move(message)); // This resumes the send operation sends_.erase(it); // Clean up } } asio::awaitable<void> startReceiving() { while (!receives_.empty()) { // Simulate receiving a message sendTimer_.expires_after(std::chrono::seconds(1)); co_await sendTimer_.async_wait(asio::use_awaitable); auto message = std::make_unique<Message>(); message->id = 1; message->streamId = 1; message->priority = 1; completeReceive(std::move(message)); } receiving_ = false; co_return; } asio::awaitable<void> startSending() { while (!sends_.empty()) { sendTimer_.expires_after(std::chrono::seconds(1)); auto message = std::move(sendQueue_.front()); sendQueue_.pop(); co_await sendTimer_.async_wait(asio::use_awaitable); completeSend(message->id); } sending_ = false; co_return; } }; class Stream { public: Stream(std::shared_ptr<NetworkConnection> connection) : connection_(std::move(connection)) {} asio::awaitable<std::unique_ptr<Message>> receive() { co_return co_await connection_->receive(id_); } asio::awaitable<void> send(std::unique_ptr<Message> message) { message->streamId = id_; co_return co_await connection_->send(std::move(message)); } private: uint64_t id_; std::shared_ptr<NetworkConnection> connection_; };
I'm trying to write a simple ASIO send/receiver that uses message queueing. The goal is to have a number of virtual streams over a single TCP connection. This is pretty simple using callbacks, but I can't figure out the coroutines alternative. Here is my simplified sample code. Note that I'm using asio::experimental::promise the same way I would use std::promise which obviously doesn't work, but it illustrates what I'm trying to do. What is the simplest solution to my problem?
Have you considered utilizing asio::experimental::basic_channel? https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/reference/experime... By the way, there is no synchronization mechanism in Asio akin to a future/promise pair, if you're open to external solutions, you might want to explore these options: https://github.com/ashtum/saf https://github.com/ashtum/oneshot Regards, Mohammad
participants (2)
-
Mohammad Nejati [ashtum]
-
Willem Mitchell