提问人:JITHIN_PATHROSE 提问时间:11/17/2023 最后编辑:JITHIN_PATHROSE 更新时间:11/17/2023 访问量:21
将 org.springframework.messaging.Message 与 Protobuf 对象一起使用作为 kafka 消息时出现 ClassCast 错误
ClassCast error when using org.springframework.messaging.Message with Protobuf object as kafka message
问:
我正在使用生产者属性,如下所示spring-kafka
key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
我的代码看起来像这样——
Message<Event> message = MessageBuilder
.withPayload(protobufStruct)
.setHeader(MESSAGE_TIMESTAMP, Instant.now().toString())
.build();
kafkaTemplate.send(topic, new MessageKey(key).toString(), message);
生成消息时,我收到类似
原因:java.lang.ClassCastException:类 org.springframework.messaging.support.GenericMessage 无法转换为 类 com.google.protobuf.Message (org.springframework.messaging.support.GenericMessage 和 com.google.protobuf.Message 位于加载器“app”的未命名模块中,位于 io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.serialize(KafkaProtobufSerializer.java:34) 在 org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 在 org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015)
我在这里犯了什么错误?
答:
没有重载方法。send(topic, key, Message<?>)
send(topic, K, V)
正在使用,因此您正在发送 作为有效负载 ()。GenericMessage
record.value()
请改用。kafkaTemplate.send(topic, new MessageKey(key).toString(), protobufStruct);
如果要自行设置时间戳,请使用变体。send(ProducerRecord<K, V>)
评论
ProducerRecord
send(Message<?>)
>When you use the methods with a Message<?> parameter, the topic, partition, and key information is provided in a message header that includes the following items: KafkaHeaders.TOPIC KafkaHeaders.PARTITION KafkaHeaders.KEY KafkaHeaders.TIMESTAMP
评论