根据 PySpark 中的多个高级条件筛选行

Filter rows based on multiple advanced criterias in PySpark

提问人:mabramov 提问时间:8/6/2023 更新时间:8/6/2023 访问量:41

问:

目前,我正在对一个数据库进行一些计算,该数据库包含有关借款人如何偿还贷款的信息。

我的目标是创建一个新的数据帧,其中包括符合以下条件的贷款:

  • 借款人 (ID) 至少有 2 笔贷款;
  • 后续每笔贷款均低于前一笔贷款,并在前一笔贷款还清后 15 天发放;
  • 上一次贷款(没有截止日期的未偿还贷款)比上一次贷款大。

这是我的数据帧:

Name    ID     ContractDate LoanSum ClosingDate
A       ID1    2022-10-10   10      2022-10-15 
A       ID1    2022-10-16   8       2022-10-25
A       ID1    2022-10-27   25      
B       ID2    2022-12-12   10      2022-10-15
B       ID2    2022-12-16   22      2022-11-18
B       ID2    2022-12-20   9       2022-11-25
B       ID2    2023-11-29   13      
C       ID3    2022-11-11   30      2022-11-18

我的预期结果是:

Name    ID     ContractDate LoanSum ClosingDate
A       ID1    2022-10-10   10      2022-10-15 
A       ID1    2022-10-16   8       2022-10-25
A       ID1    2022-10-27   25      
B       ID2    2022-12-12   10      2022-10-15
B       ID2    2022-12-16   22      2022-11-18
B       ID2    2022-12-20   9       2022-11-25
B       ID2    2023-11-29   13      

我已经做了什么: (下面的代码有助于捕捉贷款,但它不包括没有 ClosesingDate 的未偿还贷款,这些贷款比以前的贷款大)

cols = df.columns
w = Window.partitionBy('ID').orderBy('ContractDate')

newdf = df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  .withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) \
  .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) >= 1 and datediff(ContractDate, PreviousContractDate) < 16 and LoanSum - PreviousLoanSum < 0')) \
  .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
  .filter('Target == True')

任何帮助都非常感谢! 非常感谢您的想法/解决方案!

python pyspark 过滤 数据操作

评论


答:

1赞 Chetna Chaudhari 8/6/2023 #1

为了获得预期的输出,请尝试以下逻辑

newdf = (df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) 
  .withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) 
  .withColumn('Target', f.expr('(datediff(ContractDate, PreviousContractDate) >= 1 and datediff(ContractDate, PreviousContractDate) < 16) or ((LoanSum > PreviousLoanSum)  and (ClosingDate is null))' )) 
  .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) 
  .filter('Target == True'))

以下是完整的代码片段

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Initialize SparkSession
spark = SparkSession.builder.appName("LoanFiltering").getOrCreate()

# Sample data
data = [
    ("A", "ID1", "2022-10-10", 10, "2022-10-15"),
    ("A", "ID1", "2022-10-16", 8, "2022-10-25"),
    ("A", "ID1", "2022-10-27", 25, None),
    ("B", "ID2", "2022-12-12", 10, "2022-10-15"),
    ("B", "ID2", "2022-12-16", 22, "2022-11-18"),
    ("B", "ID2", "2022-12-20", 9, "2022-11-25"),
    ("B", "ID2", "2023-11-29", 13, None),
    ("C", "ID3", "2022-11-11", 30, "2022-11-18")
]

# Define the schema for the DataFrame
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("ID", StringType(), True),
    StructField("ContractDate", StringType(), True),
    StructField("LoanSum", IntegerType(), True),
    StructField("ClosingDate", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(data, schema)

# Convert date columns to DateType
df = df.withColumn("ContractDate", f.to_date("ContractDate", "yyyy-MM-dd"))
df = df.withColumn("ClosingDate", f.to_date("ClosingDate", "yyyy-MM-dd"))

# Create a window specification to order rows by ContractDate within each ID partition
w = Window.partitionBy("ID").orderBy("ContractDate")

newdf = (df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) 
  .withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) 
  .withColumn('Target', f.expr('(datediff(ContractDate, PreviousContractDate) >= 1 and datediff(ContractDate, PreviousContractDate) < 16) or ((LoanSum > PreviousLoanSum)  and (ClosingDate is null))' )) 
  .withColumn('Target', f.col('Target') | f.lead('Target').over(w))
  .filter('Target == True'))

newdf.show()