提问人:Minura Punchihewa 提问时间:4/24/2023 更新时间:6/13/2023 访问量:427
PySpark 中的模糊匹配优化
Fuzzy Matching Optimization in PySpark
问:
我正在尝试通过PySpark对某些数据进行一些模糊匹配。为此,我正在使用该包并在 Databricks 上运行它。fuzzywuzzy
我的数据集非常简单。它存储在 CSV 文件中,包含两列:Name1 和 Name2。但是,我不仅要比较同一行中的两个值,还要将每个 Name1 与所有可用的 Name2 值进行比较。
这是我的代码的样子,
from pyspark.sql import functions as f
from fuzzywuzzy import fuzz
from pyspark.sql.types import StringType
# create a simple function that performs fuzzy matching on two strings
def match_string(s1, s2):
return fuzz.token_sort_ratio(s1, s2)
# convert the function into a UDF
MatchUDF = f.udf(match_string, StringType())
# separate the two Name columns into individual DataFrames
df1 = raw_df.select('Name1')
df2 = raw_df.select('Name2')
# perform a CROSS JOIN on the two DataFrames
# CAN THIS BE AVOIDED?
df = df1.crossJoin(df2)
# use the UDF from before to calculate a similarity score for each combination
df = df.withColumn("similarity_score", MatchUDF(f.col("Name1"), f.col("Name2")))
一旦我有了相似度分数,我就可以计算每个名字的排名,从而获得最佳匹配。
我担心的是CROSS JOIN。这成倍地增加了我拥有的数据点的数量。无论如何,这是可以避免的吗?
我也对完全不同的方法持开放态度,这些方法将以更优化的方式完成我需要做的事情。
答:
fuzzywuzzy
包中有一个 extractBests
函数,它返回与选项集合(列)的最佳匹配列表。Name2
此函数可以应用于列和整列中的单个值,因此您可以将其转换为 UDF,而无需交叉连接列。Name1
Name2
评论
鉴于您需要从内部检查所有匹配项,并且您的数据帧很小,因此直接的解决方案是交叉连接。Name1
Name2
但是,如果将来需要纵向扩展数据管道或需要提高性能,则可以应用一些技巧。
尝试将工作负载减少到小批量中:您可以使用或拆分数据帧将此操作转换为小块(或小批量)。
.option("maxFilesPerTrigger", 100)
广播 df2:如果
df2
是一个小数据帧,你可以将其 () 广播给所有工作线程并加快并行度(例如from pyspark.sql.functions import broadcast
join(broadcast(df2))
)调整分区数:可能在交叉连接之后,分区数量会猛增(df1 分区 * df2 分区)。数据帧分区的数量会影响计算的运行时间。如果分区太少,则计算将无法利用群集中可用的所有并行性。同样,如果分区过多,则管理许多小任务的开销会过大,从而使计算运行速度非常慢。因此,交叉联接数据帧属于“分区过多”的情况。用于检查分区的当前状态并应用以减小大小。
df1.rdd.partitions.size
.repartition
将 UDF 转换为 Pandas UDF:目前,这是利用 Apache Arrow 传输数据的最有效方法之一。
计算相似度分数的另一个选项是使用本机函数,例如(如果适用)。pyspark.sql.functions.levenshtein
评论
Name1
Name2