无法在 loh4j2 的 appender [Kafka] java.util.concurrent.TimeoutException 中写入 Kafka

Unable to write to Kafka in appender [Kafka] java.util.concurrent.TimeoutException in Loh4j2

提问人:Siva Ram 提问时间:7/3/2018 最后编辑:Matthias J. SaxSiva Ram 更新时间:2/2/2019 访问量:3720

问:

我正在尝试将日志从 log4j2 流式传输到 Kafka 的主题。 我的 zookeeper 和 kafka 服务器正在运行。我已经为此创建了一个主题。

 Unable to write to Kafka in appender [Kafka] java.util.concurrent.TimeoutException: Timeout after waiting for 30000 ms.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.send(KafkaManager.java:116)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.tryAppend(KafkaAppender.java:169)
    at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.append(KafkaAppender.java:150)
    at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
    at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
    at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
    at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:433)
    at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
    at org.apache.logging.log4j.core.config.LoggerConfig.logParent(LoggerConfig.java:439)
    at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:434)
    at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
    at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:403)
    at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:63)
    at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:146)
    at org.apache.logging.slf4j.Log4jLogger.log(Log4jLogger.java:376)

我的动物园管理员和卡夫卡如下

[2018-07-03 18:38:05,866] INFO Client attempting to establish new session at 
/127.0.0.1:57207 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-07-03 18:38:05,867] INFO Creating new log file: log.cb 
(org.apache.zookeeper.server.persistence.FileTxnLog)
[2018-07-03 18:38:05,899] INFO Established session 0x1646041bc4d0000 with 
negotiated timeout 6000 for client /127.0.0.1:57207 
(org.apache.zookeeper.server.ZooKeeperServer)
kafka:
[2018-07-03 18:38:05,807] INFO [ZooKeeperClient] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)
[2018-07-03 18:38:05,807] INFO Opening socket connection to server 
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
(unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,810] INFO Socket connection established to 
127.0.0.1/127.0.0.1:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,901] INFO Session establishment complete on server 
127.0.0.1/127.0.0.1:2181, sessionid = 0x1646041bc4d0000, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2018-07-03 18:38:05,905] INFO [ZooKeeperClient] Connected. 
(kafka.zookeeper.ZooKeeperClient)

和消费者代码如下

Properties consumerConfig = new Properties();
  consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.19.102.93:9092");
  consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
  consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
 "org.apache.kafka.common.serialization.StringDeserializer");
  consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
 "org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>
 (consumerConfig);
  TestConsumerRebalanceListener rebalanceListener = new 
  TestConsumerRebalanceListener();
  consumer.subscribe(Collections.singletonList("TestKafkaTopic"), 
  rebalanceListener);

  while (true) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(1);
      for (ConsumerRecord<byte[], byte[]> record : records) {
          System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }

      consumer.commitSync();
  }

Log4j2 配置:

 <Kafka name="Kafka" topic="TestKafkaTopic">
                  <PatternLayout pattern="|%-5p|%d{yyyy-MM-dd|HH:mm:ss,SSS}|%X{InterfaceId}|%X{SeqNo}|%X{Ouid} %X{srch1} %X{BussRef}|${sys:hostname}|${sys:ApplicationComponent}|%X{ExternalRefSend}|%m||%C{6}:%L|%t%n"/>
<Property name="metadata.broker.list">****:9092</Property>
        <Property 
 name="serializer.class">kafka.serializer.StringEncoder</Property>
        <Property name="bootstrap.servers">****:9092</Property>

    </Kafka>

知道我做错了什么。由于我的主题是使用 satndalone 代码,当我测试使用 log4j2 使用生产者位发送消息时,它无法将错误发送给消费者。

apache-kafka kafka-consumer-api kafka-producer-api

评论


答:

3赞 GoPal 2/2/2019 #1

我也遇到了同样的问题 log4j2 kafka appender。由于根日志级别设置为跟踪,它失败了。我试图将org.apache.kafka的记录器级别提高到等于或高于INFO的水平,但其他所有内容都记录在所需的日志级别(例如:跟踪,调试)。在那之后,它开始工作。

编号: https://github.com/danielwegener/logback-kafka-appender/issues/44