Now that I've looked at the implementation, it seems like the stream's async_write will use the executor bound to the first message found in the queue. This can yield surprising results. For instance:
cli.async_publish(..., asio::bind_executor(ex1, tok1)); cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1
I think having this limitation is dangerous, as it's not present in any other Asio-based library, and it's not enforced anywhere (as far as I've read).
I think that this restriction can be lifted by making the stream's async_write a child agent of async_run. That would imply changing async_sender::async_send (https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...) to push the message to the queue (as is today), then notifying a writer task (e.g. using a channel or a timer), then waiting for the writer task to complete the write (again using a channel or a timer). If I'm not mistaken, this is how Boost.Redis works.
You are correct that the second handler might complete on ex1, and from that perspective, the code doesn't fully adhere to typical Asio standards. The issue can indeed be resolved as you suggested, either by using a channel or a timer, or possibly even more efficiently by dispatching the message to the writer queue using the stream's executor via asio::dispatch.
However, conceptually, the right solution is still unclear. Technically, we understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor.
I think the point here is the parent-child relationship of the involved async agents. Read operations are children of async_run. When an async operation involving only a read, like async_receive, is invoked, it waits on its channel until the required message arrives. It is relying on an internal async operation, but it does not propagate its bound properties to it, because there's no parent-child relationship between them. The channel serves as an isolator. I fully agree on this being a good design. I'd advise you to apply this same logic to writing. At the moment, writing a message does not have a clear parent. If 2 messages are being written concurrently, the parent is chosen at random. In my opinion, writing should be symmetrical to reading: a child of async_run. As you do with reading, writing should never be invoked directly by async_publish, but maintained by async_run. This avoids spuriously propagating properties.
async_publish actually supports total cancellation. So, if you write
async_publish(..., asio::cancel_after(5s, cancellation_type::total));
it will work as you expect. Support for total cancellation type is quite complicated for PUBLISH messages and I've described the reasons on the Slack channel earlier. For the reference, I'll put the explanation here as well:
Oh, you're right. I debugged and got the (false) impression that total signals were being blocked by a filter. Thanks for pointing that out.
12. I've noticed that the implementation doesn't use asio::async_compose, but has its own version. Is there a reason for it? Also, note that cancellable_handler is implemented as always having a bound executor, regardless of what the underlying handler has (note that initiating an operation with bind_executor(ex_default, tok) and with tok is not strictly equivalent, even if get_associated_executor(tok, ex_default) yields the same value).
I wrote a rather lengthy explanation fo the reasons why we don't use async_compose in an PDF document shared by Klemens on the mailing list. Here's the link to it in case you've missed it:
I completely did, thanks for pointing it out. Note however that async_compose is not tied to "coroutine like" behavior - you can implement the same overloading scheme you currently have with async_compose, too. I agree that it's under-documented, and that doing custom cancellation handling is non-trivial, so you probably made the right choice.
cancellable_handler uses our internal tracking_executor type which is the result of the operation asio::prefer( asio::get_associated_executor(handler, ex), asio::execution::outstanding_work.tracked )
It's here to prevent handler's execution context from running out of job. If handler doesn't have an associated executor it will be dispatched using default mqtt_client executor and, to my understanding, the above tracking executor will prevent client's execution context to run out of job (although this is not strictly necessary).
Yes, I understand that, and yes, it's required for correctness. You
might consider using asio::executor_work_guard, too, which is
compatible with old-style executors.
My point here is that, if you want cancellable_handler to propagate
the original handler's executor, immediate executor and allocator, the
usual practice is to specialize asio::associated_executor,
asio::associated_immediate_executor, and asio::associated_allocator.
For instance, and supposing that cancellable_handler::handler is the
original completion handler passed by the user:
template
14. I've measured build times in the linked repo. Using clang-18 with C++23 on my machine, it takes 22s to build the sender in debug mode, and 30s in release mode. This is a little bit high IMO. Some observations here: - The client is unconditionally including Boost.Beast websocket, which causes a considerable overhead. I'd consider a way to forward-declare it or make it part of a separate header. - Instantiations of async functions are pretty heavyweight. asio::prepend is used a lot, which is not free. Did you experiment with/without it and measure the benefit? - The spirit headers/functions also appear in the flame graph (although I thought they would be much higher).
Have you considered optimizations of compile times?
We've already tried to minimize the inclusion of Boost.Beast stream (websocket) headers, but forward declarations didn't work well in this case. The code in connect_op uses if constexpr to generate the necessary boilerplate when the stream is a websocket, and unfortunately, we couldn't find a reasonable solution that avoids including the Boost.Beast stream headers.
Oh. I think this might be indicating that connect_op is too strongly
coupled to Boost.Beast websocket. I know that some guys in Boost are
developing another HTTP/websocket library, so you might end up with
messy code if this library ever gets traction.
I'd advise to split these chunks of code into a customizable trait.
You might choose two traits (tls_handshake_traits and
ws_connect_traits) or a single one (connect_traits). For instance:
// Trait declaration
template <class Stream>
struct ws_connect_traits : detail::no_ws_connect_traits {};
// Trait definition for Beast
template <class Stream>
struct ws_connect_traits
You are correct that the use of asio::prepend, consign, and append is the main contributor to the longer-than-desired compilation times. These constructs are fundamental to our code since we don't use async_compose, and we're unsure how to avoid them in these scenarios.
I'd advise to experiment with a callback wrapper that propagates
associated properties with asio::associator. For example:
// The type of handler to be passed to async operations. This replaces
asio::append and asio::prepend
template