提问人:Naveen Balachandran 提问时间:9/17/2023 更新时间:9/18/2023 访问量:52
在 pyspark 中对数据帧应用逻辑操作
Apply logical operation on a dataframe in pyspark
问:
我有一个包含许多列的数据帧,在其中一列中,我有需要对数据帧执行的逻辑操作。例如,请看下面的数据帧
我需要对相关行执行列逻辑操作中定义的逻辑操作
在正常情况下,我可以使用expr()。但是在这种情况下,当我想从列中读取它然后应用时,它会给我一个错误,说列不可迭代。
有什么建议吗?
答:
1赞
user238607
9/17/2023
#1
这是在 pyspark 中使用 scala UDF 的解决方案,因为它们比 python UDF 更快。您可以在以下存储库中找到 pyspark 脚本中使用的 UDF 和发布 jar 的代码。
如果要修改 UDF 函数以满足将来的需求,只需运行编译 jar 即可。sbt assembly
然后从 jar 调用该类以验证正确的实现。com.help.stackoverflow.CheckUDFs
https://github.com/dineshdharme/pyspark-native-udfs
该类的源代码:EvaluateBooleanExpression
package com.help.udf
import org.apache.spark.sql.api.java.UDF3
import org.apache.spark.sql.api.java.UDF4
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
class EvaluateBooleanExpression extends UDF4[Int, Int, Int, String, Boolean] {
override def call(a_value:Int, b_value:Int, c_value:Int, given_expression: String): Boolean = {
var new_expression = given_expression.replaceAll("A", a_value.toString)
new_expression = new_expression.replaceAll("B", b_value.toString)
new_expression = new_expression.replaceAll("C", c_value.toString)
new_expression = new_expression.replaceAll("0", false.toString)
new_expression = new_expression.replaceAll("1", true.toString)
//println("Here's the new expression ", new_expression)
val toolbox = currentMirror.mkToolBox()
val calc = toolbox.eval(toolbox.parse(new_expression))
val convertedCalc = calc.toString.toBoolean
//println("Here's the new expression ", new_expression)
convertedCalc
}
}
Pyspark python脚本:
import sys
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.jars", "file:/path/to/pyspark-native-udfs/releases/pyspark-native-udfs-assembly-0.1.3.jar") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
data1 = [
[0, 1, 1, "(A&B)"],
[1, 1, 1, "(A)"],
[0, 0, 1, "(A|C)"],
]
df1Columns = ["A", "B", "C", "exp"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.withColumn("A", F.col("A").cast("int"))
df1 = df1.withColumn("B", F.col("B").cast("int"))
df1 = df1.withColumn("C", F.col("C").cast("int"))
print("Schema of the dataframe")
df1.printSchema()
print("Given dataframe")
df1.show(n=100, truncate=False)
spark.udf.registerJavaFunction("evaluate_boolean_exp_udf", "com.help.udf.EvaluateBooleanExpression", BooleanType())
df1.createOrReplaceTempView("given_table")
df1_array = sqlContext.sql("select *, evaluate_boolean_exp_udf(A, B, C, exp) as bool_exp_evaluated from given_table")
print("Dataframe after applying SCALA NATIVE UDF")
df1_array.show(n=100, truncate=False)
输出:
Schema of the dataframe
root
|-- A: integer (nullable = true)
|-- B: integer (nullable = true)
|-- C: integer (nullable = true)
|-- exp: string (nullable = true)
Given dataframe
+---+---+---+-----+
|A |B |C |exp |
+---+---+---+-----+
|0 |1 |1 |(A&B)|
|1 |1 |1 |(A) |
|0 |0 |1 |(A|C)|
+---+---+---+-----+
Dataframe after applying SCALA NATIVE UDF
+---+---+---+-----+------------------+
|A |B |C |exp |bool_exp_evaluated|
+---+---+---+-----+------------------+
|0 |1 |1 |(A&B)|false |
|1 |1 |1 |(A) |true |
|0 |0 |1 |(A|C)|true |
+---+---+---+-----+------------------+
3赞
werner
9/17/2023
#2
您可以在 UDF 中使用标准 Python eval 函数。
该函数期望数据位于字典中,因此我们首先将数据列转换为结构体:eval
from pyspark.sql import functions as F
eval_udf = F.udf(lambda op, data: eval(op, {}, data.asDict()))
df.withColumn('data', F.struct([df[x] for x in df.columns if x != 'logical_operation'])) \
.withColumn('result', eval_udf(F.col('logical_operation'), F.col('data'))) \
.show()
输出:
+---+---+---+-----------------+---------+------+
| A| B| C|logical_operation| data|result|
+---+---+---+-----------------+---------+------+
| 0| 1| 1| (A&B)|{0, 1, 1}| 0|
| 1| 1| 1| (A)|{1, 1, 1}| 1|
| 0| 0| 1| (A|C)|{0, 0, 1}| 1|
+---+---+---+-----------------+---------+------+
eval
带有一些安全问题,因此请检查这对您来说是否可能是问题!
评论
0赞
Naveen Balachandran
9/22/2023
谢谢werner - 该解决方案完美地满足了我的要求。我在列名中有一些特殊字符,这些字符最初让我绊倒了,但首先使用正则表达式来清理它们 -
评论