Python 并行化 os.walk() 以提高本地 SSD 的性能

Python Parallelize os.walk() for performance on a local SSD

提问人:joejoejoejoe4 提问时间:12/10/2022 更新时间:12/14/2022 访问量:213

问:

我正在尝试并行化以下代码以使其运行得更快:

import os

fs = 0
for root, dirs, files in os.walk(os.getcwd()):
  for file in files:
    fp = os.path.join(root, file)
    fs += os.path.getsize(fp)
      
print(fs)

我尝试了以下方法:

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def main():

  total_size = 0
  with ProcessPoolExecutor() as executor:
    for root, dirs, files in os.walk(os.getcwd()):
      futures = []
      for file in files:
        file_path = os.path.join(root, file)
        futures.append(executor.submit(os.path.getsize, file_path))

      for future in as_completed(futures):
        total_size += future.result()

  print(total_size)
  
if __name__ == '__main__':
  main()

但它比我的原始代码(遍历 SSD 上的文件)慢得多。我在 Windows 上使用 Python 3.8。关于如何正确加快速度的任何想法?

python-3.x windows 并行处理 io

评论


答:

0赞 vladmihaisima 12/10/2022 #1

您应该对文件进行批处理,否则期货的开销可能大于保存。对于 100 万个小文件,在 Linux 上,我使用以下代码获得了比没有 ProcessPoolExecutor 的 ~2 倍的加速。您可能需要调整为系统批量设置的文件数量(在我的示例中为 10k)。

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def getsize_list(files):
  fs = 0
  for file in files:
    fs += os.path.getsize(file)
  return fs

def main():
  total_size = 0
  with ProcessPoolExecutor() as executor:
    for root, dirs, files in os.walk(os.getcwd()):
      futures = []

      current_list = []
      for file in files:
        file_path = os.path.join(root, file)
        current_list.append(file_path)
        if len(current_list)==10_000:
          futures.append(executor.submit(getsize_list, current_list))
          current_list = []
      futures.append(executor.submit(getsize_list, current_list))

      for future in as_completed(futures):
        total_size += future.result()

  print(total_size)

if __name__ == '__main__':
  main()

评论

1赞 Jérôme Richard 12/10/2022
此代码在我的机器上打印 0,而不是 OP 序列号。因此,它似乎无法正常工作。
0赞 vladmihaisima 12/10/2022
谢谢,有一个我没有注意到的错误(没有附加最后一个文件块),如果您的文件少于 10k,则会导致 0。
0赞 Jérôme Richard 12/10/2022 #2

由于进程间通信,第二个代码速度较慢。要了解原因,我们首先需要了解事物是如何运作的。

在第一次运行期间,操作系统真正使用 SSD 并将内容放入 RAM 缓存中。然后,此代码的后续执行速度会因此而明显加快(根本不需要与 SSD 设备交互)。在我的机器上,第一次连续运行大约需要 10 秒,而第二次运行大约需要 5 秒。对于操作系统、文件系统和实际的 SSD 驱动程序,可能会使用系统,因此并行执行此操作可能不会更快(这样做是为了防止任何损坏,也是为了简化系统堆栈某些部分的开发)。从历史上看,一些操作系统甚至使用了效率低得可怕的巨型锁。请注意,操作的可伸缩性取决于 RAM 缓存的使用情况(它可能会更好地使用缓存进行扩展)。我将重点介绍内容在缓存中的情况,因为它更容易重现

在第一个代码中,大部分时间都花在了(大约80%的时间在我的机器上 - 4秒)。然后是(大约 15% -- 0.75 秒)。然后是循环开销和字符串处理(约 5% -- 0.25 秒)。在我的机器上,每个电话平均需要大约 50 个我们。os.path.getsizeos.walkos.path.getsize

