提问人:Niranjan 提问时间:4/25/2023 最后编辑:Peter CsalaNiranjan 更新时间:4/25/2023 访问量:151
一旦在 kafka 中消耗了所有消息,如何退出 while 循环?
How to come out of while loop once all messages consumed in kafka?
问:
网络核心控制台应用程序。我有以下kafka消费者的实现
var config = new ConsumerConfig
{
GroupId = groupId,
BootstrapServers = brokerList,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword,
SecurityProtocol = SecurityProtocol.SaslSsl,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => logger.LogInformation($"Error: {e.Reason}"))
.SetStatisticsHandler((_, json) => logger.LogInformation($"Statistic{json}"))
.Build())
consumer.Subscribe(topic);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume();
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
if (consumeResult?.Message == null) { return; }
var mess = consumeResult.Message.Value;
var vesselScoreFleetData = JsonConvert.DeserializeObject<VesselScoreFleet>(mess);
vesselScoreFleets.Add(vesselScoreFleetData);
logger.LogInformation($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
try
{
consumer.StoreOffset(consumeResult);
}
catch (KafkaException e)
{
logger.LogError($"Store Offset error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
logger.LogError($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
logger.LogError("Closing consumer.");
consumer.Close();
}
使用上面的代码,我能够成功地使用事件。但是一旦我的代码读取了所有消息,它就会卡在 var 行中。我的期望是,当没有数据可供使用时,它应该脱离循环并执行序列中的后续步骤。我可以知道如何解决它吗?任何帮助将不胜感激。谢谢consumeResult = consumer.Consume();
答: 暂无答案
评论