如何通过筛选标题来处理 .csv 文件

How to process .csv files by filtering their titles

提问人:Folkas 提问时间:8/30/2023 更新时间:8/30/2023 访问量:95

问:

我有一个数据集中大量 .csv 文件的列表,但为了简单起见,我们假设数据集中有三个文件:

  • ASGK_2022.csv
  • ASGK_2023.csv
  • PRAS_2022.csv

我只需要处理标题中带有“ASGK”的文件。换句话说,我需要使用 transforms.api.FileSystem.files 方法按标题过滤文件。所有文件共享相同的列名。

我使用正则表达式代码过滤文件。这是我一直在使用的两段代码,但是没有成功。

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import pandas as pd
import json
import re


@transform(
    output_df=Output(
        ""),
    input_raw=Input(""),
)
def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path:

                pdf = pd.read_csv(xlsx_path, dtype=str, header=0)
                pdf.columns = pdf.columns.str.lower()

                for row in pdf.to_dict('records'):
                    yield json.dumps(row, default=str)

    rdd = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$').rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)
    output_df.write_dataframe(dfs)

我得到的错误:

回溯(最近一次调用最后一次): 计算中的文件“/myproject/datasets/ExcelFile.py”,第 27 行 dfs = spark.read.json(rdd) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 241 行,JSON 返回 self._df(self._jreader.json(jrdd)) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py”,第 1322 行,调用return_value = get_return_value( 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py”,第 111 行,在 deco 中 返回 f(*a, **kw) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py”,第 326 行,get_return_value 引发 Py4JJavaError( py4j.protocol.Py4JJavaError:调用 o163.json 时出错。 org.apache.spark.SparkException:由于阶段失败,作业中止:阶段 0.0 中的任务 0 失败 1 次,最近一次失败:阶段 0.0 中的任务 0.0 丢失 (TID 0)(localhost 执行程序驱动程序):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后一次): 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 619 行,主 进程() 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 611 行,正在处理中 serializer.dump_stream(out_iter,外文件) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py”,第 132 行,dump_stream 对于迭代器中的 obj: 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 232 行,在 func 中 对于迭代器中的 x: 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/ExcelFile.py”,第 17 行,process_file 带PD。ExcelFile(f.read(), engine='openpyxl') xlsx_path: 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py”,第 1695 行,init self._reader = self._engines[engine](self._io, storage_options=storage_options) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py”,第 557 行,在 init super() 中。初始化(filepath_or_buffer, storage_options=storage_options) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py”,第 545 行,init self.book = self.load_workbook(self.handles.handle) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py”,第 568 行,load_workbook 返回load_workbook( 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py”,第 344 行,load_workbook reader = ExcelReader(文件名、read_only、keep_vba、 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py“,第 123 行,init self.archive = _validate_archive(fn) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py”,第 95 行,_validate_archive 存档 = ZipFile(文件名,'r') 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py”,第 1269 行,init self._RealGetContents() 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py”,第 1336 行,_RealGetContents raise BadZipFile(“文件不是zip文件”) zip文件。BadZipFile:文件不是 zip 文件

另一种方法是使用 OleFile:

def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            ole = olefile.OleFileIO(f.read())
            if ole.exists('Workbook'):
                d = ole.openstream('Workbook')
                pdf = pd.read_excel(d, dtype=str, header=0, engine='openpyxl')

         

            for row in pdf.to_dict('records'):
                yield json.dumps(row)

    files_df = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$')
    rdd = files_df.rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)


    output_df.write_dataframe(dfs)

回溯(最近一次调用最后一次): 计算中的文件“/myproject/datasets/OleFile.py”,第 33 行 dfs = spark.read.json(rdd) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 241 行,JSON 返回 self._df(self._jreader.json(jrdd)) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py”,第 1322 行,调用return_value = get_return_value( 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py”,第 111 行,在 deco 中 返回 f(*a, **kw) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py”,第 326 行,get_return_value 引发 Py4JJavaError( py4j.protocol.Py4JJavaError:调用 o320.json 时出错。 org.apache.spark.SparkException:由于阶段故障,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中的任务 0.0 丢失 (TID 1)(localhost 执行程序驱动程序):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后一次): 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 619 行,主 进程() 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 611 行,正在处理中 serializer.dump_stream(out_iter,外文件) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py”,第 132 行,dump_stream 对于迭代器中的 obj: 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 232 行,在 func 中 对于迭代器中的 x: 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/OleFile.py”,第 18 行,process_file ole = olefile。OleFileIO(f.read()) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py”,第 1075 行,init self.open(filename, write_mode=write_mode) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py”,第 1169 行,打开 self.fp = open(文件名,模式) FileNotFoundError: [Errno 2] 没有这样的文件或目录: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n

请帮忙吗?

python csv 解析 palantir-foundry-api

评论

1赞 Gameplay 8/30/2023
为什么你不能只使用并进行处理,否则跳过它?if "ASGK" in filename:
0赞 Panagiotis Kanavos 8/30/2023
CSV 文件不是 Excel 格式。它们是纯文本文件,其值以逗号分隔。Excel 文件是 ZIP 文件包 XML 文档。不能将 Excel 文件加载为 CSV,反之亦然。
0赞 Panagiotis Kanavos 8/30/2023
无论如何,代码都会做一些非常奇怪的事情,看起来像是随机复制的片段。只阅读文档会容易得多。1) 尝试打开一个 Excel 文件并将其作为对象返回。2) 虽然试图像文件路径一样踩踏该对象,但 3) 正在尝试从 Excel 对象加载 CSV 文件。pd.ExcelFile(f.read(), engine='openpyxl')as xlsx_pathpd.read_csv(xlsx_path)
0赞 Panagiotis Kanavos 8/30/2023
这种方法试图做什么?将CSV文件转换为JSON?读取 CSV 文件的简单方法是 .您可以使用 to_json 生成 JSON。整个方法可以替换为df=pd.read_csv(file_status.path)return pd.read_csv(file_status.path).to_json(orient='records')
0赞 Panagiotis Kanavos 8/30/2023
using OleFile但 Excel 文件根本不是 OLE 文档。代码的其余部分显示了与上一个相同的错误 - 文件内容被加载和使用,就好像它们是文件的路径一样。

答:

3赞 user5233494 8/30/2023 #1

第一个错误看起来像是该库正在尝试将文件解释为 zip 文件。

文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py”,第 1336 行,在_RealGetContents中引发 BadZipFile(“File is not a zip file”) zipfile。BadZipFile:文件不是 zip 文件

第二个,看起来它正在尝试使用您的 csv 文件中的一行作为路径?

FileNotFoundError: [Errno 2] 没有这样的文件或目录: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n

据我所知,您的正则表达式看起来是正确的。

您可以直接使用 Spark 读取 CSV,如以下答案所述: https://stackoverflow.com/a/72312808/5233494

    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls(regex=r'.*ASGK.*\.csv$')]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 is the default
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )

评论

0赞 Panagiotis Kanavos 8/30/2023
文件包含 XML 文档的 ZIP 包。CSV 文件只是一个带逗号的文本文件。该问题的代码尝试以一种奇怪的方式使用 Excel 方法读取文本 (CSV) 文件。似乎目的是将CSV数据转换为JSON行xlsx
0赞 Folkas 8/31/2023
谢谢@user5233494。我还有很多.xlsx文件,我还需要按标题过滤并编写转换。您能否提供一个代码,按标题过滤.xlsx文件,然后解析它们?spark.read 方法似乎仅适用于 .csv 文件。