阿西奥。错误:“由于线程退出或应用程序请求,I/O 操作已中止”

Asio. Error: "The I/o operation has been aborted because of either a thread exit or an application requests"

提问人:Jésus Christophe 提问时间:8/4/2023 更新时间:8/4/2023 访问量:179

问:

我正在尝试使用 asio 创建异步服务器,但是当接受器调用 async_accept 函数时,我收到此错误 I/o 操作已中止,因为线程退出或应用程序请求,这不允许进程继续。我尝试更改端口,但它不起作用。当我尝试制作一个同步服务器使其正常工作时。 此外,我使用 asio 作为独立于 boost 的库。

这是我收到错误的函数:

void startAsyncAccept()
    {
        acceptor.async_accept(socket, [&](const asio::error_code& error)
            {
                if(!error)
                {
                    std::cout << "Client is connected" << "\n";
                    startAsyncRead();
                }
                else
                {
                    
                    std::cerr << "It is: " << error.message() << '\n';
                    return 0;
                    
                }
                startAsyncAccept();


            }
        );
        
    }

答:这是我的异步服务器:

class Server
{
public:
    Server(asio::io_context& io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 2000)), socket(io_context)
    {
        startAsyncAccept();
    }
    ~Server() {}
private:
    // Members
    asio::ip::tcp::acceptor acceptor;
    asio::ip::tcp::socket socket;
    asio::streambuf buffer;
    std::vector<asio::ip::tcp::socket> sockets;

    // Functions
    void startAsyncAccept()
    {
        acceptor.async_accept(socket, [&](const asio::error_code& error)
            {
                if(!error)
                {
                    std::cout << "Client is connected" << "\n";
                    startAsyncRead();
                }
                else
                {
                    
                    std::cerr << "It is: " << error.message() << '\n';
                    return 0;
                    
                }
                startAsyncAccept();


            }
        );
        
    }

    void startAsyncRead()
    {
        asio::async_read_until(socket, buffer, '\n', [&](const asio::error_code& error, size_t length)
            {
                if(!error)
                {
                    std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
                    std::cout << "Received from client: " << message;

                    for(auto& clients : sockets)
                    {
                        asio::async_write(clients, asio::buffer(message), [&](const asio::error_code& error, size_t length) 
                            {
                                if(error)
                                {
                                    std::cerr << "Failed to write to client: " << error.message() << "\n";
                                }
                            }
                        );
                    }
                    buffer.consume(length);
                    startAsyncRead();

                }
                else
                {
                    std::cout << "Client disconnected." << std::endl;
                    removeClient();
                }
            }
        );
    }

    void startAsyncWrite() 
    {
        asio::async_write(socket, asio::buffer("Connected to chat Server. \n"), [&](const asio::error_code& error, size_t length) 
            {
                if(error)
                {
                    std::cerr << "Failed to write to the client: " << error.message() << "\n";
                    removeClient();
                }
            }
        );
    }
    void removeClient()
    {
        auto it = std::find_if(sockets.begin(), sockets.end(),
            [&](const auto& client_socket)
            {
                return &client_socket == &socket;
            });

        if (it != sockets.end())
        {
            sockets.erase(it);
        }
    }


};
C++ 异步 boost-asio asio

评论


答:

2赞 sehe 8/4/2023 #1

首先,有 Undefined Behavior,因为 lambda 在某些路径上错过了返回。这似乎是某种复制粘贴错误,所以我将删除毫无意义的错误。return 0;

真正的问题

真正的问题是,您在每次接受时都使用相同的方法。这意味着,一旦再次调用,您就会重置启动第一个套接字的套接字。这会导致错误代码。socketasyncReadstartAsyncAcceptasio::error::operation_aborted

只是不要使用相同的套接字。将其移动到“会话”对象中,例如,或者,正如您似乎想做的那样,将它们放在您的集合中。sockets

定影?

具有讽刺意味的是,很难知道你想如何使用它,因为它在你当前的代码中实际上是未使用的(除了永远不会迭代的循环,因为总是空的)。socketssockets

更糟糕的是,没有工作是件好事,否则你会得到更多的 UB,因为会重新分配以增加容量,这意味着对现有套接字的任何引用都是无效的。std::vector

处理此问题的通常方法不是包含套接字,而是集合指向会话对象的(弱)共享指针。为简单起见,让我通过更改和删除有问题的变量来修复此处的引用稳定性。std::vectorstd::listsocket

tcp::socket              socket;
asio::streambuf          buffer;
std::vector<tcp::socket> sockets;

成为

asio::streambuf        buffer;
std::list<tcp::socket> sockets;

也有很大的问题async_write

  • 使用局部变量作为缓冲区。这又是 UB,因为它的生存期在写入操作完成之前就结束了message
  • 您必须避免重叠通话。由于您无法控制完成时间(并且可能具有任意数量的连接客户端),因此无法确定没有写入重叠。唯一的解决方案是有一个“发件箱”队列 - 通常是每个客户端,用于每个连接的独立操作。async_read_until

顺便说一句,目前尚不清楚应该做什么(它未使用,并且无助于解决刚才描述的问题)。同样,即使你已经填充了,它也不会做任何事情,它只会由于套接字引用的无效而导致 UB。我现在放弃了这两个。startAsyncWriteremoveClientsockets

最后,你使用一个 streambuf,看起来你可以直接使用一个动态字符串缓冲区。无论如何,请将更接近“消耗”,以免在某些错误条件下错过它。consume

简化版 V1

这个简化版本删除了您提出的问题以及 :message

在 Coliru 上直播

class Server {
  public:
    Server(asio::io_context& io_context) //
        : acceptor(io_context, {{}, 2000}) {
        startAsyncAccept();
    }

