NATS Core + Nats C 客户端消息丢失

NATS Core + Nats C Client message loss

提问人:Walter 提问时间:7/7/2023 最后编辑:Walter 更新时间:7/20/2023 访问量:195

问:

我在 NATS 发布(核心,还不是 Jetstream)时丢失了消息。

在 Windows 上使用 NATS CLI 订阅为子“>”

在本地LAN上的Linux Ubuntu上使用NATS服务器。

使用 NATS C 客户端(最新 GitHub 版本)的 Windows 上的应用程序。

以下代码重现了该问题(可能在客户端的 FAST CPU 上。 我使用了 AMD Threadripper 16、32 和 64 核以及 Intel i7-10810U,它们都有)。

专用于此测试的 SINGLE 消息、空闲网络和 NATS 服务器已经出现问题,因此 NATS 服务器上没有其他流量或重负载。

您需要提供与此方法的登录连接(代码未显示,包含我的密钥)。选择“使用”案例“1、2 或 3 进行测试,以查看不同的方案和解决方法。

   #include "nats.h"

natsStatus PublishIt(natsConnection* nc) {

// Create subject
std::string subject = "test";

// Fill a buffer with data to send
char* buf = new char[1024];
int len = sprintf(buf, "This is a reliability test to see if NATS looses messages on fast systems and if possibly the provided buffer is cloned after the natsConnection_Publish() function allready returned. If that is the case it would explain NATS high performance but while being unreliable depending on the underlying CPU speed and thread-lottery.");

// Publish
natsStatus nstat = natsConnection_Publish(nc, subject.c_str(), (const void*) buf, len);
if (nstat != NATS_OK) { printf("natsConnection_Publish() Failed"); return nstat; }  // <<< Never failed

// Select the test according remarks next to the 'case' statements.
int selectTest = 3;
switch (selectTest)
{
    case 1: // This looses messages. NATS CLI doesn't display

        delete[] buf;
        break;

    case 2: // This is a memory leak BUT NEVER looses any message and above text appears on NATS CLI
            // Will eventually run out of memory of course and isn't an acceptable solution.
            
        // do nothing, just don't delete buf[]
        break;

    case 3: // This is a workaround that doesn't loose messages and NATS CLI shows text BUT it looses performance.

        nstat = natsConnection_Flush(nc);
        if (nstat != NATS_OK) printf("NATS Flush Failed: %i", nstat); // <<< Flush never failed.
        delete[] buf;
        break;
}
return nstat;}

有没有人有比上面的 flush() 更好的解决方案。有人告诉我,在更快的 CPU 中,或者如果核心专用成为可能,这种解决方法将不成立。我的理由是 flush() 只是为一些底层异步创造了足够的时间。操作,以在删除缓冲区之前使用缓冲区。

在断开连接之前,我尝试使用具有 2 秒超时的单个 flush(),但这不起作用。刷新必须介于发布调用和删除缓冲区之间。这意味着必须在每次发布时调用它,这是一个性能问题。

http://nats-io.github.io/nats.c/group__conn_pub_group.html#gac0b9f7759ecc39b8d77807b94254f9b4 的文档没有说明调用方是否需要放弃缓冲区,因此我将其删除。也许还有其他文件,但上面的文件声称是官方的。

感谢您提供任何其他信息。

消息代理 nats.io nats-streaming-server

评论


答:

0赞 Walter 7/20/2023 #1

经过更多的测试和 Nats C 团队的一些良好信息,以下回答了这个问题。

  1. 在断开连接之前,最后的 flush() 可以工作,但在快速 CPU 上,最好在刷新和断开连接之间构建一点 std::thread_sleep。刷新是一个异步函数,在连接保持打开状态时需要一点时间才能执行。

  2. 每次发布后都不需要刷新,只要 publish 语句继续填充缓冲区即可。因此,集合中的第一条消息、单条消息或最后一条消息可能需要刷新。在实践中,在某些情况下,了解“最后”消息可能很困难。

  3. 使用计时器进行刷新,在发布后重置,也可以解决一些明显的长时间徘徊的问题。

通过上述手段,NATS Core 仍然非常高性能,并且不再丢失任何消息。

可能要记住的一句话是:Flush is Async。

感谢 NATS 团队在 GitHub 上的帮助。