在 Pyspark 中动态转换数据类型

Dynamically cast the datatype in Pyspark

提问人:SHIVAM YADAV 提问时间:9/14/2023 更新时间:9/16/2023 访问量:69

问:

我正在从 PostgresSQL 创建一个 parquet 文件,它的所有内容都标记为 varchar 列。在 ADLS 中获取文件后,我们希望使用 Python/Pyspark 根据 Azure Databricks 中的日期、整数、varchar 字段等数据强制转换数据类型。

具有不同架构的多个文件需要使用相同的代码,因此需要一个没有硬编码值的通用进程

我有用户inferSchema=True,但它仍然将int读取为String。 所以想要一种方法来自动转换文件d

Python 数据帧 PySpark 强制转换 Databricks

评论

0赞 user238607 9/15/2023
我编辑了我的答案,以包含一个代码示例,其中 varchar 列正确地被强制转换为所需的 datype。dataframe( string, int, float, datatime)

答:

0赞 user238607 9/14/2023 #1

我用来寻找广义解决方案的策略如下。

1.首先,我将 parquet 文件作为数据帧读取。这个阶段的一切都是字符串。

2.将一个小样本(即 2 到 10 行)转换为 pandas 数据帧。

3.Pandas 能够推断字符串列的数据类型。但只有当我们模拟它从磁盘读取时,它才能做到这一点。为此,我们将 pandas 数据帧转换为内存中的文件,并从该文件读取到新的数据帧中。

4.现在,上述新的回读 pandas 数据帧具有必需的类型。我们可以继续。一旦出现问题,它就无法处理日期时间字符串。因此,如果可能的话,将 pandas 中的列转换为数据时间列。请确保在此处添加其他自定义数据类型处理代码。object

5.将此具有推断类型的 pandas 数据帧转换为 pyspark 数据帧。

6.从上面获取 pyspark 数据帧的架构。

7.将上述每列分别推断的数据类型应用于我们在步骤 1 中的原始数据帧。

8.需要记住的一件事是,Spark会尽可能地慷慨解囊。因此,它将使用 LongType 而不是 IntType 和 DoubleType 而不是 FloatType,以便处理所有未来可能的情况。

import sys
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
import pandas as pd
import io


sc = SparkContext('local')
sqlContext = SQLContext(sc)

def print_pandas(dataframe_given):
    with pd.option_context('display.max_rows', None,'display.max_columns', None, 'expand_frame_repr', False):
        print("Given pandas dataframe name")
        print(dataframe_given)

### Very Long dataframe which we have reading from a file or database with everything varchar
data1 = [
    ["a1", "1", "101.0", "2023-09-06 17:51:23"],
    ["a2", "2", "223.0", "2025-09-16 17:51:23"],
    ["a3", "4", "10.0", "2023-09-06 17:51:23"],
    ["b1", "6", "256.0", "2025-09-16 17:51:23"],
    ["b2", "11", "1023.0", "2023-09-06 17:51:23"],
    ["b3", "22", "243.0", "2025-09-16 17:51:23"],
    ["c1", "17", "10.0", "2023-09-06 17:51:23"],
    ["c2", "72",  "211.0", "2025-09-16 12:00:23"],
    ["d1", "81", "1023.0", "2023-09-06 17:51:23"],
    ["d3", "92",  "1232.0", "2025-09-16 17:51:23"],

      ]


df1Columns = ["col1", "col2", "col3", "col4"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)

columns_list = list(df1.columns)
print(columns_list)

print("Showing the dataframe")
df1.show(n=10, truncate=False)
print("Printing the schema of dataframe")
df1.printSchema()

print("Converting a small sample of rows to Pandas dataframe")
pandas_df = df1.limit(2).toPandas()
print_pandas(pandas_df)
print("Datatypes of pandas dataframe before")
print(pandas_df.dtypes)



stream = io.StringIO()
pandas_df.to_csv(stream, index=False)
stream.seek(0)
df_readback = pd.read_csv(stream)
stream.close()

print("Datatypes of pandas dataframe after reading back")
print(df_readback.dtypes)

### if certain column types are still not handled, please handle them in this forloop.
for col in df_readback.columns:
    if df_readback[col].dtype == 'object':
        try:
            df_readback[col] = pd.to_datetime(df_readback[col])
        except ValueError:
            pass


