提问人:Niranjan 提问时间:6/8/2023 更新时间:6/8/2023 访问量:195
在 Kafka 中生成消息“本地值序列化错误”时出错
Error producing message Local Value serialization error in Kafka
问:
嗨,我在 Kafka 和 .Net 中工作。我正在尝试生成如下消息。我已经为架构注册表和生产者创建了配置,如下所示。我正在使用证书连接到架构注册表
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "",
SslKeystoreLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/decodedca.p12"),
SslKeystorePassword = "",
EnableSslCertificateVerification = false
};
var producerConfig = new ProducerConfig
{
BootstrapServers = "3",
SaslMechanism = SaslMechanism.ScramSha512,
SaslUsername = "",
SaslPassword = "",
SecurityProtocol = SecurityProtocol.SaslSsl,
SslCaLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/srdecodedca.crt"),
EnableSslCertificateVerification = false
};
var avroSerializerConfig = new AvroSerializerConfig
{
// optional Avro serializer properties:
BufferBytes = 100
};
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var producer =
new ProducerBuilder<string, MLFixtureEmp>(producerConfig)
.SetValueSerializer(new AvroSerializer<MLFixtureEmp>(schemaRegistry, avroSerializerConfig))
.Build())
{
MLFixtureEmp mLFixtureEmp = new MLFixtureEmp()
{
deliveryDate = DateTime.Now,
ISOCurrencyCode = "INR",
chartererName = "ss",
estimateRedeliveryDate = DateTime.Today,
fixtureId = 4,
imoNumber = "123",
managingOwnerName = "dd",
maximumDate = DateTime.Now,
minimumDate = DateTime.Now,
purchaseObligation = "stri",
rate = 123,
redeliveryPortName = "dd",
RedeliveryRanges = "dsd",
vesselOwnershipType = "dsds"
};
producer.ProduceAsync("mytopic", new Message<string, MLFixtureEmp>
{
Key = "test",
Value = mLFixtureEmp
}).ContinueWith(
task =>
{
if (!task.IsFaulted)
{
Console.WriteLine($"produced to: {task.Result.TopicPartitionOffset}");
return;
}
Console.WriteLine($"error producing message: {task.Exception.InnerException}");
});
}
每当我尝试将数据推送到主题时,我都会收到以下错误消息。
error producing message: Confluent.Kafka.ProduceException`2[System.String,MaerskLine.CHAMPS.Dto.MLFixtureEmp]: Local: Value serialization error
---> System.InvalidOperationException: AvroSerializer only accepts type parameters of int, bool, double, string, float, long, byte[], instances of ISpecificRecord and subclasses of SpecificFixed.
at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl`1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize)
at Confluent.SchemaRegistry.Serdes.AvroSerializer`1.SerializeAsync(T value, SerializationContext context)
at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
我无法弄清楚问题所在。有人可以帮我确定问题吗?任何帮助将不胜感激。谢谢
答: 暂无答案
评论