使用 ASIO 协程自定义可等待和恢复

Custom awaitable and resumption with ASIO coroutines

提问人:Matthias Grün 提问时间:8/29/2023 更新时间:8/31/2023 访问量:96

问:

我有以下类表示ASIO中的连接(我省略了与问题无关的位):

template<Enumeration T>
class connection : public std::enable_shared_from_this<connection<T>> {

    ...
    // Thread-safe queue
    ts_queue<message<T>> m_qMessagesIn;
    std::coroutine_handle<> m_resumeCoro{};
    ...

  public:
    ...

    asio::awaitable<void> listen() {
        auto self = this->shared_from_this();
        for (;;) {
          auto ret = co_await (readMessage() || watchdog());
          if (ret.index() == 1)
             co_return;
 
          ...
          auto& msgOpt = std::get<std::optional<message<T>>>(ret);
          if (not msgOpt.has_value())
             co_return;
          auto& msg = msgOpt.value();

          m_qMessagesIn.push_back(std::move(msg));
          
          // ==================================================
          // THIS IS WHAT I WOULD LIKE TO DO
          if (m_resumeCoro)
             m_resumeCoro.resume();
          ...
      }

我想恢复一个协程,一旦消息发生,它就会到达。我试图通过在类中放置另一个函数来实现它:co_await


    auto message() {
      struct awaitable {
        connection* m_pConn;
        std::optional<message<T>> m_msg;

        awaitable(connection* pConn) : m_pConn{ pConn } {}

        bool await_ready() { return m_pConn->hasMessage(); }
        void await_suspend(std::coroutine_handle<> coro) {
          // Save handle for later resumption
          m_pConn->m_resumeCoro = coro;
        }
        auto await_resume() { 
          m_pConn->m_resumeCoro = nullptr;
          return std::move(m_pConn->m_qMessagesIn.front());
        }
      };
      return awaitable{ this };
   }
};

这应该暂停调用协程并保存句柄,直到收到消息,然后恢复协程。它的用法如下所示:

asio::awaitable<void> useConnection() {
   connection<T> conn = ...;
   ...
   auto msg = co_await conn.message(); // <-- DOESN'T COMPILE
   // error: no matching member function for call to 'await_transform'
}

唉,这没有编译。我假设这与ASIO错综复杂的内部运作有关,我不应该做这样的事情。我已经研究过了,但我不确定这是否会对我有所帮助。async_initiate

因此,我的问题是:完成我想做的事情的“正确”方式是什么?

C++ 异步 网络 协程 ASIO

评论

0赞 Guillaume Racicot 8/29/2023
异步启动就是你要找的,它只是真的不符合人体工程学,也没有很好的文档记录

答:

0赞 Matthias Grün 8/31/2023 #1

所以我已经能够弄清楚如何做到这一点。这是我的做法,以防有人遇到同样的问题。

async_initiate毕竟提供了解决方案。传递给传递给的函数对象的处理程序负责恢复并将值返回给协程的表达式。因此,在 lambda 内部,可以将处理程序添加到类内部的处理程序队列中以供以后调用,这相当于恢复协程并返回一个值:async_initiateco_await

template<Enumeration T>
class connection : public std::enable_shared_from_this<connection<T>> {

    ...
    // Thread-safe queue
    ts_queue<message<T>> m_qMessagesIn;
    // C++23's std::move_only_function used here, because the handler is not copyable,
    // which would be required by a plain old std::function
    ts_queue<std::move_only_function<void(ice::net::message<T>)>> m_qfnMessageReturners{};
    ...

  public:
    ...

    asio::awaitable<void> listen() {
        auto self = this->shared_from_this();
        for (;;) {
          auto ret = co_await (readMessage() || watchdog());
          if (ret.index() == 1)
             co_return;
 
          ...
          auto& msgOpt = std::get<std::optional<message<T>>>(ret);
          if (not msgOpt.has_value())
             co_return;
          auto& msg = msgOpt.value();

          m_qMessagesIn.push_back(std::move(msg));
          
          // ===========================================
          // CALL THE HANDLER BELOW, THEREBY RETURNING THE MESSAGE TO THE
          // co_await'ING EXPRESSION
          if (not m_qfnMessageReturners.empty()) {
            auto func = std::move(m_qfnMessageReturners.front());
            m_qfnMessageReturners.pop_front();
            
            auto msg = std::move(m_qMessagesIn.front());
            m_qMessagesIn.pop_front();
            
            std::move(func)(std::move(msg));
          }
          ...
      }

然后,我的问题中的函数变成这样:message

        template<typename CompletionToken>
        auto message(CompletionToken&& token) {
          return asio::async_initiate<CompletionToken, void(ice::net::message<T>)>(
              [self = this->shared_from_this()](auto&& handler) mutable {
                // Return message at the front of the queue if there is one
                // immediately...
                if (not self->m_qMessagesIn.empty()) {
                  auto msg = std::move(self->m_qMessagesIn.front());
                  self->m_qMessagesIn.pop_front();
                  // Calling the handler means returning the value to the
                  // coroutine that co_awaits message().
                  std::move(handler)(std::move(msg));
                } else 
                // ... otherwise save handler for when a message arrives
                {
                  self->m_qfnMessageReturners.emplace_back(std::move(handler));
                }
              },
              token);
        }

或者,硬编码用于(即用于协程),我们可以编写:asio::use_awaitable

        auto message() {
          return asio::async_initiate<decltype(asio::use_awaitable), void(ice::net::message<T>)>(
              [self = this->shared_from_this()](auto&& handler) mutable {
                 ...
              },
              asio::use_awaitable);
        }