提问人:joejoejoejoe4 提问时间:12/10/2022 更新时间:12/14/2022 访问量:213
Python 并行化 os.walk() 以提高本地 SSD 的性能
Python Parallelize os.walk() for performance on a local SSD
问:
我正在尝试并行化以下代码以使其运行得更快:
import os
fs = 0
for root, dirs, files in os.walk(os.getcwd()):
for file in files:
fp = os.path.join(root, file)
fs += os.path.getsize(fp)
print(fs)
我尝试了以下方法:
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
def main():
total_size = 0
with ProcessPoolExecutor() as executor:
for root, dirs, files in os.walk(os.getcwd()):
futures = []
for file in files:
file_path = os.path.join(root, file)
futures.append(executor.submit(os.path.getsize, file_path))
for future in as_completed(futures):
total_size += future.result()
print(total_size)
if __name__ == '__main__':
main()
但它比我的原始代码(遍历 SSD 上的文件)慢得多。我在 Windows 上使用 Python 3.8。关于如何正确加快速度的任何想法?
答:
您应该对文件进行批处理,否则期货的开销可能大于保存。对于 100 万个小文件,在 Linux 上,我使用以下代码获得了比没有 ProcessPoolExecutor 的 ~2 倍的加速。您可能需要调整为系统批量设置的文件数量(在我的示例中为 10k)。
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
def getsize_list(files):
fs = 0
for file in files:
fs += os.path.getsize(file)
return fs
def main():
total_size = 0
with ProcessPoolExecutor() as executor:
for root, dirs, files in os.walk(os.getcwd()):
futures = []
current_list = []
for file in files:
file_path = os.path.join(root, file)
current_list.append(file_path)
if len(current_list)==10_000:
futures.append(executor.submit(getsize_list, current_list))
current_list = []
futures.append(executor.submit(getsize_list, current_list))
for future in as_completed(futures):
total_size += future.result()
print(total_size)
if __name__ == '__main__':
main()
评论
由于进程间通信,第二个代码速度较慢。要了解原因,我们首先需要了解事物是如何运作的。
在第一次运行期间,操作系统真正使用 SSD 并将内容放入 RAM 缓存中。然后,此代码的后续执行速度会因此而明显加快(根本不需要与 SSD 设备交互)。在我的机器上,第一次连续运行大约需要 10 秒,而第二次运行大约需要 5 秒。对于操作系统、文件系统和实际的 SSD 驱动程序,可能会使用系统锁,因此并行执行此操作可能不会更快(这样做是为了防止任何损坏,也是为了简化系统堆栈某些部分的开发)。从历史上看,一些操作系统甚至使用了效率低得可怕的巨型锁。请注意,操作的可伸缩性取决于 RAM 缓存的使用情况(它可能会更好地使用缓存进行扩展)。我将重点介绍内容在缓存中的情况,因为它更容易重现。
在第一个代码中,大部分时间都花在了(大约80%的时间在我的机器上 - 4秒)。然后是(大约 15% -- 0.75 秒)。然后是循环开销和字符串处理(约 5% -- 0.25 秒)。在我的机器上,每个电话平均需要大约 50 个我们。os.path.getsize
os.walk
os.path.getsize
问题是系统调用的成本很高,每次调用都涉及通常应该创建一个内核线程,涉及 IO 调度程序和任务调度程序,执行上下文切换和同步。更不用说操作系统需要解析、检查和解析完整路径的每个部分,以便实际获取有关目标文件的统计信息。最后,对于这次操作来说,50 us实际上似乎还不错。在 Windows 上,实际上调用了几个系统调用:它用 、 获取大小 打开文件,然后用 关闭它。 占用超过70%的功能时间。os.path.getsize
os.path.getsize
CreateFileW
GetFileInformationByHandle
CloseHandle
CreateFileW
在第二个代码中,使数据被腌制,进程间通信 (IPC) 和数据被目标进程取消腌制。IPC 操作的成本很高,因为它们通常会导致上下文切换和低级别同步。获取结果也做同样的事情。最后,大部分时间都应该花在这样的开销上,特别是因为在进行此类操作时主要流程会减慢,因此工人实际上应该在大部分时间等待/挨饿。因此,并行代码可能比串行代码慢得多。对每个文件执行此操作非常昂贵。使用 @vladmihaisima 指出的批处理计算是减少上下文切换开销的第一步。尽管如此,使用此方法不会消除酸洗 + 取消拣选 + 传输数据的开销,尽管它们应该小于初始开销。主要问题来自使用多处理本身。一种解决方案是使用线程而不是进程,以便摆脱上述开销。但是,全局解释器锁定 (GIL) 会导致字符串处理被锁定,从而减慢了计算速度(实际上比在我的机器上拣选更多)。executor.submit
下面是带有块的结果代码:
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
def compute_chunk_size(fileChunk):
total_size = 0
for block in fileChunk:
root, files = block
for file in files:
file_path = os.path.join(root, file)
total_size += os.path.getsize(file_path)
return total_size
def main():
total_size = 0
with ProcessPoolExecutor() as executor:
fileChunk = []
fileChunkSize = 0
futures = []
for root, dirs, files in os.walk(os.getcwd()):
fileChunk.append((root, files))
fileChunkSize += len(files)
if fileChunkSize >= 1024:
futures.append(executor.submit(compute_chunk_size, fileChunk))
fileChunk = []
fileChunkSize = 0
completed = 0
if fileChunkSize > 0:
futures.append(executor.submit(compute_chunk_size, fileChunk))
while completed < len(futures):
for future in as_completed(futures):
total_size += future.result()
completed += 1
print(total_size)
if __name__ == '__main__':
main()
在我的 i5-9600KF(6 核)处理器上,它的速度大约快了 4 倍。
以下是并行执行的分析信息:
每一行都是一个过程。PID 26172 的进程是主进程(启动其他进程并等待它们)。棕色部分是进程的 CPU 时间(活动)。剖析被分成 1 毫秒的小块。浅灰色和浅绿色部分是存在上下文切换或同步的部分,而深绿色部分是进程仅进行计算的部分。
我们可以看到,工作人员相对活跃,但他们很多时候都在挨饿或等待某些东西(通常是访问与 IO 相关的资源)。主线程被工人减慢了速度,因为操作没有完全扩展。事实上,调用速度大约慢了 10-15%,主要是因为无法扩展(当然是由于系统锁定)。并行计算的速度不能快于 。在某些具有许多内核和可扩展的操作系统文件子系统的平台上,这可能是一个瓶颈。如果是这样,则可以使用递归任务来传输文件树,但代价是代码要复杂得多。os.path.getsize
CreateFileW
os.walk
os.walk
在实践中,在 Windows 上,事实证明实现显然效率不高。根据这篇文章,使用应该会快得多。坏消息是替代函数也使用相同的方法。因此,如果你想有效地执行此操作,为此编写一个 C 扩展当然是一个好主意。这样的 C 扩展也可以从使用线程中受益,同时对 GIL、酸洗和 IPC 没有任何问题。os.path.getsize
GetFileAttributesEx
os.stat
评论