print("Datatypes of pandas dataframe after datatime specific conversion")
print(df_readback.dtypes)
print_pandas(df_readback)


print("Converting pandas dataframe with appropriate datatypes to pyspark dataframe")
small_df_with_dtypes = sqlContext.createDataFrame(df_readback)
small_df_with_dtypes.show(n=10, truncate=False)
print("small_df_with_dtypes with schema")
pandas_inferred_schema = small_df_with_dtypes.schema
print(pandas_inferred_schema)
small_df_with_dtypes.printSchema()


required_df = df1

for struct_field in pandas_inferred_schema:
    column_name = struct_field.name
    column_type = struct_field.dataType
    required_df = required_df.withColumn(column_name, F.col(column_name).cast(column_type))


print("Required dataframe and schema")
required_df.show(n=100, truncate=False)
print("Schema applied is as follows")
print(required_df.schema)
required_df.printSchema()

输出:

['col1', 'col2', 'col3', 'col4']
Showing the dataframe
+----+----+------+-------------------+
|col1|col2|col3  |col4               |
+----+----+------+-------------------+
|a1  |1   |101.0 |2023-09-06 17:51:23|
|a2  |2   |223.0 |2025-09-16 17:51:23|
|a3  |4   |10.0  |2023-09-06 17:51:23|
|b1  |6   |256.0 |2025-09-16 17:51:23|
|b2  |11  |1023.0|2023-09-06 17:51:23|
|b3  |22  |243.0 |2025-09-16 17:51:23|
|c1  |17  |10.0  |2023-09-06 17:51:23|
|c2  |72  |211.0 |2025-09-16 12:00:23|
|d1  |81  |1023.0|2023-09-06 17:51:23|
|d3  |92  |1232.0|2025-09-16 17:51:23|
+----+----+------+-------------------+

Printing the schema of dataframe
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)
 |-- col4: string (nullable = true)

Converting a small sample of rows to Pandas dataframe
Given pandas dataframe name
  col1 col2   col3                 col4
0   a1    1  101.0  2023-09-06 17:51:23
1   a2    2  223.0  2025-09-16 17:51:23
Datatypes of pandas dataframe before
col1    object
col2    object
col3    object
col4    object
dtype: object
Datatypes of pandas dataframe after reading back
col1     object
col2      int64
col3    float64
col4     object
dtype: object
Datatypes of pandas dataframe after datatime specific conversion
col1            object
col2             int64
col3           float64
col4    datetime64[ns]
dtype: object
Given pandas dataframe name
  col1  col2   col3                col4
0   a1     1  101.0 2023-09-06 17:51:23
1   a2     2  223.0 2025-09-16 17:51:23
Converting pandas dataframe with appropriate datatypes to pyspark dataframe
+----+----+-----+-------------------+
|col1|col2|col3 |col4               |
+----+----+-----+-------------------+
|a1  |1   |101.0|2023-09-06 17:51:23|
|a2  |2   |223.0|2025-09-16 17:51:23|
+----+----+-----+-------------------+

small_df_with_dtypes with schema
StructType([StructField('col1', StringType(), True), StructField('col2', LongType(), True), StructField('col3', DoubleType(), True), StructField('col4', TimestampType(), True)])
root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: timestamp (nullable = true)

Required dataframe and schema
+----+----+------+-------------------+
|col1|col2|col3  |col4               |
+----+----+------+-------------------+
|a1  |1   |101.0 |2023-09-06 17:51:23|
|a2  |2   |223.0 |2025-09-16 17:51:23|
|a3  |4   |10.0  |2023-09-06 17:51:23|
|b1  |6   |256.0 |2025-09-16 17:51:23|
|b2  |11  |1023.0|2023-09-06 17:51:23|
|b3  |22  |243.0 |2025-09-16 17:51:23|
|c1  |17  |10.0  |2023-09-06 17:51:23|
|c2  |72  |211.0 |2025-09-16 12:00:23|
|d1  |81  |1023.0|2023-09-06 17:51:23|
|d3  |92  |1232.0|2025-09-16 17:51:23|
+----+----+------+-------------------+

Schema applied is as follows
StructType([StructField('col1', StringType(), True), StructField('col2', LongType(), True), StructField('col3', DoubleType(), True), StructField('col4', TimestampType(), True)])
root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: timestamp (nullable = true)