问题是系统调用的成本很高,每次调用都涉及通常应该创建一个内核线程,涉及 IO 调度程序和任务调度程序,执行上下文切换和同步。更不用说操作系统需要解析、检查和解析完整路径的每个部分,以便实际获取有关目标文件的统计信息。最后,对于这次操作来说,50 us实际上似乎还不错。在 Windows 上,实际上调用了几个系统调用:它用 、 获取大小 打开文件,然后用 关闭它。 占用超过70%的功能时间。os.path.getsizeos.path.getsizeCreateFileWGetFileInformationByHandleCloseHandleCreateFileW

在第二个代码中,使数据被腌,进程间通信 (IPC) 和数据被目标进程取消腌制。IPC 操作的成本很高,因为它们通常会导致上下文切换和低级别同步。获取结果也做同样的事情。最后,大部分时间都应该花在这样的开销上,特别是因为在进行此类操作时主要流程会减慢,因此工人实际上应该在大部分时间等待/挨饿。因此,并行代码可能比串行代码慢得多。对每个文件执行此操作非常昂贵。使用 @vladmihaisima 指出的批处理计算是减少上下文切换开销的第一步。尽管如此,使用此方法不会消除酸洗 + 取消拣选 + 传输数据的开销,尽管它们应该小于初始开销。主要问题来自使用多处理本身。一种解决方案是使用线程而不是进程,以便摆脱上述开销。但是,全局解释器锁定 (GIL) 会导致字符串处理被锁定,从而减慢了计算速度(实际上比在我的机器上拣选更多)。executor.submit

下面是带有块的结果代码:

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def compute_chunk_size(fileChunk):
  total_size = 0
  for block in fileChunk:
    root, files = block
    for file in files:
      file_path = os.path.join(root, file)
      total_size += os.path.getsize(file_path)
  return total_size

def main():
  total_size = 0
  with ProcessPoolExecutor() as executor:
    fileChunk = []
    fileChunkSize = 0
    futures = []
    for root, dirs, files in os.walk(os.getcwd()):
      fileChunk.append((root, files))
      fileChunkSize += len(files)
      if fileChunkSize >= 1024:
        futures.append(executor.submit(compute_chunk_size, fileChunk))
        fileChunk = []
        fileChunkSize = 0
    completed = 0
    if fileChunkSize > 0:
      futures.append(executor.submit(compute_chunk_size, fileChunk))
    while completed < len(futures):
      for future in as_completed(futures):
        total_size += future.result()
        completed += 1

  print(total_size)

if __name__ == '__main__':
  main()

在我的 i5-9600KF(6 核)处理器上,它的速度大约快了 4 倍。

以下是并行执行的分析信息:

enter image description here

每一行都是一个过程。PID 26172 的进程是主进程(启动其他进程并等待它们)。棕色部分是进程的 CPU 时间(活动)。剖析被分成 1 毫秒的小块。浅灰色和浅绿色部分是存在上下文切换或同步的部分,而深绿色部分是进程仅进行计算的部分。

我们可以看到,工作人员相对活跃,但他们很多时候都在挨饿或等待某些东西(通常是访问与 IO 相关的资源)。主线程被工人减慢了速度,因为操作没有完全扩展。事实上,调用速度大约慢了 10-15%,主要是因为无法扩展(当然是由于系统锁定)。并行计算的速度不能快于 。在某些具有许多内核和可扩展的操作系统文件子系统的平台上,这可能是一个瓶颈。如果是这样,则可以使用递归任务来传输文件树,但代价是代码要复杂得多。os.path.getsizeCreateFileWos.walkos.walk

在实践中,在 Windows 上,事实证明实现显然效率不高。根据这篇文章,使用应该会快得多。坏消息是替代函数也使用相同的方法。因此,如果你想有效地执行此操作,为此编写一个 C 扩展当然是一个好主意。这样的 C 扩展也可以从使用线程中受益,同时对 GIL、酸洗和 IPC 没有任何问题。os.path.getsizeGetFileAttributesExos.stat

评论

1赞 vladmihaisima 12/10/2022
我建议在示例中保留主要调用,否则如果有人复制粘贴您的代码,它将不打印任何内容。
0赞 Jérôme Richard 12/14/2022
谢谢,这是固定的。