提问人:Zohar81 提问时间:9/21/2023 最后编辑:Zohar81 更新时间:9/21/2023 访问量:69
在 boost.asio 中实现 readers-writer 锁定,并具有 writer 优先级
Implementing readers-writer lock in boost.asio with writer prioritization
问:
我的应用程序处理 3 种类型的任务,这些任务由异步外部源(用户按下按钮)或内部(超时过期、正在进行的任务触发另一个任务、网络事件等)发起。
上述每个任务的特点是等待时间长(即发送网络请求或等待网络响应),在此期间它可以合作暂停并移动到另一个任务。
此外,每个任务(无论类型如何)都使用一个新连接进行单个发送和接收操作。
http::response<http::dynamic_body> resp;
boost::beast::http::request<boost::beast::http::string_body> req;
boost::beast::flat_buffer buf;
http::async_write(stream, req, yield);
http::async_read(stream, buf, resp, yield);
我希望从运行相同基本io_context的同一组线程执行所有这些任务。相关代码如下:
boost::thread_group threads;
num_threads = 0;
for(; num_threads < kMaxThreads ; num_threads++) {
threads.create_thread([] {
try {
io_context_.run();
} catch (const std::exception& e) {
print("exception found");
}
});
}
任务描述:
Type1:发送带有 Authorization 标头的常规 https 请求,其中包含一些预先获取的 OAuth2 访问令牌 - 这些任务可以并行运行,因此我从 spawn/io_context 调用它们:
boost::asio::io_context io_context_; boost::asio::spawn(io_context_, [](const boost::asio::yield_context &yield) { ... }
Type2:是基于 Oauth2 的任务,用于刷新访问令牌并获取新令牌,以换取旧令牌失效。
此任务必须串行执行,因为在执行过程中,我们无法知道它是否已经到达服务器并且旧令牌是否失效。
另一个原因是,我可以决定重新连接到另一台服务器,因此我想推迟其他任务,直到我获得该服务器的令牌。
Type3:发送不带令牌的 http 请求,以便它们可以与前两种类型并行执行。
实现 Type2 的理想方法是使用不带异步操作的 post/strand (yield)
boost::asio::strand<boost::asio::io_context::executor_type> strand_
// initialization in class c'tor
strand_(io_context_.get_executor())
boost::asio::post(strand_, []() {...}
但是,由于我的网络 API 使用协程,并且为了允许类型 3 任务运行,我需要使用 spawn/strand,但以某种方式阻止类型 2 任务进入队列。所以我的第一个问题是如何正确实现它?
在当前的实现中,我保证第一种类型的任务将并行执行,而第二种类型的任务将单独执行,所有待处理的任务都将等待其终止。
现在我想确定第二个任务的优先级,因此一旦该任务进入队列,它将在所有当前正在执行的任务终止后首先进入。有什么办法可以使用boost.asio吗?
答:
你的故事似乎缺少一个聚会。
例如,“我有 3 种类型的任务”:即使改写为不那么以人类为中心的“有 3 种类型的任务”或“应用程序导致 3 种类型的任务”,我脑海中浮现的问题是:“谁在造成任务”。
另一个角度是当你描述Type3任务时(不是第一次):“Type3:发送没有令牌的http请求,以便它们可以与前两种类型并行执行” - 无论如何,安静的省略是将这些任务与应用程序联系在一起的原因。
我有一种感觉,答案不仅仅是“什么都没有”(尽管 Type3 似乎很接近?谁知道呢)。具体来说,它 Type2 禁止 Type1 继续,这肯定是基于身份验证“会话”(或您命名的令牌生命周期)。
现在,一切都可能很简单,因为此类会话只有一个实例,因此身份验证令牌/连接终结点是应用程序域的单一实例。但是,通常情况是应用程序处理多个会话。
无论会话是否是单例会话,您的领域模型都将要求您确认它,以免您遇到大量意外的复杂性(您似乎已经发现了)。
然后,您可以为会话提供对消息进行排队的功能,区分需要身份验证的消息和不需要身份验证的消息。然后,当应用程序逻辑确定需要刷新/替换身份验证/终结点元组时,它可以
- 设置标志以暂停启动经过身份验证的 (Type1) 请求
- 异步等待所有待处理的请求完成,
- 执行重新身份验证请求 (Type2)
- 重置标志以恢复启动经过身份验证的请求
实现此逻辑的方法有很多种,包括(但不限于)一个
- Type[12]的调度链,使用共享状态变量和计数器的集合来跟踪会话生命周期中的位置
- 单独的队列和微服务(作为 COROS)为它们提供服务
- 一个成熟的FSM模型(如果你能做对的话,这可能是最容易测试和维护的方法)
事件?
无论您选择如何实现它,您都需要一个原语,该原语允许您发布延续以响应特定于应用程序的事件。
这通常可以使用 Asio 计时器对象进行建模。例如,“无限”计时器可以用作手动复位事件(例如,在 Type1 使用者链中,它表示最后一个待处理的 Type1 请求刚刚完成)。cancel()
计时器接口具有双接口,允许您手动完成等待或“恢复服务员”:
方法 描述 cancel
取消正在等待计时器的任何异步操作。 cancel_one
取消一个正在等待计时器的异步操作。
请注意,为了提高鲁棒性,您可以使用类似 instead 之类的东西来更轻松地可靠地检测边缘情况(请参阅安全地取消提升 asio 截止时间计时器)。expires_at(clock_type::timepoint::min())
cancel()
摘要/杂项
我希望这能给你一些想法来推进你的设计。关闭门处的一些零钱:
问。现在我想确定第二个任务的优先级,因此一旦该任务进入队列,它将在所有当前正在执行的任务终止后首先进入。有什么办法可以使用boost.asio吗?
有一些示例演示如何创建具有不同级别优雅的优先级的优先级处理程序:
- https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp
- https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/example/cpp14/executors/priority_scheduler.cpp
我敦促您首先专注于对应用程序域进行建模。然后,如果现有工具适合,则可以使用它。否则,总是喜欢根据自己的需求对核心业务进行编码。
奖金领取
我突然想到,一个自然的建模方式是让调度器自由地承担任何任务。但是,作为 Type1 coro 中的必要步骤,您将异步请求最新的身份验证令牌:
auto token = session->get_current_auth_token(yield);
现在,您可以像这样对 Type2 进行建模:
invalidate_current_auth_token();
session_idle.async_wait(yield);
auto new_token = async_request_new_token(yield);
set_current_auth_token(new_token);
其中可以是前面描述的计时器对象。可以使用其他计时器对象来表示令牌准备就绪,或者使用实验性::p romise
?session_idle
set_current_auth_token
免责声明:我没有使用这些的经验。
评论
However, usually the picture is that an application handles multiple sessions
session
评论