boost::beast 解析 http 流块时出现“流截断”错误

boost::beast "stream truncated" error when parsing http stream chunks

提问人:Ian Gaspariunas 提问时间:7/28/2023 最后编辑:Ian Gaspariunas 更新时间:7/28/2023 访问量:150

问:

我曾经通过HTTPS流获取数据。(特别是来自 EXANTE API )。 我的代码与 Beast 库官方文档中的示例几乎相同:分块编码boost::beast

一切正常,但定期读取块会因错误而中断。我无法理解这些错误的原因。是我的代码,还是互联网连接或服务器的问题,或者连接被代理服务器或防火墙 (idk) 中断。"Stream truncated""Partial message"

更新:通过 Wireshark 进行的流量分析表明,在错误发生之前,我收到了一条 TLS 消息。但我不明白它是什么意思以及如何解释它。"Encrypted alert (21)"

下面显示了我的代码的最大简化版本。 P.S. 数据来自测试帐户,因此不是秘密。

#include <string>
#include <iostream>

#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>
#include <boost/algorithm/string.hpp>

#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>

#include <boost/beast.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/http.hpp>

using namespace std::string_literals;
namespace net   = boost::asio;
namespace ssl   = boost::asio::ssl;
namespace ip    = boost::asio::ip;
namespace beast = boost::beast;
namespace http  = boost::beast::http;
using tcp = boost::asio::ip::tcp;

std::string base64_encode(const std::string& val) {
    using namespace boost::archive::iterators;
    using It = base64_from_binary<transform_width<std::string::const_iterator, 6, 8>>;
    auto tmp = std::string(It(std::begin(val)), It(std::end(val)));
    return tmp.append((3 - val.size() % 3) % 3, '=');
}

int main() {

    const auto clientID = "6dc42e50-0b01-444e-bae0-3a431c88b525"s;
    const auto key = "TTQmWnbHlr5OWlNSWl1P"s;
    const auto host = "api-demo.exante.eu"s;
    const auto target = "/trade/3.0/stream/orders"s;
    const auto port = "443"s;
    const auto http_version = 11;
    const auto auth_token = "Basic "s + base64_encode(clientID + ':' + key);
    const auto accept_value = "application/x-json-stream"s;
    const auto timeout = std::chrono::seconds(12);

    http::request<http::string_body> request{http::verb::get, target, http_version};
    request.set(http::field::host, host);
    request.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
    request.set(http::field::authorization, auth_token);
    request.set(http::field::accept, accept_value);
    request.prepare_payload();

    net::io_context ioc;

    ssl::context ctx(ssl::context::tlsv12_client);
    tcp::resolver resolver(ioc);
    beast::ssl_stream<beast::tcp_stream> stream(ioc, ctx);
    if (!SSL_set_tlsext_host_name(stream.native_handle(), host.c_str())) {
        beast::error_code ec{static_cast<int>(::ERR_get_error()), net::error::get_ssl_category()};
        throw beast::system_error{ec};
    }
    auto const results = resolver.resolve(host, port);

    beast::get_lowest_layer(stream).connect(results);
    stream.handshake(ssl::stream_base::client);

    http::write(stream, request);

    beast::flat_buffer buffer;
    http::parser<false, http::empty_body> p;

    http::read_header(stream, buffer, p);

    http::chunk_extensions ce;
    std::string chunk;

    auto header_cb =
            [&](std::uint64_t size,
                beast::string_view extensions,
                beast::error_code &ev) {
                ce.parse(extensions, ev);
                if (ev)
                    return;

                if (size > (std::numeric_limits<std::size_t>::max)()) {
                    ev = http::error::body_limit;
                    return;
                }

                chunk.reserve(static_cast<std::size_t>(size));
                chunk.clear();
            };
    p.on_chunk_header(header_cb);

    auto body_cb =
            [&](std::uint64_t remain,
                beast::string_view body,
                beast::error_code &ec) {
                if (remain == body.size())
                    ec = http::error::end_of_chunk;

                chunk.append(body.data(), body.size());

                return body.size();
            };
    p.on_chunk_body(body_cb);

    std::function<void(const beast::error_code &, size_t)> onRead = [&](const auto &ec, auto bytes_transferred) {
        if (ec && ec != http::error::end_of_chunk)
            throw std::runtime_error(ec.what());

        for (auto const &extension: ce) {
            std::cout << "Extension: " << extension.first;
            if (!extension.second.empty())
                std::cout << " = " << extension.second << std::endl;
            else
                std::cout << std::endl;
        }

        static int counter = 0;
        std::cout << "Chunk Body: " << chunk << counter++ << " @@@!" << std::endl;
        chunk.clear();
        if (!p.is_done()) {
            stream.next_layer().expires_after(timeout);
            http::async_read(stream, buffer, p, onRead);
        }
    };

    stream.next_layer().expires_after(timeout);
    http::async_read(stream, buffer, p, onRead);

    ioc.run();
    
    return 0;
}

我已经尝试了函数的同步和异步版本,无论如何都会出现问题。http::read

该错误发生在不同网络上的三个不同设备上。

根据 API 开发人员的说法,对 HTTP 流侦听的持续时间没有限制。 每次在几分钟到几小时的随机时间段后都会出现问题。

c++ https boost-beast http-streaming

评论

0赞 sehe 7/28/2023
“根据 API 开发人员的说法,对 HTTP 流侦听的持续时间没有限制”——请附上指向该文档的链接。此外,“无限制”并不意味着“无限收听的保证”。事实上,从技术上讲,这种保证是不存在的。
0赞 Ian Gaspariunas 7/28/2023
@sehe 不幸的是,文档中没有说明这一点,我不得不联系技术支持才能找到答案。但是,这似乎不是故意的服务器端关闭,因为即使同时在 3 台设备上运行时,此错误也会在不同的时间发生,有时相差几个数量级。
0赞 sehe 7/28/2023
所以?您可能会有一些网络不稳定。事实上,这很常见。如果您经常断开连接,您可能会找出原因(例如,使用 wireshark 或消除交换机/路由器和代理等瓶颈)
0赞 Ian Gaspariunas 7/28/2023
@sehe 但是,例如,当通过websocket接收数据或通过rest api与服务器交互时,我不会遇到这样的错误。所以我不能确定问题出在互联网连接上,但我也不能排除它。我真的不明白有什么方法可以检查流中断的原因:由于程序错误、互联网问题或服务器行为的特殊性
0赞 sehe 7/28/2023
添加调试跟踪或使用数据包转储程序(pcap、tcpdump、ethereal、wireshark 等)。

答: 暂无答案