提问人:Jésus Christophe 提问时间:8/4/2023 更新时间:8/4/2023 访问量:179
阿西奥。错误:“由于线程退出或应用程序请求,I/O 操作已中止”
Asio. Error: "The I/o operation has been aborted because of either a thread exit or an application requests"
问:
我正在尝试使用 asio 创建异步服务器,但是当接受器调用 async_accept 函数时,我收到此错误 I/o 操作已中止,因为线程退出或应用程序请求,这不允许进程继续。我尝试更改端口,但它不起作用。当我尝试制作一个同步服务器使其正常工作时。 此外,我使用 asio 作为独立于 boost 的库。
这是我收到错误的函数:
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "It is: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
答:这是我的异步服务器:
class Server
{
public:
Server(asio::io_context& io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 2000)), socket(io_context)
{
startAsyncAccept();
}
~Server() {}
private:
// Members
asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::socket socket;
asio::streambuf buffer;
std::vector<asio::ip::tcp::socket> sockets;
// Functions
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "It is: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
void startAsyncRead()
{
asio::async_read_until(socket, buffer, '\n', [&](const asio::error_code& error, size_t length)
{
if(!error)
{
std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
std::cout << "Received from client: " << message;
for(auto& clients : sockets)
{
asio::async_write(clients, asio::buffer(message), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
}
);
}
buffer.consume(length);
startAsyncRead();
}
else
{
std::cout << "Client disconnected." << std::endl;
removeClient();
}
}
);
}
void startAsyncWrite()
{
asio::async_write(socket, asio::buffer("Connected to chat Server. \n"), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to the client: " << error.message() << "\n";
removeClient();
}
}
);
}
void removeClient()
{
auto it = std::find_if(sockets.begin(), sockets.end(),
[&](const auto& client_socket)
{
return &client_socket == &socket;
});
if (it != sockets.end())
{
sockets.erase(it);
}
}
};
答:
首先,有 Undefined Behavior,因为 lambda 在某些路径上错过了返回。这似乎是某种复制粘贴错误,所以我将删除毫无意义的错误。return 0;
真正的问题
真正的问题是,您在每次接受时都使用相同的方法。这意味着,一旦再次调用,您就会重置启动第一个套接字的套接字。这会导致错误代码。socket
asyncRead
startAsyncAccept
asio::error::operation_aborted
只是不要使用相同的套接字。将其移动到“会话”对象中,例如,或者,正如您似乎想做的那样,将它们放在您的集合中。sockets
定影?
具有讽刺意味的是,很难知道你想如何使用它,因为它在你当前的代码中实际上是未使用的(除了永远不会迭代的循环,因为总是空的)。sockets
sockets
更糟糕的是,没有工作是件好事,否则你会得到更多的 UB,因为会重新分配以增加容量,这意味着对现有套接字的任何引用都是无效的。std::vector
处理此问题的通常方法不是包含套接字,而是集合指向会话对象的(弱)共享指针。为简单起见,让我通过更改和删除有问题的变量来修复此处的引用稳定性。std::vector
std::list
socket
tcp::socket socket;
asio::streambuf buffer;
std::vector<tcp::socket> sockets;
成为
asio::streambuf buffer;
std::list<tcp::socket> sockets;
也有很大的问题async_write
- 使用局部变量作为缓冲区。这又是 UB,因为它的生存期在写入操作完成之前就结束了
message
- 您必须避免重叠通话。由于您无法控制完成时间(并且可能具有任意数量的连接客户端),因此无法确定没有写入重叠。唯一的解决方案是有一个“发件箱”队列 - 通常是每个客户端,用于每个连接的独立操作。
async_read_until
顺便说一句,目前尚不清楚应该做什么(它未使用,并且无助于解决刚才描述的问题)。同样,即使你已经填充了,它也不会做任何事情,它只会由于套接字引用的无效而导致 UB。我现在放弃了这两个。startAsyncWrite
removeClient
sockets
最后,你使用一个 streambuf,看起来你可以直接使用一个动态字符串缓冲区。无论如何,请将更接近“消耗”,以免在某些错误条件下错过它。consume
简化版 V1
这个简化版本删除了您提出的问题以及 :message
class Server {
public:
Server(asio::io_context& io_context) //
: acceptor(io_context, {{}, 2000}) {
startAsyncAccept();
}
private:
// Members
tcp::acceptor acceptor;
asio::streambuf buffer;
std::list<tcp::socket> sockets;
void startAsyncAccept() {
acceptor.async_accept([&](error_code error, tcp::socket accepted) {
if (!error) {
auto& client = sockets.emplace_back(std::move(accepted));
std::cout << "Client is connected\n";
startAsyncRead(client);
} else {
std::cerr << "It is: " << error.message() << "\n";
}
startAsyncAccept();
});
}
void startAsyncRead(tcp::socket& client) {
asio::async_read_until(client, buffer, '\n', [&](error_code error, size_t n) {
if (!error) {
auto f = asio::buffers_begin(buffer.data()), l = f + static_cast<ptrdiff_t>(n);
auto message = std::make_shared<std::string>(f, l);
buffer.consume(n);
std::cout << "Received from client: " << message;
for (auto& peer : sockets) {
// SEHE: TODO FIXME serializing writes per peer!
asio::async_write(
peer, asio::buffer(*message), [&, message](error_code error, size_t /*length*/) {
if (error) {
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
});
}
startAsyncRead(client);
} else {
std::cout << "Client disconnected." << std::endl;
// SEHE: TODO ERASE
}
});
}
};
已修复:客户端会话
还有一些工作来解决其他问题:
[早餐后]
更新你绝对需要一个单独的,因为我之前(早餐前)错过了你也错误地将同一个实例用于所有连接......类型是对传入缓冲区、传出缓冲区队列和套接字进行分组的逻辑位置。Session
streambuf
Session
我们一起使用,以便在连接损坏/关闭时获得自动生命周期管理。enable_shared_from_this
shared_from_this()
请注意,这一切都归结为正确的关注点分离:服务器管理侦听,聊天室管理连接的客户端,会话管理单个客户端。
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <list>
#ifndef STANDALONE_ASIO
namespace asio = boost::asio;
using boost::system::error_code;
#else
using asio::error_code;
#endif
using asio::ip::tcp;
struct Session;
using SessionPtr = std::shared_ptr<Session>;
using Handle = std::weak_ptr<Session>;
struct ChatRoom {
std::list<Handle> clients_;
void garbage_collect();
void broadcast(std::string_view message);
// just for example:
void direct_message(std::string recipient, std::string_view message);
};
struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket s, ChatRoom& room) : socket_(std::move(s)), room_(room) {}
void start() { readLoop(); }
void send(std::string_view message){
outbox_.emplace_back(message);
if (outbox_.size() == 1)
writeLoop();
}
private:
tcp::socket socket_;
ChatRoom& room_;
asio::streambuf incoming_;
std::deque<std::string> outbox_;
void readLoop() {
asio::async_read_until(
socket_, incoming_, '\n', [&, self = shared_from_this()](error_code ec, size_t n) {
if (ec) {
std::cout << "Client disconnect (" << ec.message() << ")" << std::endl;
return;
}
auto f = asio::buffers_begin(incoming_.data()), l = f + static_cast<ptrdiff_t>(n);
room_.broadcast(std::string(f, l));
incoming_.consume(n);
readLoop();
});
}
void writeLoop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, size_t /*length*/) {
outbox_.pop_front();
if (ec)
std::cerr << "Failed to write to client: " << ec.message() << "\n";
else
writeLoop();
});
}
};
class Server {
public:
Server(asio::any_io_executor ex) : acceptor_(ex, {{}, 2000}) { acceptLoop(); }
private:
tcp::acceptor acceptor_;
ChatRoom room_;
void acceptLoop() {
room_.garbage_collect(); // optionally prevent dead connections piling up
acceptor_.async_accept([&](error_code ec, tcp::socket accepted) {
if (!ec) {
auto sess = std::make_shared<Session>(std::move(accepted), room_);
room_.clients_.push_back(sess);
std::cout << "Client is connected\n";
sess->start();
} else {
std::cerr << "Accept error: " << ec.message() << "\n";
}
acceptLoop();
});
}
};
void ChatRoom::garbage_collect() {
clients_.remove_if(std::mem_fn(&Handle::expired));
}
void ChatRoom::broadcast(std::string_view message) {
for (auto& handle : clients_) {
if (auto peer = handle.lock()) {
peer->send(message);
}
}
}
using namespace std::chrono_literals;
int main() {
asio::io_context io;
Server chat(io.get_executor());
io.run_for(30s);
}
与多个客户端一起测试:
for a in {1..10}; do (
sleep 1.$RANDOM
echo "hello from $a"
sleep 1.$RANDOM
echo "bye from $a"
) | nc 127.0.0.1 2000 -w3 | (while read line; do echo "Client $a received '$line'"; done) & done
打印输出,如下所示:
也住在 Coliru :) 上
评论
下一个:缩短 ASIO 协程调度时间
评论