提问人:MichaelO 提问时间:11/3/2023 最后编辑:MichaelO 更新时间:11/6/2023 访问量:83
使用虚拟回调围绕旧版 API 创建 asio 包装器
Creating an asio wrapper around legacy API with a virtual callback
问:
我有一个与硬件接口的旧库,它已经是异步的(通过中断),但想在它上面创建一个基于 asio 的接口。问题在于,基于绑定回调的 API 的 asio 示例都可以通过函数参数直接传递处理程序。但是这个 API 的工作方式是这样的:
class WriterBase
{
public:
...
bool writeMessage(Packet* pkt)
{
...
}
virtual wroteMessage(Packet* pkt) = 0;
};
因此,人们应该从这个类派生,在被覆盖的函数中调用并获取回调,一些处理程序的传递实际上只能通过类实例本身来完成。writeMessage()
wroteMessage()
使用此 API 的应用程序使用具有 C++20 个协程的 asio,并且此函数将使用 进行co_await。use_awaitable
我现在有点迷茫于如何通过正确的 asio 处理创建send_async函数。最简单的解决方案(来自 ChatGPT 建议)是创建一个 ,获取它的未来并在函数中设置 promise 的值。但这使其有效地阻塞了线程。std::promise
wroteMessage()
尝试使用示例 asio 回调示例,但我无法想出可以将处理程序正确传递给函数的解决方案。我陷入了很多模板混乱,我不完全清楚如何正确地做到这一点。wroteMessage()
如何解决这个问题?
更新:
下面是一个更详细的示例(包含更多注释):
// This class is from a legacy library and cannot be changed
#include <array>
#include <asio.hpp>
#include <atomic>
#include <memory>
#include <thread>
#include <chrono>
// Normally, this is a class
using Packet = std::array<uint8_t, 1024>;
class WriterBase
{
public:
virtual ~WriterBase() {}
void writeMessage(Packet *pkt)
{
using namespace std::chrono_literals;
// Simulation. In reality, the data of the packet
// is passed down to a driver which uses DMA to
// transfer the data. When finished, the device
// notifies via interrupt that the message has been
// written and calls wroteMessage.
std::thread(
[this, pkt]
{
std::this_thread::sleep_for(1s);
bool ret = wroteMessage(pkt);
if (ret)
{
delete pkt;
}
})
.detach();
}
protected:
virtual bool wroteMessage(Packet *pkt) = 0;
};
// Derived class, under my control
class Writer : public WriterBase
{
protected:
virtual bool wroteMessage(Packet *pkt) override
{
// Called by interrupt service routine in its own
// thread
// Here we should notify asio, that he operation is
// finished, aka the completion handler should be
// invoked
// if we return true, the packet will be
// deleted by the legacy library.
// If we return false, we need to take care of this
return true;
}
};
int main_without_asio()
{
auto writer = std::make_unique<Writer>();
std::atomic<bool> terminate{false};
while (!terminate.load())
{
// Some example. In reality, gets data from a network, converts
// it into the packet and sends it to the device
Packet *pkt = new Packet();
writer->writeMessage(pkt);
// other code continues here, wroteMessage will be called
// and a synchronized action is done, either via atomics
// or via a promise/future pair, where wroteMessage sets
// the result of the promise.
}
return 0;
}
// This is what I am aiming for
int main_with_asio()
{
asio::io_context context;
std::atomic<bool> terminate{false};
asio::co_spawn(
context,
[&terminate]
{
auto writer = std::make_unique<Writer>();
while (!terminate.load())
{
// Some example. In reality, gets data from a network, converts
// it into the packet and sends it to the device
Packet *pkt = new Packet();
co_await writer->write_async(pkt, asio::use_awaitable);
}
},
asio::detached);
context.run();
return 0;
}
因此,基本上 API 已经是异步的,并且是调用的“完成处理程序”。我没有回调参数,例如带有用户数据的函数指针或.在 和 之间传递数据(据我所知,这对于 CompletionToken 处理程序是必要的)基本上只能通过 Writer 类的成员变量来实现。还是有其他可能性,我只是没有看到?wroteMessage()
writeMessage
std::function
writeMessage
wroteMessage
我希望,这让它更清楚一些。
答:
让我们想象一下 API 的简单仿真:
using Packet = std::string;
void writeMessage(Packet msg, std::function<void(Packet)> wroteMessage) {
std::thread([=] {
std::this_thread::sleep_for(1s);
wroteMessage(msg);
}).detach();
}
一个简单的演示原样(Live On Coliru):
auto trace() { return std::osyncstream(std::cout); }
void demoClassic() {
writeMessage("oldfashioned",
[](Packet const& msg) { trace() << "completed " << quoted(msg) << std::endl; });
std::this_thread::sleep_for(1500ms);
}
打印预期的:
completed "oldfashioned"
异步启动
要与 Asio 无缝协作,“黄金路径”是编写一个启动函数,其中完成语义由完成令牌选择:
template <typename Token> auto asyncWrite(Packet msg, Token&& token) {
auto init = [](auto completion_handler, Packet msg) {
writeMessage(std::move(msg), std::move(completion_handler));
};
return asio::async_initiate<Token, void(Packet)>(init, token, std::move(msg));
}
可以以多种方式使用: Live On Coliru
void demoAsio() {
asio::io_context ioc;
auto work = make_work_guard(ioc);
asyncWrite("asio style", [](Packet const& msg) { trace() << "callback " << quoted(msg) << std::endl; });
std::future<Packet> fut = asyncWrite("asio style", asio::use_future);
ioc.run_for(1500ms);
trace() << "future " << quoted(fut.get()) << std::endl;
}
印刷
completed "oldfashioned"
callback "asio style"
future "asio style"
添加 c++23 coros:Live On Compiler Explorer
asio::awaitable<void> cpp20_coro() {
Packet msg = co_await asyncWrite("cpp20 rocks", asio::deferred);
trace() << "cpp20 style " << quoted(msg) << std::endl;
}
或者改用 Boost Context 进行协程:Live On Compiler Explorer
void stackful_coro(asio::yield_context yield) {
Packet msg = asyncWrite("cpp11 rocks too", yield);
trace() << "stackful_coro " << quoted(msg) << std::endl;
}
笔记
在实践中,您将希望将工作防护作为操作的一部分,以便用户不必担心。
上市供参考
#include <functional>
#include <string>
#include <thread>
using namespace std::literals;
using Packet = std::string;
void writeMessage(Packet msg, std::move_only_function<void(Packet)> wroteMessage) {
std::thread([=, f = std::move(wroteMessage)]() mutable {
std::this_thread::sleep_for(1s);
std::move(f)(msg);
}).detach();
}
void demoClassic();
void demoAsio();
int main() {
demoClassic();
demoAsio();
}
#include <iomanip>
#include <iostream>
#include <syncstream>
auto trace() { return std::osyncstream(std::cout); }
void demoClassic() {
writeMessage("oldfashioned",
[](Packet const& msg) { trace() << "completed " << quoted(msg) << std::endl; });
std::this_thread::sleep_for(1500ms);
}
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
namespace asio = boost::asio;
template <typename Token> auto asyncWrite(Packet msg, Token&& token) {
auto init = [](auto completion_handler, Packet msg) {
writeMessage(std::move(msg), std::move(completion_handler));
};
return asio::async_initiate<Token, void(Packet)>(init, token, std::move(msg));
}
asio::awaitable<void> cpp20_coro() {
Packet msg = co_await asyncWrite("cpp20 rocks", asio::deferred);
trace() << "cpp20 style " << quoted(msg) << std::endl;
}
void stackful_coro(asio::yield_context yield) {
Packet msg = asyncWrite("cpp11 rocks too", yield);
trace() << "stackful_coro " << quoted(msg) << std::endl;
}
void demoAsio() {
asio::io_context ioc;
auto work = make_work_guard(ioc);
asyncWrite("asio style", [](Packet const& msg) { trace() << "callback " << quoted(msg) << std::endl; });
std::future<Packet> fut = asyncWrite("asio style", asio::use_future);
co_spawn(ioc, cpp20_coro, asio::detached);
spawn(ioc, stackful_coro);
ioc.run_for(1500ms);
trace() << "future " << quoted(fut.get()) << std::endl;
}
本地输出:
$ time ./build/sotest
completed "oldfashioned"
callback "asio style"
cpp20 style "cpp20 rocks"
stackful_coro "cpp11 rocks too"
future "asio style"
real 0m3,006s
user 0m0,006s
sys 0m0,000s
评论
writeMessage
wroteMessage
Packet*
std::thread
wroteMessage
Packet*
wirteMessage
评论
async_compose