Boost::Asio 异步客户端和服务器代理实现 - 跨实例访问客户端和服务器套接字

Boost::Asio Asynchronous Client and Server Proxy implentation - Accessing client and server socket across the instances

提问人:saravanan_1987 提问时间:10/17/2023 更新时间:10/17/2023 访问量:69

问:

我正在努力实现一个客户端和服务器代理系统,其中代理负责接收来自客户端的命令并将其转发到服务器,反之亦然。但是,我在从服务器实例访问客户端套接字以将消息转发到客户端时遇到了挑战[反之亦然]。我目前正在学习 Boost.Asio 行为。您能建议在服务器和客户端实例之间共享套接字的最有效方法吗?以下是当前的实现

在 cmd_handler::read_cmd_done API 中 - client_server_.is_open() 为 0 但不是 1。

#pragma once
#ifndef __OAMIP_PROXY_H__
#define __OAMIP_PROXY_H__

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <vector>
#include <functional>
#include <deque>
#include "config.h"

namespace asio = boost::asio;
class cmd_handler: public std::enable_shared_from_this<cmd_handler>
{
public:
    cmd_handler(asio::io_context &io_context, AppConfig* appConfig);
    ~cmd_handler();
    asio::ip::tcp::socket &socket();
    asio::ip::tcp::socket &c_socket();
    asio::ip::tcp::endpoint remote_endpoint();
    asio::ip::tcp::endpoint c_remote_endpoint();
    asio::io_context &m_io_context();
    void start();
    void read_cmd();
    void read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
    void read_cmd_client();
    void read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
private:
    asio::io_context& io_context_;
    asio::ip::tcp::socket server_socket_;
    asio::ip::tcp::socket client_socket_;
    asio::io_context::strand write_strand_;
    asio::streambuf in_packet_;
    std::deque<std::string> send_cmd_queue;
    std::mutex queue_mutex_;  // Added for thread safety
    AppConfig *app_config;
};


class ProxyServer
{
    using shared_handler_t = std::shared_ptr<cmd_handler>;
public:
    ProxyServer(int thread_count, AppConfig* appConfig);;
    ~ProxyServer();
    void start_server(std::string ip_addr, int port);
    void start_client(std::string ip_addr, int port);
    void handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec);
private:
    asio::io_context io_context_;
    int thread_count_;
    asio::ip::tcp::acceptor acceptor_;
    asio::ip::tcp::resolver resolver_;
    std::vector<std::thread> thread_pool_;
    asio::streambuf buffer_;
    AppConfig *app_config;
};

#endif //  __OAMIP_PROXY_H__

nagarajans1@PA168951:~/Projects/OpenAMIPProxy$ cat proxy.cpp
#include "proxy.h"
#include "parser.cpp"
#include "send.cpp"

ProxyServer::ProxyServer(int thread_count,
                         AppConfig *appconfig)
    : thread_count_(thread_count),
      acceptor_(io_context_),
      resolver_(io_context_),
      thread_pool_(),
      app_config(appconfig)
{
}
/*******************************************************************************************/
ProxyServer::~ProxyServer()
{
    // Stop and join the io_context to prevent memory leaks
    io_context_.stop();
    for (auto &thread : thread_pool_)
    {
        thread.join();
    }
}
/*******************************************************************************************/
void ProxyServer::start_server(std::string ip_addr, int port)
{
    std::cout << "Starting Server, IP:" << ip_addr << ", Port:" << port << std::endl;
    auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
    asio::ip::address_v4 ipv4_address = asio::ip::address_v4::from_string(ip_addr);
    asio::ip::tcp::endpoint endpoint(ipv4_address, port);
    acceptor_.open(endpoint.protocol());
    acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
    acceptor_.bind(endpoint);
    acceptor_.listen();
    std::cout << "Start listening" << std::endl;
    try
    {
        acceptor_.async_accept(handler->socket(), [=](auto ec)
                               {
                                    if(ec)
                                        std::cerr << "Error accepting connection from : " << ec.message() << std::endl;
                                    else
                                        handle_new_connection(handler, ec); });
    }
    catch (const std::exception &e)
    {
        std::cerr << "Exception caught: " << e.what() << std::endl;
    }
    io_context_.run();
        // start pool of threads to process the asio events
 /*        for (int i = 0; i < thread_count_; ++i)
    {
        thread_pool_.emplace_back([=]
                                  { io_context_.run(); });
    }
    for (auto &thread : thread_pool_)
    {
        thread.join();
    } */
}
void ProxyServer::start_client(std::string ip_addr, int port)
{
    std::cout << "Starting client" << std::endl;
    auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
    asio::ip::tcp::resolver::query query(ip_addr, std::to_string(port));
    asio::ip::tcp::resolver::iterator endpoint_iterator = resolver_.resolve(query);
    asio::connect(handler->c_socket(), endpoint_iterator);

    std::cout << "Connected to server on port " << port << std::endl;

    handler->read_cmd_client();
    io_context_.run();
}

