Apache Spark Null 值(强制转换不兼容的 DecimalType 与 ClassCastException)

Apache Spark Null Value when casting incompatible DecimalType vs ClassCastException

提问人:alexanoid 提问时间:4/15/2019 最后编辑:alexanoid 更新时间:1/10/2020 访问量:4717

问:

铸造 ) 例如 到 5,4) 在 Apache Spark 中以静默方式返回DecimalType(10,599999.99999DecimalType(null

在这种情况下,是否可以更改此行为并允许 Spark 抛出异常(例如某些 CastException)并失败作业而不是静默返回?null

Scala Apache Spark 铸造

评论

0赞 VB_ 4/28/2020
你找到一个干净的解决方案了吗?
0赞 wrschneider 2/28/2022
这让我措手不及,花了很长时间才找到。数据库中的 SQL 通常会在数字溢出时出现很大的错误,所以我根本没想到会有这种行为。

答:

0赞 Gladiator 4/15/2019 #1

根据 Git hub 文档,https://github.com/apache/spark/blob/3ab96d7acf870e53c9016b0b63d0b328eec23bed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L499

/** * 将给定小数点中的精度/小数位数更改为设置的精度/小数位数 in (如果有),* 如果它溢出或 就地修改,如果成功则返回。* * 注意:这是就地修改的,所以不要在外部调用它 数据。*/decimalTypevaluevalue

还有另一个线程,表明如果无法强制转换,可能没有直接的方法来使代码失败。Spark:在不更改列的可为 null 属性的情况下强制转换十进制。 因此,也许您可以尝试检查强制转换列中的值并创建一个失败的逻辑(如果有)?null

评论

0赞 RudyVerboven 1/9/2020
检查 null 值不是一个可靠的解决方案,因为该列中可能还有其他 null 值不是由强制转换引起的。除非在强制转换之前替换所有 null 值。我建议(就像上面给出的线程中所述的那样)编写您自己的UserDefinedFunction,然后可能会引发异常。
0赞 VB_ 4/28/2020
@RubyVerboven Java/Scala 玩得很好。由于 PySpark UDF 速度较慢,您必须使用 Scala UDF 管理额外的痛苦
0赞 RudyVerboven 1/9/2020 #2

正如我在上面的评论中提到的,您可以尝试使用 UserDefinedFunction 实现您想要的目标。我目前面临同样的问题,但设法使用 UDF 解决了我的问题。我面临的问题是我想尝试将列转换为 DoubleType,但我不知道预先知道类型,并且当解析失败时,我不会让我的应用程序失败,所以不是像您所说的那样无声的“null”。

在下面的代码中,您可以看到我编写了一个 which takes a as 参数。我将尝试将此结构中的唯一值解析为双精度值。如果失败,我将抛出异常,导致我的作业失败。udfstruct

import spark.implicits._

val cast_to_double = udf((number: Row) => {
  try {
    number.get(0) match {
      case s: String => s.toDouble
      case d: Double => d
      case l: Long => l.toDouble
      case i: Int => i.toDouble
      case _ => throw new NumberFormatException
    }
  } catch {
    case _: NumberFormatException => throw new IllegalArgumentException("Can't parse this so called number of yours.")
  }
})

try {
  val intDF = List(1).toDF("something")
  val secondIntDF = intDF.withColumn("something_else", cast_to_double(struct(col("something"))))
  secondIntDF.printSchema()
  secondIntDF.show()

  val stringIntDF = List("1").toDF("something")
  val secondStringIntDF = stringIntDF.withColumn("something_else", cast_to_double(struct(col("something"))))
  secondStringIntDF.printSchema()
  secondStringIntDF.show()

  val stringDF = List("string").toDF("something")
  val secondStringDF = stringDF.withColumn("something_else", cast_to_double(struct(col("something"))))
  secondStringDF.printSchema()
  secondStringDF.show()
} catch {
  case se: SparkException => println(se.getCause.getMessage)
}

输出:

root
 |-- something: integer (nullable = false)
 |-- something_else: double (nullable = false)

+---------+--------------+
|something|something_else|
+---------+--------------+
|        1|           1.0|
+---------+--------------+

root
 |-- something: string (nullable = true)
 |-- something_else: double (nullable = false)

+---------+--------------+
|something|something_else|
+---------+--------------+
|        1|           1.0|
+---------+--------------+

root
 |-- something: string (nullable = true)
 |-- something_else: double (nullable = false)

Can't parse this so called number of yours.