将 Map 从 KafkaProducer 发送到 KafkaConsumer

Send Map from KafkaProducer to KafkaConsumer

提问人:Al3x4ndru1 提问时间:11/3/2023 最后编辑:Al3x4ndru1 更新时间:11/7/2023 访问量:127

问:

我正在使用 java 21 和 Spring Boot 3.2.0 (SNAPSHOT)。我想使用 send a Map<Short,List<Customer>>从 KafkaProducer 到 KafkaConsumer。

KafkaConsumerConfig 类:

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    public Map<String, Object> consumerConfig(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return properties;
    }

    @Bean
    public ConsumerFactory<String,Map<Short, List<Customer>>> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }


    public KafkaListenerContainerFactory<
            ConcurrentMessageListenerContainer<String,Map<Short, List<Customer>>>> kafkaListenerContainerFactory(ConsumerFactory<String,Map<Short, List<Customer>>> consumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,Map<Short, List<Customer>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}

这是 KafkaConsumer 的 Customer 类:

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import jakarta.persistence.*;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;

import java.util.List;
import java.util.Objects;

@Data
@NoArgsConstructor
@Entity
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class Customer {

    @JsonProperty("id")
    @Nullable
    @Id
    @SequenceGenerator(
            name = "customer_id_sequence",
            sequenceName = "customer_id_sequence"
    )
    @GeneratedValue(
            strategy = GenerationType.SEQUENCE,
            generator = "customer_id_sequence"
    )
    private Long ID;

    @JsonProperty("userName")
    @Column(unique = false)
    @Size(min = 5, max = 50, message = "User must be between 5 and 50 characters")
    private String userName;

    @JsonProperty("email")
    @NonNull
    @NotBlank(message = "The email can not be empty")
    @Size(min = 12, max = 50, message = "Email must be between 12 and 50 characters")
    private String email;

    @JsonProperty("message")
    private List<String> message;

    public Customer(Long ID,
                    String userName,
                    String email,
                    List<String> message) {
        this.ID = ID;
        this.userName = userName;
        this.email = email;
        this.message = message;
    }

    public Customer(String userName,
                    String email,
                    List<String> message) {

        this.userName = userName;
        this.email = email;
        this.message = message;
    }


    public Long getID() {
        return ID;
    }

    public void setID(Long ID) {
        this.ID = ID;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public List<String> getMessage() {
        return message;
    }

    public void setMessage(List<String> message) {
        this.message = message;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }


    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Customer customer = (Customer) o;
        return Objects.equals(ID, customer.ID) && Objects.equals(userName, customer.userName) && Objects.equals(email, customer.email) && Objects.equals(message, customer.message);
    }

    @Override
    public int hashCode() {
        return Objects.hash(ID, userName, email, message);
    }
}

