I'm trying to write a simple ASIO send/receiver that uses message queueing. The goal is to have a number of virtual streams over a single TCP connection. This is pretty simple using callbacks, but I can't figure out the coroutines alternative. Here is my simplified sample code. Note that I'm using asio::experimental::promise the same way I would use std::promise which obviously doesn't work, but it illustrates what I'm trying to do. What is the simplest solution to my problem?
#pragma once
#include "asio.hpp"
#include "asio/co_spawn.hpp"
#include "asio/detached.hpp"
#include "asio/experimental/co_composed.hpp"
#include "asio/experimental/promise.hpp"
#include "asio/use_awaitable.hpp"
#include <coroutine>
#include <cstdint>
#include <map>
#include <memory>
#include <queue>
struct Message {
uint64_t id;
uint64_t streamId;
uint64_t priority;
};
class NetworkConnection {
public:
NetworkConnection(asio::io_context &ioContext)
: ioContext_(ioContext), sendTimer_(ioContext_),
receiveTimer_(ioContext_){};
asio::awaitable receive(uint64_t streamId) {
auto promise = std::make_shared<
asio::experimental::promise>();
auto future = promise->get_future();
receives_[streamId] = promise;
if (!receiving_) {
receiving_ = true;
co_spawn(ioContext_, startReceiving(), asio::detached);
}
co_return co_await future;
}
asio::awaitable<void> send(std::unique_ptr<Message> message) {
auto promise = std::make_shared();
auto future = promise->get_future();
sends_[message->id] = promise;
if (!sending_) {
sending_ = true;
co_spawn(ioContext_, startSending(), asio::detached);
}
co_await future;
co_return;
}
private:
asio::io_context &ioContext_;
asio::steady_timer receiveTimer_;
asio::steady_timer sendTimer_;
std::map>>
receives_;
std::map> sends_;
std::queue sendQueue_;
bool receiving_ = false;
bool sending_ = false;
void completeSend(uint64_t messageId) {
auto it = sends_.find(messageId);
if (it != sends_.end()) {
it->second->set_value(); // This resumes the send operation
sends_.erase(it); // Clean up
}
}
void completeReceive(std::unique_ptr<Message> message) {
auto it = receives_.find(message->streamId);
if (it != receives_.end()) {
it->second->set_value(
std::move(message)); // This resumes the send operation
sends_.erase(it); // Clean up
}
}
asio::awaitable<void> startReceiving() {
while (!receives_.empty()) {
// Simulate receiving a message
sendTimer_.expires_after(std::chrono::seconds(1));
co_await sendTimer_.async_wait(asio::use_awaitable);
auto message = std::make_unique<Message>();
message->id = 1;
message->streamId = 1;
message->priority = 1;
completeReceive(std::move(message));
}
receiving_ = false;
co_return;
}
asio::awaitable<void> startSending() {
while (!sends_.empty()) {
sendTimer_.expires_after(std::chrono::seconds(1));
auto message = std::move(sendQueue_.front());
sendQueue_.pop();
co_await sendTimer_.async_wait(asio::use_awaitable);
completeSend(message->id);
}
sending_ = false;
co_return;
}
};
class Stream {
public:
Stream(std::shared_ptr<NetworkConnection> connection)
: connection_(std::move(connection)) {}
asio::awaitable receive() {
co_return co_await connection_->receive(id_);
}
asio::awaitable<void> send(std::unique_ptr<Message> message) {
message->streamId = id_;
co_return co_await connection_->send(std::move(message));
}
private:
uint64_t id_;
std::shared_ptr<NetworkConnection> connection_;
};