提问人:bruce reed 提问时间:11/17/2023 最后编辑:bruce reed 更新时间:11/17/2023 访问量:30
Cloud Stream v4(反应式)—无法通过SpringBoot 3使用简单转换功能
cloud stream v4 (reactive) - unable to use simple transform function using springboot 3
问:
我是Spring Cloud Stream的新手。我无法使用转换功能(消费消息、转换、生成消息)。消费者功能运行良好。
@Bean
public Function<String, String> processorBinding() {
return s -> s + " :: " + System.currentTimeMillis();
}
application.yml 中的绑定配置
spring:
cloud:
function:
definition: processorBinding
stream:
bindings:
processorBinding-in-0:
destination: processor-topic
processorBinding-out-0:
destination: consumer-topic
kafka:
binder:
replicationFactor: 1
brokers:
- localhost:9092
通过 GUI kafka 工具向 processor-topic 发送简单字符串消息时出错。
输出错误在此处检查日志
2023-11-17T13:47:22.584+01:00 ERROR 21151 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@2574a232], failedMessage=GenericMessage [payload=byte[25], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@778d6951, kafka_receivedTopic=processor-topic, target-protocol=kafka, kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5b098190, source-type=kafka, id=441f0410-7efb-1780-f210-d2e1be910c10, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1700225238940, contentType=application/json, kafka_groupId=anonymous.8d99852a-9b33-43d1-b269-da2f03a7ae4c, timestamp=1700225242574}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.handleMessage(KafkaMessageChannelBinder.java:1594)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185)
我实际上想在apache kafka中使用来自主题的消息,进行一些数据处理和检查(包括IO操作),并将处理后的数据发送到另一个kafka主题。确保消息传递平台将来可以更改,因此使用云流。让我知道如何使用springboot 3.x和生态系统中最好的依赖项来实现这一点。
预期是消息从 (processor-topic) 消费,并根据 config 中的绑定发送到另一个主题 (consumer-topic)。
答: 暂无答案
评论