提问人:G_Rivera 提问时间:11/3/2023 最后编辑:G_Rivera 更新时间:11/8/2023 访问量:38
在不移动行的情况下对具有特定条件的行进行计数 (PySpark)
Count Rows with a specific condition without moving Rows (PySpark)
问:
从字面上看,我是编程新手,或者至少我是新基础知识,我面临着一个问题,我不知道如何计算我在 PySpark datafrme 中的“周期”。
假设这是 DataFrame:
信 | 群 |
---|---|
一个 | 0 |
一个 | 0 |
一个 | 1 |
一个 | 1 |
一个 | 1 |
一个 | 0 |
一个 | 0 |
一个 | 1 |
一个 | 1 |
一个 | 0 |
一个 | 0 |
我想看到的结果是有多少个“周期(两个或 X #1 是连续的)”。在这种情况下,将是:
- 2 个周期(下表显示了分离的“周期”)。
我怎样才能达到这个结果?
信 | 群 |
---|---|
一个 | 0 |
一个 | 0 |
------ | ----- |
一个 | 1 |
一个 | 1 |
一个 | 1 |
------ | ----- |
一个 | 0 |
一个 | 0 |
------ | ----- |
一个 | 1 |
一个 | 1 |
------ | ----- |
一个 | 0 |
一个 | 0 |
如果您能帮助我提供示例或链接,如果已经提到过,将不胜感激。
我已经尝试了几个过滤器,但没有成功。
提前致谢。
答:
0赞
Adrian Maxwell
11/3/2023
#1
您需要一种方法来保留当前行顺序,以便计算周期。为此,有一个函数可用于为每行分配一个唯一的“递增值”,因此我们按该列排序,保留原始行序列。一旦到位,我们就可以使用基于窗口的计算(需要对行进行排序)来提供周期计数。monotonically_increasing_id()
注意:不确定连续 2 行的要求,下面不讨论
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Create SparkSession
spark = SparkSession.builder \
.appName('CycleCount') \
.getOrCreate()
# Assume "df" as the DataFrame
df.show()
# Create a new column "index" so that we can retain the current row order
df = df.withColumn("index", F.monotonically_increasing_id())
# Define the window ordered by the "index" which retains current row order (per letter)
window = Window.partitionBy("letter").orderBy("index")
# Add new column "change" that flags when a change in 'group' occurs
df = df.withColumn("change", F.when(F.lag("group").over(window) != F.col("group"), 1).otherwise(0))
# Use cumsum() for cumulative sum of these changes, which will serve as an identifier for each cycle
df = df.withColumn("cycle", F.sum("change").over(window))
# Filter out groups with only 1 row
df = df.filter(F.col("count") > 1)
# Count the number of cycles
cycle_count = df.groupBy("letter", "cycle").count().count()
print("Number of cycles: ", cycle_count)
评论
0赞
G_Rivera
11/8/2023
我尝试了这个解决方案,它运行良好,但是当我遇到只有一个“1”的情况时,程序会计算它,在这种情况下,我如何定义一个特定的“条件”,只有两个以上的连续“1”?多谢。
0赞
Adrian Maxwell
11/8/2023
为计数大于 1 的行添加筛选器 请参阅 - 在计算cycle_count之前添加到答案中df = df.filter(F.col("count") > 1)
评论