Spark 映射闭包内部没有自由变量

Spark map closure not getting free variables inside

提问人:merukii6912 提问时间:6/17/2021 最后编辑:merukii6912 更新时间:6/18/2021 访问量:98

问:

我正在尝试在 RDD 上使用 map -> combineByKey -> map 进行转换。

我有这个驱动程序方法,它初始化了sparkSession:

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(getClass.getName).getOrCreate()

    val originalRdd = ...

    new A.transform(spark, originalRdd) 
  }

然后在转换函数中,我初始化了另一个变量。

第一个映射:将其转换为键值对,第二个映射:转换列表值并调用 f 和 g

class A {
 
  def transform(spark: sparkSession, originalRdd: RDD) = {

    val b = new B()

    val rdd = originalRdd
        .map {
          s => ((s.a, s.b, s.c), (s.d, s.e)) 
        }.combineByKey(
          // define combiner to combine the tuples to a list
        ).map {
          case (k, v) => Seq((k, b.f(spark.sparkContext.parallelize(v)), b.g(spark.sparkContext.parallelize(v))) // "b" and "sparkContext" will always be null here and throw NullPointerException on this line
        }

        rdd
   }

}

我需要使用 b 来调用 f 和 g,两者都采用带有值的 RDD 并根据值计算一个数字,无论如何都不会更新输入 RDD 本身。和 spark 将 v 转换为 RDD,并在此闭包中将其传递给 f 和 g。

我尝试将这两个变量包装在广播中,以在 transform() 中的集群执行器上保留这两个变量的只读缓存,并在闭包中使用它们:

      val broadcastBVar = spark.sparkContext.broadcast(b)
      val broadcastSpark = spark.sparkContext.broadcast(spark)

      broadcastBVar.value //usage

这也没用,我仍然得到了 NPE。有什么方法可以解决这个问题吗?

Scala Apache Spark 闭包分布式 计算

评论

0赞 Gaël J 6/17/2021
您能澄清一下NPE发生的位置吗?当然,您不需要广播 nor .对你来说,这取决于它到底是什么以及它做什么。SparkSessionSparkContextB
0赞 merukii6912 6/18/2021
@GaëlJ我更新了 .我还评论了抛出 NPE 的行B

答: 暂无答案