在 Kafka 中生成消息“本地值序列化错误”时出错

Error producing message Local Value serialization error in Kafka

提问人:Niranjan 提问时间:6/8/2023 更新时间:6/8/2023 访问量:195

问:

嗨,我在 Kafka 和 .Net 中工作。我正在尝试生成如下消息。我已经为架构注册表和生产者创建了配置,如下所示。我正在使用证书连接到架构注册表

var schemaRegistryConfig = new SchemaRegistryConfig
    {
        Url = "",
        SslKeystoreLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/decodedca.p12"),
        SslKeystorePassword = "",
        EnableSslCertificateVerification = false
    };
 var producerConfig = new ProducerConfig
    {
        BootstrapServers = "3",
        SaslMechanism = SaslMechanism.ScramSha512,
        SaslUsername = "",
        SaslPassword = "",
        SecurityProtocol = SecurityProtocol.SaslSsl,
        SslCaLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/srdecodedca.crt"),
        EnableSslCertificateVerification = false
    };
 var avroSerializerConfig = new AvroSerializerConfig
    {
        // optional Avro serializer properties:
        BufferBytes = 100
    };
 using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
    using (var producer =
        new ProducerBuilder<string, MLFixtureEmp>(producerConfig)
            .SetValueSerializer(new AvroSerializer<MLFixtureEmp>(schemaRegistry, avroSerializerConfig))
            .Build())
    {
        MLFixtureEmp mLFixtureEmp = new MLFixtureEmp()
        {
            deliveryDate = DateTime.Now,
            ISOCurrencyCode = "INR",
            chartererName = "ss",
            estimateRedeliveryDate = DateTime.Today,
            fixtureId = 4,
            imoNumber = "123",
            managingOwnerName = "dd",
            maximumDate = DateTime.Now,
            minimumDate = DateTime.Now,
            purchaseObligation = "stri",
            rate = 123,
            redeliveryPortName = "dd",
            RedeliveryRanges = "dsd",
            vesselOwnershipType = "dsds"
        };
        producer.ProduceAsync("mytopic", new Message<string, MLFixtureEmp>
        {
            Key = "test",
            Value = mLFixtureEmp
        }).ContinueWith(
            task =>
            {
                if (!task.IsFaulted)
                {
                    Console.WriteLine($"produced to: {task.Result.TopicPartitionOffset}");
                    return;
                }
                Console.WriteLine($"error producing message: {task.Exception.InnerException}");
            });
    }

每当我尝试将数据推送到主题时,我都会收到以下错误消息。

error producing message: Confluent.Kafka.ProduceException`2[System.String,MaerskLine.CHAMPS.Dto.MLFixtureEmp]: Local: Value serialization error
 ---> System.InvalidOperationException: AvroSerializer only accepts type parameters of int, bool, double, string, float, long, byte[], instances of ISpecificRecord and subclasses of SpecificFixed.
   at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl`1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize)
   at Confluent.SchemaRegistry.Serdes.AvroSerializer`1.SerializeAsync(T value, SerializationContext context)
   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)

我无法弄清楚问题所在。有人可以帮我确定问题吗?任何帮助将不胜感激。谢谢

C# avro confluent-kafka-dotnet

评论

0赞 jdweng 6/8/2023
BufferBytes 应该是一个整数:docs.confluent.io/platform/current/clients/...

答: 暂无答案