在不移动行的情况下对具有特定条件的行进行计数 (PySpark)

Count Rows with a specific condition without moving Rows (PySpark)

提问人:G_Rivera 提问时间:11/3/2023 最后编辑:G_Rivera 更新时间:11/8/2023 访问量:38

问:

从字面上看,我是编程新手,或者至少我是新基础知识,我面临着一个问题,我不知道如何计算我在 PySpark datafrme 中的“周期”。

假设这是 DataFrame:

一个 0
一个 0
一个 1
一个 1
一个 1
一个 0
一个 0
一个 1
一个 1
一个 0
一个 0

我想看到的结果是有多少个“周期(两个或 X #1 是连续的)”。在这种情况下,将是:

  • 2 个周期(下表显示了分离的“周期”)。

我怎样才能达到这个结果?

一个 0
一个 0
------ -----
一个 1
一个 1
一个 1
------ -----
一个 0
一个 0
------ -----
一个 1
一个 1
------ -----
一个 0
一个 0

如果您能帮助我提供示例或链接,如果已经提到过,将不胜感激。

我已经尝试了几个过滤器,但没有成功。

提前致谢。

if-statement pyspark 过滤器 条件语句

评论

0赞 Adrian Maxwell 11/3/2023
有 5 个周期不是吗?哪里。对于同一个字母,2 个或多个等于 = 1 个周期的“组”值)但是,如果“组”在每一行上发生变化,例如 1、0、1、0、1 等,会发生什么,这些行只是被忽略吗?
0赞 G_Rivera 11/7/2023
@PaulMaxwell,没错,有 5 个周期,但出于本练习的目的,不计算 0 并且只关注 1,在有 1,0,1,0,1 的情况下,这些行需要被忽略,因为唯一要计算的行是在 2 个或更多连续 1 的情况下。

答:

0赞 Adrian Maxwell 11/3/2023 #1

您需要一种方法来保留当前行顺序,以便计算周期。为此,有一个函数可用于为每行分配一个唯一的“递增值”,因此我们按该列排序,保留原始行序列。一旦到位,我们就可以使用基于窗口的计算(需要对行进行排序)来提供周期计数。monotonically_increasing_id()

注意:不确定连续 2 行的要求,下面不讨论

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create SparkSession
spark = SparkSession.builder \
          .appName('CycleCount') \
          .getOrCreate()

# Assume "df" as the DataFrame
df.show()

# Create a new column "index" so that we can retain the current row order
df = df.withColumn("index", F.monotonically_increasing_id())

# Define the window ordered by the "index" which retains current row order (per letter)
window = Window.partitionBy("letter").orderBy("index")

# Add new column "change" that flags when a change in 'group' occurs
df = df.withColumn("change", F.when(F.lag("group").over(window) != F.col("group"), 1).otherwise(0))

# Use cumsum() for cumulative sum of these changes, which will serve as an identifier for each cycle
df = df.withColumn("cycle", F.sum("change").over(window))

# Filter out groups with only 1 row
df = df.filter(F.col("count") > 1)

# Count the number of cycles
cycle_count = df.groupBy("letter", "cycle").count().count()

print("Number of cycles: ", cycle_count)

评论

0赞 G_Rivera 11/8/2023
我尝试了这个解决方案,它运行良好,但是当我遇到只有一个“1”的情况时,程序会计算它,在这种情况下,我如何定义一个特定的“条件”,只有两个以上的连续“1”?多谢。
0赞 Adrian Maxwell 11/8/2023
为计数大于 1 的行添加筛选器 请参阅 - 在计算cycle_count之前添加到答案中df = df.filter(F.col("count") > 1)