跳过 Python Spark Pyspark Databricks 未知字段异常中的错误记录

Skipping a Bad Record in Python Spark Pyspark Databricks Unknown Field Exception

提问人:TadeG 提问时间:11/9/2023 最后编辑:Alex OttTadeG 更新时间:11/9/2023 访问量:28

问:

我想知道是否有人可能知道如何跳过我们从 json 文件获取的记录

这是错误

[UNKNOWN_FIELD_EXCEPTION。NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] 解析过程中遇到未知字段: 这是失败的代码

sent = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'json') \
  .option('multiline', 'true') \
  .option('cloudFiles.inferColumnTypes', 'true') \
  .option('cloudFiles.schemaLocation', checkpoint_path) \
  .load(raw_files) \
  .withColumn('load_ts', F.current_timestamp()) \
  .writeStream \
  .format('delta') \
  .option('checkpointLocation', checkpoint_path) \
  .trigger(availableNow=True) \
  .option('mergeSchema', 'true') \
  .toTable(b_write_path)

谢谢!

我没有看到任何关于如何修复此错误的文档。

Python Apache Spark 错误处理 Databricks Databricks Databricks-Autoloader

评论


答:

0赞 Alex Ott 11/9/2023 #1

这取决于您要对该数据执行的操作。默认情况下,Databricks 自动加载程序使用该模式,该模式在遇到新列时流失败,但在重启后,它将正确处理它们。addNewColumns

您可以使用其中一种模式或作为模式演化模式,如下所示。rescuenone

.option("cloudFiles.schemaEvolutionMode", "rescue")

在模式下,新列的数据将被放入所谓的“救援列”中,如有必要,您可以对其进行分析,并且该过程不会失败。rescue

在模式下,新列将被忽略,并且该过程不会失败。none

有关更多详细信息,请参阅文档

评论

0赞 TadeG 11/9/2023
非常感谢!!!!!我把头撞在墙上