提问人:saravanan_1987 提问时间:10/17/2023 更新时间:10/17/2023 访问量:69
Boost::Asio 异步客户端和服务器代理实现 - 跨实例访问客户端和服务器套接字
Boost::Asio Asynchronous Client and Server Proxy implentation - Accessing client and server socket across the instances
问:
我正在努力实现一个客户端和服务器代理系统,其中代理负责接收来自客户端的命令并将其转发到服务器,反之亦然。但是,我在从服务器实例访问客户端套接字以将消息转发到客户端时遇到了挑战[反之亦然]。我目前正在学习 Boost.Asio 行为。您能建议在服务器和客户端实例之间共享套接字的最有效方法吗?以下是当前的实现
在 cmd_handler::read_cmd_done API 中 - client_server_.is_open() 为 0 但不是 1。
#pragma once
#ifndef __OAMIP_PROXY_H__
#define __OAMIP_PROXY_H__
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <vector>
#include <functional>
#include <deque>
#include "config.h"
namespace asio = boost::asio;
class cmd_handler: public std::enable_shared_from_this<cmd_handler>
{
public:
cmd_handler(asio::io_context &io_context, AppConfig* appConfig);
~cmd_handler();
asio::ip::tcp::socket &socket();
asio::ip::tcp::socket &c_socket();
asio::ip::tcp::endpoint remote_endpoint();
asio::ip::tcp::endpoint c_remote_endpoint();
asio::io_context &m_io_context();
void start();
void read_cmd();
void read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
void read_cmd_client();
void read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
private:
asio::io_context& io_context_;
asio::ip::tcp::socket server_socket_;
asio::ip::tcp::socket client_socket_;
asio::io_context::strand write_strand_;
asio::streambuf in_packet_;
std::deque<std::string> send_cmd_queue;
std::mutex queue_mutex_; // Added for thread safety
AppConfig *app_config;
};
class ProxyServer
{
using shared_handler_t = std::shared_ptr<cmd_handler>;
public:
ProxyServer(int thread_count, AppConfig* appConfig);;
~ProxyServer();
void start_server(std::string ip_addr, int port);
void start_client(std::string ip_addr, int port);
void handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec);
private:
asio::io_context io_context_;
int thread_count_;
asio::ip::tcp::acceptor acceptor_;
asio::ip::tcp::resolver resolver_;
std::vector<std::thread> thread_pool_;
asio::streambuf buffer_;
AppConfig *app_config;
};
#endif // __OAMIP_PROXY_H__
nagarajans1@PA168951:~/Projects/OpenAMIPProxy$ cat proxy.cpp
#include "proxy.h"
#include "parser.cpp"
#include "send.cpp"
ProxyServer::ProxyServer(int thread_count,
AppConfig *appconfig)
: thread_count_(thread_count),
acceptor_(io_context_),
resolver_(io_context_),
thread_pool_(),
app_config(appconfig)
{
}
/*******************************************************************************************/
ProxyServer::~ProxyServer()
{
// Stop and join the io_context to prevent memory leaks
io_context_.stop();
for (auto &thread : thread_pool_)
{
thread.join();
}
}
/*******************************************************************************************/
void ProxyServer::start_server(std::string ip_addr, int port)
{
std::cout << "Starting Server, IP:" << ip_addr << ", Port:" << port << std::endl;
auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
asio::ip::address_v4 ipv4_address = asio::ip::address_v4::from_string(ip_addr);
asio::ip::tcp::endpoint endpoint(ipv4_address, port);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
std::cout << "Start listening" << std::endl;
try
{
acceptor_.async_accept(handler->socket(), [=](auto ec)
{
if(ec)
std::cerr << "Error accepting connection from : " << ec.message() << std::endl;
else
handle_new_connection(handler, ec); });
}
catch (const std::exception &e)
{
std::cerr << "Exception caught: " << e.what() << std::endl;
}
io_context_.run();
// start pool of threads to process the asio events
/* for (int i = 0; i < thread_count_; ++i)
{
thread_pool_.emplace_back([=]
{ io_context_.run(); });
}
for (auto &thread : thread_pool_)
{
thread.join();
} */
}
void ProxyServer::start_client(std::string ip_addr, int port)
{
std::cout << "Starting client" << std::endl;
auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
asio::ip::tcp::resolver::query query(ip_addr, std::to_string(port));
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver_.resolve(query);
asio::connect(handler->c_socket(), endpoint_iterator);
std::cout << "Connected to server on port " << port << std::endl;
handler->read_cmd_client();
io_context_.run();
}
/*******************************************************************************************/
void ProxyServer::handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec)
{
std::cout << "Handle connection" << std::endl;
if (ec)
{
std::cerr << "Error accepting connection from client: " << ec.message() << std::endl;
return;
}
handler->read_cmd();
auto new_handler = std::make_shared<cmd_handler>(io_context_, app_config);
acceptor_.async_accept(new_handler->socket(), [=](auto ec)
{ handle_new_connection(new_handler, ec); });
}
/*******************************************************************************************/
cmd_handler::cmd_handler(asio::io_context &io_context, AppConfig *appConfig)
: io_context_(io_context), server_socket_(io_context), client_socket_(io_context),write_strand_(io_context), app_config(appConfig)
{
}
/*******************************************************************************************/
cmd_handler::~cmd_handler()
{
// Explicitly clear the buffer to release the allocated memory
in_packet_.consume(in_packet_.size());
}
/*******************************************************************************************/
asio::ip::tcp::socket &cmd_handler::socket()
{
return server_socket_;
}
asio::ip::tcp::socket &cmd_handler::c_socket()
{
return client_socket_;
}
asio::ip::tcp::endpoint cmd_handler::remote_endpoint()
{
return server_socket_.remote_endpoint();
}
asio::ip::tcp::endpoint cmd_handler::c_remote_endpoint()
{
return client_socket_.remote_endpoint();
}
asio::io_context &cmd_handler::m_io_context()
{
return io_context_;
}
/*******************************************************************************************/
void cmd_handler::start()
{
read_cmd();
}
/*******************************************************************************************/
void cmd_handler::read_cmd()
{
auto remote_ip = remote_endpoint().address().to_string();
std::cout << "Read command from " << remote_ip << std::endl;
asio::async_read_until(server_socket_,
in_packet_,
'\n',
[me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
{
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
return;
}
else
{
me->read_cmd_done(ec, bytes_xfer);
}
});
}
void cmd_handler::read_cmd_client()
{
std::cout << "Read cmd client" << std::endl;
auto remote_ip = c_remote_endpoint().address().to_string();
std::cout << client_socket_.is_open() << std::endl;
std::cout << "Read command from " << remote_ip << std::endl;
asio::async_read_until(client_socket_,
in_packet_,
'\n',
[me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
{
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
return;
}
else
{
me->read_cmd_client_done(ec, bytes_xfer);
}
});
}
void cmd_handler::read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
auto remote_ip = remote_endpoint().address().to_string();
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << remote_ip << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
return;
}
std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
in_packet_.consume(bytes_transferred);
std::cout << "Connected server IP: " << remote_ip << std::endl;
std::cout << "command:" << command << std::endl;
Parser parser(app_config);
std::string recv_cmd = parser.process_cmd(command);
//CmdSender sender(server_socket_);
//sender.send_cmd(recv_cmd);
read_cmd();
};
/*******************************************************************************************/
void cmd_handler::read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
auto remote_ip = remote_endpoint().address().to_string();
if (ec == asio::error::eof)
{
std::cout << "Connection closed by client:" << remote_ip << std::endl;
return; // No need to read further; the connection is closed.
}
else if (ec)
{
std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
return;
}
std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
in_packet_.consume(bytes_transferred);
std::cout << "Connected client IP: " << remote_ip << std::endl;
std::cout << "command:" << command << std::endl;
Parser parser(app_config);
std::string recv_cmd = parser.process_cmd(command);
std::cout << client_socket_.is_open() <<c_socket().is_open() << socket().is_open()<< std::endl;
CmdSender sender(client_socket_);
sender.send_cmd(recv_cmd);
read_cmd();
};
/*******************************************************************************************/
int main() {
try
{
int thread_count = 1;
int port = appConfig.serverConfig.port;
int s_port = appConfig.clientConfig.port;
std::vector<std::thread> thread_pool;
std::string ip_addr = appConfig.serverConfig.ip;
// start pool of threads to process the asio events
for (int i = 0; i < thread_count; ++i)
{
thread_pool.emplace_back([&]()
{ proxy_server.start_server(ip_addr, port); });
}
for (int i = 0; i < thread_count; ++i)
{
thread_pool.emplace_back([&]()
{ proxy_server.start_client(ip_addr, s_port);});
}
for (auto &thread : thread_pool)
{
thread.join();
}
}
catch (const std::exception &e)
{
std::cerr << "Exception caught: " << e.what() << std::endl;
}
return 0;
}
答:
std::mutex queue_mutex_; // Added for thread safety
除非使用,否则互斥锁不会做任何事情。
asio::io_context::strand write_strand_;
链同步对共享资源的访问。可能你只需要股线,但不是为了“写”,而是为了......资源(异步操作中使用的 IO 对象和缓冲区)。
您可能在多个线程的相同端点上运行许多“服务器”和“客户端”。我不认为你想要那样。您可能只是想要让“服务器”接受多个连接。您可以通过在前一个连接之后接受更多连接来做到这一点。您已经在 的末尾这样做了:handle_new_connection
acceptor_.async_accept(new_handler->socket(),
[=](auto ec) { handle_new_connection(new_handler, ec); });
其他一些注意事项:
// Explicitly clear the buffer to release the allocated memory
in_packet_.consume(in_packet_.size());
这不是必需的。析构函数已经释放了所拥有的资源。那里不需要做任何事情。
会话管理
现在,从逻辑上讲,每个接受的客户端连接都会获得与代理服务器的新连接。没有必要协调。只需将上游连接移动到“CmdHandler”中(将其重命名为 ProxySession 或其他名称),问题就会自行解决。
我本来打算编辑东西,但这需要......超过合理的时间。相反,请考虑查看我最近审查的代理实现:Tcp proxy mysql。从 mysql-client 接收的数据以奇怪的符号输出,它显示了基本完全相同的想法。
评论
Bridge
下一个:为什么模板只能在头文件中实现?
评论