一旦在 kafka 中消耗了所有消息,如何退出 while 循环?

How to come out of while loop once all messages consumed in kafka?

提问人:Niranjan 提问时间:4/25/2023 最后编辑:Peter CsalaNiranjan 更新时间:4/25/2023 访问量:151

问:

网络核心控制台应用程序。我有以下kafka消费者的实现

var config = new ConsumerConfig
{
    GroupId = groupId,
    BootstrapServers = brokerList,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = saslUsername,
    SaslPassword = saslPassword,
    SecurityProtocol = SecurityProtocol.SaslSsl,
    AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
  .SetErrorHandler((_, e) => logger.LogInformation($"Error: {e.Reason}"))
  .SetStatisticsHandler((_, json) => logger.LogInformation($"Statistic{json}"))
  .Build())
consumer.Subscribe(topic);

try
{
    while (true)
    {
        try
        {
            var consumeResult = consumer.Consume();

            if (consumeResult.IsPartitionEOF)
            {
                Console.WriteLine(
                    $"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                continue;
            }
            if (consumeResult?.Message == null) { return; }
            var mess = consumeResult.Message.Value;
            var vesselScoreFleetData = JsonConvert.DeserializeObject<VesselScoreFleet>(mess);
            vesselScoreFleets.Add(vesselScoreFleetData);
            logger.LogInformation($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
            try
            {
                consumer.StoreOffset(consumeResult);
            }
            catch (KafkaException e)
            {
                logger.LogError($"Store Offset error: {e.Error.Reason}");
            }
        }
        catch (ConsumeException e)
        {
            logger.LogError($"Consume error: {e.Error.Reason}");
        }
    }
}
catch (OperationCanceledException)
{
    logger.LogError("Closing consumer.");
    consumer.Close();
}

使用上面的代码,我能够成功地使用事件。但是一旦我的代码读取了所有消息,它就会卡在 var 行中。我的期望是,当没有数据可供使用时,它应该脱离循环并执行序列中的后续步骤。我可以知道如何解决它吗?任何帮助将不胜感激。谢谢consumeResult = consumer.Consume();

C# ASP.NET 核心 confluent-kafka-dotnet

评论

0赞 Roe 4/25/2023
你的 while 循环始终为 true,因此它不会停止。您需要创建一个条件,一旦所有消息都被使用,该条件就会更新为 false。您是否考虑过 foreach 循环?这基本上完全符合您的需求。
0赞 Niranjan 4/25/2023
谢谢罗伊。我怎么知道我必须做假的或读到最后的信息?
0赞 Roe 4/25/2023
我认为如果您遵循教程并仔细阅读它是明智的。看来你错过了一些核心点。

答: 暂无答案