提问人:SHIVAM YADAV 提问时间:9/14/2023 更新时间:9/16/2023 访问量:69
在 Pyspark 中动态转换数据类型
Dynamically cast the datatype in Pyspark
问:
我正在从 PostgresSQL 创建一个 parquet 文件,它的所有内容都标记为 varchar 列。在 ADLS 中获取文件后,我们希望使用 Python/Pyspark 根据 Azure Databricks 中的日期、整数、varchar 字段等数据强制转换数据类型。
具有不同架构的多个文件需要使用相同的代码,因此需要一个没有硬编码值的通用进程。
我有用户inferSchema=True,但它仍然将int读取为String。 所以想要一种方法来自动转换文件d
答:
我用来寻找广义解决方案的策略如下。
1.首先,我将 parquet 文件作为数据帧读取。这个阶段的一切都是字符串。
2.将一个小样本(即 2 到 10 行)转换为 pandas 数据帧。
3.Pandas 能够推断字符串列的数据类型。但只有当我们模拟它从磁盘读取时,它才能做到这一点。为此,我们将 pandas 数据帧转换为内存中的文件,并从该文件读取到新的数据帧中。
4.现在,上述新的回读 pandas 数据帧具有必需的类型。我们可以继续。一旦出现问题,它就无法处理日期时间字符串。因此,如果可能的话,将 pandas 中的列转换为数据时间列。请确保在此处添加其他自定义数据类型处理代码。object
5.将此具有推断类型的 pandas 数据帧转换为 pyspark 数据帧。
6.从上面获取 pyspark 数据帧的架构。
7.将上述每列分别推断的数据类型应用于我们在步骤 1 中的原始数据帧。
8.需要记住的一件事是,Spark会尽可能地慷慨解囊。因此,它将使用 LongType 而不是 IntType 和 DoubleType 而不是 FloatType,以便处理所有未来可能的情况。
import sys
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
import pandas as pd
import io
sc = SparkContext('local')
sqlContext = SQLContext(sc)
def print_pandas(dataframe_given):
with pd.option_context('display.max_rows', None,'display.max_columns', None, 'expand_frame_repr', False):
print("Given pandas dataframe name")
print(dataframe_given)
### Very Long dataframe which we have reading from a file or database with everything varchar
data1 = [
["a1", "1", "101.0", "2023-09-06 17:51:23"],
["a2", "2", "223.0", "2025-09-16 17:51:23"],
["a3", "4", "10.0", "2023-09-06 17:51:23"],
["b1", "6", "256.0", "2025-09-16 17:51:23"],
["b2", "11", "1023.0", "2023-09-06 17:51:23"],
["b3", "22", "243.0", "2025-09-16 17:51:23"],
["c1", "17", "10.0", "2023-09-06 17:51:23"],
["c2", "72", "211.0", "2025-09-16 12:00:23"],
["d1", "81", "1023.0", "2023-09-06 17:51:23"],
["d3", "92", "1232.0", "2025-09-16 17:51:23"],
]
df1Columns = ["col1", "col2", "col3", "col4"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
columns_list = list(df1.columns)
print(columns_list)
print("Showing the dataframe")
df1.show(n=10, truncate=False)
print("Printing the schema of dataframe")
df1.printSchema()
print("Converting a small sample of rows to Pandas dataframe")
pandas_df = df1.limit(2).toPandas()
print_pandas(pandas_df)
print("Datatypes of pandas dataframe before")
print(pandas_df.dtypes)
stream = io.StringIO()
pandas_df.to_csv(stream, index=False)
stream.seek(0)
df_readback = pd.read_csv(stream)
stream.close()
print("Datatypes of pandas dataframe after reading back")
print(df_readback.dtypes)
### if certain column types are still not handled, please handle them in this forloop.
for col in df_readback.columns:
if df_readback[col].dtype == 'object':
try:
df_readback[col] = pd.to_datetime(df_readback[col])
except ValueError:
pass
print("Datatypes of pandas dataframe after datatime specific conversion")
print(df_readback.dtypes)
print_pandas(df_readback)
print("Converting pandas dataframe with appropriate datatypes to pyspark dataframe")
small_df_with_dtypes = sqlContext.createDataFrame(df_readback)
small_df_with_dtypes.show(n=10, truncate=False)
print("small_df_with_dtypes with schema")
pandas_inferred_schema = small_df_with_dtypes.schema
print(pandas_inferred_schema)
small_df_with_dtypes.printSchema()
required_df = df1
for struct_field in pandas_inferred_schema:
column_name = struct_field.name
column_type = struct_field.dataType
required_df = required_df.withColumn(column_name, F.col(column_name).cast(column_type))
print("Required dataframe and schema")
required_df.show(n=100, truncate=False)
print("Schema applied is as follows")
print(required_df.schema)
required_df.printSchema()
输出:
['col1', 'col2', 'col3', 'col4']
Showing the dataframe
+----+----+------+-------------------+
|col1|col2|col3 |col4 |
+----+----+------+-------------------+
|a1 |1 |101.0 |2023-09-06 17:51:23|
|a2 |2 |223.0 |2025-09-16 17:51:23|
|a3 |4 |10.0 |2023-09-06 17:51:23|
|b1 |6 |256.0 |2025-09-16 17:51:23|
|b2 |11 |1023.0|2023-09-06 17:51:23|
|b3 |22 |243.0 |2025-09-16 17:51:23|
|c1 |17 |10.0 |2023-09-06 17:51:23|
|c2 |72 |211.0 |2025-09-16 12:00:23|
|d1 |81 |1023.0|2023-09-06 17:51:23|
|d3 |92 |1232.0|2025-09-16 17:51:23|
+----+----+------+-------------------+
Printing the schema of dataframe
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- col4: string (nullable = true)
Converting a small sample of rows to Pandas dataframe
Given pandas dataframe name
col1 col2 col3 col4
0 a1 1 101.0 2023-09-06 17:51:23
1 a2 2 223.0 2025-09-16 17:51:23
Datatypes of pandas dataframe before
col1 object
col2 object
col3 object
col4 object
dtype: object
Datatypes of pandas dataframe after reading back
col1 object
col2 int64
col3 float64
col4 object
dtype: object
Datatypes of pandas dataframe after datatime specific conversion
col1 object
col2 int64
col3 float64
col4 datetime64[ns]
dtype: object
Given pandas dataframe name
col1 col2 col3 col4
0 a1 1 101.0 2023-09-06 17:51:23
1 a2 2 223.0 2025-09-16 17:51:23
Converting pandas dataframe with appropriate datatypes to pyspark dataframe
+----+----+-----+-------------------+
|col1|col2|col3 |col4 |
+----+----+-----+-------------------+
|a1 |1 |101.0|2023-09-06 17:51:23|
|a2 |2 |223.0|2025-09-16 17:51:23|
+----+----+-----+-------------------+
small_df_with_dtypes with schema
StructType([StructField('col1', StringType(), True), StructField('col2', LongType(), True), StructField('col3', DoubleType(), True), StructField('col4', TimestampType(), True)])
root
|-- col1: string (nullable = true)
|-- col2: long (nullable = true)
|-- col3: double (nullable = true)
|-- col4: timestamp (nullable = true)
Required dataframe and schema
+----+----+------+-------------------+
|col1|col2|col3 |col4 |
+----+----+------+-------------------+
|a1 |1 |101.0 |2023-09-06 17:51:23|
|a2 |2 |223.0 |2025-09-16 17:51:23|
|a3 |4 |10.0 |2023-09-06 17:51:23|
|b1 |6 |256.0 |2025-09-16 17:51:23|
|b2 |11 |1023.0|2023-09-06 17:51:23|
|b3 |22 |243.0 |2025-09-16 17:51:23|
|c1 |17 |10.0 |2023-09-06 17:51:23|
|c2 |72 |211.0 |2025-09-16 12:00:23|
|d1 |81 |1023.0|2023-09-06 17:51:23|
|d3 |92 |1232.0|2025-09-16 17:51:23|
+----+----+------+-------------------+
Schema applied is as follows
StructType([StructField('col1', StringType(), True), StructField('col2', LongType(), True), StructField('col3', DoubleType(), True), StructField('col4', TimestampType(), True)])
root
|-- col1: string (nullable = true)
|-- col2: long (nullable = true)
|-- col3: double (nullable = true)
|-- col4: timestamp (nullable = true)
上一个:带参数的函数的返回类型
评论
dataframe( string, int, float, datatime)