gRPC async_client中的内存泄漏

Memory leak in gRPC async_client

提问人:lior.i 提问时间:12/23/2020 最后编辑:lior.i 更新时间:7/19/2021 访问量:2617

问:

我以与示例类似的方式使用。gRPCasync client

在这个例子中(在官方发布)中,客户端为要发送的消息分配内存,使用地址作为 ,当消息在侦听器线程中被应答时,内存(已知为 - 地址)是空闲的。gRPCgithubtagcompletion queuetag

我担心服务器不响应消息并且内存永远不会空闲的情况。

  • 能保护我免受这种情况的影响吗?gRPC
  • 我应该以不同的方式实现它吗?(使用智能指针/将指针保存在数据结构中/等)

异步客户端发送函数

void SayHello(const std::string& user) {
    // Data we are sending to the server.
    HelloRequest request;
    request.set_name(user);

    // Call object to store rpc data
    AsyncClientCall* call = new AsyncClientCall;

    // Because we are using the asynchronous API, we need to hold on to
    // the "call" instance in order to get updates on the ongoing RPC.
    call->response_reader =
        stub_->PrepareAsyncSayHello(&call->context, request, &cq_);

    // StartCall initiates the RPC call
    call->response_reader->StartCall();

    call->response_reader->Finish(&call->reply, &call->status, (void*)call);

}

线程的异步客户端接收函数

void AsyncCompleteRpc() {
    void* got_tag;
    bool ok = false;

    // Block until the next result is available in the completion queue "cq".
    while (cq_.Next(&got_tag, &ok)) {
        // The tag in this example is the memory location of the call object
        AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);

        // Verify that the request was completed successfully. Note that "ok"
        // corresponds solely to the request for updates introduced by Finish().
        GPR_ASSERT(ok);

        if (call->status.ok())
            std::cout << "Greeter received: " << call->reply.message() << std::endl;
        else
            std::cout << "RPC failed" << std::endl;

        // Once we're complete, deallocate the call object.
        delete call;
    }
}

主要

int main(int argc, char** argv) {


    GreeterClient greeter(grpc::CreateChannel(
            "localhost:50051", grpc::InsecureChannelCredentials()));

    // Spawn reader thread that loops indefinitely
    std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);

    for (int i = 0; i < 100; i++) {
        std::string user("world " + std::to_string(i));
        greeter.SayHello(user);  // The actual RPC call!
    }

    std::cout << "Press control-c to quit" << std::endl << std::endl;
    thread_.join();  //blocks forever

    return 0;
}
c 内存泄漏 grpc-c++

评论


答:

2赞 user4442671 7/19/2021 #1

gRPC 是否能保护我免受这种情况的影响?

金达。gRPC 保证所有排队的操作迟早都会进入其匹配的完成队列。因此,只要满足以下条件,您的代码就可以了:

  • 在不幸的时刻不会抛出任何例外。
  • 不会对创建不包括排队操作或删除调用的代码路径的代码进行更改。

换句话说:没关系,但很脆弱。

选项 A:

如果你想真正强大,要走的路是.但是,它们可能会以意想不到的方式破坏多线程性能。因此,它是否值得取决于您的应用程序在性能与健壮性方面所处的位置。std::shared_ptr<>

这样的重构如下所示:

  1. 继承自AsyncClientCallstd::enable_shared_from_this
  2. 将构造更改为callstd::make_shared<AsyncClientCall>()
  3. 在完成队列处理程序中,增加 ref-count:
while (cq_.Next(&got_tag, &ok)) {
    auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();

并摆脱,显然。delete

选项B:

您还可以通过以下方式获得体面的折衷措施:unique_ptr<>

    auto call = std::make_unique<AsyncClientCall>();
    ...
    call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());

    std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};

这样可以防止重构和异常,同时保留其他所有内容。但是,这仅适用于生成单个完成事件的一元 rpc。流式处理 rpcs 或交换元数据的 rpc 将需要完全不同的处理方式。

评论

0赞 lior.i 7/19/2021
感谢您的回答,在服务器不响应某些消息的情况下会发生什么?消息会停留在队列中等待吗?队列会溢出吗?在这种情况下,我们可以从旧消息中清除队列吗?
1赞 7/19/2021
@lior.i 如果服务器没有响应,RPC 迟早会失败,要么是超时,要么是套接字关闭,要么是以某种方式取消。此时,该标记将以 non-ok 状态放置在队列中。
0赞 7/19/2021
“在这种情况下,我们可以从旧消息中清除队列吗?”您可以随时取消通话。但一般来说,增加一个截止日期就足够了。
0赞 lior.i 7/19/2021
取消通话是什么意思?
0赞 7/19/2021
TryCancel(),但正如我所说,set_deadline() 通常是首选,因为它是即发即弃的