提问人:mabramov 提问时间:8/6/2023 更新时间:8/6/2023 访问量:41
根据 PySpark 中的多个高级条件筛选行
Filter rows based on multiple advanced criterias in PySpark
问:
目前,我正在对一个数据库进行一些计算,该数据库包含有关借款人如何偿还贷款的信息。
我的目标是创建一个新的数据帧,其中包括符合以下条件的贷款:
- 借款人 (ID) 至少有 2 笔贷款;
- 后续每笔贷款均低于前一笔贷款,并在前一笔贷款还清后 15 天发放;
- 上一次贷款(没有截止日期的未偿还贷款)比上一次贷款大。
这是我的数据帧:
Name ID ContractDate LoanSum ClosingDate
A ID1 2022-10-10 10 2022-10-15
A ID1 2022-10-16 8 2022-10-25
A ID1 2022-10-27 25
B ID2 2022-12-12 10 2022-10-15
B ID2 2022-12-16 22 2022-11-18
B ID2 2022-12-20 9 2022-11-25
B ID2 2023-11-29 13
C ID3 2022-11-11 30 2022-11-18
我的预期结果是:
Name ID ContractDate LoanSum ClosingDate
A ID1 2022-10-10 10 2022-10-15
A ID1 2022-10-16 8 2022-10-25
A ID1 2022-10-27 25
B ID2 2022-12-12 10 2022-10-15
B ID2 2022-12-16 22 2022-11-18
B ID2 2022-12-20 9 2022-11-25
B ID2 2023-11-29 13
我已经做了什么: (下面的代码有助于捕捉贷款,但它不包括没有 ClosesingDate 的未偿还贷款,这些贷款比以前的贷款大)
cols = df.columns
w = Window.partitionBy('ID').orderBy('ContractDate')
newdf = df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
.withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) \
.withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) >= 1 and datediff(ContractDate, PreviousContractDate) < 16 and LoanSum - PreviousLoanSum < 0')) \
.withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
.filter('Target == True')
任何帮助都非常感谢! 非常感谢您的想法/解决方案!
答:
1赞
Chetna Chaudhari
8/6/2023
#1
为了获得预期的输出,请尝试以下逻辑
newdf = (df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w))
.withColumn('PreviousLoanSum', f.lag('LoanSum').over(w))
.withColumn('Target', f.expr('(datediff(ContractDate, PreviousContractDate) >= 1 and datediff(ContractDate, PreviousContractDate) < 16) or ((LoanSum > PreviousLoanSum) and (ClosingDate is null))' ))
.withColumn('Target', f.col('Target') | f.lead('Target').over(w))
.filter('Target == True'))
以下是完整的代码片段
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
# Initialize SparkSession
spark = SparkSession.builder.appName("LoanFiltering").getOrCreate()
# Sample data
data = [
("A", "ID1", "2022-10-10", 10, "2022-10-15"),
("A", "ID1", "2022-10-16", 8, "2022-10-25"),
("A", "ID1", "2022-10-27", 25, None),
("B", "ID2", "2022-12-12", 10, "2022-10-15"),
("B", "ID2", "2022-12-16", 22, "2022-11-18"),
("B", "ID2", "2022-12-20", 9, "2022-11-25"),
("B", "ID2", "2023-11-29", 13, None),
("C", "ID3", "2022-11-11", 30, "2022-11-18")
]
# Define the schema for the DataFrame
schema = StructType([
StructField("Name", StringType(), True),
StructField("ID", StringType(), True),
StructField("ContractDate", StringType(), True),
StructField("LoanSum", IntegerType(), True),
StructField("ClosingDate", StringType(), True)
])
# Create a DataFrame
df = spark.createDataFrame(data, schema)
# Convert date columns to DateType
df = df.withColumn("ContractDate", f.to_date("ContractDate", "yyyy-MM-dd"))
df = df.withColumn("ClosingDate", f.to_date("ClosingDate", "yyyy-MM-dd"))
# Create a window specification to order rows by ContractDate within each ID partition
w = Window.partitionBy("ID").orderBy("ContractDate")
newdf = (df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w))
.withColumn('PreviousLoanSum', f.lag('LoanSum').over(w))
.withColumn('Target', f.expr('(datediff(ContractDate, PreviousContractDate) >= 1 and datediff(ContractDate, PreviousContractDate) < 16) or ((LoanSum > PreviousLoanSum) and (ClosingDate is null))' ))
.withColumn('Target', f.col('Target') | f.lead('Target').over(w))
.filter('Target == True'))
newdf.show()
评论