提问人:sandor-88 提问时间:5/17/2022 最后编辑:ShadowRangersandor-88 更新时间:5/17/2022 访问量:118
对多个文件使用多处理的最佳方式
Optimal way to use multiprocessing for many files
问:
因此,我有大量需要处理成 CSV 的文件。每个文件本身都很大,每一行都是一个字符串。文件的每一行可以表示三种类型的数据之一,每种类型的处理方式略有不同。我当前的解决方案如下所示:
type1_columns = [...]
type2_columns = [...]
type3_columns = [...]
file_list = os.listdir(filelist)
def process_type1_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type1_series = pd.Series(to_append, index=type1_columns)
return type1_series
def process_type2_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type2_series = pd.Series(to_append, index=type2_columns)
return type2_series
def process_type3_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type3_series = pd.Series(to_append, index=type3_columns)
return type3_series
def process_file(file):
type1_df = pd.DataFrame(columns=type1_columns)
type2_df = pd.DataFrame(columns=type2_columns)
type3_df = pd.DataFrame(columns=type3_columns)
with open(filepath/file) as f:
data=f.readlines()
for line in data:
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
type1_df = type1_df.append(type1_series, ignore_index=True)
if record_type == "type2":
type2_series = process_type2_line(json_line)
type2_df = type2_df.append(type2_series, ignore_index=True)
if record_type == "type3":
type3_series = process_type3_line(json_line)
type3_df = type3_df.append(type3_series, ignore_index=True)
type1_df.to_csv(type1_csv_path.csv)
type2_df.to_csv(type2_csv_path.csv)
type3_df.to_csv(type3_csv_path.csv)
for file in file_list:
process_file(file)
我遍历文件,并为三种不同类型的记录中的每一种创建数据帧。我解析这些行并为每个行调用相应的处理函数。返回的序列将追加到该文件的该record_type的最终数据帧中。处理文件后,三个数据帧将另存为 CSV,然后从下一个文件开始。
问题是这种方法花费的时间太长,我需要数周时间才能处理所有文件。
我试图通过使用多处理(我没有很多经验)来修改我的方法,如下所示:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_file, file) for file in file_list]
在一些日志记录打印语句中,我可以看到这开始了对 30 个文件的处理,但没有一个文件完成,所以我至少知道我的方法有缺陷。谁能解释一下解决这个问题的最佳方法是什么?也许是多处理和异步的某种组合?
答:
你有两个大问题:
您将整个输入文件加载到内存中,在内存中生成整个结果,然后一次写入整个输出文件。这意味着,如果您有 30 个工作线程并行运行,则需要与 30 个(自我描述的)大文件成比例的内存。您将所有数据存储两倍,一次是 返回的行,另一次是 返回的行,另一次是存储在三个 s 之一中;如果您使用了没有执行程序的代码,并且只是更改了:
list
str
f.readlines()
DataFrame
data=f.readlines() for line in data:
自:
for line in f:
您可以立即将内存使用量减少大约一半,这(可能)足以阻止页面抖动。也就是说,您仍然会使用与文件大小成比例的内存来存储 s,因此,如果您并行化代码,您将恢复抖动,如果文件足够大,即使没有并行性,也可能会继续抖动。
DataFrame
你对每一行都使用,IIRC,for s 是画家 Schlemiel 算法的一种形式:每行都制作一个全新的,将旧内容的全部内容加上少量的新数据复制到新数据中,随着现有数据变大,工作时间越来越长;本应摊销的工作变成了工作。
.append
DataFrame
append
DataFrame
DataFrame
DataFrame
O(n)
O(n**2)
在这两者之间,您使用的内存比需要的要多得多,并且对重复的附加执行大量不必要的繁忙工作。并行性可能有助于更快地完成繁忙的工作,但作为交换,它会将您的内存需求增加 30 倍;很有可能,你没有那么多的 RAM(如果这些文件真的很大,你可能没有足够的 RAM 来容纳其中一个文件),并且你最终会进行页面抖动(将内存写入 pagefile/swap 文件以为其他东西腾出空间,按需读取它,并经常丢弃在你完成之前分页的内存, 使内存访问与磁盘性能挂钩,这比 RAM 访问慢几个数量级)。
我不太了解 Pandas,无法说它是否为你正在做的事情提供了一些更好的、渐进式的解决方案,但你真的不需要一个;只需逐行使用输入,然后使用模块随时编写行即可。您的内存要求将从“与每个输入文件的大小成比例”下降到“与输入文件每一行的数据成比例”。csv
您的函数最终将如下所示:process_file
def process_file(file):
# Open input file and all output files (newline='' needed to play nice with csv module
# which takes control of newline format to ensure dialect rules followed precisely,
# regardless of OS line separator rules)
with open(filepath/file) as f,\
open(type1_csv_path, 'w', newline='') as type1f,\
open(type2_csv_path, 'w', newline='') as type2f,\
open(type3_csv_path, 'w', newline='') as type3f:
csv1 = csv.writer(type1f)
csv1.writerow(type1_columns) # Omit if no useful column header
csv2 = csv.writer(type2f)
csv2.writerow(type2_columns) # Omit if no useful column header
csv3 = csv.writer(type3f)
csv3.writerow(type3_columns) # Omit if no useful column header
for line in f: # Directly iterating file object lazily fetches line at a time
# where .readlines() eagerly fetches whole file, requiring
# a ton of memory for no reason
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
csv1.writerow(type1_series) # Might need to convert to plain list if Series
# doesn't iterate the values you need directly
elif record_type == "type2":
type2_series = process_type2_line(json_line)
csv2.writerow(type2_series)
elif record_type == "type3":
type3_series = process_type3_line(json_line)
csv3.writerow(type3_series)
如果按原样工作(没有执行程序),请以这种方式使用它。如果你在没有执行程序的情况下也要翻页,或者文件足够大,重复的 s 会严重伤害你,这可能足以让它自己工作。如果它太慢,如果你做了大量的工作来将每一行处理成输出格式,执行器可能会提供一点好处(因为当大多数工作线程都在处理时,一个或两个工作线程可以充分共享磁盘访问以进行读取和写入),但如果每行的处理工作量很低,那么任何超过一小部分工作线程(我从两三个开始)只会增加磁盘争用(特别是如果你是使用旋转磁盘硬盘驱动器,而不是 SSD),并行性要么无济于事,要么会造成积极损害。append
您可能需要调整使用的确切 CSV 方言(作为参数传递给 ),并可能为输出文件显式设置特定方言,而不是区域设置默认值(例如,传递或传递给 s 表示写入,因此它始终以文件消费者期望的编码写入),但这是一般形式。csv.writer
encoding
encoding='utf-8'
encoding='utf-16'
open
.csv
评论