提问人:Siva Ram 提问时间:7/3/2018 最后编辑:Matthias J. SaxSiva Ram 更新时间:2/2/2019 访问量:3720
无法在 loh4j2 的 appender [Kafka] java.util.concurrent.TimeoutException 中写入 Kafka
Unable to write to Kafka in appender [Kafka] java.util.concurrent.TimeoutException in Loh4j2
问:
我正在尝试将日志从 log4j2 流式传输到 Kafka 的主题。 我的 zookeeper 和 kafka 服务器正在运行。我已经为此创建了一个主题。
Unable to write to Kafka in appender [Kafka] java.util.concurrent.TimeoutException: Timeout after waiting for 30000 ms.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.send(KafkaManager.java:116)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.tryAppend(KafkaAppender.java:169)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.append(KafkaAppender.java:150)
at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:433)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
at org.apache.logging.log4j.core.config.LoggerConfig.logParent(LoggerConfig.java:439)
at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:434)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:403)
at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:63)
at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:146)
at org.apache.logging.slf4j.Log4jLogger.log(Log4jLogger.java:376)
我的动物园管理员和卡夫卡如下
[2018-07-03 18:38:05,866] INFO Client attempting to establish new session at
/127.0.0.1:57207 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-07-03 18:38:05,867] INFO Creating new log file: log.cb
(org.apache.zookeeper.server.persistence.FileTxnLog)
[2018-07-03 18:38:05,899] INFO Established session 0x1646041bc4d0000 with
negotiated timeout 6000 for client /127.0.0.1:57207
(org.apache.zookeeper.server.ZooKeeperServer)
kafka:
[2018-07-03 18:38:05,807] INFO [ZooKeeperClient] Waiting until connected.
(kafka.zookeeper.ZooKeeperClient)
[2018-07-03 18:38:05,807] INFO Opening socket connection to server
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL
(unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,810] INFO Socket connection established to
127.0.0.1/127.0.0.1:2181, initiating session
(org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,901] INFO Session establishment complete on server
127.0.0.1/127.0.0.1:2181, sessionid = 0x1646041bc4d0000, negotiated timeout =
6000 (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,905] INFO [ZooKeeperClient] Connected.
(kafka.zookeeper.ZooKeeperClient)
和消费者代码如下
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.19.102.93:9092");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>
(consumerConfig);
TestConsumerRebalanceListener rebalanceListener = new
TestConsumerRebalanceListener();
consumer.subscribe(Collections.singletonList("TestKafkaTopic"),
rebalanceListener);
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
Log4j2 配置:
<Kafka name="Kafka" topic="TestKafkaTopic">
<PatternLayout pattern="|%-5p|%d{yyyy-MM-dd|HH:mm:ss,SSS}|%X{InterfaceId}|%X{SeqNo}|%X{Ouid} %X{srch1} %X{BussRef}|${sys:hostname}|${sys:ApplicationComponent}|%X{ExternalRefSend}|%m||%C{6}:%L|%t%n"/>
<Property name="metadata.broker.list">****:9092</Property>
<Property
name="serializer.class">kafka.serializer.StringEncoder</Property>
<Property name="bootstrap.servers">****:9092</Property>
</Kafka>
知道我做错了什么。由于我的主题是使用 satndalone 代码,当我测试使用 log4j2 使用生产者位发送消息时,它无法将错误发送给消费者。
答:
3赞
GoPal
2/2/2019
#1
我也遇到了同样的问题 log4j2 kafka appender。由于根日志级别设置为跟踪,它失败了。我试图将org.apache.kafka的记录器级别提高到等于或高于INFO的水平,但其他所有内容都记录在所需的日志级别(例如:跟踪,调试)。在那之后,它开始工作。
编号: https://github.com/danielwegener/logback-kafka-appender/issues/44
上一个:如何在同一活动中来回移动
下一个:子查询中的分组依据未按预期工作
评论