
From the simplest example (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/hello_world_over_tls.htm...) it looks like setting up TLS requires defining some non-trivial template specializations. I'd like these to be documented. On the other hand, the specializations seem to be using only Boost.Asio types, rather than user-defined types. What would happen if two
Hi Ivica, First of all, thanks for submitting your library to Boost. I've been reading the discussion and reference and have some questions for you: 1. The limitation listed on https://spacetime.mireo.com/async-mqtt5/async_mqtt5/auto_reconnect.html about concealing configuration issues looks concerning, as it can be very frustrating for the user. As far as I know, Boost.Redis solved it by including logging as part of its API, and Boost.MySQL did it by providing an extra diagnostics output argument, populated on operation cancellation, that informs the user of potential causes to the problem. Neither of us are very happy with our solution, to be frank. But I think it's a point that might be worth solving before going into Boost. As an option, I have been looking into abusing Asio handler tracking (https://live.boost.org/doc/libs/1_86_0/doc/html/boost_asio/overview/core/han...) to log custom events. BOOST_ASIO_HANDLER_OPERATION might be a possibility. What are your thoughts regarding this? 2. Quoting the allocators section (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/allocato...): "[...] associates the allocator of the first packet in the queue to the low-level Boost.Asio function async_write on the transport layer". Does this mean that the queue might use the allocator bound to the first async_publish operation that is issued? If this is the case, wouldn't it be violating the requirement that all memory allocated by an associated allocator is deallocated before the async operation completes? 3. Quoting the executors section (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/executor...): "The same executor must execute mqtt_client::async_run and all the subsequent async_xxx operations". This sounds like an unusual restriction. What happens if it is violated? Wouldn't it make more sense to use the executor associated with async_run for intermediate handlers, and then the async_xxx associated executor for the final one? 4. I noticed that async_mqtt5 does not implement persistent and clean sessions. I'm no MQTT expert, so I can't evaluate whether these features are relevant or not - I just know that they exist. Is there a reason why you chose to not implement them? If so, I'd suggest adding these reasons somewhere in the docs. 5. According to what I read in the docs, message payloads might contain UTF-8 encoded text or just data blobs, depending on the payload_format_indicator property passed to publish. However, std::string is always used for payloads. As I understand it, if I want to send a binary blob, I need to copy my blob to a string (probably with a reinterpret_cast in the way), and then setting payload_format_indicator. Would it make sense to expose a simpler API taking a blob (e.g. a std::vector<unsigned char>) to make this use case safer? 6. I can't find any docs (other than the examples) on how to use TLS. libraries separately define these hooks because they need MQTT over TLS? Wouldn't it make more sense to provide an optional header providing these functions? 7. What's the use of the TlsContext type in mqtt_client? It looks very coupled to asio::ssl::stream. 8. I've noticed that async_receive returns with an error code whenever a reconnection occurs, which forces the user to re-subscribe to topics. This contrasts with how async_publish works, where reconnection is transparent to the user. Is there a reason for this difference? Would it make sense to implement automatic resubscription on reconnection? Regards, Ruben.

On Fri, Oct 18, 2024 at 5:56 PM Ruben Perez via Boost <boost@lists.boost.org> wrote:
5. According to what I read in the docs, message payloads might contain UTF-8 encoded text or just data blobs, depending on the payload_format_indicator property passed to publish. However, std::string is always used for payloads. As I understand it, if I want to send a binary blob, I need to copy my blob to a string (probably with a reinterpret_cast in the way), and then setting payload_format_indicator. Would it make sense to expose a simpler API taking a blob (e.g. a std::vector<unsigned char>) to make this use case safer?
These days, I'm passing std::span<const std::byte> for binary blobs. This a C++17 lib, so no std::span, but please do consider std::byte at least. --DD PS: I used https://github.com/tcbrindle/span in C++17. Maybe Boost has a span?

These days, I'm passing std::span<const std::byte> for binary blobs. This a C++17 lib, so no std::span, but please do consider std::byte at least. --DD
PS: I used https://github.com/tcbrindle/span in C++17. Maybe Boost has a span?
boost::core::span is C++11. I haven't looked at the implementation yet, but I'm assuming that owning types are being used for a reason.

On 18.10.2024., at 17:56, Ruben Perez <rubenperez038@gmail.com> wrote:
Hi Ivica,
First of all, thanks for submitting your library to Boost.
Thanks a lot Ruben for taking a look into the library and especially for your comments bellow!
I've been reading the discussion and reference and have some questions for you:
1. The limitation listed on https://spacetime.mireo.com/async-mqtt5/async_mqtt5/auto_reconnect.html about concealing configuration issues looks concerning, as it can be very frustrating for the user. As far as I know, Boost.Redis solved it by including logging as part of its API, and Boost.MySQL did it by providing an extra diagnostics output argument, populated on operation cancellation, that informs the user of potential causes to the problem. Neither of us are very happy with our solution, to be frank. But I think it's a point that might be worth solving before going into Boost. As an option, I have been looking into abusing Asio handler tracking (https://live.boost.org/doc/libs/1_86_0/doc/html/boost_asio/overview/core/han...) to log custom events. BOOST_ASIO_HANDLER_OPERATION might be a possibility. What are your thoughts regarding this?
We’ve had extensive internal discussions about logging. We’re fully aware that the lack of logging can be a major issue for users. For example, if you don’t properly set up CA storage for an SSL connection, the TLS handshake will continually fail, and the client will keep trying to reconnect. As a user, you’d have no indication of what went wrong. Throughout the library, we’ve been careful to ensure that "you don’t get for what you did’t pay," especially when it comes to performance. In terms of logging, this means we wanted to avoid any runtime overhead associated with logging if you choose not to use it, even simple `if` checks. For this reason, we didn’t want to use something like `bool logging` to enable or disable logging. Another option was to use a (defaulted) template parameter that would generate logging code if enabled. However, this cluttered the code significantly, as nearly every class would need an additional template parameter. Worse yet, in most cases where logging is needed — such as during reconnections — it was unclear what meaningful, helpful messages to log. The Stream template parameter’s interface can report a variety of errors, and without knowing the exact nature of the Stream, it’s not always possible to interpret them correctly. Using a macro like MQTT5_ENABLE_LOGGING might be the best option, but for now, we wanted to avoid reducing code readability by adding logging lines throughout the code. In the absence of a better solution, we recommend users add their own logging code directly into the async_mqtt5 source during development. It’s very far from ideal, but we couldn’t come up with a better approach.
2. Quoting the allocators section (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/allocato...): "[...] associates the allocator of the first packet in the queue to the low-level Boost.Asio function async_write on the transport layer". Does this mean that the queue might use the allocator bound to the first async_publish operation that is issued? If this is the case, wouldn't it be violating the requirement that all memory allocated by an associated allocator is deallocated before the async operation completes?
It's a bit more complicated. The internal queue is a std::vector with an explicit asio::recycling_allocator as the custom allocator. The queue contains references (not values) to the actual MQTT data, along with type-erased completion handlers. The actual MQTT data (packet) is stored in memory that was allocated using the allocator associated with the corresponding handler. When we call async_write on the underlying stream, we supply the entire write queue at once as a ConstBuffers parameter, effectively performing a scatter/gather write. The allocator from the first handler in the queue is associated with the completion handler of async_write. Once async_write completes, we clear the queue and complete each (outer) handler associated with the packets in the queue. The memory allocated for the MQTT packets is also released before the handler is invoked. As a result, all memory allocated by an associated allocator is deallocated before the asynchronous operation completes.
3. Quoting the executors section (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/executor...): "The same executor must execute mqtt_client::async_run and all the subsequent async_xxx operations". This sounds like an unusual restriction. What happens if it is violated? Wouldn't it make more sense to use the executor associated with async_run for intermediate handlers, and then the async_xxx associated executor for the final one?
The `async_run` function starts the internal stream read loop. The executor used for the read operation is either the one associated with the `async_run` completion handler or the default executor provided to the `mqtt_client` constructor. Asynchronous operations like `async_publish` are composed operations that involve both writing to stream and reading messages. These operations must be synchronized with the read loop. If we were to use a different executor for `async_publish` than the one used for the internal read loop, we would need to synchronize access to shared data (like read buffers) and, more importantly, to the underlying stream’s `async_read` and `async_write` operations. This is why the same executor must be used for both `async_run` and other asynchronous operations.
4. I noticed that async_mqtt5 does not implement persistent and clean sessions. I'm no MQTT expert, so I can't evaluate whether these features are relevant or not - I just know that they exist. Is there a reason why you chose to not implement them? If so, I'd suggest adding these reasons somewhere in the docs.
We always initiate the connection with Clean Start set to 0. This is due to the automatic reconnect functionality, which cannot be overridden. From the user's perspective, the mqtt_client interface behaves as though the underlying TCP connection is always "on." If the connection drops, the client will do everything it can to restore the session, which is the essence of Clean Start being set to false. Allowing Clean Start to be true would mean all pending async_publish operations would need to be canceled, leaving it up to the user to decide what to do with those packets. In most cases, the user would likely re-send them, especially when using QoS > 0, which is exactly what the client automatically handles when Clean Start is set to false. In short, we think that automatic reconnect and Clean Start = true are not logically compatible.
5. According to what I read in the docs, message payloads might contain UTF-8 encoded text or just data blobs, depending on the payload_format_indicator property passed to publish. However, std::string is always used for payloads. As I understand it, if I want to send a binary blob, I need to copy my blob to a string (probably with a reinterpret_cast in the way), and then setting payload_format_indicator. Would it make sense to expose a simpler API taking a blob (e.g. a std::vector<unsigned char>) to make this use case safer?
We use std::string instead of std::vector<unsigned char> for MQTT message payloads because, in practice, people often use strings (like JSON) as MQTT messages. This is also hinted at in the MQTT specification, which includes the payload_format_indicator with only two possible values: 0 and 1. A value of 0 means the format is unspecified, while 1 indicates a UTF-8 string. Since they gave special significance to UTF-8 strings, it made sense for us to use std::string as the more common format. However, you don’t need to explicitly set payload_format_indicator to 1 or 0, regardless of what you’re sending. The payload_format_indicator is an optional property that the broker may use to validate the payload. If you omit this property, the broker will treat the payload as an unspecified binary array and won’t perform any validation checks. Therefore, there's no need to explicitly set the payload_format_indicator to 0 in such cases. To address your related question—why we decided to use an owning type for the message payload—it’s due to the symmetry between async_publish and async_receive. In async_receive, the payload is obtained through the completion handler's arguments. When messages are received, they are stored (moved) into the Asio channel, and the underlying buffer used for stream reading is reused. Since messages might remain in the channel for some time (until the user reads them), their payloads are moved (or copied) away from the operating read buffer. Yes, async_publish involves an extra payload copy. We were aware that the typical pattern is to keep the buffer alive until the async operation completes. While this extra copy can be avoided at the cost of slightly more complex code, in our performance tests, we found no significant impact from the additional copy. This is likely because copying results in the entire packet occupying a contiguous block of memory, rather than being split into two parts, which would otherwise require scatter/gather writes. We believe this is related to L1 cache efficiency.
6. I can't find any docs (other than the examples) on how to use TLS. From the simplest example (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/hello_world_over_tls.htm...) it looks like setting up TLS requires defining some non-trivial template specializations. I'd like these to be documented. On the other hand, the specializations seem to be using only Boost.Asio types, rather than user-defined types. What would happen if two libraries separately define these hooks because they need MQTT over TLS? Wouldn't it make more sense to provide an optional header providing these functions?
First of all, you're correct—it seems we didn’t properly document the TLS customization points. The example you mentioned (hello_world_over_tls.cpp) includes code that shows what needs to be defined to use a boost::asio::ssl stream. The first mandatory customization point is a specialization of the tls_handshake_type template, which must contain two constexpr values: client and server. The second customization point is the assign_tls_sni function, which will be called when SNI (Server Name Indication) needs to be set. The TLS stream used by async_mqtt5 isn’t limited to boost::asio::ssl. In fact, we’ve designed it to work with both boost::asio::ssl and Botan Asio streams, to ensure we cover at least two different cases. In this setup, the TlsContext is fully opaque to mqtt_client, meaning that mqtt_client doesn’t make any assumptions about the TlsContext type other than it being move-constructible. The stream itself uses this context, and it must be compatible with the stream. We simply guarantee that the TlsStream instance will remain alive as long as the client is alive.
7. What's the use of the TlsContext type in mqtt_client? It looks very coupled to asio::ssl::stream.
As mentioned above, for boost::asio::ssl you would use boost::asio::ssl::context. For Botan TLS stream, there's equivalent context type.
8. I've noticed that async_receive returns with an error code whenever a reconnection occurs, which forces the user to re-subscribe to topics. This contrasts with how async_publish works, where reconnection is transparent to the user. Is there a reason for this difference? Would it make sense to implement automatic resubscription on reconnection?
Unlike the reconnect logic, we wanted to expose potential async_subscribe MQTT errors directly to the user. For example, an attempt to subscribe to a topic might result in an MQTT error like "Quota exceeded." There are nine different subscription error codes, and users may need to make different decisions based on the specific error. That’s why users must explicitly re-subscribe if they choose to do so after receiving an error code from the async_receive function.
Regards, Ruben.
With regards, Ivica Siladic

Hi Ivica, Thanks for your detailed answer. I understand and agree with most of your points. I'd like to discuss some of them further:
3. Quoting the executors section (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/executor...): "The same executor must execute mqtt_client::async_run and all the subsequent async_xxx operations". This sounds like an unusual restriction. What happens if it is violated? Wouldn't it make more sense to use the executor associated with async_run for intermediate handlers, and then the async_xxx associated executor for the final one?
The `async_run` function starts the internal stream read loop. The executor used for the read operation is either the one associated with the `async_run` completion handler or the default executor provided to the `mqtt_client` constructor.
Asynchronous operations like `async_publish` are composed operations that involve both writing to stream and reading messages. These operations must be synchronized with the read loop. If we were to use a different executor for `async_publish` than the one used for the internal read loop, we would need to synchronize access to shared data (like read buffers) and, more importantly, to the underlying stream’s `async_read` and `async_write` operations. This is why the same executor must be used for both `async_run` and other asynchronous operations.
Now that I've looked at the implementation, it seems like the stream's async_write will use the executor bound to the first message found in the queue. This can yield surprising results. For instance: cli.async_publish(..., asio::bind_executor(ex1, tok1)); cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1 I think having this limitation is dangerous, as it's not present in any other Asio-based library, and it's not enforced anywhere (as far as I've read). I think that this restriction can be lifted by making the stream's async_write a child agent of async_run. That would imply changing async_sender::async_send (https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...) to push the message to the queue (as is today), then notifying a writer task (e.g. using a channel or a timer), then waiting for the writer task to complete the write (again using a channel or a timer). If I'm not mistaken, this is how Boost.Redis works. Some follow-up questions: 9. I've written two small clients (https://github.com/anarthal/async-mqtt5-test). One simulates reading a couple of sensors, at different frequencies, and publishes measures to a topic. The other subscribes to the topics and prints the measures. In the sender, I've written code like this: // Wait loop auto next_tp = std::chrono::steady_clock::now(); while (true) { // Read the sensor double measure = sensor_to_read.read(); // Publish the measure. Don't co_await it, because we want to // keep reading the sensor even if the network is unavailable cli.async_publish<mqtt::qos_e::at_most_once>( std::string(sensor_name), std::to_string(measure), mqtt::retain_e::yes, mqtt::publish_props{}, [](error_code ec) { if (ec) std::cout << "Error during publish: " << ec << std::endl; } ); // Use a timer to wait until the next measure is due (omitted for brevity) } There are two things in my code that make me feel uncomfortable: a. I'd like to somehow detect and break from the loop if async_publish had an immediate error. I could use a channel to solve that. b. If network problems persist, resource consumption is going to increase until the system collapses. A ring-buffer strategy would be the most common solution here, but I don't see how to do it easily. I could use a channel as a counter semaphore to prevent more than N measurements from being in flight for each sensor. How would you write this use case? Are there alternative ways I haven't considered? 10. I find the broker function signature (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/brokers....) surprising. Is there a reason for that particular signature, instead of a vector<string>, for example? 11. Per-operation cancellation in async_publish may yield surprising results. For instance: cli.async_publish(..., asio::cancel_after(5s)); Will end up tearing down the entire client. I'm not saying this is wrong (it actually makes sense), but it can be surprising. As an alternative, if you end up implementing my suggestion for (3), you can make async_publish support total cancellation (as writing is now a child of async_run). I'd also consider stopping the potential retransmission of the PUBLISH packet for QOS0 (and maybe QOS1) messages for which a cancellation has been requested (as you do in async_subscribe, for instance). 12. I've noticed that the implementation doesn't use asio::async_compose, but has its own version. Is there a reason for it? Also, note that cancellable_handler is implemented as always having a bound executor, regardless of what the underlying handler has (note that initiating an operation with bind_executor(ex_default, tok) and with tok is not strictly equivalent, even if get_associated_executor(tok, ex_default) yields the same value). 13. I've noticed the usage of std::vector (as opposed to boost::core::span) in many APIs (e.g. https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/async_su...). Is it for safety during deferred initiation? 14. I've measured build times in the linked repo. Using clang-18 with C++23 on my machine, it takes 22s to build the sender in debug mode, and 30s in release mode. This is a little bit high IMO. Some observations here: - The client is unconditionally including Boost.Beast websocket, which causes a considerable overhead. I'd consider a way to forward-declare it or make it part of a separate header. - Instantiations of async functions are pretty heavyweight. asio::prepend is used a lot, which is not free. Did you experiment with/without it and measure the benefit? - The spirit headers/functions also appear in the flame graph (although I thought they would be much higher). Have you considered optimizations of compile times? Kind regards, Ruben.

Hi Ruben, thanks a lot again for you comments and questions. Our comments are inlined:
On 19.10.2024., at 15:08, Ruben Perez <rubenperez038@gmail.com> wrote:
Hi Ivica,
Thanks for your detailed answer. I understand and agree with most of your points. I'd like to discuss some of them further:
3. Quoting the executors section (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/executor...): "The same executor must execute mqtt_client::async_run and all the subsequent async_xxx operations". This sounds like an unusual restriction. What happens if it is violated? Wouldn't it make more sense to use the executor associated with async_run for intermediate handlers, and then the async_xxx associated executor for the final one?
The `async_run` function starts the internal stream read loop. The executor used for the read operation is either the one associated with the `async_run` completion handler or the default executor provided to the `mqtt_client` constructor.
Asynchronous operations like `async_publish` are composed operations that involve both writing to stream and reading messages. These operations must be synchronized with the read loop. If we were to use a different executor for `async_publish` than the one used for the internal read loop, we would need to synchronize access to shared data (like read buffers) and, more importantly, to the underlying stream’s `async_read` and `async_write` operations. This is why the same executor must be used for both `async_run` and other asynchronous operations.
Now that I've looked at the implementation, it seems like the stream's async_write will use the executor bound to the first message found in the queue. This can yield surprising results. For instance:
cli.async_publish(..., asio::bind_executor(ex1, tok1)); cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1
I think having this limitation is dangerous, as it's not present in any other Asio-based library, and it's not enforced anywhere (as far as I've read).
I think that this restriction can be lifted by making the stream's async_write a child agent of async_run. That would imply changing async_sender::async_send (https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...) to push the message to the queue (as is today), then notifying a writer task (e.g. using a channel or a timer), then waiting for the writer task to complete the write (again using a channel or a timer). If I'm not mistaken, this is how Boost.Redis works.
You are correct that the second handler might complete on ex1, and from that perspective, the code doesn't fully adhere to typical Asio standards. The issue can indeed be resolved as you suggested, either by using a channel or a timer, or possibly even more efficiently by dispatching the message to the writer queue using the stream's executor via asio::dispatch. However, conceptually, the right solution is still unclear. Technically, we understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor. This leads to a classic problem with asynchronous "services" that need to execute internal tasks using a strand-like executor, ensuring serialized, lock-free behavior. In such cases, it's necessary to call asio::post or asio::dispatch from the initiation function to the inner code, perform the work within the strand, and finally invoke the handler using asio::dispatch. However, Asio's operates under the assumption that single-threaded code shouldn't pay the performance cost of multithreaded code. In other words, if we call a series of async functions on Asio objects from the same thread, we are guaranteed to avoid any executor switches during execution. If, however, we intend to use multithreaded code with Asio async objects, it is up to us, the users, to explicitly dispatch the async function calls within a strand, whether implicit or explicit. In our design, mqtt_client essentially operates within an implicit strand. Regarding your original question—why "The same executor must execute mqtt_client::async_run and all subsequent async operations"—this is why we can take the first executor from the queue. In fact, all executors of the handlers in the queue are the same, including the stream's executor. This would explain why your example would be considered ill-formed based on the documentation.
Some follow-up questions:
9. I've written two small clients (https://github.com/anarthal/async-mqtt5-test). One simulates reading a couple of sensors, at different frequencies, and publishes measures to a topic. The other subscribes to the topics and prints the measures. In the sender, I've written code like this:
// Wait loop auto next_tp = std::chrono::steady_clock::now(); while (true) { // Read the sensor double measure = sensor_to_read.read();
// Publish the measure. Don't co_await it, because we want to // keep reading the sensor even if the network is unavailable cli.async_publish<mqtt::qos_e::at_most_once>( std::string(sensor_name), std::to_string(measure), mqtt::retain_e::yes, mqtt::publish_props{}, [](error_code ec) { if (ec) std::cout << "Error during publish: " << ec << std::endl; } );
// Use a timer to wait until the next measure is due (omitted for brevity) }
There are two things in my code that make me feel uncomfortable: a. I'd like to somehow detect and break from the loop if async_publish had an immediate error. I could use a channel to solve that. b. If network problems persist, resource consumption is going to increase until the system collapses. A ring-buffer strategy would be the most common solution here, but I don't see how to do it easily. I could use a channel as a counter semaphore to prevent more than N measurements from being in flight for each sensor.
How would you write this use case? Are there alternative ways I haven't considered?
a) A quick and dirty solution could look like this: bool should_continue = true; while (should_continue) { // Read the sensor double measure = sensor_to_read.read(); // Publish the measurement. Don't co_await the result, as we want // to continue reading the sensor even if the network is unavailable. cli.async_publish<mqtt::qos_e::at_most_once>( std::string(sensor_name), std::to_string(measure), mqtt::retain_e::yes, mqtt::publish_props{}, [&should_continue, &timer](error_code ec) { if (ec) { should_continue = false; timer.cancel(); std::cout << "Error during publish: " << ec << std::endl; } } ); // Use a timer to wait until the next measurement is due (omitted for brevity) } However, in a more realistic scenario, you likely wouldn't use a while (true) loop, since it would prevent clean program exit. A better approach would be to run a "recursive" timer, where async_publish is called from within the timer's completion handler. In the event of an error, the completion handler for async_publish could call cancel() on the timer if needed. b) Resource consumption will not grow indefinitely. The client will attempt to send up to 65,535 packets, after which async_publish will immediately return an error code (pid_overrun), as it will run out of Packet Identifiers. If you want to impose stricter limits, you can implement a more complex solution on the user's side. For example, you could track the number of bytes (or messages) sent by incrementing a counter each time you call async_publish and decrementing it in the completion handler of async_publish. Before attempting to publish, you should check whether the number of in-flight messages has reached your limit, and if so, simply skip publishing until some messages are acknowledged.
10. I find the broker function signature (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/brokers....) surprising. Is there a reason for that particular signature, instead of a vector<string>, for example?
In the vast majority of cases, there will be only one broker, so creating a std::vector<std::string> with a single element would introduce unnecessary overhead. If multiple brokers are involved, they are either configured in a .conf file or hard-coded. In the case of a configuration file, it's simpler to pass the entire broker string directly to the mqtt_client rather than parsing the string, splitting it into substrings, and filling a vector. If the brokers are hard-coded, then this is irrelevant anyway.
11. Per-operation cancellation in async_publish may yield surprising results. For instance:
cli.async_publish(..., asio::cancel_after(5s));
Will end up tearing down the entire client. I'm not saying this is wrong (it actually makes sense), but it can be surprising.
As an alternative, if you end up implementing my suggestion for (3), you can make async_publish support total cancellation (as writing is now a child of async_run). I'd also consider stopping the potential retransmission of the PUBLISH packet for QOS0 (and maybe QOS1) messages for which a cancellation has been requested (as you do in async_subscribe, for instance).
async_publish actually supports total cancellation. So, if you write async_publish(..., asio::cancel_after(5s, cancellation_type::total)); it will work as you expect. Support for total cancellation type is quite complicated for PUBLISH messages and I've described the reasons on the Slack channel earlier. For the reference, I'll put the explanation here as well: In MQTT, the implementation of Quality of Service (QoS), which can be 0, 1, or 2, introduces additional considerations. For QoS 0, no acknowledgment from the broker is expected. However, with QoS 1, the broker must send a PUBACK packet. For QoS 2, the client sends a PUBLISH packet, waits for the broker’s PUBREC, then sends PUBREL, and finally waits for the broker’s PUBCOMP packet. Now, if the client sends a PUBLISH packet with QoS 2, it must respond with PUBREL after receiving PUBREC from the broker. If the client fails to send PUBREL, the broker will consider the connection corrupted and will close the TCP link. This means that even if a single async_publish operation is canceled with the total cancellation type, the client must continue to communicate with the broker according to MQTT protocol requirements. Otherwise, the connection could close, causing all other messages to be dropped and requiring them to be resent. A similar issue occurs with Boost.Beast’s WebSocket connections: to support total cancellation of an asynchronous WebSocket operation, specific bytes may need to be sent even after cancellation. This behavior necessitates a different approach to cancellation. The outer handler's cancellation slot (if any) cannot be directly propagated to the stream's async_write. Instead, stream cancellation is managed independently of the outer handler, which only supports the terminal cancellation type. This occurs, for example, when the client is stopped with cancel() or when mqtt_client's destructor is called. When you cancel an async_publish handler with the total cancellation type, the operation intercepts the cancellation, marks itself as "canceled," returns an error code of operation_aborted to the outer handler, and continues to exchange messages with the broker as required by the protocol.
12. I've noticed that the implementation doesn't use asio::async_compose, but has its own version. Is there a reason for it? Also, note that cancellable_handler is implemented as always having a bound executor, regardless of what the underlying handler has (note that initiating an operation with bind_executor(ex_default, tok) and with tok is not strictly equivalent, even if get_associated_executor(tok, ex_default) yields the same value).
I wrote a rather lengthy explanation fo the reasons why we don't use async_compose in an PDF document shared by Klemens on the mailing list. Here's the link to it in case you've missed it: https://www.mireo.com/cdn/Async.MQTT5.pdf cancellable_handler uses our internal tracking_executor type which is the result of the operation asio::prefer( asio::get_associated_executor(handler, ex), asio::execution::outstanding_work.tracked ) It's here to prevent handler's execution context from running out of job. If handler doesn't have an associated executor it will be dispatched using default mqtt_client executor and, to my understanding, the above tracking executor will prevent client's execution context to run out of job (although this is not strictly necessary).
13. I've noticed the usage of std::vector (as opposed to boost::core::span) in many APIs (e.g. https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/async_su...). Is it for safety during deferred initiation?
To be honest, we completely overlooked the existence of boost::core::span. Since std::span is part of C++20 and we wanted to maintain support for C++17, we opted to use const std::vector&. However, it should be perfectly safe to replace all instances of const std::vector& with boost::core::span.
14. I've measured build times in the linked repo. Using clang-18 with C++23 on my machine, it takes 22s to build the sender in debug mode, and 30s in release mode. This is a little bit high IMO. Some observations here: - The client is unconditionally including Boost.Beast websocket, which causes a considerable overhead. I'd consider a way to forward-declare it or make it part of a separate header. - Instantiations of async functions are pretty heavyweight. asio::prepend is used a lot, which is not free. Did you experiment with/without it and measure the benefit? - The spirit headers/functions also appear in the flame graph (although I thought they would be much higher).
Have you considered optimizations of compile times?
We've already tried to minimize the inclusion of Boost.Beast stream (websocket) headers, but forward declarations didn't work well in this case. The code in connect_op uses if constexpr to generate the necessary boilerplate when the stream is a websocket, and unfortunately, we couldn't find a reasonable solution that avoids including the Boost.Beast stream headers. You are correct that the use of asio::prepend, consign, and append is the main contributor to the longer-than-desired compilation times. These constructs are fundamental to our code since we don't use async_compose, and we're unsure how to avoid them in these scenarios. As you pointed out, Boost.Spirit doesn't significantly impact compilation times. This is because we use it in a very "lightweight" manner that avoids generating too many expression template instantiations. We’ve done already everything that could come up to our minds to reduce compilation time. There’s certainly some more room but we just don’t know at the moment how to do it.
Kind regards, Ruben.
With regards, Ivica SIladic

Throughout the library, we’ve been careful to ensure that "you don’t get for what you did’t pay," especially when it comes to performance. In terms of logging, this means we wanted to avoid any runtime overhead associated with logging if you choose not to use it, even simple `if` checks.
I strongly disagree with you here, in pretty much every scenario I can think of in terms of IO an if check is costless, this includes: resolve, connect, handshake, reads and writes. You can't even measure it since IO operations are orders of magnitude more costly. On the other hand, lack of logging results in a lot of frustration not only for your users but also for the authors as bug reports won't contain enough information for you to investigate the problem. There are just so many things that can go wrong, freeze etc. In the absence of a better solution, we recommend users add their own
logging code directly into the async_mqtt5 source during development. It’s very far from ideal, but we couldn’t come up with a better approach.
I don't think it is reasonable to expect users to even know where to add log lines in a 15k loc codebase (that uses callback to implement its composed operations). This will result in a lot of frustration. It would also only be possible as long as this library is kept header-only and I think it shouldn't. What you could do here is to introduce a BOOST_MQTT5_ENABLE_BOOST_LOG macro and add the lines yourself to the code. However, conceptually, the right solution is still unclear. Technically, we
understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor.
AFAICS this leads into inconsistency since async_read uses the conn.async_run executor while the async_write will use the async_publish executor. That is why I argue that the bound executor in async_publish should only be used to deliver the completion while everything else runs on the connection executor, not on the async_run bound executor, which should also be only used to deliver the completion. Marcelo

Hi Marcelo,
On 20.10.2024., at 12:36, Marcelo Zimbres Silva <mzimbres@gmail.com> wrote:
Throughout the library, we’ve been careful to ensure that "you don’t get for what you did’t pay," especially when it comes to performance. In terms of logging, this means we wanted to avoid any runtime overhead associated with logging if you choose not to use it, even simple `if` checks.
I strongly disagree with you here, in pretty much every scenario I can think of in terms of IO an if check is costless, this includes: resolve, connect, handshake, reads and writes. You can't even measure it since IO operations are orders of magnitude more costly.
On the other hand, lack of logging results in a lot of frustration not only for your users but also for the authors as bug reports won't contain enough information for you to investigate the problem. There are just so many things that can go wrong, freeze etc.
In the absence of a better solution, we recommend users add their own logging code directly into the async_mqtt5 source during development. It’s very far from ideal, but we couldn’t come up with a better approach.
I don't think it is reasonable to expect users to even know where to add log lines in a 15k loc codebase (that uses callback to implement its composed operations). This will result in a lot of frustration.
It would also only be possible as long as this library is kept header-only and I think it shouldn't.
What you could do here is to introduce a BOOST_MQTT5_ENABLE_BOOST_LOG macro and add the lines yourself to the code.
Let me be very clear: the lack of logging is definitely not a good thing, and as you mentioned, debugging network I/O issues can be extremely frustrating. Logging is crucial in such cases. I just want logging to be a NOOP if the developer chooses not to enable it. This can be done quite easily with macros, for instance. However, our goal is to find a more elegant, C++-style solution that achieves the same effect.
However, conceptually, the right solution is still unclear. Technically, we understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor.
AFAICS this leads into inconsistency since async_read uses the conn.async_run executor while the async_write will use the async_publish executor. That is why I argue that the bound executor in async_publish should only be used to deliver the completion while everything else runs on the connection executor, not on the async_run bound executor, which should also be only used to deliver the completion.
Ok, understood.
Marcelo
Thanks, Ivica Siladic

On Sun, Oct 20, 2024, 20:13 Ivica Siladic via Boost <boost@lists.boost.org> wrote:
I just want logging to be a NOOP if the developer chooses not to enable it. This can be done quite easily with macros, for instance. However, our goal is to find a more elegant, C++-style solution that achieves the same effect.
Perhaps you could expose the no-op logging function as a weak symbol that the user can override.

On Sun, Oct 20, 2024 at 11:13 AM Ivica Siladic via Boost < boost@lists.boost.org> wrote:
What you could do here is to introduce a BOOST_MQTT5_ENABLE_BOOST_LOG macro
I don't like configuration macros. Because these create different libraries, effectively creating separate object files which have the potential to be incompatible at link-time. It is simpler for there to be only one version of the library with no configurations.
our goal is to find a more elegant, C++-style solution that achieves the same effect.
How do you feel about this? template< class Stream, class Logger = int, class TlsContext = std::monostate > class mqtt_client { Logger log_; // could use empty base optimization here public: void cancel() { if constexpr(is_logger<Logger>) log_("cancelling"); auto impl = _impl; _impl = impl->dup(); impl->cancel(); } ... To be honest though, I think you are being unnecessarily strict with the "zero overhead." There is nothing wrong with adding a single if statement to each function for logging. Really now, are you worried about the performance of adding a branch to a function which invokes an initiating function? I bet if you measure it, you will change your mind. Thanks

On Sun, 20 Oct 2024 at 20:32, Vinnie Falco <vinnie.falco@gmail.com> wrote: I don't like configuration macros. Because these create different
libraries, effectively creating separate object files which have the potential to be incompatible at link-time. It is simpler for there to be only one version of the library with no configurations.
Yes, it is bad and won't work for a library that has to be compiled (and I hope this will be one of the conditions imposed if this lib gets accepted). But IMO even that is better than letting users with no clue about what is happening. To the bare minimum I would log, resolving, connecting, handshaking, reading and writing, for example log("resolving"); obj.async_resolve(...) if (ec) { log("resolve failt with:" , ec); return; }
our goal is to find a more elegant, C++-style solution that achieves the same effect.
<snip>
This is the approach I took in Boost.Redis: https://www.boost.org/doc/libs/1_86_0/libs/redis/doc/html/classboost_1_1redi... The template parameter is an annoyance to deal with in type-erased connection types where a definite type is required. It is also questionable to pollute the public API with log types just because we are too afraid of upsetting users by logging into a global state. We have a log library in Boost so why not default to it? Marcelo

On Sun, Oct 20, 2024 at 12:15 PM Marcelo Zimbres Silva <mzimbres@gmail.com> wrote:
It is also questionable to pollute the public API with log types
I don't characterize a template parameter enabling opt-in to a commonly requested feature as "pollution."
...we are too afraid of upsetting users by logging into a global state
A global solution increases the run-time costs even when not logging, which the mqtt5 authors find objectionable. I would in fact prefer a global solution, because I think that paying for a conditional to evaluate a pointer for being not-null is negligible compared to the other costs. For example I would prefer this: using log_fn_type = void (*)(std::string_view); log_fn_type gLogger = nullptr; and inside mqtt5 member functions: if( gLogger != nullptr ) (*gLogger)("log text"); We have a log library in Boost so why not default to it?
mqtt5 already has enough dependencies (and would benefit from trimming some of them away). Users who are integrating mqtt5 into their existing code base might prefer a way to use the logging infrastructure which is already there. Thanks

On Sun, Oct 20, 2024, at 9:15 PM, Marcelo Zimbres Silva via Boost wrote:
This is the approach I took in Boost.Redis: https://www.boost.org/doc/libs/1_86_0/libs/redis/doc/html/classboost_1_1redi...
Oh cool. I played around with Redis the other day and failed to figure this out. Yes, not-logging by default really hurts adoption, at least in my case
[upsetting users by logging into a global state.] We have a log library in Boost so why not default to it?
Every single time I try to use Boost Log I run into some permutation of the same three link problems. Boost Log is nice when you need the kitchen sink. I would never want any library to default to it, because it's way too heavy. I would not want to burden any library with the default of Boost Log. just my usual $0.02

Hi Seth,
On 20.10.2024., at 23:52, Seth via Boost <boost@lists.boost.org> wrote:
On Sun, Oct 20, 2024, at 9:15 PM, Marcelo Zimbres Silva via Boost wrote:
This is the approach I took in Boost.Redis: https://www.boost.org/doc/libs/1_86_0/libs/redis/doc/html/classboost_1_1redi...
Oh cool. I played around with Redis the other day and failed to figure this out. Yes, not-logging by default really hurts adoption, at least in my case
Boost.Redis has the cleanest logging approach I've encountered so far, and we will most likely adopt it in Async.MQTT5 as well. The only concern I have is the potential impact on compilation times, which could increase significantly since we would need to templatize almost all existing classes by the logger type. To mitigate this, we might explore using a type-erased logger with the same interface required by Boost.Redis, which could help reduce the compilation overhead. We'll need to experiment and see how it works in practice.
[upsetting users by logging into a global state.] We have a log library in Boost so why not default to it?
Every single time I try to use Boost Log I run into some permutation of the same three link problems. Boost Log is nice when you need the kitchen sink. I would never want any library to default to it, because it's way too heavy. I would not want to burden any library with the default of Boost Log.
just my usual $0.02
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
Thanks, Ivica Siladic

On Sun, 20 Oct 2024 at 23:53, Seth via Boost <boost@lists.boost.org> wrote:
On Sun, Oct 20, 2024, at 9:15 PM, Marcelo Zimbres Silva via Boost wrote:
This is the approach I took in Boost.Redis:
https://www.boost.org/doc/libs/1_86_0/libs/redis/doc/html/classboost_1_1redi...
Oh cool. I played around with Redis the other day and failed to figure this out. Yes, not-logging by default really hurts adoption, at least in my case
Thanks for the feedback. I agree, logging by default avoids a lot of frustration for newcomers. I will try to change that for the next release, see https://github.com/boostorg/redis/issues/219. Marcelo

Now that I've looked at the implementation, it seems like the stream's async_write will use the executor bound to the first message found in the queue. This can yield surprising results. For instance:
cli.async_publish(..., asio::bind_executor(ex1, tok1)); cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1
I think having this limitation is dangerous, as it's not present in any other Asio-based library, and it's not enforced anywhere (as far as I've read).
I think that this restriction can be lifted by making the stream's async_write a child agent of async_run. That would imply changing async_sender::async_send (https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...) to push the message to the queue (as is today), then notifying a writer task (e.g. using a channel or a timer), then waiting for the writer task to complete the write (again using a channel or a timer). If I'm not mistaken, this is how Boost.Redis works.
You are correct that the second handler might complete on ex1, and from that perspective, the code doesn't fully adhere to typical Asio standards. The issue can indeed be resolved as you suggested, either by using a channel or a timer, or possibly even more efficiently by dispatching the message to the writer queue using the stream's executor via asio::dispatch.
However, conceptually, the right solution is still unclear. Technically, we understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor.
I think the point here is the parent-child relationship of the involved async agents. Read operations are children of async_run. When an async operation involving only a read, like async_receive, is invoked, it waits on its channel until the required message arrives. It is relying on an internal async operation, but it does not propagate its bound properties to it, because there's no parent-child relationship between them. The channel serves as an isolator. I fully agree on this being a good design. I'd advise you to apply this same logic to writing. At the moment, writing a message does not have a clear parent. If 2 messages are being written concurrently, the parent is chosen at random. In my opinion, writing should be symmetrical to reading: a child of async_run. As you do with reading, writing should never be invoked directly by async_publish, but maintained by async_run. This avoids spuriously propagating properties.
async_publish actually supports total cancellation. So, if you write
async_publish(..., asio::cancel_after(5s, cancellation_type::total));
it will work as you expect. Support for total cancellation type is quite complicated for PUBLISH messages and I've described the reasons on the Slack channel earlier. For the reference, I'll put the explanation here as well:
Oh, you're right. I debugged and got the (false) impression that total signals were being blocked by a filter. Thanks for pointing that out.
12. I've noticed that the implementation doesn't use asio::async_compose, but has its own version. Is there a reason for it? Also, note that cancellable_handler is implemented as always having a bound executor, regardless of what the underlying handler has (note that initiating an operation with bind_executor(ex_default, tok) and with tok is not strictly equivalent, even if get_associated_executor(tok, ex_default) yields the same value).
I wrote a rather lengthy explanation fo the reasons why we don't use async_compose in an PDF document shared by Klemens on the mailing list. Here's the link to it in case you've missed it:
I completely did, thanks for pointing it out. Note however that async_compose is not tied to "coroutine like" behavior - you can implement the same overloading scheme you currently have with async_compose, too. I agree that it's under-documented, and that doing custom cancellation handling is non-trivial, so you probably made the right choice.
cancellable_handler uses our internal tracking_executor type which is the result of the operation asio::prefer( asio::get_associated_executor(handler, ex), asio::execution::outstanding_work.tracked )
It's here to prevent handler's execution context from running out of job. If handler doesn't have an associated executor it will be dispatched using default mqtt_client executor and, to my understanding, the above tracking executor will prevent client's execution context to run out of job (although this is not strictly necessary).
Yes, I understand that, and yes, it's required for correctness. You might consider using asio::executor_work_guard, too, which is compatible with old-style executors. My point here is that, if you want cancellable_handler to propagate the original handler's executor, immediate executor and allocator, the usual practice is to specialize asio::associated_executor, asio::associated_immediate_executor, and asio::associated_allocator. For instance, and supposing that cancellable_handler::handler is the original completion handler passed by the user: template <class Handler, typename DefaultCandidate> struct associated_executor<cancellable_handler<Handler>, DefaultCandidate> : associated_executor<Handler, DefaultCandidate> { static typename associated_executor<Handler, DefaultCandidate>::type get( const cancellable_handler<Handler>& h ) noexcept { return associated_executor<Handler, DefaultCandidate>::get(h.handler); } static auto get(const cancellable_handler<Handler>& h, const DefaultCandidate& c) { return associated_executor<Handler, DefaultCandidate>::get(h.handler, c); } }; Inheriting from associated_executor<Handler, DefaultCandidate> has the effect that if the original handler didn't have an associated executor, cancellable_handler won't have an associated executor, either. There are some if constexpr's in Asio taking advantage of this to make small optimizations. It probably doesn't make a difference in your case though, so feel free to ignore my comment if you think it's not worth it.
14. I've measured build times in the linked repo. Using clang-18 with C++23 on my machine, it takes 22s to build the sender in debug mode, and 30s in release mode. This is a little bit high IMO. Some observations here: - The client is unconditionally including Boost.Beast websocket, which causes a considerable overhead. I'd consider a way to forward-declare it or make it part of a separate header. - Instantiations of async functions are pretty heavyweight. asio::prepend is used a lot, which is not free. Did you experiment with/without it and measure the benefit? - The spirit headers/functions also appear in the flame graph (although I thought they would be much higher).
Have you considered optimizations of compile times?
We've already tried to minimize the inclusion of Boost.Beast stream (websocket) headers, but forward declarations didn't work well in this case. The code in connect_op uses if constexpr to generate the necessary boilerplate when the stream is a websocket, and unfortunately, we couldn't find a reasonable solution that avoids including the Boost.Beast stream headers.
Oh. I think this might be indicating that connect_op is too strongly coupled to Boost.Beast websocket. I know that some guys in Boost are developing another HTTP/websocket library, so you might end up with messy code if this library ever gets traction. I'd advise to split these chunks of code into a customizable trait. You might choose two traits (tls_handshake_traits and ws_connect_traits) or a single one (connect_traits). For instance: // Trait declaration template <class Stream> struct ws_connect_traits : detail::no_ws_connect_traits {}; // Trait definition for Beast template <class Stream> struct ws_connect_traits<boost::beast::websocket::stream<Stream>> { template <class CompletionToken> auto async_handshake(boost::beast::websocket::stream<Stream>& ws, const authority_path& ap, CompletionToken&&) { // what you currently have in do_ws_handshake } }; You can now split the trait specialization to a separate, optional websocket.hpp header, thus making Beast an optional peer dependency (instead of a hard one). In this header, you can provide helper typedefs to make things less verbose. E.g. using ws_client = mqtt_client<boost::beast::websocket::stream<boost::asio::ip::tcp>>; If you combine this with my comment on placing TLS hooks in optional headers, you'll end up with 4 optional headers, with a typedef each (i.e. tcp_client, tcp_tls_client, ws_client, ws_tls_client). I think this is great for usability.
You are correct that the use of asio::prepend, consign, and append is the main contributor to the longer-than-desired compilation times. These constructs are fundamental to our code since we don't use async_compose, and we're unsure how to avoid them in these scenarios.
I'd advise to experiment with a callback wrapper that propagates associated properties with asio::associator. For example: // The type of handler to be passed to async operations. This replaces asio::append and asio::prepend template <class Op, class... Args> struct intermediate_handler { // The operation object, whose associated properties will be propagated Op op; // The bound arguments. This works like std::bind_front, so bound arguments should come first std::tuple<Args...> args; // Helper to invoke the handler template <class... SuppliedArgs, std::size_t... I> void invoke_impl(std::index_sequence<I...>, SuppliedArgs&&... supplied_args) { std::move(op)(std::get<I>(std::move(args))..., std::forward<SuppliedArgs>(supplied_args)...); } // Invoke the handler template <class... SuppliedArgs> void operator()(SuppliedArgs&&... supplied_args) { invoke_impl( std::make_index_sequence<sizeof...(Args)>{}, std::forward<SuppliedArgs>(supplied_args)... ); } }; // Factory function template <class Op, class... Args> auto make_intermediate_handler(Op&& op, Args&&... args) { return intermediate_handler<std::decay_t<Op>, std::decay_t<Args>...>{ std::move(op), {std::forward<Args>(args)...} }; } // Ensure that our handler propagates associated properties (executor, allocator...) template <template <typename, typename> class Associator, class Op, class... Args, class DefaultCandidate> struct boost::asio::associator<Associator, intermediate_handler<Op, Args...>, DefaultCandidate> : Associator<Op, DefaultCandidate> { static typename Associator<Op, DefaultCandidate>::type get(const intermediate_handler<Op, Args...>& h ) noexcept { return Associator<Op, DefaultCandidate>::get(h.op); } static auto get(const intermediate_handler<Op, Args...>& h, const DefaultCandidate& c) noexcept { return Associator<Op, DefaultCandidate>::get(h.op, c); } }; // Taking connect_op::on_connect as an example, this: lowest_layer(_stream).async_connect( *std::begin(eps), asio::append( asio::prepend(std::move(*this), on_connect {}), *std::begin(eps), std::move(ap) ) ); // Can be replaced by lowest_layer(_stream).async_connect( *std::begin(eps), make_intermediate_handler( std::move(*this), on_connect {}, *std::begin(eps), std::move(ap) ) ); It probably requires you to re-arrange callback args in some functions, though. I don't know whether this provides substantial gains. But may be worth trying. Regards, Ruben.

Hi Ruben,
On 20.10.2024., at 15:03, Ruben Perez <rubenperez038@gmail.com> wrote:
Now that I've looked at the implementation, it seems like the stream's async_write will use the executor bound to the first message found in the queue. This can yield surprising results. For instance:
cli.async_publish(..., asio::bind_executor(ex1, tok1)); cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1
I think having this limitation is dangerous, as it's not present in any other Asio-based library, and it's not enforced anywhere (as far as I've read).
I think that this restriction can be lifted by making the stream's async_write a child agent of async_run. That would imply changing async_sender::async_send (https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...) to push the message to the queue (as is today), then notifying a writer task (e.g. using a channel or a timer), then waiting for the writer task to complete the write (again using a channel or a timer). If I'm not mistaken, this is how Boost.Redis works.
You are correct that the second handler might complete on ex1, and from that perspective, the code doesn't fully adhere to typical Asio standards. The issue can indeed be resolved as you suggested, either by using a channel or a timer, or possibly even more efficiently by dispatching the message to the writer queue using the stream's executor via asio::dispatch.
However, conceptually, the right solution is still unclear. Technically, we understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor.
I think the point here is the parent-child relationship of the involved async agents. Read operations are children of async_run. When an async operation involving only a read, like async_receive, is invoked, it waits on its channel until the required message arrives. It is relying on an internal async operation, but it does not propagate its bound properties to it, because there's no parent-child relationship between them. The channel serves as an isolator. I fully agree on this being a good design.
I'd advise you to apply this same logic to writing. At the moment, writing a message does not have a clear parent. If 2 messages are being written concurrently, the parent is chosen at random. In my opinion, writing should be symmetrical to reading: a child of async_run. As you do with reading, writing should never be invoked directly by async_publish, but maintained by async_run. This avoids spuriously propagating properties.
I agree this makes a lot of sense. I'm not sure if using a channel would introduce a significant performance penalty, especially when the user creates a parallel_group with a vector of publish operations for efficiency. We've already done something similar for read operations, so I think it would be acceptable—and it would also provide symmetry, which is a good thing.
async_publish actually supports total cancellation. So, if you write
async_publish(..., asio::cancel_after(5s, cancellation_type::total));
it will work as you expect. Support for total cancellation type is quite complicated for PUBLISH messages and I've described the reasons on the Slack channel earlier. For the reference, I'll put the explanation here as well:
Oh, you're right. I debugged and got the (false) impression that total signals were being blocked by a filter. Thanks for pointing that out.
12. I've noticed that the implementation doesn't use asio::async_compose, but has its own version. Is there a reason for it? Also, note that cancellable_handler is implemented as always having a bound executor, regardless of what the underlying handler has (note that initiating an operation with bind_executor(ex_default, tok) and with tok is not strictly equivalent, even if get_associated_executor(tok, ex_default) yields the same value).
I wrote a rather lengthy explanation fo the reasons why we don't use async_compose in an PDF document shared by Klemens on the mailing list. Here's the link to it in case you've missed it:
I completely did, thanks for pointing it out. Note however that async_compose is not tied to "coroutine like" behavior - you can implement the same overloading scheme you currently have with async_compose, too. I agree that it's under-documented, and that doing custom cancellation handling is non-trivial, so you probably made the right choice.
Thanks!
cancellable_handler uses our internal tracking_executor type which is the result of the operation asio::prefer( asio::get_associated_executor(handler, ex), asio::execution::outstanding_work.tracked )
It's here to prevent handler's execution context from running out of job. If handler doesn't have an associated executor it will be dispatched using default mqtt_client executor and, to my understanding, the above tracking executor will prevent client's execution context to run out of job (although this is not strictly necessary).
Yes, I understand that, and yes, it's required for correctness. You might consider using asio::executor_work_guard, too, which is compatible with old-style executors.
My point here is that, if you want cancellable_handler to propagate the original handler's executor, immediate executor and allocator, the usual practice is to specialize asio::associated_executor, asio::associated_immediate_executor, and asio::associated_allocator. For instance, and supposing that cancellable_handler::handler is the original completion handler passed by the user:
template <class Handler, typename DefaultCandidate> struct associated_executor<cancellable_handler<Handler>, DefaultCandidate> : associated_executor<Handler, DefaultCandidate> { static typename associated_executor<Handler, DefaultCandidate>::type get( const cancellable_handler<Handler>& h ) noexcept { return associated_executor<Handler, DefaultCandidate>::get(h.handler); }
static auto get(const cancellable_handler<Handler>& h, const DefaultCandidate& c) { return associated_executor<Handler, DefaultCandidate>::get(h.handler, c); } };
Inheriting from associated_executor<Handler, DefaultCandidate> has the effect that if the original handler didn't have an associated executor, cancellable_handler won't have an associated executor, either. There are some if constexpr's in Asio taking advantage of this to make small optimizations. It probably doesn't make a difference in your case though, so feel free to ignore my comment if you think it's not worth it.
Ok, thanks for detailed explanation!
14. I've measured build times in the linked repo. Using clang-18 with C++23 on my machine, it takes 22s to build the sender in debug mode, and 30s in release mode. This is a little bit high IMO. Some observations here: - The client is unconditionally including Boost.Beast websocket, which causes a considerable overhead. I'd consider a way to forward-declare it or make it part of a separate header. - Instantiations of async functions are pretty heavyweight. asio::prepend is used a lot, which is not free. Did you experiment with/without it and measure the benefit? - The spirit headers/functions also appear in the flame graph (although I thought they would be much higher).
Have you considered optimizations of compile times?
We've already tried to minimize the inclusion of Boost.Beast stream (websocket) headers, but forward declarations didn't work well in this case. The code in connect_op uses if constexpr to generate the necessary boilerplate when the stream is a websocket, and unfortunately, we couldn't find a reasonable solution that avoids including the Boost.Beast stream headers.
Oh. I think this might be indicating that connect_op is too strongly coupled to Boost.Beast websocket. I know that some guys in Boost are developing another HTTP/websocket library, so you might end up with messy code if this library ever gets traction.
I'd advise to split these chunks of code into a customizable trait. You might choose two traits (tls_handshake_traits and ws_connect_traits) or a single one (connect_traits). For instance:
// Trait declaration template <class Stream> struct ws_connect_traits : detail::no_ws_connect_traits {};
// Trait definition for Beast template <class Stream> struct ws_connect_traits<boost::beast::websocket::stream<Stream>> { template <class CompletionToken> auto async_handshake(boost::beast::websocket::stream<Stream>& ws, const authority_path& ap, CompletionToken&&) { // what you currently have in do_ws_handshake } };
You can now split the trait specialization to a separate, optional websocket.hpp header, thus making Beast an optional peer dependency (instead of a hard one). In this header, you can provide helper typedefs to make things less verbose. E.g. using ws_client = mqtt_client<boost::beast::websocket::stream<boost::asio::ip::tcp>>;
If you combine this with my comment on placing TLS hooks in optional headers, you'll end up with 4 optional headers, with a typedef each (i.e. tcp_client, tcp_tls_client, ws_client, ws_tls_client). I think this is great for usability.
You're absolutely right that connect_op is too tightly coupled to the Beast WebSocket implementation. However, since it's the only available WebSocket library at the moment, it's difficult to define the correct type traits or specializations without having at least one other WebSocket implementation for reference. Currently, the if constexpr statements in connect_op explicitly instantiate Beast WebSocket objects and call their corresponding async methods. We'll need to decouple this, much in the way you've described.
You are correct that the use of asio::prepend, consign, and append is the main contributor to the longer-than-desired compilation times. These constructs are fundamental to our code since we don't use async_compose, and we're unsure how to avoid them in these scenarios.
I'd advise to experiment with a callback wrapper that propagates associated properties with asio::associator. For example:
// The type of handler to be passed to async operations. This replaces asio::append and asio::prepend template <class Op, class... Args> struct intermediate_handler { // The operation object, whose associated properties will be propagated Op op;
// The bound arguments. This works like std::bind_front, so bound arguments should come first std::tuple<Args...> args;
// Helper to invoke the handler template <class... SuppliedArgs, std::size_t... I> void invoke_impl(std::index_sequence<I...>, SuppliedArgs&&... supplied_args) { std::move(op)(std::get<I>(std::move(args))..., std::forward<SuppliedArgs>(supplied_args)...); }
// Invoke the handler template <class... SuppliedArgs> void operator()(SuppliedArgs&&... supplied_args) { invoke_impl( std::make_index_sequence<sizeof...(Args)>{}, std::forward<SuppliedArgs>(supplied_args)... ); } };
// Factory function template <class Op, class... Args> auto make_intermediate_handler(Op&& op, Args&&... args) { return intermediate_handler<std::decay_t<Op>, std::decay_t<Args>...>{ std::move(op), {std::forward<Args>(args)...} }; }
// Ensure that our handler propagates associated properties (executor, allocator...) template <template <typename, typename> class Associator, class Op, class... Args, class DefaultCandidate> struct boost::asio::associator<Associator, intermediate_handler<Op, Args...>, DefaultCandidate> : Associator<Op, DefaultCandidate> { static typename Associator<Op, DefaultCandidate>::type get(const intermediate_handler<Op, Args...>& h ) noexcept { return Associator<Op, DefaultCandidate>::get(h.op); }
static auto get(const intermediate_handler<Op, Args...>& h, const DefaultCandidate& c) noexcept { return Associator<Op, DefaultCandidate>::get(h.op, c); } };
// Taking connect_op::on_connect as an example, this: lowest_layer(_stream).async_connect( *std::begin(eps), asio::append( asio::prepend(std::move(*this), on_connect {}), *std::begin(eps), std::move(ap) ) );
// Can be replaced by lowest_layer(_stream).async_connect( *std::begin(eps), make_intermediate_handler( std::move(*this), on_connect {}, *std::begin(eps), std::move(ap) ) );
It probably requires you to re-arrange callback args in some functions, though.
I don't know whether this provides substantial gains. But may be worth trying.
Wow, thanks for these code snippets! If I'm not mistaken, you're suggesting creating an equivalent of asio::prepend/consign/append that operates directly on the CompletionHandler rather than the CompletionToken, which is exactly our main use case. This approach would allow us to avoid unnecessary token-to-handler specializations. Did I understand that correctly?
Regards, Ruben.
Thanks, Ivica Siladic

You can now split the trait specialization to a separate, optional websocket.hpp header, thus making Beast an optional peer dependency (instead of a hard one). In this header, you can provide helper typedefs to make things less verbose. E.g. using ws_client = mqtt_client<boost::beast::websocket::stream<boost::asio::ip::tcp>>;
If you combine this with my comment on placing TLS hooks in optional headers, you'll end up with 4 optional headers, with a typedef each (i.e. tcp_client, tcp_tls_client, ws_client, ws_tls_client). I think this is great for usability.
You're absolutely right that connect_op is too tightly coupled to the Beast WebSocket implementation. However, since it's the only available WebSocket library at the moment, it's difficult to define the correct type traits or specializations without having at least one other WebSocket implementation for reference. Currently, the if constexpr statements in connect_op explicitly instantiate Beast WebSocket objects and call their corresponding async methods. We'll need to decouple this, much in the way you've described.
I want to point out that I think that you can probably forward declare the Beast websocket stream object in rebind_executor.hpp by replacing the current rebind_executor implementation by: template <typename Stream, bool deflate_supported, typename Executor> struct rebind_executor<boost::beast::websocket::stream<asio::ssl::stream<Stream>, deflate_supported>, Executor> { using other = typename boost::beast::websocket::stream< asio::ssl::stream<typename rebind_executor<Stream, Executor>::other>, deflate_supported >; }; If you manage to do it, you can probably defer the introduction of such trait classes until really required. IMO it is the hard dependency what is a problem.
Wow, thanks for these code snippets! If I'm not mistaken, you're suggesting creating an equivalent of asio::prepend/consign/append that operates directly on the CompletionHandler rather than the CompletionToken, which is exactly our main use case. This approach would allow us to avoid unnecessary token-to-handler specializations. Did I understand that correctly?
Yes. asio::prepend and asio::append also include code to deal with arbitrary initiations, and my presumption is that a custom handler may be cheaper. But you need to measure. Note that what I wrote replace only append and prepend, and not consign. For consign, you need to destroy the passed variables before the handler is invoked. I can try to write something if you think it's useful. Note that you can use asio::consign on top of the custom handler that I wrote. Regards, Ruben.

On Sat, Oct 19, 2024 at 4:44 PM Ivica Siladic via Boost < boost@lists.boost.org> wrote:
...
I use the Visual Studio IDE, which is great for debugging. I cloned async-mqtt5 into my Boost superproject at boost/libs/async-mqtt5 to try it out. I tried to generate a Visual Studio Solution for async-mqtt5 using this command line: cmake -G "Visual Studio 17 2022" -A x64 -B bin64 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake -DVCPKG_CHAINLOAD_TOOLCHAIN_FILE="C:/Users/vinnie/src/boost/libs/http_proto/cmake/toolchains/msvc.cmake" Unfortunately, I got much more than I wanted. CMake ended up installing a ton of Boost version 1.83.0 libraries, I'm not sure where they went but here is the log: https://gist.github.com/vinniefalco/446c6b2caed6ff98bc7d522349bab185 Now, vcpkg is showing all these packages locally: https://gist.github.com/vinniefalco/fd24ed4ac2239287da5fc293432536b1 Sadly, when I opened the async-mqtt5.sln (Visual Studio solution file) it had nothing in it: [image: image.png] 1. How do I properly undo all the stuff that was installed? 2. How do I generate a useful Visual Studio Solution where I can see the source files, build the examples, run them and set breakpoints? Thanks

Hi Vinnie,
On 20.10.2024., at 19:25, Vinnie Falco <vinnie.falco@gmail.com> wrote:
On Sat, Oct 19, 2024 at 4:44 PM Ivica Siladic via Boost <boost@lists.boost.org <mailto:boost@lists.boost.org>> wrote:
...
I use the Visual Studio IDE, which is great for debugging. I cloned async-mqtt5 into my Boost superproject at boost/libs/async-mqtt5 to try it out. I tried to generate a Visual Studio Solution for async-mqtt5 using this command line:
cmake -G "Visual Studio 17 2022" -A x64 -B bin64 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake -DVCPKG_CHAINLOAD_TOOLCHAIN_FILE="C:/Users/vinnie/src/boost/libs/http_proto/cmake/toolchains/msvc.cmake"
Unfortunately, I got much more than I wanted. CMake ended up installing a ton of Boost version 1.83.0 libraries, I'm not sure where they went but here is the log:
https://gist.github.com/vinniefalco/446c6b2caed6ff98bc7d522349bab185
Now, vcpkg is showing all these packages locally:
https://gist.github.com/vinniefalco/fd24ed4ac2239287da5fc293432536b1
Sadly, when I opened the async-mqtt5.sln (Visual Studio solution file) it had nothing in it:
<image.png>
1. How do I properly undo all the stuff that was installed?
2. How do I generate a useful Visual Studio Solution where I can see the source files, build the examples, run them and set breakpoints?
The CMakeLists.txt file in the root folder is used to configure include directories and dependencies for building projects based on async_mqtt5. It does not define any build targets. It does, however, define BUILD_EXAMPLES macro which will generate project for an example. Therefore, please do not run the following command: cmake -G "Visual Studio 17 2022" -A x64 -B bin64 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake -DVCPKG_CHAINLOAD_TOOLCHAIN_FILE="C:/Users/vinnie/src/boost/libs/http_proto/cmake/toolchains/msvc.cmake" If you'd like to generate .vcxproj files for the included examples, you can do so by running: cmake -G "Visual Studio 17 2022" -A x64 -B bin64 -DBUILD_EXAMPLES=ON This will get you .vcxproj that you’ve expected.
Thanks
Thanks, Ivica Siladic

On Oct 18, 2024, at 2:50 PM, Ivica Siladic via Boost <boost@lists.boost.org> wrote:
In the absence of a better solution, we recommend users add their own logging code directly into the async_mqtt5 source during development. It’s very far from ideal, but we couldn’t come up with a better approach.
Sorry for the drive-by comment, but it seems that a policy-based metaprogramming approach would work here. There must be specific places in the code where it makes sense to (potentially) emit log messages. At those places you could put something like the following: using log_policy = typename type_traits::log_policy<T>::type; log_policy::apply(x,y,z) The following metafunctions would create a customization point for users who want to partially (or completely) specialize log_policy_customization<> for type-specific or universal logging. // default policy template < typename T > struct log_policy_default { using type = log_policy_default; template < typename … T > static void apply (T&& …) {} // no cost by default }; // customization point // _ is always void, so partial specializations can apply to all T template < typename T, typename _ > struct log_policy_customization : log_policy_default<T> {}; // main metafiction template < typename T > struct log_policy : log_policy_customization< T,void > {} Even if this is not a good solution for MQTT, I’m curious about general reactions to this strategy for customizing policies. It seems useful to me, but a broader perspective would help. Thanks a lot. Cheers, Brook

On Sat, Oct 19, 2024 at 4:43 PM Ivica Siladic via Boost < boost@lists.boost.org> wrote:
...
Ivica, I have some questions :) My first step was to audit the file mqtt_client.hpp by inspecting everything from top to bottom: 1. Where are the type requirements for TlsContext explained? https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... 2. Allocating in a constructor is OK, and the documentation should communicate this and the possibility of an exception: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... 3. Why bother with this overload? Users can call ctx.get_executor(), why do it for them? https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... 4. What if `other` is performing operations? The documentation does not explain it. https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... 5. The destructor ~mqtt_client calls impl_->cancel(), which is a non-trivial function which can throw. Submitting work to an executor from a destructor is surprising: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... 6. This "will" look like an MQTT protocol thing. The documentation could explain it better: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... I found this from searching: https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/ 7. The word "will" appearing three times in this declaration looks unhealthy. The function name doesn't make its operation clear: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... 8. The ad-hoc format of putting a list into a string is a little weird, why isn't this some kind of range? https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... 9. I found the "TlsContext" concept documentation. It completely omits any requirements, which means that TlsContext is not a real concept. Using std::monostate is weird, why not void? How do you feel about using this declaration for mqtt_client instead? template < typename StreamType, bool enableTLS = false > class mqtt_client; 10. async_subscribe requires a reference to a vector. And the call to `async_initiate` makes a copy? Why not pass the vector by value and then move it? This would allow specifying an initializer list at call sites to async_publish without making an extra copy: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... (of course, my C++ is rusty so I could be wrong here) 11. Everything in this document should be in the official library documentation, and a permanent part of the library. Not just for reviewers. This information is very helpful for people who are evaluating whether or not to adopt the library for their needs instead of other offerings: https://www.mireo.com/cdn/Async.MQTT5.pdf 12. The documentation alludes to some kind of queuing or buffering of outgoing messages, to handle the case for recurring disconnections (which are frequent in IoT deployments): "While offline, it automatically buffers all the packets to send when the connection is re-established." Does this mean that mqtt_client objects can consume unbounded memory? What is the limit on the queue, or where are the APIs to control queue depth? How does the client eventually block callers when the queue is full, to prevent resource exhaustion? 13. I have the same questions as above, but regarding buffering for incoming messages. Are they buffered without bounds? What if someone calls async_run() without collecting any messages, will the memory consumption grow unchecked? I will have more soon. Thanks

Hi Vinnie, and thanks for your questions :)
On 20.10.2024., at 19:06, Vinnie Falco via Boost <boost@lists.boost.org> wrote:
On Sat, Oct 19, 2024 at 4:43 PM Ivica Siladic via Boost < boost@lists.boost.org> wrote:
...
Ivica, I have some questions :) My first step was to audit the file mqtt_client.hpp by inspecting everything from top to bottom:
1. Where are the type requirements for TlsContext explained? https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
TlsContext is indeed not a concept. For mqtt_client, it’s a fully opaque object typically required for TLS streams. For example, with boost::asio::ssl::stream, the TlsContext would be boost::asio::ssl::context. For Botan's TLS stream, it would be the Botan TLS context, and so on. The context is generally a kind of singleton that holds administrative data for streams that will be created based on that data. For instance, a collection of Certificate Authorities (CAs) is typically loaded once and then used to create TLS streams, which rely on those CAs to verify the server's certificate. There are no interface requirements for TlsContext other than that it must be move-constructible. The mqtt_client simply passes this object to the SSL stream specializations that require access to it.
2. Allocating in a constructor is OK, and the documentation should communicate this and the possibility of an exception: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
Correct, and thanks for pointing that out.
3. Why bother with this overload? Users can call ctx.get_executor(), why do it for them? https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
Literally all Asio async objects have constructors that accept an executor or execution context. I believe this is a bit of a historical relic, but we included it to maintain consistency with Asio standards.
4. What if `other` is performing operations? The documentation does not explain it. https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
`other` must not be running and you are correct, this should be documented.
5. The destructor ~mqtt_client calls impl_->cancel(), which is a non-trivial function which can throw. Submitting work to an executor from a destructor is surprising: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
This is precisely how, for example, the Asio basic_deadline_timer works. Specifically, the basic_deadline_timer object contains a member named impl_, whose type is io_object_impl<detail::deadline_timer_service>. In its destructor, impl_ calls service_->destroy(implementation_). The destroy function invokes an internal cancel, which may throw and submits the completion handler for execution via scheduler_.post_deferred_completions. The logic in mqtt_client follows this same pattern. The key point, as we understand it, is to explicitly cancel all outstanding asynchronous operations and notify any pending handlers that the object has gone out of scope. This can only be accomplished from the destructor. It is unusual, I agree. Most their-party Asio objects I’ve seen do not call cancel() from destructor. If you have a pending async operation on that object and you leave object to go out of scope your program will most likely crash. We wanted to be aligned here with Asio again.
6. This "will" look like an MQTT protocol thing. The documentation could explain it better: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
I found this from searching: https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/
The documentation for the "Will" type states, "A Will Message is an Application Message that the Broker should publish after the Network Connection is closed in cases where the Network Connection is not closed normally." The context in which a Will can be used in the MQTT protocol is indeed specific to MQTT. Therefore, the conceptual usage of the MQTT Will is likely beyond the scope of the Async.MQTT5 documentation.
7. The word "will" appearing three times in this declaration looks unhealthy. The function name doesn't make its operation clear: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
Correct, and thanks for pointing that out.
8. The ad-hoc format of putting a list into a string is a little weird, why isn't this some kind of range? https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c...
Ruben asked the same question so I’m pasting the answer here: In the vast majority of cases, there will be only one broker, so creating a std::vector<std::string> with a single element would introduce unnecessary overhead. If multiple brokers are involved, they are either configured in a .conf file or hard-coded. In the case of a configuration file, it's simpler to pass the entire broker string directly to the mqtt_client rather than parsing the string, splitting it into substrings, and filling a vector. If the brokers are hard-coded, then this overhead is irrelevant anyway.
9. I found the "TlsContext" concept documentation. It completely omits any requirements, which means that TlsContext is not a real concept. Using std::monostate is weird, why not void? How do you feel about using this declaration for mqtt_client instead?
template < typename StreamType, bool enableTLS = false > class mqtt_client;
We use std::monostate to avoid creating separate specializations of the mqtt_client and client_service constructors that omit the (defaulted) TlsContext parameter. These constructors would require additional enable_if disambiguation, which would complicate the code. By using a default-constructible special object, the code becomes more concise. While we could have chosen any default-constructible type, we decided to go with std::monostate because, as the specifications state, "std::monostate is a unit type intended for use as a well-behaved empty alternative in std::variant."
10. async_subscribe requires a reference to a vector. And the call to `async_initiate` makes a copy? Why not pass the vector by value and then move it? This would allow specifying an initializer list at call sites to async_publish without making an extra copy: https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c... (of course, my C++ is rusty so I could be wrong here)
Yes, async_initiate makes a copy of vector supplied as argument to async_subscribe. Actually, during async_initiate we encode complete SUBSCRIBE MQTT message with topic names embedded into it. The encoding function copies elements from supplied vector of topics directly to buffer holding encoded message.
11. Everything in this document should be in the official library documentation, and a permanent part of the library. Not just for reviewers. This information is very helpful for people who are evaluating whether or not to adopt the library for their needs instead of other offerings: https://www.mireo.com/cdn/Async.MQTT5.pdf
OK, thanks for pointing that out. We’ll include it into official docs.
12. The documentation alludes to some kind of queuing or buffering of outgoing messages, to handle the case for recurring disconnections (which are frequent in IoT deployments): "While offline, it automatically buffers all the packets to send when the connection is re-established."
Does this mean that mqtt_client objects can consume unbounded memory? What is the limit on the queue, or where are the APIs to control queue depth? How does the client eventually block callers when the queue is full, to prevent resource exhaustion?
Ruben also asked this question so here’s my answer: Resource consumption will not grow indefinitely. The client will attempt to send up to 65,535 packets, after which async_publish will immediately return an error code (pid_overrun), as it will have run out of Packet Identifiers. If you want to impose stricter limits, you can implement a more complex solution on the user's side. For example, you could track the number of bytes (or messages) sent by incrementing a counter each time you call async_publish and decrementing it in the completion handler of async_publish. Before attempting to publish, you should check whether the number of in-flight messages has reached your limit, and if so, simply skip publishing until some messages are acknowledged.
13. I have the same questions as above, but regarding buffering for incoming messages. Are they buffered without bounds? What if someone calls async_run() without collecting any messages, will the memory consumption grow unchecked?
Incoming messages are placed into an asio::basic_channel, with its limit set to std::numeric_limits<size_t>::max(), effectively making it unlimited. The program may crash if you subscribe to a topic and fail to consume the incoming messages. It’s important to note that the receiving side operates very differently from the publishing side, whose buffer depends solely on network bandwidth and availability. The receiving side only gets messages that have successfully traversed the network. We set the limit to std::numeric_limits<size_t>::max() because we couldn't come up with a reasonable lower value. The channel buffers received messages because there may be lengthy operations occurring between consecutive async_receive calls, during which several messages might arrive. However, it is the user's responsibility to pop these messages in a timely manner. I don't believe that exposing the buffer limit as a configurable parameter would make sense. It would represent something like the "maximum number of messages in the intermediate buffer stored between consecutive async_receive calls." I think it would be challenging to explain to users what this means and what value should be used.
I will have more soon.
Thanks
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
Thanks, Ivica Siladic

On Sun, Oct 20, 2024 at 1:31 PM Ivica Siladic <ivica.siladic@mireo.com> wrote:
TlsContext is indeed not a concept
I think that the treatment of TlsContext in the public API doesn't rise to the same level of quality as the rest of the library. I'm not sure what the best way to improve it is, though. One thing I am certain of, is that it should not be presented as if it is a concept. It should be designed as the least-powerful construct which permits the desired functionality. A concept is too broad. A "bool" seems right, either TLS is on or off. Ultimately it is up to you to decide how much to invest here (or not to). Literally all Asio async objects have constructors that accept an executor
or execution context. I believe this is a bit of a historical relic, but we included it to maintain consistency with Asio standards.
Yeah...one school of thought is to only include functions which are used and have a demonstrable use case. There is an argument to be made for removing this. Asio includes it for backward compatibility, yet this rationale doesn't apply to your library which is new. Up to you to decide what to do. This is precisely how, for example, the Asio basic_deadline_timer works. Okay, then its fine I suppose. ...the conceptual usage of the MQTT Will is likely beyond the scope of the
Async.MQTT5 documentation.
I agree but you can at least have one or two sentences which explain that it is an MQTT thing, and you can include a hyperlink to an external site which explains it more deeply. For example: /** * \brief Assign a \ref will Message. * * \details The \ref will Message that the Broker should publish * after the Network Connection is closed and it is not * closed normally. * * \attention This function takes action when the client is in a non-operational state, * meaning the \ref async_run function has not been invoked. * Furthermore, you can use this function after the \ref cancel function has been called, * before the \ref async_run function is invoked again. * * \par References * * \see https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/ */ In the vast majority of cases, there will be only one broker Fine.
Resource consumption will not grow indefinitely. The client will attempt to send up to 65,535 packets, after which async_publish will immediately return an error code (pid_overrun), as it will have run out of Packet Identifiers.
65,535 sounds like quite a lot. The reason that I ask about buffering, is because I am trying to determine if the mqtt_client interface interferes with the correct operation of application-level TCP flow control. This is best accomplished by designing a system with back-pressure, which automatically limits the application's ability to generate new work to match the peer's ability to consume it. For example, asio sockets can only have one pending read and one pending write operation. To write the next buffer you have to finish writing the current buffer. In other words, the application is blocked from writing as long as there is a write active. In this case, the delivery of the write completion is naturally delayed until the peer acknowledges the data. If the mqtt_client buffers up to 65,535 outgoing writes, then the calling application will perform much more work than it should. It will perform more work than the receiving end is able to process. The result is bursting of the network link and oversubscribing. When the peer is unable to keep up with incoming messages, it is better for the sending application to slow down so that it produces new work at the lower rate. Disclaimer: this is all based on theories and I have not performed any measurements.
If you want to impose stricter limits, you can implement a more complex solution on the user's side. For example, you could track the number of bytes (or messages) sent by incrementing a counter each time you call async_publish and decrementing it in the completion handler of async_publish. Before attempting to publish, you should check whether the number of in-flight messages has reached your limit, and if so, simply skip publishing until some messages are acknowledged.
Well, that's not a nice solution at all. A strength of the library is to take care of complicated things automatically. The library's design should naturally solve this problem without any action required by callers. To "skip publishing" may be difficult, or even impossible. For example, if the application has a condition that all messages must be received by the peer, then it cannot just throw out messages when a limit is reached. It would have to queue them up. Which defeats the purpose of adding an in-flight message limit feature :) I am not completely sure since I have not deployed the library in production, but I think it would be helpful if async_publish would not invoke the completion handler until the outgoing queue is decongested. I'm not sure how to congestion would be defined but you as an expert of MQTT and with field experience could probably come up with the right definition. On the receiving side, the mqtt_client should not keep calling socket::async_read when there is a backlog of messages. Instead, it should wait until the caller has processed some or all of the backlog and only then resume calling async_read. This buffering of incoming and outgoing data was a huge problem in the websocketpp library which is one of the main motivations why I wrote beast::websocket.
Incoming messages are placed into an asio::basic_channel, with its limit set to std::numeric_limits<size_t>::max(), effectively making it unlimited. The program may crash if you subscribe to a topic and fail to consume the incoming messages. It’s important to note that the receiving side operates very differently from the publishing side, whose buffer depends solely on network bandwidth and availability. The receiving side only gets messages that have successfully traversed the network.
I already covered most of this above, and it is worth repeating here. The problem is not so much the number of elements placed into the basic_channel container, but rather that there is no backpressure. If mqtt_client just reads quickly in a loop without waiting for the caller to consume some data, then the peer believes that the TCP window should be as wide as possible. Thus defeating application-level flow control. What you might do is keep track of the number of received messages which are buffered, and that the caller has not yet received, and if that number goes over a configured threshold then do not call async_read on the socket again until the number goes back down. Another disclaimer, all of the above is theoretical and it could be completely wrong :)
We set the limit to std::numeric_limits<size_t>::max() because we couldn't come up with a reasonable lower value. The channel buffers received messages because there may be lengthy operations occurring between consecutive async_receive calls, during which several messages might arrive. However, it is the user's responsibility to pop these messages in a timely manner.
On the receiving side, you might actually want an API that gives the caller complete control over when the low-level async_read is resumed. Because the caller has more information than you do. The caller knows, for every message it receives, whether it will process it immediately or whether it will become a long-running task. This one issue of application-level flow control should not prevent the library from acceptance if it is to be accepted, as it is functional and useful the way it is now. However, I think that exploring whether or not the current design meshes with application-level flow control, and if not then how the API might be improved, would be useful. Thanks

On Sun, Oct 20, 2024, at 11:09 PM, Vinnie Falco via Boost wrote:
On Sun, Oct 20, 2024 at 1:31 PM Ivica Siladic <ivica.siladic@mireo.com> wrote:
TlsContext is indeed not a concept
I think that the treatment of TlsContext in the public API doesn't rise to the same level of quality as the rest of the library. I'm not sure what the best way to improve it is, though.
When I read that, I was already thinking that there could be a factory for (tls) streams. Many dynamic IO frameworks (in other languages) seem to have this approach, I think I remember it from my Java past and even the Curl API seemed to have multiple callbacks to customize e.g. socket creation and configuration. Of course, this type of customization can be modeled as a "strategy mixin" in the good old Modern C++ spirit. The principle remains the same.

Hi Seth,
On 21.10.2024., at 00:06, Seth via Boost <boost@lists.boost.org> wrote:
On Sun, Oct 20, 2024, at 11:09 PM, Vinnie Falco via Boost wrote:
On Sun, Oct 20, 2024 at 1:31 PM Ivica Siladic <ivica.siladic@mireo.com> wrote:
TlsContext is indeed not a concept
I think that the treatment of TlsContext in the public API doesn't rise to the same level of quality as the rest of the library. I'm not sure what the best way to improve it is, though.
When I read that, I was already thinking that there could be a factory for (tls) streams. Many dynamic IO frameworks (in other languages) seem to have this approach, I think I remember it from my Java past and even the Curl API seemed to have multiple callbacks to customize e.g. socket creation and configuration.
Of course, this type of customization can be modeled as a "strategy mixin" in the good old Modern C++ spirit. The principle remains the same.
There are several ways to implement a factory-like approach. One option is to use a callback to create the stream. However, since the stream type cannot be type-erased, this callback would need to be a template function. This means the user would have to provide a template specialization to create the stream. During the stream creation, the user would also need access to the corresponding SSL context, which could lead to the use of a global variable or a Meyers-like singleton—neither of which is ideal. Another option is to use a template class, such as TlsStreamFactory, and then templatize the mqtt_client class with this factory. This approach would allow the creation of a well-defined concept (i.e., prescribing the required interface). It is arguably cleaner and easier to understand, but it would require more effort from the user compared to the current TlsContext-template approach. Honestly, I’m not sure which approach is better.
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
Thanks, Ivica Siladic

On Mon, Oct 21, 2024, at 4:32 PM, Ivica Siladic wrote:
Hi Seth,
However, since the stream type cannot be type-erased, this callback would need to be a template function.
How so? The signature might have to depend on template arguments. That's not necessarily as intrusive
During the stream creation, the user would also need access to the corresponding SSL context, which could lead to the use of a global variable or a Meyers-like singleton—neither of which is ideal.
This was the whole point of mentioning factories: the factories can *own* their own ssl context (or whatever else is required). It's effectively an example of Inversion Of Control. Yes some people will cut corners and use bad practices. You could make sure the examples don't.
Another option is to use a template class, such as TlsStreamFactory, and then templatize the mqtt_client class with this factory. This approach would allow the creation of a well-defined concept (i.e., prescribing the required interface).
I guess so. I would probably expect a more generalized concept (StreamFactory) to create any AsyncStream, but I have no idea whether such a thing makes sense in the context of your library.
Honestly, I’m not sure which approach is better.
Me neither. Those are hard choices. I mostly intended to provide inspiration.

Hi Vinnie,
On 20.10.2024., at 23:09, Vinnie Falco <vinnie.falco@gmail.com> wrote:
On Sun, Oct 20, 2024 at 1:31 PM Ivica Siladic <ivica.siladic@mireo.com <mailto:ivica.siladic@mireo.com>> wrote:
TlsContext is indeed not a concept
I think that the treatment of TlsContext in the public API doesn't rise to the same level of quality as the rest of the library. I'm not sure what the best way to improve it is, though. One thing I am certain of, is that it should not be presented as if it is a concept. It should be designed as the least-powerful construct which permits the desired functionality. A concept is too broad. A "bool" seems right, either TLS is on or off. Ultimately it is up to you to decide how much to invest here (or not to).
You’re probably right that we could improve how we handle TlsContext. The current solution is the most straightforward approach we could devise. The core issue is that we create, configure, and manage the SSL socket entirely within the mqtt_client code. These operations require access to the appropriate SSL context, which depends on the specific stream implementation. This is true for both Asio SSL (OpenSSL) and Botan SSL streams, and it would likely apply to any other Asio-compatible SSL streams. Since the context needs to be created based on various user-defined policies, it must be supplied by the user. One alternative could have been to expose a function-based customization point that creates the context when needed. We could call this “factory” function from within the mqtt_client code, obtain the context, and proceed as we do now. However, this approach doesn't work well if the user wants to reuse the same context for other SSL streams within the same program that are unrelated to mqtt_client (which is something we’ve encountered in practice). Providing the SSL context directly through the constructor seems like a more straightforward and flexible solution in this regard. Since TlsContext cannot be type-erased—we don’t know or need to know its interface—we ended up using template parameterization for mqtt_client with TlsContext. As mentioned earlier, this is the best solution we could come up with so far.
Literally all Asio async objects have constructors that accept an executor or execution context. I believe this is a bit of a historical relic, but we included it to maintain consistency with Asio standards.
Yeah...one school of thought is to only include functions which are used and have a demonstrable use case. There is an argument to be made for removing this. Asio includes it for backward compatibility, yet this rationale doesn't apply to your library which is new. Up to you to decide what to do.
This is precisely how, for example, the Asio basic_deadline_timer works.
Okay, then its fine I suppose.
...the conceptual usage of the MQTT Will is likely beyond the scope of the Async.MQTT5 documentation.
I agree but you can at least have one or two sentences which explain that it is an MQTT thing, and you can include a hyperlink to an external site which explains it more deeply. For example:
/** * \brief Assign a \ref will Message. * * \details The \ref will Message that the Broker should publish * after the Network Connection is closed and it is not * closed normally. * * \attention This function takes action when the client is in a non-operational state, * meaning the \ref async_run function has not been invoked. * Furthermore, you can use this function after the \ref cancel function has been called, * before the \ref async_run function is invoked again. * * \par References * * \see https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/
Thanks, it’s definitely better that way.
*/
In the vast majority of cases, there will be only one broker
Fine.
Resource consumption will not grow indefinitely. The client will attempt to send up to 65,535 packets, after which async_publish will immediately return an error code (pid_overrun), as it will have run out of Packet Identifiers.
65,535 sounds like quite a lot. The reason that I ask about buffering, is because I am trying to determine if the mqtt_client interface interferes with the correct operation of application-level TCP flow control. This is best accomplished by designing a system with back-pressure, which automatically limits the application's ability to generate new work to match the peer's ability to consume it.
For example, asio sockets can only have one pending read and one pending write operation. To write the next buffer you have to finish writing the current buffer. In other words, the application is blocked from writing as long as there is a write active. In this case, the delivery of the write completion is naturally delayed until the peer acknowledges the data.
If the mqtt_client buffers up to 65,535 outgoing writes, then the calling application will perform much more work than it should. It will perform more work than the receiving end is able to process. The result is bursting of the network link and oversubscribing. When the peer is unable to keep up with incoming messages, it is better for the sending application to slow down so that it produces new work at the lower rate.
Disclaimer: this is all based on theories and I have not performed any measurements.
In many aspects, the MQTT protocol treats the client and broker (server) symmetrically. For example, the broker sends PUBLISH messages in much the same way the client does. The key difference lies in the scale: a broker typically needs to manage connections to tens of thousands of clients, all while running on a single machine. When we designed the behavior of the buffers in mqtt_client, we looked closely at how this is handled in the EMQX broker, one of the most popular brokers, also used by AWS. EMQX allows users to configure the size of the outgoing queue, which defaults to 1,000. While this is significantly lower than the 64k buffer size we use, it’s important to remember that brokers must handle a large number of simultaneous connections. We opted for a policy that’s roughly equivalent to a broker managing 64 concurrent connections, which seemed like a reasonable balance. Further reasoning can be found in the response to the next question:
If you want to impose stricter limits, you can implement a more complex solution on the user's side. For example, you could track the number of bytes (or messages) sent by incrementing a counter each time you call async_publish and decrementing it in the completion handler of async_publish. Before attempting to publish, you should check whether the number of in-flight messages has reached your limit, and if so, simply skip publishing until some messages are acknowledged.
Well, that's not a nice solution at all. A strength of the library is to take care of complicated things automatically. The library's design should naturally solve this problem without any action required by callers. To "skip publishing" may be difficult, or even impossible. For example, if the application has a condition that all messages must be received by the peer, then it cannot just throw out messages when a limit is reached. It would have to queue them up. Which defeats the purpose of adding an in-flight message limit feature :)
To clarify: when calling async_publish with QoS = 0, the operation completes (i.e., the completion handler is invoked) once the data is successfully written to the underlying stream. For QoS = 1, it completes when the broker acknowledges the packet. With QoS = 2, it completes after a two-way acknowledgment between the client and broker is finalized. In all three cases, the typical way to send messages is by calling async_publish in a manner similar to how Asio handles async_write on streams—publishing a new message only when the previous one has completed. However, in scenarios where faster message delivery is needed, you can use Asio's parallel_group to initiate multiple simultaneous async_publish calls and wait for all of them to finish. The mqtt_client will optimize multiple publish operations by internally batching them, combining several packets into a single async_write call using scatter/gather techniques. There’s another critical use case to consider. When QoS > 0, it indicates that you really want the server to guarantee receipt of your messages. In IoT environments, devices may restart, and relying solely on in-memory message queues is insufficient. A common approach is to first store the message in persistent storage (such as a file or SQLite database), then send it using async_publish, and once the completion handler is triggered, you delete the message from local storage. If the device loses network connectivity, messages will accumulate in memory, while your application likely continues to generate new messages to be published later. Once connectivity is restored, it's common to send the queued messages as quickly as possible. In such cases, you can use parallel_group to accelerate delivery and take advantage of the MQTT client’s built-in optimizations (e.g., batching multiple messages together). In all these scenarios, the user should be mindful of the potential absence of network connectivity and structure their code accordingly. It’s straightforward to track the number of messages (or bytes) queued by async_publish and then decrement that count when each async_publish completes.
I am not completely sure since I have not deployed the library in production, but I think it would be helpful if async_publish would not invoke the completion handler until the outgoing queue is decongested. I'm not sure how to congestion would be defined but you as an expert of MQTT and with field experience could probably come up with the right definition.
On the receiving side, the mqtt_client should not keep calling socket::async_read when there is a backlog of messages. Instead, it should wait until the caller has processed some or all of the backlog and only then resume calling async_read.
This buffering of incoming and outgoing data was a huge problem in the websocketpp library which is one of the main motivations why I wrote beast::websocket.
Incoming messages are placed into an asio::basic_channel, with its limit set to std::numeric_limits<size_t>::max(), effectively making it unlimited. The program may crash if you subscribe to a topic and fail to consume the incoming messages. It’s important to note that the receiving side operates very differently from the publishing side, whose buffer depends solely on network bandwidth and availability. The receiving side only gets messages that have successfully traversed the network.
I realized I made an error in my previous explanations. In addition to the 64k limit on the sending queue, there is also a 64k limit on the receiving queue. However, this limit isn’t set directly on receiving channel. Instead, it’s managed through a custom, bounded internal queue that temporarily stores messages before they are delivered to the channel and eventually to the user. If the channel is full (i.e., no one is consuming messages), this bounded queue will discard the oldest message when a new one arrives, providing overflow protection on the receiving side as well.
I already covered most of this above, and it is worth repeating here. The problem is not so much the number of elements placed into the basic_channel container, but rather that there is no backpressure. If mqtt_client just reads quickly in a loop without waiting for the caller to consume some data, then the peer believes that the TCP window should be as wide as possible. Thus defeating application-level flow control.
What you might do is keep track of the number of received messages which are buffered, and that the caller has not yet received, and if that number goes over a configured threshold then do not call async_read on the socket again until the number goes back down.
Another disclaimer, all of the above is theoretical and it could be completely wrong :)
We set the limit to std::numeric_limits<size_t>::max() because we couldn't come up with a reasonable lower value. The channel buffers received messages because there may be lengthy operations occurring between consecutive async_receive calls, during which several messages might arrive. However, it is the user's responsibility to pop these messages in a timely manner.
On the receiving side, you might actually want an API that gives the caller complete control over when the low-level async_read is resumed. Because the caller has more information than you do. The caller knows, for every message it receives, whether it will process it immediately or whether it will become a long-running task.
This one issue of application-level flow control should not prevent the library from acceptance if it is to be accepted, as it is functional and useful the way it is now. However, I think that exploring whether or not the current design meshes with application-level flow control, and if not then how the API might be improved, would be useful.
Thanks
Thanks, Ivica Siladic
participants (6)
-
Brook Milligan
-
Dominique Devienne
-
Ivica Siladic
-
Janko Dedic
-
Marcelo Zimbres Silva
-
Ruben Perez
-
Seth
-
Vinnie Falco