按顺序从数组列中获取非重复行 pyspark

Get Non repeating rows from an array column in sequential order pyspark

提问人:mouli lee 提问时间:11/16/2023 最后编辑:Aswinmouli lee 更新时间:11/16/2023 访问量:104

问:

我有一个包含 2 列的 pyspark 数据帧。Column1 是整数列,Column2 是 ArrayType。我需要获取一个数据帧,该 Column2 的前几行中没有任何重复元素。在 Column2 中,如果行中的某个元素在前面的任何行中重复,则应忽略整行。

请考虑下面的 DataFrame,在第 3 行中,第 39 行是一个重复元素,因此应忽略整行。

enter image description here

预期结果数据帧

enter image description here

我尝试使用分解数组列和窗口函数来删除重复项。但它并没有给出预期的结果。

Azure Pyspark Apache Spark-SQL Databricks

评论

0赞 DileeprajnarayanThumula 11/16/2023
是否使用 Azure Databricks?
0赞 mouli lee 11/16/2023
是的@DileeprajnarayanThumula
0赞 DileeprajnarayanThumula 11/16/2023
dilip_df = spark.createDataFrame([Row(Column1=row[0], Column2=row[1]) for row in data], schema=schema) dilip_windowSpec = Window.orderBy(“Column1”) dilip_df = dilip_df.withColumn(“lag_Column2”, lag(“Column2”).over(dilip_windowSpec)) def has_intersection(arr1, arr2): 如果 arr1 不是 None 并且 arr2 不是 None: return bool(set(arr1) & set(arr2)) else: return False dilip_intersection_udf = udf(has_intersection, BooleanType()) dilip_df_filtered = dilip_df.filter(~dilip_intersection_udf(“Column2”, “lag_Column2”) |(col(“lag_Column2”).isNull()))。drop(“lag_Column2”)
0赞 DileeprajnarayanThumula 11/16/2023
你能试试这是否有帮助吗?
0赞 mouli lee 11/16/2023
在这里,我们正在检查上一行和当前行,但我们需要检查当前行以及所有先前捕获的行
0赞 DileeprajnarayanThumula 11/17/2023
df_exploded = df.select(“Column1”, F.explode(“Column2”).alias(“Element”)) window_spec = Window.orderBy(“Column1”) df_exploded = df_exploded.withColumn(“prev_elements”, F.collect_set(“Element”).over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1))) df_result = df_exploded.filter(~F.array_contains(F.col(“prev_elements”), F.col(“Element”))) df_final = (df_result .groupBy(“Column1”) .agg(F.collect_list(“Element”).alias(“Column2”)) .orderBy(”列 1“))

答:

0赞 partlov 11/16/2023 #1

尝试自行联接此数据帧并列出应删除的所有 Column1 值。像这样的东西:

df_exploded = df.select(df.Column1, explode(df.Column2).alias("Column2"))
df_join1 = df_exploded.alias("df1")
df_join2 = df_exploded.alias("df2")
df_to_remove = (
  df_join1
  .join(
    df_join2, 
    [df_join1.Column2 == df_join2.Column2, df_join1.Column1 > df_join2.Column2]
  )
  .select(df1.Column1)
  .dropDuplicates()
)
df_final = df.join(df_to_remove, ["Column1"], "left_anti")

我没有运行此代码,因此可能存在一些问题,但这是想法。

评论

0赞 mouli lee 11/21/2023
但它并没有给出预期的输出