Debezium Kafka Connect for Postgresql收到意外更新消息

Debezium Kafka Connect for Postgresql receiving unexpected update message received

提问人:prashant 提问时间:11/17/2023 更新时间:11/17/2023 访问量:16

问:

我正在使用 debezium kafka connect 将 postgresql 数据库 CDC 事件流式传输到 kafka 集群。目前,我使用的是 1.9.0 版,但想升级到 2.4.0 版

我们使用 Kafka 发件箱模式将数据库记录流式传输到 Kafka 队列。

我能够升级到 2.4.0,但收到奇怪的警告

 Unexpected update message received Struct{id=1} and ignored   [io.debezium.transforms.outbox.EventRouterDelegate]

以下是我的配置

{
    "value.converter.delegate.converter.type.schemas.enable": "false",
    "database.password": "*****",
    "database.user": "postgres",
    "publication.name": "dbz_outbox_pub",
    "slot.name": "debezium2",
    "topic.creation.default.cleanup.policy": "compact",
    "heartbeat.topic.prefix": "__debezium-heartbeat",
    "tasks.max": "1",
    "transforms": "outbox",
    "topic.creation.default.partitions": "1",
    "heartbeat.interval.ms": "60000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "transforms.outbox.table.field.event.key": "aggregateid",
    "publication.autocreate.mode": "filtered",
    "topic.creation.default.replication.factor": "1",
    "heartbeat.action.query": "CREATE TABLE IF NOT EXISTS debezium_heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP WITH TIME ZONE);\nINSERT INTO debezium_heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;",
    "plugin.name": "pgoutput",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "table.include.list": "public.kafkaoutbox,public.debezium_heartbeat",
    "topic.creation.default.compression.type": "uncompressed",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "database.dbname": "superipdev_cellb",
    "value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter",
    "database.port": "5432",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
    "tombstones.on.delete": "false",
    "key.converter.delegate.converter.type.schemas.enable": "false",
    "event.processing.failure.handling.mode": "warn",
    "schema.include.list": "",
    "snapshot.mode": "never",
    "database.hostname": "host.docker.internal",
    "topic.prefix": "cell_b",
    "database.server.name": "cell_b",
    "name": "debezium2"
}

以前有人遇到过这种情况吗?任何帮助将不胜感激。

postgresql apache-kafka debezium-connect

评论


答:

0赞 Tudor Plugaru 11/23/2023 #1

似乎您正在对发件箱表进行更新。Debezium 上的 SMT 将忽略所有操作,除非您不应该更新发件箱表中的记录。 代码在这里EventRouterDelegateINSERT

评论

0赞 prashant 11/23/2023
我没有对发件箱本身进行任何更新。检测信号消息似乎已发送到事件路由器。Hearbeat 表经常更新。
0赞 Tudor Plugaru 11/24/2023
啊,是的,确实,所有记录都将通过该 SMT。