提问人:SHIVAM YADAV 提问时间:9/12/2023 更新时间:9/12/2023 访问量:87
Pyspark 中的 AutoCast 数据类型
AutoCast datatype in Pyspark
问:
我正在从 PostgresSQL 创建一个 parquet 文件,它的所有内容都标记为 varchar 列。在 ADLS 中拥有文件后,我们希望使用 Python/Pyspark 根据 Azure Databricks 中的日期、整数、varchar 字段等数据强制转换数据类型。 所以想要一种方法来自动转换文件d
答:
1赞
DileeprajnarayanThumula
9/12/2023
#1
我尝试使用 pyspark 重现和自动投射方法,根据日期、整数、Varchar 字段等数据投射所有 Varchar 数据类型。
例如,我使用字符串数据类型创建了数据和列,并将它们以 Parquet 格式在 ADLS 中保存。
schema = StructType([
StructField("name", StringType(), True),
StructField("birth_date", StringType(), True),
StructField("age", StringType(), True),
StructField("score", StringType(), True)
])
data = [("John", "1990-01-01", "123", "12.34"),
("Alice", "1995-05-15", "456", "56.78")]
以下代码将对数据类型 Date、Integer、Varchar 执行 AUTO CAST
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
spark = SparkSession.builder.appName("AutoCastParquet").getOrCreate()
Parquet_file_path = "abfss://[email protected]/parquet_sample.parquet"
parquet_df = spark.read.parquet(Parquet_file_path)
casting_functions = {
"string": lambda col_name: col(col_name),
"date": to_date,
"timestamp": to_timestamp,
"integer": lambda col_name: col(col_name).cast("integer"),
"double": lambda col_name: col(col_name).cast("double"),
}
for column_name, data_type in parquet_df.dtypes:
if "string" in data_type.lower():
cast_func = casting_functions.get(data_type.lower())
if cast_func:
parquet_df = parquet_df.withColumn(column_name, cast_func(column_name))
parquet_df = parquet_df.withColumn(
"birth_date",
when(
to_date(col("birth_date"), "yyyy-MM-dd").isNotNull(),
to_date(col("birth_date"), "yyyy-MM-dd")
).otherwise(None)
)
parquet_df = parquet_df.withColumn(
"age",
when(
col("age").cast("integer").isNotNull(),
col("age").cast("integer")
).otherwise(None)
)
parquet_df = parquet_df.withColumn(
"score",
when(
col("score").cast("double").isNotNull(),
col("score").cast("double")
).otherwise(None)
)
new_schema = StructType([
StructField("name", StringType(), True),
StructField("birth_date", DateType(), True),
StructField("age", IntegerType(), True),
StructField("score", DoubleType(), True)
])
parquet_df = spark.createDataFrame(parquet_df.rdd, new_schema)
parquet_df.printSchema()
display(parquet_df)
上面的代码将 Parquet 文件从 Azure Data Lake Storage 读取到 Spark DataFrame 中,自动识别 StringType 列,并使用预定义的强制转换函数将其强制转换为推断的数据类型。生成的 DataFrame,具有更正的数据类型。
评论
0赞
SHIVAM YADAV
9/13/2023
在这里,您已经修复了架构。我有 n 个文件,每个文件都有不同的架构。我想要一个通用代码,它将从值中检测数据类型,然后创建一个模式。或其他方法
0赞
DileeprajnarayanThumula
9/13/2023
@SHIVAMYADAV 好的,让我帮你试试
0赞
DileeprajnarayanThumula
9/13/2023
@SHIVAMYADAV 尝试使用 parquet_df = spark.read.option(“inferSchema”, “true”).parquet(Parquet_file_path) 此选项允许 Spark 从每个 Parquet 文件中的数据自动推断架构。
0赞
SHIVAM YADAV
9/14/2023
是的,但它读取的整数值为 String
评论