提问人:Al3x4ndru1 提问时间:11/3/2023 最后编辑:Al3x4ndru1 更新时间:11/7/2023 访问量:127
将 Map 从 KafkaProducer 发送到 KafkaConsumer
Send Map from KafkaProducer to KafkaConsumer
问:
我正在使用 java 21 和 Spring Boot 3.2.0 (SNAPSHOT)。我想使用 send a Map<Short,List<Customer>>从 KafkaProducer 到 KafkaConsumer。
KafkaConsumerConfig 类:
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
public Map<String, Object> consumerConfig(){
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return properties;
}
@Bean
public ConsumerFactory<String,Map<Short, List<Customer>>> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
public KafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String,Map<Short, List<Customer>>>> kafkaListenerContainerFactory(ConsumerFactory<String,Map<Short, List<Customer>>> consumerFactory){
ConcurrentKafkaListenerContainerFactory<String,Map<Short, List<Customer>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
这是 KafkaConsumer 的 Customer 类:
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import jakarta.persistence.*;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import java.util.List;
import java.util.Objects;
@Data
@NoArgsConstructor
@Entity
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class Customer {
@JsonProperty("id")
@Nullable
@Id
@SequenceGenerator(
name = "customer_id_sequence",
sequenceName = "customer_id_sequence"
)
@GeneratedValue(
strategy = GenerationType.SEQUENCE,
generator = "customer_id_sequence"
)
private Long ID;
@JsonProperty("userName")
@Column(unique = false)
@Size(min = 5, max = 50, message = "User must be between 5 and 50 characters")
private String userName;
@JsonProperty("email")
@NonNull
@NotBlank(message = "The email can not be empty")
@Size(min = 12, max = 50, message = "Email must be between 12 and 50 characters")
private String email;
@JsonProperty("message")
private List<String> message;
public Customer(Long ID,
String userName,
String email,
List<String> message) {
this.ID = ID;
this.userName = userName;
this.email = email;
this.message = message;
}
public Customer(String userName,
String email,
List<String> message) {
this.userName = userName;
this.email = email;
this.message = message;
}
public Long getID() {
return ID;
}
public void setID(Long ID) {
this.ID = ID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public List<String> getMessage() {
return message;
}
public void setMessage(List<String> message) {
this.message = message;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Customer customer = (Customer) o;
return Objects.equals(ID, customer.ID) && Objects.equals(userName, customer.userName) && Objects.equals(email, customer.email) && Objects.equals(message, customer.message);
}
@Override
public int hashCode() {
return Objects.hash(ID, userName, email, message);
}
}
这是 KafkaProducerConfig 类:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return properties;
}
@Bean
public ProducerFactory<String, Map<Short, List<Customer>>> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate(
ProducerFactory<String, Map<Short, List<Customer>>> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
这是 KafkaProducer 的 Customer 类:
import jakarta.annotation.Nullable;
import jakarta.persistence.*;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import java.util.List;
import java.util.Objects;
@Data
@NoArgsConstructor
@Entity
public class Customer {
@Nullable
@Id
@SequenceGenerator(
name = "customer_id_sequence",
sequenceName = "customer_id_sequence"
)
@GeneratedValue(
strategy = GenerationType.SEQUENCE,
generator = "customer_id_sequence"
)
private Long ID;
@Column(unique = false)
@Size(min = 5, max = 50, message = "User must be between 5 and 50 characters")
private String userName;
@NonNull
@NotBlank(message = "The email can not be empty")
@Size(min = 12, max = 50, message = "Email must be between 12 and 50 characters")
private String email;
private List<String> message;
public Customer(Long ID,
String userName,
String email,
List<String> message) {
this.ID = ID;
this.userName = userName;
this.email = email;
this.message = message;
}
public Customer(String userName,
String email,
List<String> message) {
this.userName = userName;
this.email = email;
this.message = message;
}
public Long getID() {
return ID;
}
public void setID(Long ID) {
this.ID = ID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public List<String> getMessage() {
return message;
}
public void setMessage(List<String> message) {
this.message = message;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Customer customer = (Customer) o;
return Objects.equals(ID, customer.ID) && Objects.equals(userName, customer.userName) && Objects.equals(email, customer.email) && Objects.equals(message, customer.message);
}
@Override
public int hashCode() {
return Objects.hash(ID, userName, email, message);
}
}
这就是我从生产者发送到消费者的方式:
@Component
public class SendToDatabse {
private final KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate;
public SendToDatabse(KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendToDatabse(Map<Short, List<Customer>> map){
this.kafkaTemplate.send("databaseTopic",map);
}
}
这是 KafkaListener:
@KafkaListener(topics = "databaseTopic", groupId = "groupID", containerFactory = "kafkaListenerContainerFactory")
void listener(Map<Short, List<Customer>> map){
map.forEach((k,v)->{
System.out.print(k + "\t");
v.forEach(System.out::println);
});
}
错误是:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [void com.example.DatabseMicroservice.listener.Listener.listener(java.util.Map<java.lang.Short, java.util.List<com.example.DatabseMicroservice.model.Customer>>)]
Bean [com.example.DatabseMicroservice.listener.Listener@62dfe152]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2902) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2847) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2732) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.12.0-RC1.jar:1.12.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2730) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2582) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2468) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2110) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1465) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1429) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1304) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:402) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:380) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:380) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"200":[{"userName":"Bob Johnson","email":"[email protected]","message":["random messag"],....{"userName":"Peter Hall","email":"[email protected]","message":["messag"],"id":18},{"userName":"Noah Taylor","email":"[email protected]","message":["another random message","and another on"],"id":16}]}, headers={__ContentTypeId__=[B@3fddd565, kafka_offset=5, __KeyTypeId__=[B@1577f842, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@49c36e71, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=databaseTopic, kafka_receivedTimestamp=1699028747669, __TypeId__=[B@894c52a, kafka_groupId=groupID}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
... 15 common frames omitted
我不知道在这种情况下我该怎么办,因为代码对我来说似乎是正确的。
答:
0赞
Artem Bilan
11/3/2023
#1
请参阅 Javadocs:@KafkaListener
* The {@link #containerFactory()} identifies the
* {@link org.springframework.kafka.config.KafkaListenerContainerFactory
* KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
* set, a <em>default</em> container factory is assumed to be available with a bean name
* of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
* through configuration.
因此,您的配置确实使用了自动配置,这实际上是基于普通字符串反序列化程序。kafkaListenerContainerFactory
由于您的 JSON 工厂 bean 有一个名称,请考虑为属性指定该确切名称。factory
@KafkaListener(containerFactory)
评论
0赞
Al3x4ndru1
11/4/2023
谢谢你的建议。我试着按照你说的去做,我回答了这个问题,但仍然需要纠正。
0赞
Al3x4ndru1
11/4/2023
顺便说一句,我是 Kafka 的新手,很抱歉我所做的可能不正确。
0赞
Artem Bilan
11/4/2023
我不确定需要纠正什么。
0赞
Al3x4ndru1
11/4/2023
好吧,我也不确定。感谢您抽出宝贵时间接受采访!
0赞
Al3x4ndru1
11/7/2023
#2
我设法解决这样的问题:
if (a instanceof ConsumerRecord) {
ConsumerRecord<?, ?> kafkaRecord = (ConsumerRecord<?, ?>) a;
// Assuming the value is a JSON string
String json = (String) kafkaRecord.value();
// Parse the JSON string into a Map<Short, List<Customer>>
ObjectMapper objectMapper = new ObjectMapper();
try {
Map<Short, List<Customer>> resultMap = objectMapper.readValue(json, new TypeReference<Map<Short, List<Customer>>>() {});
// Now you can use resultMap as needed
// For example, printing the values:
for (Map.Entry<Short, List<Customer>> entry : resultMap.entrySet()) {
System.out.println("Key: " + entry.getKey());
System.out.println("Value: " + entry.getValue());
System.out.println();
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
但感觉有些不对劲,更像是过度设计。
评论
KafkaProducerConfig
KafkaConsumerConfig
@KafkaListener
KafkaListenerContainerFactory