将 org.springframework.messaging.Message 与 Protobuf 对象一起使用作为 kafka 消息时出现 ClassCast 错误

ClassCast error when using org.springframework.messaging.Message with Protobuf object as kafka message

提问人:JITHIN_PATHROSE 提问时间:11/17/2023 最后编辑:JITHIN_PATHROSE 更新时间:11/17/2023 访问量:21

问:

我正在使用生产者属性,如下所示spring-kafka

key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

我的代码看起来像这样——

                         Message<Event> message = MessageBuilder
                        .withPayload(protobufStruct)
                        .setHeader(MESSAGE_TIMESTAMP, Instant.now().toString())
                        .build();

                kafkaTemplate.send(topic, new MessageKey(key).toString(), message);

生成消息时,我收到类似

原因:java.lang.ClassCastException:类 org.springframework.messaging.support.GenericMessage 无法转换为 类 com.google.protobuf.Message (org.springframework.messaging.support.GenericMessage 和 com.google.protobuf.Message 位于加载器“app”的未命名模块中,位于 io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.serialize(KafkaProtobufSerializer.java:34) 在 org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 在 org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015)

我在这里犯了什么错误?

apache-kafka 协议缓冲区 spring-kafka

评论


答:

1赞 Gary Russell 11/17/2023 #1

没有重载方法。send(topic, key, Message<?>)

send(topic, K, V)正在使用,因此您正在发送 作为有效负载 ()。GenericMessagerecord.value()

请改用。kafkaTemplate.send(topic, new MessageKey(key).toString(), protobufStruct);

如果要自行设置时间戳,请使用变体。send(ProducerRecord<K, V>)

评论

0赞 JITHIN_PATHROSE 11/17/2023
我还有一些标题信息,我想与我的消息一起发送。在这种情况下,我该怎么办?
0赞 Gary Russell 11/17/2023
您可以将它们添加到变体或使用变体 - 请参阅 docs.spring.io/spring-kafka/docs/current/reference/html/...ProducerRecordsend(Message<?>) >When you use the methods with a Message<?> parameter, the topic, partition, and key information is provided in a message header that includes the following items: KafkaHeaders.TOPIC KafkaHeaders.PARTITION KafkaHeaders.KEY KafkaHeaders.TIMESTAMP