架构必须包含“type”字段 org.apache.kafka.connect.errors.DataException:架构必须包含“type”字段 [closed]

Schema must contain 'type' field org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field [closed]

提问人:chinna mahesh 提问时间:11/17/2023 更新时间:11/17/2023 访问量:17

问:


想改进这个问题吗?通过编辑这篇文章添加详细信息并澄清问题。

2天前关闭。

{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"account_num"},{"type":"int32","optional":true,"field":"event_seq"},{"type":"string","optional":true,"field":"event_source"},{"type":"int32","optional":true,"field":"event_type_id"},{"type":"string","optional":true,"field":"event_ref"},{"type":"int32","optional":true,"field":"billed_cost_mny"},{"type":"int32","optional":true,"field":"charge_number"},{"type":"int32","optional":true,"field":"cluster_sub_id"},{"type":"int32","optional":true,"field":"competitor_cost_mny"},{"type":"int32","optional":true,"field":"cost_centre_id"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_dtm"},{"type":"string","optional":true,"field":"discount_data"},{"type":"int32","optional":true,"field":"event_cost_mny"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"event_dtm"},{"type":"int32","optional":true,"field":"external_cost_mny"},{"type":{"type":"array","items":"string"},"optional":true,"field":"flexibleattributes"},{"type":"int32","optional":true,"field":"fragment_number"},{"type":"int32","optional":true,"field":"highest_priority_discount_id"},{"type":"int32","optional":true,"field":"highest_priority_prod_seq"},{"type":"int32","optional":true,"field":"host_event_type_id"},{"type":"int32","optional":true,"field":"imported_cost_mny"},{"type":"string","optional":true,"field":"imported_currency_code"},{"type":"string","optional":true,"field":"inhost_costed_boo"},{"type":"int32","optional":true,"field":"internal_cost_mny"},{"type":"int32","optional":true,"field":"loyalty_points"},{"type":"int32","optional":true,"field":"managed_file_id"},{"type":"string","optional":true,"field":"modified_boo"},{"type":"string","optional":true,"field":"original_account_num"},{"type":"int32","optional":true,"field":"pre_discounted_cost_mny"},{"type":"string","optional":true,"field":"primary_event_ref"},{"type":"int32","optional":true,"field":"rating_discounted_usage_total"},{"type":"int32","optional":true,"field":"receivable_class_id"},{"type":"int32","optional":true,"field":"revenue_code_id"},{"type":"int32","optional":true,"field":"row_number"},{"type":"int32","optional":true,"field":"rule_number"},{"type":"int32","optional":true,"field":"tariff_id"},{"type":"int32","optional":true,"field":"tax_override_id"},{"type":"int32","optional":true,"field":"ust_category_id"},{"type":"int32","optional":true,"field":"ust_code_id"}],"name":"costedevent","schemaName":"orctocassandra","optional":false},"payload":{"highest_priority_discount_id":null,"event_attr_29":null,"host_event_type_id":null,"event_source":"10492","event_ref":"00289E000000022","rule_number":null,"billed_cost_mny":null,"event_type_id":7,"cluster_sub_id":2,"ust_category_id":null,"event_attr_20":null,"inhost_costed_boo":null,"competitor_cost_mny":null,"event_attr_26":null,"event_attr_25":null,"event_attr_28":null,"row_number":1,"event_attr_27":null,"event_attr_22":null,"event_attr_21":null,"event_attr_24":null,"event_attr_23":null,"tax_override_id":null,"event_attr_19":null,"event_attr_18":null,"highest_priority_prod_seq":null,"event_cost_mny":3280,"imported_cost_mny":null,"receivable_class_id":-1,"original_account_num":null,"revenue_code_id":117,"loyalty_points":null,"event_attr_15":"0","event_attr_14":"1","event_attr_17":null,"pre_discounted_cost_mny":null,"event_attr_16":"0","event_attr_11":"01:01:01","ust_code_id":null,"event_attr_10":"Fast Voice SCR 2010","event_attr_13":"0","event_attr_12":null,"tariff_id":359,"primary_event_ref":"00289E000000011","rating_discounted_usage_total":null,"external_cost_mny":null,"event_attr_9":"Locali","event_attr_2":"9123456780","account_num":"LA00234380","event_attr_1":"2","event_attr_4":"00:04:36","event_attr_3":"0522610111","event_attr_6":"1","event_attr_5":"0","event_attr_8":null,"created_dtm":1617210000000,"event_attr_7":"0035526724053","event_dtm":1618074000000,"imported_currency_code":null,"event_seq":1,"discount_data":null,"cost_centre_id":null,"charge_number":null,"modified_boo":"F","event_attr_31":null,"event_attr_30":null,"internal_cost_mny":null,"fragment_number":null,"event_attr_36":"00:04:36","managed_file_id":16367524,"event_attr_33":null,"event_attr_32":null,"event_attr_35":null,"event_attr_34":null,"flexibleattributes":["2","9123456780","0522610111","00:04:36","0","1","0035526724053","","Locali","Fast Voice SCR 2010","01:01:01","","0","1","0","0","","","","","","","","","","","","","","","","","","","",""]}}

this is schema but getting following issue ERROR WorkerSinkTask{id=costedeventSink-0} Error converting message value in topic 'costedevent' partition 0 at offset 0 and timestamp 1700128563351: Schema must contain 'type' field (org.apache.kafka.connect.runtime.WorkerSinkTask:542) org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:438) at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:493) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:340) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)

ERROR WorkerSinkTask{id=costedeventSink-0} Error converting message value in topic 'costedevent' partition 0 at offset 0 and timestamp 1700128563351: Schema must contain 'type' field (org.apache.kafka.connect.runtime.WorkerSinkTask:542) org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:438) at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:493) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:340) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)

apache-kafka 类型 架构 连接 jsonconverter

评论

0赞 jmoerdyk 11/18/2023
Please review How to Ask

答: 暂无答案