这是 KafkaProducerConfig 类:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return properties;
    }

    @Bean
    public ProducerFactory<String, Map<Short, List<Customer>>> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate(
            ProducerFactory<String, Map<Short, List<Customer>>> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

这是 KafkaProducer 的 Customer 类:

import jakarta.annotation.Nullable;
import jakarta.persistence.*;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;

import java.util.List;
import java.util.Objects;

@Data
@NoArgsConstructor
@Entity
public class Customer {

    @Nullable
    @Id
    @SequenceGenerator(
            name = "customer_id_sequence",
            sequenceName = "customer_id_sequence"
    )
    @GeneratedValue(
            strategy = GenerationType.SEQUENCE,
            generator = "customer_id_sequence"
    )
    private Long ID;
    @Column(unique = false)
    @Size(min = 5, max = 50, message = "User must be between 5 and 50 characters")
    private String userName;

    @NonNull
    @NotBlank(message = "The email can not be empty")
    @Size(min = 12, max = 50, message = "Email must be between 12 and 50 characters")
    private String email;
    private List<String> message;

    public Customer(Long ID,
                    String userName,
                    String email,
                    List<String> message) {
        this.ID = ID;
        this.userName = userName;
        this.email = email;
        this.message = message;
    }

    public Customer(String userName,
                    String email,
                    List<String> message) {

        this.userName = userName;
        this.email = email;
        this.message = message;
    }


    public Long getID() {
        return ID;
    }

    public void setID(Long ID) {
        this.ID = ID;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public List<String> getMessage() {
        return message;
    }

    public void setMessage(List<String> message) {
        this.message = message;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }


    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Customer customer = (Customer) o;
        return Objects.equals(ID, customer.ID) && Objects.equals(userName, customer.userName) && Objects.equals(email, customer.email) && Objects.equals(message, customer.message);
    }

    @Override
    public int hashCode() {
        return Objects.hash(ID, userName, email, message);
    }
}

这就是我从生产者发送到消费者的方式:

@Component
public class SendToDatabse {

    private final KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate;

    public SendToDatabse(KafkaTemplate<String, Map<Short, List<Customer>>> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendToDatabse(Map<Short, List<Customer>> map){
        this.kafkaTemplate.send("databaseTopic",map);
    }
}

这是 KafkaListener:

@KafkaListener(topics = "databaseTopic", groupId = "groupID", containerFactory = "kafkaListenerContainerFactory")
void listener(Map<Short, List<Customer>> map){
    map.forEach((k,v)->{
        System.out.print(k + "\t");
        v.forEach(System.out::println);
    });
}

错误是:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [void com.example.DatabseMicroservice.listener.Listener.listener(java.util.Map<java.lang.Short, java.util.List<com.example.DatabseMicroservice.model.Customer>>)]
Bean [com.example.DatabseMicroservice.listener.Listener@62dfe152]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2902) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2847) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2732) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.12.0-RC1.jar:1.12.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2730) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2582) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2468) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2110) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1465) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1429) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1304) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:402) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:380) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:380) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    ... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"200":[{"userName":"Bob Johnson","email":"[email protected]","message":["random messag"],....{"userName":"Peter Hall","email":"[email protected]","message":["messag"],"id":18},{"userName":"Noah Taylor","email":"[email protected]","message":["another random message","and another on"],"id":16}]}, headers={__ContentTypeId__=[B@3fddd565, kafka_offset=5, __KeyTypeId__=[B@1577f842, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@49c36e71, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=databaseTopic, kafka_receivedTimestamp=1699028747669, __TypeId__=[B@894c52a, kafka_groupId=groupID}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
    at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.0-RC2.jar:6.1.0-RC2]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.1.0-RC1.jar:3.1.0-RC1]
    ... 15 common frames omitted

我不知道在这种情况下我该怎么办,因为代码对我来说似乎是正确的。

spring-boot 弹簧-kafka kafka-producer-api java-21

评论

0赞 Artem Bilan 11/3/2023
为什么你的部分谈论?你不显示你的.也许您的自定义未在此处应用,默认的 Spring Boot 会执行操作。KafkaProducerConfigKafkaConsumerConfig@KafkaListenerKafkaListenerContainerFactory
0赞 Al3x4ndru1 11/3/2023
@ArtemBilan 对不起,我在很晚的时候发帖,我错过了。

答:

0赞 Artem Bilan 11/3/2023 #1

请参阅 Javadocs:@KafkaListener

 * The {@link #containerFactory()} identifies the
 * {@link org.springframework.kafka.config.KafkaListenerContainerFactory
 * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
 * set, a <em>default</em> container factory is assumed to be available with a bean name
 * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
 * through configuration.

因此,您的配置确实使用了自动配置,这实际上是基于普通字符串反序列化程序。kafkaListenerContainerFactory

由于您的 JSON 工厂 bean 有一个名称,请考虑为属性指定该确切名称。factory@KafkaListener(containerFactory)

评论

0赞 Al3x4ndru1 11/4/2023
谢谢你的建议。我试着按照你说的去做,我回答了这个问题,但仍然需要纠正。
0赞 Al3x4ndru1 11/4/2023
顺便说一句,我是 Kafka 的新手,很抱歉我所做的可能不正确。
0赞 Artem Bilan 11/4/2023
我不确定需要纠正什么。
0赞 Al3x4ndru1 11/4/2023
好吧,我也不确定。感谢您抽出宝贵时间接受采访!
0赞 Al3x4ndru1 11/7/2023 #2

我设法解决这样的问题:

 if (a instanceof ConsumerRecord) {
        ConsumerRecord<?, ?> kafkaRecord = (ConsumerRecord<?, ?>) a;

        // Assuming the value is a JSON string
        String json = (String) kafkaRecord.value();

        // Parse the JSON string into a Map<Short, List<Customer>>
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            Map<Short, List<Customer>> resultMap = objectMapper.readValue(json, new TypeReference<Map<Short, List<Customer>>>() {});

            // Now you can use resultMap as needed
            // For example, printing the values:
            for (Map.Entry<Short, List<Customer>> entry : resultMap.entrySet()) {
                System.out.println("Key: " + entry.getKey());
                System.out.println("Value: " + entry.getValue());
                System.out.println();
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

但感觉有些不对劲,更像是过度设计。