在笔记本中将 CSV 转换为 Parquet

Convert a CSV to a Parquet in a Notebook

提问人:organza 提问时间:11/16/2023 最后编辑:Kashyaporganza 更新时间:11/16/2023 访问量:45

问:

我刚刚进入数据世界,并被要求创建一个自定义项目,我需要使用 Notebook (PySpark) 将 CSV 转换为 parquet。到目前为止,我已经把它放在一起,它似乎运行没有错误,但我在 ADLS 的 Parquet 文件夹中没有任何内容。

def convert_csv_to_parquet(input_file_path, output_file_path):
    # Read CSV file into a Pandas DataFrame
    df = pd.read_csv(input_file_path)

    # Convert Pandas DataFrame to PyArrow Table
    table = pa.Table.from_pandas(df)

    # Write PyArrow Table to Parquet file
    pq.write_table(table, output_file_path)

    # Open the Parquet file
    table = pq.read_table(output_file_path)

    # Convert the table to a Pandas DataFrame
    df = table.to_pandas()

    # Print the DataFrame
    print(df.head(100))


input_file_path = 'abfss://[email protected]/MySQL_Project-Table_Courses.csv'
output_file_path = 'abfss://[email protected]/Parquet'

convert_csv_to_parquet(input_file_path, output_file_path)
pyspark 管道 apache-synapse

评论

0赞 Kashyap 11/16/2023
你是故意使用熊猫的吗?或者你认为 Pandas 是 PySpark?它们非常不同。尽管 PySpark 提供了与 Pandas 的互操作性,但 pandas DataFrame 与 PySpark/Spark DataFrame 有很大不同。
0赞 organza 11/17/2023
啊,我不太清楚tbh,谢谢你的喊叫!

答:

0赞 Daniel Perez Efremova 11/16/2023 #1

您可以使用 pyspark 读取器/写入器内置方法吗?

这似乎很简单(我假设 spark 会话在代码中的某个位置声明,或者你使用 Databricks noteboks):

def convert_csv_to_parquet(
    input_file_path: str,
    output_file_path: str
    ):

    df = spark.read.format('csv').load(input_file_path)
    df.write.format('parquet').save(input_file_path)
    return 1 

评论

0赞 organza 11/16/2023
谢谢。它再次运行正常,但我的输出文件夹中仍然没有任何东西。执行几乎不需要时间,但什么也没发生
1赞 Kashyap 11/16/2023 #2

Pandas 和 PySpark 非常不同。尽管 PySpark 提供了与 Pandas 的互操作性,但 pandas DataFrame 与 PySpark/Spark DataFrame 有很大不同。

在编写任何代码之前,请了解 Pandas 和 PySpark 之间的区别。


您的问题分为两部分。首先是了解如何读取/写入 csv 和 parquet 文件,这些文件位于笔记本电脑的硬盘上。其次是如何使用ADSL代替本地硬盘。

对于第一部分:

请参阅 PySpark 文档附带的示例。例如

另请参阅 PySpark SQL API 文档(PySpark SQL API 是 python API,而不是 SQL)。例如

对于第二部分:

使用云存储作为底层存储(ADLS、S3 等)时,您需要:

  1. 用适当的方案作为所有路径的前缀,例如 , , ...s3aabfss
  2. 在 pyspark 环境中安装适当的 hadoop 扩展/库(对应/支持该方案)。PySpark 将使用它来读/写云存储。
  3. 在 Spark 配置中设置适当的配置参数,或用于身份验证的任何方式。

根据您的用例,有许多可用的指南,这里有一个这里是另一个