/*******************************************************************************************/
void ProxyServer::handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec)
{
    std::cout << "Handle connection" << std::endl;
    if (ec)
    {
        std::cerr << "Error accepting connection from client: " << ec.message() << std::endl;
        return;
    }
    handler->read_cmd();
    auto new_handler = std::make_shared<cmd_handler>(io_context_, app_config);
    acceptor_.async_accept(new_handler->socket(), [=](auto ec)
                           { handle_new_connection(new_handler, ec); });
}
/*******************************************************************************************/
cmd_handler::cmd_handler(asio::io_context &io_context, AppConfig *appConfig)
    : io_context_(io_context), server_socket_(io_context), client_socket_(io_context),write_strand_(io_context), app_config(appConfig)
{

}
/*******************************************************************************************/
cmd_handler::~cmd_handler()
{
    // Explicitly clear the buffer to release the allocated memory
    in_packet_.consume(in_packet_.size());
}
/*******************************************************************************************/
asio::ip::tcp::socket &cmd_handler::socket()
{
    return server_socket_;
}
asio::ip::tcp::socket &cmd_handler::c_socket()
{
    return client_socket_;
}
asio::ip::tcp::endpoint cmd_handler::remote_endpoint()
{
    return server_socket_.remote_endpoint();
}
asio::ip::tcp::endpoint cmd_handler::c_remote_endpoint()
{
    return client_socket_.remote_endpoint();
}
asio::io_context &cmd_handler::m_io_context()
{
    return io_context_;
}
/*******************************************************************************************/
void cmd_handler::start()
{
    read_cmd();
}
/*******************************************************************************************/
void cmd_handler::read_cmd()
{
    auto remote_ip = remote_endpoint().address().to_string();
    std::cout << "Read command from " << remote_ip << std::endl;
    asio::async_read_until(server_socket_,
                           in_packet_,
                           '\n',
                           [me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
                           {
                               if (ec == asio::error::eof)
                               {
                                   std::cout << "Connection closed by client:" << std::endl;
                                   return; // No need to read further; the connection is closed.
                               }
                               else if (ec)
                               {
                                   std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
                                   return;
                               }
                               else
                               {
                                   me->read_cmd_done(ec, bytes_xfer);
                               }
                           });
}
void cmd_handler::read_cmd_client()
{
    std::cout << "Read cmd client" << std::endl;
    auto remote_ip = c_remote_endpoint().address().to_string();
    std::cout << client_socket_.is_open() << std::endl;
    std::cout << "Read command from " << remote_ip << std::endl;
    asio::async_read_until(client_socket_,
                           in_packet_,
                           '\n',
                           [me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
                           {
                               if (ec == asio::error::eof)
                               {
                                   std::cout << "Connection closed by client:" << std::endl;
                                   return; // No need to read further; the connection is closed.
                               }
                               else if (ec)
                               {
                                   std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
                                   return;
                               }
                               else
                               {
                                   me->read_cmd_client_done(ec, bytes_xfer);
                               }
                           });
}
void cmd_handler::read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
    auto remote_ip = remote_endpoint().address().to_string();
    if (ec == asio::error::eof)
    {
        std::cout << "Connection closed by client:" << remote_ip << std::endl;
        return; // No need to read further; the connection is closed.
    }
    else if (ec)
    {
        std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
        return;
    }

    std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
    in_packet_.consume(bytes_transferred);
    std::cout << "Connected server IP: " << remote_ip << std::endl;
    std::cout << "command:" << command << std::endl;
    Parser parser(app_config);
    std::string recv_cmd = parser.process_cmd(command);
    //CmdSender sender(server_socket_);
    //sender.send_cmd(recv_cmd);
    read_cmd();
};
/*******************************************************************************************/
void cmd_handler::read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
    auto remote_ip = remote_endpoint().address().to_string();
    if (ec == asio::error::eof)
    {
        std::cout << "Connection closed by client:" << remote_ip << std::endl;
        return; // No need to read further; the connection is closed.
    }
    else if (ec)
    {
        std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
        return;
    }

    std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
    in_packet_.consume(bytes_transferred);
    std::cout << "Connected client IP: " << remote_ip << std::endl;
    std::cout << "command:" << command << std::endl;
    Parser parser(app_config);
    std::string recv_cmd = parser.process_cmd(command);
    std::cout << client_socket_.is_open() <<c_socket().is_open() << socket().is_open()<< std::endl;
    CmdSender sender(client_socket_);
    sender.send_cmd(recv_cmd);
    read_cmd();
};
/*******************************************************************************************/
int main() {
  try
    {
    
        int thread_count = 1;
        int port = appConfig.serverConfig.port;
        int s_port = appConfig.clientConfig.port;
        std::vector<std::thread> thread_pool;
        std::string ip_addr = appConfig.serverConfig.ip;
   

        // start pool of threads to process the asio events
        for (int i = 0; i < thread_count; ++i)
        {
            thread_pool.emplace_back([&]()
                                     {  proxy_server.start_server(ip_addr, port); });
        }
        for (int i = 0; i < thread_count; ++i)
        {
                thread_pool.emplace_back([&]()
                                         { proxy_server.start_client(ip_addr, s_port);});
        }
        for (auto &thread : thread_pool)
        {
            thread.join();
        }
    }
    catch (const std::exception &e)
    {
        std::cerr << "Exception caught: " << e.what() << std::endl;
    }

    return 0;
}
C++ 套接字 boost-asio shared-ptr private-members

