Cloud Stream v4(反应式)—无法通过SpringBoot 3使用简单转换功能

cloud stream v4 (reactive) - unable to use simple transform function using springboot 3

提问人:bruce reed 提问时间:11/17/2023 最后编辑:bruce reed 更新时间:11/17/2023 访问量:30

问:

我是Spring Cloud Stream的新手。我无法使用转换功能(消费消息、转换、生成消息)。消费者功能运行良好。

    @Bean
    public Function<String, String> processorBinding() {
        return s -> s + " :: " + System.currentTimeMillis();
    }

application.yml 中的绑定配置

spring:
  cloud:
    function:
      definition: processorBinding
    stream:
      bindings:
        processorBinding-in-0:
          destination: processor-topic
        processorBinding-out-0:
          destination: consumer-topic
      kafka:
        binder:
          replicationFactor: 1
          brokers:
            - localhost:9092

通过 GUI kafka 工具向 processor-topic 发送简单字符串消息时出错。

输出错误在此处检查日志

2023-11-17T13:47:22.584+01:00 ERROR 21151 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@2574a232], failedMessage=GenericMessage [payload=byte[25], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@778d6951, kafka_receivedTopic=processor-topic, target-protocol=kafka, kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5b098190, source-type=kafka, id=441f0410-7efb-1780-f210-d2e1be910c10, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1700225238940, contentType=application/json, kafka_groupId=anonymous.8d99852a-9b33-43d1-b269-da2f03a7ae4c, timestamp=1700225242574}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.handleMessage(KafkaMessageChannelBinder.java:1594)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185)

我实际上想在apache kafka中使用来自主题的消息,进行一些数据处理和检查(包括IO操作),并将处理后的数据发送到另一个kafka主题。确保消息传递平台将来可以更改,因此使用云流。让我知道如何使用springboot 3.x和生态系统中最好的依赖项来实现这一点。

预期是消息从 (processor-topic) 消费,并根据 config 中的绑定发送到另一个主题 (consumer-topic)。

apache-kafka spring-cloud-stream spring-boot-3

评论

0赞 Oleg Zhurakousky 11/17/2023
您提供的堆栈跟踪不完整,并且不显示任何相关内容。上面的函数非常微不足道,我们有许多测试来验证这种情况,所以你没有向我们展示一些东西。请考虑创建一个重现该问题的小应用程序并将其推送到 Github,以便我们查看。
0赞 bruce reed 11/17/2023
github.com/brucereed1807/spring-kafka 使用 java、maven、springboot 3.1.5 及其相关依赖项从 Spring Initializer 创建了此存储库
0赞 bruce reed 11/17/2023
另外,我们是否有用于春季相关讨论的松弛/不和谐频道?
0赞 Oleg Zhurakousky 11/17/2023
我们曾经这样做过,但社区很少参与,而是将其用作免费的支持渠道,因此我们关闭了它,因为我们的资源非常有限
0赞 bruce reed 11/17/2023
嗨,奥列格,这不是问题。就我而言,经纪人版本很旧。我正在寻找在本地设置 kafka 的更简单方法,并遇到了一个 docker 映像 spotify/kafka....没检查是7岁。适用于最新的 kafka 版本。

答: 暂无答案