发送字符串列表时无法构造 kafka producer

Failed to construct kafka producer when sending list of strings

提问人:Al3x4ndru1 提问时间:10/28/2023 最后编辑:Al3x4ndru1 更新时间:10/30/2023 访问量:102

问:

我是 Kafka 的新手,我正在尝试读取一个文本文件并创建一个我想发送给消费者的字符串列表。我正在使用 Java 21 和 Spring Boot 3.2.0 (SNAPSHOT)。

这是 Kafka 生产者配置:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());

        return properties;
    }

    @Bean
    public ProducerFactory<String, List<String>> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

这是 Topic 配置:

@Configuration
public class KafkaTopicConfig {
    
    @Bean
    public NewTopic alexTopic(){
        return TopicBuilder.name("securityTopic")
                .build();
    }
}

这是“application.properties”:

spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092

这是我接收文件的方式:

public Optional uploadFile(MultipartFile file){
    if(file.isEmpty()){
        return Optional.of(new EmptyFileException("File is empty please double check"));
    }
    
    return Optional.of(this.upload.uploadFile(file));
}

这就是我向 kafka 发送“List<String>”的方式:

public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@Override
public String uploadFile(MultipartFile file) {
    try(BufferedReader reader = new BufferedReader(
                                    new InputStreamReader(file.getInputStream()))){

        String line;
        int i=0;
        List<String> list = new ArrayList<>();
        while ((line=reader.readLine())!=null) {
            if(i==10){
                this.kafkaTemplate.send("securityTopic",list);

如果我发送纯字符串并且我正在使用:

StringSerializer.class

而不是:

ListSerializer.class.getName()

在 Kafka 生产者配置中,并将其他泛型更改为 String 而不是 List<String> 正在工作。但是我该如何发送List<String>呢?同样,我使用 Kafka 2 或 3 天,我不知道我是否必须创建自己的序列化程序或使用内置的东西?

给我的错误如标题所示:“无法构建 kafka 生产者”。

我也尝试使用“ListSerializer.class”,但不起作用。

我也尝试使用“JsonSerialize.class”,但仍然是相同的错误。

感谢您抽出宝贵时间阅读本文!

spring-boot spring-kafka kafka-producer-api java-21

评论


答:

0赞 Al3x4ndru1 10/30/2023 #1

这是错误的导入。正确的导入方式为:

“导入org.springframework.kafka.support.serializer.JsonSerializer;”

评论

1赞 Artem Bilan 10/30/2023
您还需要更仔细地查看 .它需要一个属性。就你而言,那只是一个普通的.消费者方面也是如此。ListSerializerCommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASSStringSerializerListDeserializer
0赞 Al3x4ndru1 10/31/2023
@ArtemBilan谢,在我弄清楚问题所在后,我已经做到了。