提问人:Folkas 提问时间:8/30/2023 更新时间:8/30/2023 访问量:95
如何通过筛选标题来处理 .csv 文件
How to process .csv files by filtering their titles
问:
我有一个数据集中大量 .csv 文件的列表,但为了简单起见,我们假设数据集中有三个文件:
- ASGK_2022.csv
- ASGK_2023.csv
- PRAS_2022.csv
我只需要处理标题中带有“ASGK”的文件。换句话说,我需要使用 transforms.api.FileSystem.files 方法按标题过滤文件。所有文件共享相同的列名。
我使用正则表达式代码过滤文件。这是我一直在使用的两段代码,但是没有成功。
from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import pandas as pd
import json
import re
@transform(
output_df=Output(
""),
input_raw=Input(""),
)
def compute(input_raw, output_df, ctx):
def process_file(file_status):
with input_raw.filesystem().open(file_status.path, 'rb') as f:
with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path:
pdf = pd.read_csv(xlsx_path, dtype=str, header=0)
pdf.columns = pdf.columns.str.lower()
for row in pdf.to_dict('records'):
yield json.dumps(row, default=str)
rdd = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$').rdd.flatMap(process_file)
spark = ctx.spark_session
dfs = spark.read.json(rdd)
output_df.write_dataframe(dfs)
我得到的错误:
回溯(最近一次调用最后一次): 计算中的文件“/myproject/datasets/ExcelFile.py”,第 27 行 dfs = spark.read.json(rdd) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 241 行,JSON 返回 self._df(self._jreader.json(jrdd)) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py”,第 1322 行,调用return_value = get_return_value( 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py”,第 111 行,在 deco 中 返回 f(*a, **kw) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py”,第 326 行,get_return_value 引发 Py4JJavaError( py4j.protocol.Py4JJavaError:调用 o163.json 时出错。 org.apache.spark.SparkException:由于阶段失败,作业中止:阶段 0.0 中的任务 0 失败 1 次,最近一次失败:阶段 0.0 中的任务 0.0 丢失 (TID 0)(localhost 执行程序驱动程序):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后一次): 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 619 行,主 进程() 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 611 行,正在处理中 serializer.dump_stream(out_iter,外文件) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py”,第 132 行,dump_stream 对于迭代器中的 obj: 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 232 行,在 func 中 对于迭代器中的 x: 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/ExcelFile.py”,第 17 行,process_file 带PD。ExcelFile(f.read(), engine='openpyxl') xlsx_path: 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py”,第 1695 行,init self._reader = self._engines[engine](self._io, storage_options=storage_options) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py”,第 557 行,在 init super() 中。初始化(filepath_or_buffer, storage_options=storage_options) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py”,第 545 行,init self.book = self.load_workbook(self.handles.handle) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py”,第 568 行,load_workbook 返回load_workbook( 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py”,第 344 行,load_workbook reader = ExcelReader(文件名、read_only、keep_vba、 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py“,第 123 行,init self.archive = _validate_archive(fn) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py”,第 95 行,_validate_archive 存档 = ZipFile(文件名,'r') 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py”,第 1269 行,init self._RealGetContents() 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py”,第 1336 行,_RealGetContents raise BadZipFile(“文件不是zip文件”) zip文件。BadZipFile:文件不是 zip 文件
另一种方法是使用 OleFile:
def compute(input_raw, output_df, ctx):
def process_file(file_status):
with input_raw.filesystem().open(file_status.path, 'rb') as f:
ole = olefile.OleFileIO(f.read())
if ole.exists('Workbook'):
d = ole.openstream('Workbook')
pdf = pd.read_excel(d, dtype=str, header=0, engine='openpyxl')
for row in pdf.to_dict('records'):
yield json.dumps(row)
files_df = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$')
rdd = files_df.rdd.flatMap(process_file)
spark = ctx.spark_session
dfs = spark.read.json(rdd)
output_df.write_dataframe(dfs)
回溯(最近一次调用最后一次): 计算中的文件“/myproject/datasets/OleFile.py”,第 33 行 dfs = spark.read.json(rdd) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 241 行,JSON 返回 self._df(self._jreader.json(jrdd)) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py”,第 1322 行,调用return_value = get_return_value( 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py”,第 111 行,在 deco 中 返回 f(*a, **kw) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py”,第 326 行,get_return_value 引发 Py4JJavaError( py4j.protocol.Py4JJavaError:调用 o320.json 时出错。 org.apache.spark.SparkException:由于阶段故障,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中的任务 0.0 丢失 (TID 1)(localhost 执行程序驱动程序):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后一次): 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 619 行,主 进程() 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py”,第 611 行,正在处理中 serializer.dump_stream(out_iter,外文件) 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py”,第 132 行,dump_stream 对于迭代器中的 obj: 文件“/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py”,第 232 行,在 func 中 对于迭代器中的 x: 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/OleFile.py”,第 18 行,process_file ole = olefile。OleFileIO(f.read()) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py”,第 1075 行,init self.open(filename, write_mode=write_mode) 文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py”,第 1169 行,打开 self.fp = open(文件名,模式) FileNotFoundError: [Errno 2] 没有这样的文件或目录: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
请帮忙吗?
答:
第一个错误看起来像是该库正在尝试将文件解释为 zip 文件。
文件“/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py”,第 1336 行,在_RealGetContents中引发 BadZipFile(“File is not a zip file”) zipfile。BadZipFile:文件不是 zip 文件
第二个,看起来它正在尝试使用您的 csv 文件中的一行作为路径?
FileNotFoundError: [Errno 2] 没有这样的文件或目录: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
据我所知,您的正则表达式看起来是正确的。
您可以直接使用 Spark 读取 CSV,如以下答案所述: https://stackoverflow.com/a/72312808/5233494
filesystem = raw.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls(regex=r'.*ASGK.*\.csv$')]
df = (
ctx
.spark_session
.read
.option("encoding", "UTF-8") # UTF-8 is the default
.option("header", True)
.option("inferSchema", True)
.csv(files)
)
评论
xlsx
评论
if "ASGK" in filename:
pd.ExcelFile(f.read(), engine='openpyxl')
as xlsx_path
pd.read_csv(xlsx_path)
df=pd.read_csv(file_status.path)
return pd.read_csv(file_status.path).to_json(orient='records')
using OleFile
但 Excel 文件根本不是 OLE 文档。代码的其余部分显示了与上一个相同的错误 - 文件内容被加载和使用,就好像它们是文件的路径一样。