  private:
    // Members
    tcp::acceptor          acceptor;
    asio::streambuf        buffer;
    std::list<tcp::socket> sockets;

    void startAsyncAccept() {
        acceptor.async_accept([&](error_code error, tcp::socket accepted) {
            if (!error) {
                auto& client = sockets.emplace_back(std::move(accepted));
                std::cout << "Client is connected\n";
                startAsyncRead(client);
            } else {
                std::cerr << "It is: " << error.message() << "\n";
            }
            startAsyncAccept();
        });
    }

    void startAsyncRead(tcp::socket& client) {
        asio::async_read_until(client, buffer, '\n', [&](error_code error, size_t n) {
            if (!error) {
                auto f = asio::buffers_begin(buffer.data()), l = f + static_cast<ptrdiff_t>(n);
                auto message = std::make_shared<std::string>(f, l);
                buffer.consume(n);
                std::cout << "Received from client: " << message;

                for (auto& peer : sockets) {
                    // SEHE: TODO FIXME serializing writes per peer!
                    asio::async_write(
                        peer, asio::buffer(*message), [&, message](error_code error, size_t /*length*/) {
                            if (error) {
                                std::cerr << "Failed to write to client: " << error.message() << "\n";
                            }
                        });
                }
                startAsyncRead(client);
            } else {
                std::cout << "Client disconnected." << std::endl;
                // SEHE: TODO ERASE
            }
        });
    }
};

已修复:客户端会话

还有一些工作来解决其他问题:

[早餐后]

更新你绝对需要一个单独的,因为我之前(早餐前)错过了你也错误地将同一个实例用于所有连接......类型是对传入缓冲区、传出缓冲区队列和套接字进行分组的逻辑位置。SessionstreambufSession

我们一起使用,以便在连接损坏/关闭时获得自动生命周期管理。enable_shared_from_thisshared_from_this()

请注意,这一切都归结为正确的关注点分离:服务器管理侦听,聊天室管理连接的客户端,会话管理单个客户端。

在 Coliru 上直播

#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <list>
#ifndef STANDALONE_ASIO
    namespace asio = boost::asio;
    using boost::system::error_code;
#else
    using asio::error_code;
#endif
using asio::ip::tcp;

struct Session;
using SessionPtr = std::shared_ptr<Session>;
using Handle     = std::weak_ptr<Session>;

struct ChatRoom {
    std::list<Handle> clients_;

    void garbage_collect();
    void broadcast(std::string_view message);

    // just for example:
    void direct_message(std::string recipient, std::string_view message);
};

struct Session : std::enable_shared_from_this<Session> {
    Session(tcp::socket s, ChatRoom& room) : socket_(std::move(s)), room_(room) {}
    void start() { readLoop(); }

    void send(std::string_view message){
        outbox_.emplace_back(message);
        if (outbox_.size() == 1)
            writeLoop();
    }

  private:
    tcp::socket             socket_;
    ChatRoom&               room_;
    asio::streambuf         incoming_;
    std::deque<std::string> outbox_;

    void readLoop() {
        asio::async_read_until(
            socket_, incoming_, '\n', [&, self = shared_from_this()](error_code ec, size_t n) {
                if (ec) {
                    std::cout << "Client disconnect (" << ec.message() << ")" << std::endl;
                    return;
                }

                auto f = asio::buffers_begin(incoming_.data()), l = f + static_cast<ptrdiff_t>(n);
                room_.broadcast(std::string(f, l));
                incoming_.consume(n);

                readLoop();
            });
    }

    void writeLoop() {
        if (outbox_.empty())
            return;

        asio::async_write( //
            socket_, asio::buffer(outbox_.front()),
            [this, self = shared_from_this()](error_code ec, size_t /*length*/) {
                outbox_.pop_front();
                if (ec)
                    std::cerr << "Failed to write to client: " << ec.message() << "\n";
                else
                    writeLoop();
            });
    }
};

class Server {
  public:
    Server(asio::any_io_executor ex) : acceptor_(ex, {{}, 2000}) { acceptLoop(); }

  private:
    tcp::acceptor acceptor_;
    ChatRoom      room_;

    void acceptLoop() {
        room_.garbage_collect(); // optionally prevent dead connections piling up

        acceptor_.async_accept([&](error_code ec, tcp::socket accepted) {
            if (!ec) {
                auto sess = std::make_shared<Session>(std::move(accepted), room_);
                room_.clients_.push_back(sess);
                std::cout << "Client is connected\n";

                sess->start();
            } else {
                std::cerr << "Accept error: " << ec.message() << "\n";
            }
            acceptLoop();
        });
    }
};

void ChatRoom::garbage_collect() {
    clients_.remove_if(std::mem_fn(&Handle::expired));
}

void ChatRoom::broadcast(std::string_view message) {
    for (auto& handle : clients_) {
        if (auto peer = handle.lock()) {
            peer->send(message);
        }
    }
}

using namespace std::chrono_literals;
int main() {
    asio::io_context io;
    Server chat(io.get_executor());

    io.run_for(30s);
}

与多个客户端一起测试:

for a in {1..10}; do (
        sleep 1.$RANDOM
        echo "hello from $a"
        sleep 1.$RANDOM
        echo "bye from $a"
) | nc 127.0.0.1 2000 -w3 | (while read line; do echo "Client $a received '$line'"; done) & done

打印输出,如下所示:

enter image description here

也住在 Coliru :) 上

评论

1赞 sehe 8/4/2023
添加了组织得当的演示,结果我什至错过了另一个问题,所以你也绝对需要那个修复程序
0赞 Jésus Christophe 8/4/2023
谢谢你的帮助和解释,你真的帮助了我!
0赞 sehe 8/4/2023
我假设您在单个 IO 线程上运行服务器,就像我的示例一样。否则,您需要链进行同步。