rdd 与 dataframe Spark 上的序列化

Serialization on rdd vs dataframe Spark

提问人:thebluephantom 提问时间:3/9/2019 最后编辑:10465355thebluephantom 更新时间:3/9/2019 访问量:690

问:

EX1 中。这和 RDD 给出了我们预期的序列化,无论有没有 Object,val num 都是罪魁祸首,很好:

object Example {
 val r = 1 to 1000000 toList
 val rdd = sc.parallelize(r,3)
 val num = 1
 val rdd2 = rdd.map(_ + num)
 rdd2.collect  
}
Example

EX2 中。但是,以类似的方式使用 DataFrame 则不然。为什么看起来有点一样?我在这里错过了什么?

object Example {
import spark.implicits._
import org.apache.spark.sql.functions._

val n = 1 
val df = sc.parallelize(Seq(
    ("r1", 1, 1),
    ("r2", 6, 4),
    ("r3", 4, 1),
    ("r4", 1, 2)
    )).toDF("ID", "a", "b")
df.repartition(3).withColumn("plus1", $"b" + n).show(false)
}
Example

我对 DF 不完全清楚的原因,会期待类似的行为。看起来 DS 规避了一些问题,但我很可能遗漏了一些东西。

在 Databricks 上运行会带来大量序列化问题,因此不要认为这会影响事情,方便测试。

Scala Apache Spark 序列化 闭包

评论

0赞 ZakukaZ 3/9/2019
更改为您的示例,它应该没问题。第一个参数是列,因此第二个参数应为列。如果你使用标量值,你应该告诉 spark 它,否则,它会尝试找到变量 () 并期望它是 type,这在你的情况下是不正确的nlit(n)nColumn
0赞 thebluephantom 3/9/2019
n 工作正常,这就是重点,但我明白了,只是想非常准确

答:

2赞 user11175058 3/9/2019 #1

原因很简单,比 和 之间的区别更根本:RDDDataset

  • 第一段代码计算函数

    _ + num
    

    因此,必须对其进行计算和评估。

  • 第二段代码没有。以后

    $"b" + n
    

    只是一个值,因此不需要闭包计算和后续序列化。

如果这还不清楚,你可以这样想:

  • 前一段代码告诉 Spark 如何做某事
  • 后一段代码告诉 Spark 该做什么。执行的实际代码是在不同的范围内生成的。

如果您的代码更接近它的对应代码,例如:DatasetRDD

object Example {
  import spark.implicits._

  val num = 1
  spark.range(1000).map(_ + num).collect
} 

Example {
  import spark.implicits._
  import org.apache.spark.sql.functions._ 

  val num = 1
  val f = udf((x: Int) => x + num) 
  spark.range(1000).select(f($"id")).collect
}

它将失败并出现序列化异常,与版本相同。RDD