提问人:Zohar81 提问时间:11/12/2023 更新时间:11/12/2023 访问量:55
Boost.asio 使用协程实现多个连接的 echo 服务器
Boost.asio implement echo server for multiple connections using coroutine
问:
我从boost.asio官方网页上修改了一个unix套接字回显服务器示例,并采用它来使用协程。
我想允许此服务器同时处理多个连接。 一种解决方案是从后台线程在类会话中运行方法 start,但我更喜欢协程,因此我尝试在单独的生成中运行每个侦听器。这是我的代码(编译版)
请注意,在方法中,当我只尝试一个侦听器时,它起作用了,但对于多个套接字,它不起作用。也许有人能告诉我我哪里做错了?谢谢!server::start
#include <cstdio>
#include <iostream>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
using namespace std::chrono_literals;
constexpr auto kTimeout = 1s;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
using boost::asio::local::stream_protocol;
using boost::asio::io_context;
class session: public boost::enable_shared_from_this<session>
{
public:
session(boost::asio::io_context& io_context)
: socket_(io_context),
timer_(io_context)
{
}
stream_protocol::socket& socket()
{
return socket_;
}
void start(const boost::asio::yield_context &yield)
{
while (1) {
auto bytes_read = boost::asio::async_read(socket_, boost::asio::buffer(data_, sizeof(data_)),
boost::asio::transfer_all(), yield);
//timer wait to simulate data processing
timer_.expires_after(kTimeout);
timer_.async_wait(yield);
auto bytes_transferred = boost::asio::async_write(socket_,
boost::asio::buffer(data_, bytes_read),
yield);
}
}
private:
// The socket used to communicate with the client.
stream_protocol::socket socket_;
// Buffer used to store data received from the client.
boost::array<char, 256> data_;
boost::asio::steady_timer timer_;
};
typedef boost::shared_ptr<session> session_ptr;
class server
{
public:
server(boost::asio::io_context& ioc, const std::string& file)
: io_context_(ioc),
strand_(io_context_.get_executor()),
acceptor_(strand_, stream_protocol::endpoint(file))
{
for (int i=0 ; i < 3; i++) {
boost::asio::spawn(strand_,
[this](const boost::asio::yield_context &yield) {
session_ptr new_session(new session(this->io_context_));
try {
acceptor_.async_accept(new_session->socket(), yield);
new_session->start(yield);
} catch (std::exception &e) {
std::cout << "new connection needed: " << e.what() << std::endl;
}
});
}
}
private:
boost::asio::io_service& io_context_;
boost::asio::strand<io_context::executor_type> strand_;
stream_protocol::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: stream_server <file>\n";
std::cerr << "*** WARNING: existing file is removed ***\n";
return 1;
}
boost::asio::io_context ioc;
std::remove(argv[1]);
server s(ioc, argv[1]);
ioc.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
#else // defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
# error Local sockets not available on this platform.
#endif // defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
答:
2赞
sehe
11/12/2023
#1
您只想运行 1 接受,并仅在此之后生成会话。
更好的是,您可以使用移动套接字的重载,并指定会话的链:您不需要所有会话共享保护服务器/接受器的同一链。async_accept
无需计数循环,只需链接async_accepts:
struct Server {
Server(asio::io_context& ioc, std::string const& file)
: io_context_(ioc)
, acceptor_(make_strand(ioc), Protocol::endpoint(file)) //
{
accept_loop();
}
private:
asio::io_service& io_context_;
//asio::strand<Executor> strand_;
Protocol::acceptor acceptor_;
void accept_loop() {
acceptor_.async_accept(make_strand(io_context_), [this](error_code ec, Protocol::socket s) {
if (ec != asio::error::operation_aborted)
accept_loop();
std::cout << "Accepted: " << ec.message();
if (!ec)
std::cout << " from " << s.remote_endpoint();
std::cout << std::endl;
});
}
};
你会注意到,为了简化,我改变了一些其他的东西:Live On Coliru
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
using namespace std::chrono_literals;
using namespace std::placeholders;
namespace asio = boost::asio;
constexpr auto kTimeout = 1s;
using Protocol = asio::local::stream_protocol;
using error_code = boost::system::error_code;
struct Session : public std::enable_shared_from_this<Session> {
Session(Protocol::socket s) : socket_(std::move(s)) {}
~Session() { std::cout << __FUNCTION__ << std::endl; }
void start(asio::yield_context yield) try {
for (;;) {
auto n = async_read(socket_, asio::buffer(data_), yield);
{ // simulate work
asio::steady_timer t(yield.get_executor(), kTimeout);
t.async_wait(yield);
}
n = async_write(socket_, asio::buffer(data_, n), yield);
}
} catch (boost::system::system_error const& se) {
std::cerr << "Session failed: " << se.code().message() << std::endl;
}
private:
Protocol::socket socket_;
std::array<char, 256> data_;
};
struct Server {
Server(asio::io_context& ioc, std::string const& file)
: io_context_(ioc)
, acceptor_(make_strand(ioc), Protocol::endpoint(file)) //
{
accept_loop();
}
private:
asio::io_service& io_context_;
Protocol::acceptor acceptor_;
void accept_loop() {
acceptor_.async_accept(make_strand(io_context_), [this](error_code ec, Protocol::socket s) {
if (ec != asio::error::operation_aborted)
accept_loop(); // continue accepting new connections
std::cout << "Accepted: " << ec.message() << std::endl;
auto strand = s.get_executor(); // copy before move
spawn(strand, bind(&Session::start, std::make_shared<Session>(std::move(s)), _1));
});
}
};
int main(int argc, char* argv[]) try {
if (argc != 2) {
std::cerr << "Usage: stream_server <file>\n"
"*** WARNING: existing file is removed ***\n";
return 1;
}
asio::io_context ioc;
std::remove(argv[1]);
Server s(ioc, argv[1]);
ioc.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
但是,简化更多!
现在,整个业务只是不必要的复杂化。毕竟,你有一个完整的 coro 帧,可以保持对象的活力。事实上,对象只不过是套接字和缓冲区。只需制作一个 coro:直播shared_ptr
Session
void Session(Protocol::socket& socket, asio::yield_context yield) try {
std::array<char, 256> data;
for (;;) {
auto n = async_read(socket, asio::buffer(data), yield);
{ // simulate work
asio::steady_timer t(yield.get_executor(), kTimeout);
t.async_wait(yield);
}
n = async_write(socket, asio::buffer(data, n), yield);
}
} catch (boost::system::system_error const& se) {
std::cerr << "Session failed: " << se.code().message() << std::endl;
}
跟
spawn(strand, bind(Session, std::move(s), _1));
旁注
另外,请记住,已经默认为 .您的方法可能没问题,因为您需要 256 字节的“消息”。但是,对我来说,这似乎不像是“回显服务器”。async_read
transfer_all
在实践中,我会:read_some
for (;;) {
auto n = socket.async_read_some(asio::buffer(data), yield);
{ // simulate work
asio::steady_timer t(yield.get_executor(), kTimeout);
t.async_wait(yield);
}
n = async_write(socket, asio::buffer(data, n), yield);
}
最后,您要处理 EOF,因为它可能会部分成功:
for (boost::system::error_code ec; !ec;) {
auto n = socket.async_read_some(asio::buffer(data), yield[ec]);
if (n) { // simulate work
asio::steady_timer t(yield.get_executor(), kTimeout);
t.async_wait(yield);
}
if (!ec || ec == asio::error::eof)
n = async_write(socket, asio::buffer(data, n), yield);
}
评论
0赞
sehe
11/12/2023
请考虑改用 c++20 协程(不是没有更多的链接器依赖项、更快的编译速度和更少的运行时开销): coliru.stacked-crooked.com/a/61eac493d30a0878
评论