从包中导入手动声明的嵌套架构会导致 NullPointerException

Importing Manually Declared Nested Schema from Package Causes NullPointerException

提问人:user371816 提问时间:8/22/2023 最后编辑:user371816 更新时间:8/22/2023 访问量:24

问:

我正在尝试使用 Databricks 的 spark-xml 将 XML 文件解析为 DataFrames,并带有以下代码:

val xmlDF = spark
    .read
    .option("rowTag", "MeterReadingDocument")
    .option("valueTag", "foo") // meaningless, used to parse tags with no child elements
    .option("inferSchema", "false")
    .schema(schema)
    .xml(connectionString)

正如你所看到的,我提供了一个模式,以避免模式推理的代价高昂的操作。此架构定义为

 val schema = MyProjectUtils.Schemas.meterReadingDocumentSchema

其中,包含具有架构定义的对象的包:MyProjectUtilsSchemas

object Schemas {
...
// nested schemas 
...

val meterReadingDocumentSchema = StructType(
    Array(
      StructField("ReadingStatusRefTable", readingStatusRefTableSchema, nullable = true),
      StructField("Header", headerSchema, nullable = true),
      StructField("ImportExportParameters", importExportParametersSchema, nullable = true),
      StructField("Channels", channelsSchema, nullable = true),
      StructField("_xmlns:xsd", StringType, nullable = true),
      StructField("_xmlns:xsi", StringType, nullable = true)
    )
  )
}

您会注意到 ,以及与 XML 中的嵌套元素相对应的其他自定义模式。这些反过来也是嵌套的,例如:readingStatusRefTableSchemaheaderSchemaStructTypes

val headerSchema = StructType(
    Array(
      StructField("Creation_Datetime", creationDatetimeSchema, nullable = true),
      StructField("Export_Template", exportTemplateSchema, nullable = true),
      StructField("System", SystemSchema, nullable = true),
      StructField("Path", pathSchema, nullable = true),
      StructField("Timezone", timezoneSchema, nullable = true)
    )
  )

val creationDatetimeSchema = StructType(
    Array(
      StructField("_Datetime", TimestampType, nullable = true),
      StructField("foo", StringType, nullable = true)
    )
  )

(如果相关,我可以提供有关架构嵌套性质的更多详细信息)

如果我在笔记本上声明这些嵌套架构,或者声明为笔记本中用于读取数据的对象,这将起作用并加载数据。但是当我从这个项目创建一个 jar 并执行它时,我得到以下堆栈跟踪:


INFO ApplicationMaster [shutdown-hook-0]: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.NullPointerException
    at org.apache.spark.sql.types.ArrayType.existsRecursively(ArrayType.scala:102)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1(StructType.scala:508)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1$adapted(StructType.scala:508)
    at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
    at scala.collection.IndexedSeqOptimized.exists(IndexedSeqOptimized.scala:49)
    at scala.collection.IndexedSeqOptimized.exists$(IndexedSeqOptimized.scala:49)
    at scala.collection.mutable.ArrayOps$ofRef.exists(ArrayOps.scala:198)
    at org.apache.spark.sql.types.StructType.existsRecursively(StructType.scala:508)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1(StructType.scala:508)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1$adapted(StructType.scala:508)
    at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
    at scala.collection.IndexedSeqOptimized.exists(IndexedSeqOptimized.scala:49)
    at scala.collection.IndexedSeqOptimized.exists$(IndexedSeqOptimized.scala:49)
    at scala.collection.mutable.ArrayOps$ofRef.exists(ArrayOps.scala:198)
    at org.apache.spark.sql.types.StructType.existsRecursively(StructType.scala:508)
    at org.apache.spark.sql.catalyst.util.CharVarcharUtils$.hasCharVarchar(CharVarcharUtils.scala:56)
    at org.apache.spark.sql.catalyst.util.CharVarcharUtils$.failIfHasCharVarchar(CharVarcharUtils.scala:63)
    at org.apache.spark.sql.DataFrameReader.schema(DataFrameReader.scala:76)
    at com.mycompany.DataIngestion$.delayedEndpoint$com$mycompany$DataIngestion$1(DataIngestion.scala:44)
    at com.mycompany.DataIngestion$delayedInit$body.apply(DataIngestion.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at com.mycompany.DataIngestion$.main(DataIngestion.scala:10)
    at com.mycompany.DataIngestion.main(DataIngestion.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:739)
)

我添加了另一个更简单的 csv 文件,并在对象中为其创建了一个架构。此架构没有来自同一对象和写入的嵌套结构。SchemasSchemas

 val simplerDocSchema = MyProjectUtils.Schemas.anotherDocSchema

spark
      .read
      .csv(path)
      .schema(simplerDocSchema)
      .load(connectionString)
Schemas {
 ...
val anotherDocSchema: StructType = StructType(
    Array(
      StructField("ID", StringType, nullable = true),
      StructField("DATE", StringType, nullable = true),
      StructField("CODE", StringType, nullable = true),
      StructField("AD", StringType, nullable = true),
      StructField("ACCOUNT", StringType, nullable = true)
    )
  )
}

我预计这也会失败,但在编译项目和笔记本中运行正常

Scala apache-spark-xml

评论


答:

0赞 Chris 8/22/2023 #1

虽然你没有说明你使用的是哪个 Spark 版本,但代码似乎在 8 年内没有改变:

override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
    f(this) || elementType.existsRecursively(f)
  }

极有可能 elementType 为 null。由于您没有提供整个代码,我猜您有一个尚未定义的 ArrayType(someVal, ...)。

将您的 vals 换成 def,然后重试。

评论

0赞 user371816 8/22/2023
但是,如果未定义它,编译器将显示错误,而它不会。如果使用 def 而不是 val,会有什么不同?
0赞 Chris 8/22/2023
评价顺序,你试过了吗?
0赞 user371816 8/22/2023
它确实有效,我有一个架构,该架构在使用它的位置下方定义,并且编译良好,但似乎它可能会在执行时抛弃 Spark。将 val 更改为 def 解决了这个问题。谢谢!