在 pyspark 中对数据帧应用逻辑操作

Apply logical operation on a dataframe in pyspark

提问人:Naveen Balachandran 提问时间:9/17/2023 更新时间:9/18/2023 访问量:52

问:

我有一个包含许多列的数据帧,在其中一列中,我有需要对数据帧执行的逻辑操作。例如,请看下面的数据帧enter image description here

我需要对相关行执行列逻辑操作中定义的逻辑操作

在正常情况下,我可以使用expr()。但是在这种情况下,当我想从列中读取它然后应用时,它会给我一个错误,说列不可迭代。

有什么建议吗?

pyspark 逻辑运算符 expr

评论

0赞 werner 9/17/2023
请不要发布数据截图。感谢!

答:

1赞 user238607 9/17/2023 #1

这是在 pyspark 中使用 scala UDF 的解决方案,因为它们比 python UDF 更快。您可以在以下存储库中找到 pyspark 脚本中使用的 UDF 和发布 jar 的代码。

如果要修改 UDF 函数以满足将来的需求,只需运行编译 jar 即可。sbt assembly

然后从 jar 调用该类以验证正确的实现。com.help.stackoverflow.CheckUDFs

https://github.com/dineshdharme/pyspark-native-udfs

该类的源代码:EvaluateBooleanExpression

package com.help.udf

import org.apache.spark.sql.api.java.UDF3
import org.apache.spark.sql.api.java.UDF4

import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox


class EvaluateBooleanExpression extends UDF4[Int, Int, Int, String, Boolean] {

  override def call(a_value:Int, b_value:Int, c_value:Int,  given_expression: String): Boolean = {

    var new_expression = given_expression.replaceAll("A", a_value.toString)
    new_expression = new_expression.replaceAll("B", b_value.toString)
    new_expression = new_expression.replaceAll("C", c_value.toString)
    new_expression = new_expression.replaceAll("0", false.toString)
    new_expression = new_expression.replaceAll("1", true.toString)
    //println("Here's the new expression ", new_expression)

    val toolbox = currentMirror.mkToolBox()
    val calc = toolbox.eval(toolbox.parse(new_expression))

    val convertedCalc = calc.toString.toBoolean
    //println("Here's the new expression ", new_expression)

    convertedCalc
  }
}

Pyspark python脚本:

import sys

from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", "file:/path/to/pyspark-native-udfs/releases/pyspark-native-udfs-assembly-0.1.3.jar") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)


data1 = [
[0, 1, 1, "(A&B)"],
[1, 1, 1, "(A)"],
[0, 0, 1, "(A|C)"],
      ]

df1Columns = ["A", "B", "C", "exp"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.withColumn("A", F.col("A").cast("int"))
df1 = df1.withColumn("B", F.col("B").cast("int"))
df1 = df1.withColumn("C", F.col("C").cast("int"))

print("Schema of the dataframe")
df1.printSchema()

print("Given dataframe")
df1.show(n=100, truncate=False)



spark.udf.registerJavaFunction("evaluate_boolean_exp_udf", "com.help.udf.EvaluateBooleanExpression", BooleanType())

df1.createOrReplaceTempView("given_table")

df1_array = sqlContext.sql("select *, evaluate_boolean_exp_udf(A, B, C, exp) as bool_exp_evaluated from given_table")
print("Dataframe after applying SCALA NATIVE UDF")
df1_array.show(n=100, truncate=False)

输出:

Schema of the dataframe
root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: integer (nullable = true)
 |-- exp: string (nullable = true)

Given dataframe
+---+---+---+-----+
|A  |B  |C  |exp  |
+---+---+---+-----+
|0  |1  |1  |(A&B)|
|1  |1  |1  |(A)  |
|0  |0  |1  |(A|C)|
+---+---+---+-----+

Dataframe after applying SCALA NATIVE UDF
+---+---+---+-----+------------------+
|A  |B  |C  |exp  |bool_exp_evaluated|
+---+---+---+-----+------------------+
|0  |1  |1  |(A&B)|false             |
|1  |1  |1  |(A)  |true              |
|0  |0  |1  |(A|C)|true              |
+---+---+---+-----+------------------+
3赞 werner 9/17/2023 #2

您可以在 UDF 中使用标准 Python eval 函数

该函数期望数据位于字典中,因此我们首先将数据列转换为结构体:eval

from pyspark.sql import functions as F

eval_udf = F.udf(lambda op, data: eval(op, {}, data.asDict()))

df.withColumn('data', F.struct([df[x] for x in df.columns if x != 'logical_operation'])) \
    .withColumn('result', eval_udf(F.col('logical_operation'), F.col('data'))) \
    .show()

输出:

+---+---+---+-----------------+---------+------+
|  A|  B|  C|logical_operation|     data|result|
+---+---+---+-----------------+---------+------+
|  0|  1|  1|            (A&B)|{0, 1, 1}|     0|
|  1|  1|  1|              (A)|{1, 1, 1}|     1|
|  0|  0|  1|            (A|C)|{0, 0, 1}|     1|
+---+---+---+-----------------+---------+------+

eval带有一些安全问题,因此请检查这对您来说是否可能是问题!

评论

0赞 Naveen Balachandran 9/22/2023
谢谢werner - 该解决方案完美地满足了我的要求。我在列名中有一些特殊字符,这些字符最初让我绊倒了,但首先使用正则表达式来清理它们 -