评论


答:

0赞 sehe 10/17/2023 #1
std::mutex queue_mutex_; // Added for thread safety

除非使用,否则互斥锁不会做任何事情。

asio::io_context::strand write_strand_;

链同步对共享资源的访问。可能你只需要股线,但不是为了“写”,而是为了......资源(异步操作中使用的 IO 对象和缓冲区)。

您可能在多个线程的相同端点上运行许多“服务器”和“客户端”。我不认为你想要那样。您可能只是想要让“服务器”接受多个连接。您可以通过在前一个连接之后接受更多连接来做到这一点。您已经在 的末尾这样做了:handle_new_connection

acceptor_.async_accept(new_handler->socket(),
                       [=](auto ec) { handle_new_connection(new_handler, ec); });

其他一些注意事项:

    // Explicitly clear the buffer to release the allocated memory
    in_packet_.consume(in_packet_.size());

这不是必需的。析构函数已经释放了所拥有的资源。那里不需要做任何事情。


会话管理

现在,从逻辑上讲,每个接受的客户端连接都会获得与代理服务器的新连接。没有必要协调。只需将上游连接移动到“CmdHandler”中(将其重命名为 ProxySession 或其他名称),问题就会自行解决。

我本来打算编辑东西,但这需要......超过合理的时间。相反,请考虑查看我最近审查的代理实现:Tcp proxy mysql。从 mysql-client 接收的数据以奇怪的符号输出,它显示了基本完全相同的想法。

评论

0赞 saravanan_1987 10/18/2023
感谢您的评论。我无权访问 tcp 代理的 gitlab 链接。您能否建议一种方法来访问 read_cmd() 函数上的“client_socket_”变量和 read_cmd_client() 函数上的“server_socket_”变量而不会丢失其值?使用当前代码,它不起作用。是否可以使用 client_fd 和 server_fd 等全局变量来存储套接字值以跨实例访问?
0赞 sehe 10/20/2023
不要那样做(全局变量“永远不会”没问题)。此外,我的回答描述了为什么您不需要访问外部状态:您的代理会话将同时拥有两个连接(上游和下游),并且会话不共享连接(否则您将获得损坏的流)。这是我在 BitBucket 上查看的代码 stackoverflow-sehe.s3.amazonaws.com/... (出于某种原因,他们称之为代理会话:))Bridge
0赞 saravanan_1987 11/8/2023
谢谢你的建议。在代理代码中,当它收到来自客户端的断开连接信号时[proxy as server],它会启动与服务器[proxy as client]的新连接。但是,如果客户端由于软件错误而重新启动,则存在耗尽服务器的最大文件描述符的风险,因为我们作为客户端的行为。你对这个问题有什么建议吗?
0赞 sehe 11/8/2023
“当它收到断开连接时 [...]它启动了与服务器的新连接“——为什么?客户走了,对吧?“耗尽服务器最大文件描述符的风险” - 我看不出这里是如何改变的,因为我假设至少与重新连接一样多的连接被丢弃。我认为,如何管理服务器负载是一个与此无关的不同问题。
0赞 saravanan_1987 11/8/2023
对不起,我上次的评论是不对的。在代理代码中,当它收到来自客户端的断开连接信号(代理充当服务器)时,它不会关闭与服务器端的相应会话[代理充当客户端]。当客户端重新连接时,它会通过充当客户端的代理与服务器建立新的连接。但是,如果客户端由于软件错误而反复重新启动,则存在耗尽服务器的最大文件描述符的潜在问题,因为代理会不断作为客户端启动新连接。