在 2 列上获取非重复元素,并在两列之间有 1:1 的记录,按顺序重复数据删除 pyspark

Get Non repeating elements on 2 column and have 1:1 records between both columns, dedup in sequential order pyspark

提问人:mouli lee 提问时间:11/13/2023 最后编辑:mouli lee 更新时间:11/21/2023 访问量:120

问:

我有一个包含 2 列的 pyspark 数据帧。我需要获取一个数据帧,该列的前几行中没有任何重复元素。如果一个元素是重复的,那么整行都应该被忽略。

输入数据帧:

列 1 专栏 2
一个 1
B 2
一个 3
C 3
C 4
C 4
D 4
E 5
F 4
G 7
D 8
H 9
9
H 10
10

预期结果数据帧:

列 1 专栏 2
一个 1
B 2
C 3
D 4
E 5
G 7
H 9
10

我尝试使用窗口函数来删除重复项。但它并没有给出预期的结果。

SQL Pyspark apache-spark-sql 复制 Databricks

评论

2赞 Steven 11/20/2023
问题在于,您的情况取决于表格的排序方式。因此,它不适用于 Spark。如果 B 线在 A 线之前,则输出会有所不同。但是,Spark 不是这样工作的;无论顺序如何,输出将始终相同。这意味着你的逻辑是有缺陷的。
0赞 mouli lee 11/21/2023
感谢您的回复@Steven 我们是否有任何解决方法可以在不使用 UDF 的情况下跨 Column1 和 Column2 记录实现此 1:1 映射!
0赞 Steven 11/22/2023
您至少需要一列额外的列来维持订单。
0赞 mouli lee 11/23/2023
假设我们使用 monotonically_increasing_id() 或 row_number() 为订单创建另一列。是否可以使用我们创建的列消除重复元素的行?!

答:

0赞 Steven 11/13/2023 #1

假设是你的数据帧:df

from pyspark.sql import functions as F, Window as W

df.withColumn(
    "r", F.row_number().over(W.partitionBy("column1").orderBy("column2"))
).where("r=1").drop("r")

我为每一行创建一个行号。column1 的每个组(分区)的第一行是 1,因此,我只需要过滤该值。


编辑

做同样的事情两次以删除所有重复项:

df.withColumn(
    "r", F.row_number().over(W.partitionBy("column1").orderBy("column2"))
).where("r=1").drop("r").withColumn(
    "r", F.row_number().over(W.partitionBy("column2").orderBy("column1"))
).where("r=1").drop("r")

评论

0赞 mouli lee 11/13/2023
通过这段代码,我们也将在结果中获得 F 行。这是意料之外的。我们不应该认为该行已经包含在 Column2 中的 4 中。
0赞 Steven 11/13/2023
@moulilee,您只需要执行两次重复数据删除过程。首先,使用代码根据 column1 进行重复数据删除。然后,通过交换 column1 和 column2 来重复该过程,以根据 column2 进行重复数据删除。
0赞 mouli lee 11/19/2023
我们应该如何处理我在问题中提到的案例。PS:我会针对这种情况编辑问题
0赞 leftjoin 11/13/2023 #2

在 Spark-SQL 中使用 row_number 函数

val dF2 = Seq(
("A",1),
("B",2),
("A",3),
("C",3),
("C",4),
("C",4),
("D",4),
("E",5),
("F",4),
("G",7),
("D",8)
).toDF("Column1","Column2")

dF2.createOrReplaceTempView("dF2")

spark.sql(""" 
select column1, column2
from 
(
select column1, column2, 
       row_number() over (partition by column2 order by column1) rn
from
(
select column1, column2, 
       row_number() over (partition by column1 order by column2) rn
  from dF2
) s where rn=1
) s where rn=1

""").show(100, false)

结果:

+-------+-------+
|column1|column2|
+-------+-------+
|A      |1      |
|B      |2      |
|C      |3      |
|D      |4      |
|E      |5      |
|G      |7      |
+-------+-------+

评论

0赞 mouli lee 11/13/2023
通过这段代码,我们也将在结果中获得 F 行。这是意料之外的。我们不应该认为该行已经包含在 Column2 中的 4 中。
0赞 leftjoin 11/13/2023
@moulilee固定
0赞 mouli lee 11/19/2023
对于我在编辑问题中提到的案例。仍然没有得到预期的结果
0赞 leftjoin 11/20/2023
@moulilee 请仔细检查您在 row_number() 计算中使用了正确的分区依据和排序依据。我已经测试了代码并得到了预期的结果。查看更新代码,您可以运行它并检查。它在上层是 Scala 语言,但您可以轻松地转换为 Python。请注意,分区依据和排序依据在第一row_number和第二中是不同的
0赞 mouli lee 11/20/2023
尽管如此,对于我添加的情况,并没有给我预期的结果。