如何在不阻止 kafka 消息消费的情况下异步运行 Polly.NET

How to run Polly.NET asynchronously without blocking kafka message consumption

提问人:ILoveProgramming123 提问时间:9/15/2023 最后编辑:Peter CsalaILoveProgramming123 更新时间:9/15/2023 访问量:111

问:

我有一个 Kafka 使用者,它在使用消息时调用外部 API。Polly.NET 用作重审机制,以防调用不成功。

当前解决方案的问题在于重审机制阻止了下一条消息的消费,因此下一条消息必须等待重审机制完成。

知道我如何异步运行重审机制以便我可以继续下一条消息吗?

下面的示例演示了所描述的问题:

using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;


var config = new ConsumerConfig
{
    BootstrapServers = "host1:9092,host2:9092",
    GroupId = "foo",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    // Awaiting retry policy here will block the consumption of next message
    var result = await GetRetryPolicy().ExecuteAsync(async () =>
    {
        // CALL AN API HERE...
        return new HttpResponseMessage(System.Net.HttpStatusCode.OK);
    });
}


IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
        .WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}
c# 。网 Apache 卡夫卡 kafka-consumer-api 波莉

评论


答:

1赞 nimeom 9/15/2023 #1

为了完全实现不阻塞的异步处理,确实应该避免等待消息消耗。

下面是修改后的代码:

using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;

var config = new ConsumerConfig
{
    BootstrapServers = "host1:9092,host2:9092",
    GroupId = "foo",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    var retryPolicy = GetRetryPolicy();

    while (true)
    {
        var result = consumer.Consume(); // This is a synchronous call

        // Start the asynchronous execution of the retry policy
        _ = retryPolicy.ExecuteAsync(async () =>
        {
            // CALL AN API HERE asynchronously, without awaiting the response...
            await YourApiCallAsync(); // Your API call should be asynchronous

            // Continue processing or log the result as needed
        });
    }
}

IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
        .WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}

在此代码中,不会等待调用,从而允许 Kafka 使用者继续异步处理消息。该调用还会在不等待响应的情况下启动 API 调用的异步执行,从而确保它不会阻塞消息消耗循环。result = consumer.Consume()retryPolicy.ExecuteAsync

评论

0赞 ILoveProgramming123 9/15/2023
谢谢你的回答。关于 .NET 中的异步的另一个问题。我们如何确保异步执行不会“丢失”。不使用关键字会创建新线程吗?如何确保在主程序继续使用消息时运行重审。最后一个问题。重审机制结束后,我如何获得 Polly.NET 结果。我想记录总失败次数。在制定策略时可以包含该信息吗?await.OnRetry
1赞 Peter Csala 9/15/2023 #2

每当你从同步执行世界转向异步执行世界时,就会出现一堆新问题。

让我们看看其中的几个。使处理异步的最基本方法是将主题消费与消息处理分开。通常,它以有一个专用线程从 Kafka 主题获取消息的方式完成,并在反序列化后将消息处理分派给工作线程。(通常有一个包含可重用线程的工作线程池。

您将在这里遇到的第一个问题是偏移量的提交。我应该什么时候这样做?在调度之前还是在工作线程完成处理之后?

如果我们采用调度前的方法(因此,自动提交偏移),那么我们面临的下一个问题是错误处理。如何处理在处理(包括重试)期间失败的消息?我应该把它们放回主题(比如重新排队)以供以后使用吗?或者我应该把它放在一个名为死信队列的特殊主题中,以以不同的方式处理这些消息?或者干脆丢弃它们?

如果我们使用工作线程完成后的方法,那么我们又会遇到一堆问题。消费者应用程序必须从处理的角度维护我们在哪里。我们确定处理偏移之前的所有消息的每个分区的最新偏移量是多少?我们什么时候应该提交这些偏移量?

另一个挑战是消息排序。假设您有两条消息(M1 和 M2)。假设 M2 的时间戳大于 M1 的时间戳。M2 消息处理完美而快速地运行。但是 M1 的处理有点颠簸。由于暂时性网络故障,外部服务呼叫必须重复多次。换句话说,在某个时间点,M2(这是一条较新的消息)已被处理,但 M1 尚未处理。在这种情况下,我应该提交哪个偏移量?


我的观点是,创建一个性能良好且正确的异步消费者并不像拥有一个同步消费者那么容易。因此,我会仔细检查承诺的可扩展性是否值得付出努力。