Python:每次迭代时重新打开文件还是截断以覆盖?

Python: re-open file on each iteration or truncate to overwrite?

提问人:o c 提问时间:7/14/2023 更新时间:7/16/2023 访问量:159

问:

在 Python 中,如果你有一个循环,在每次迭代中你都想写入一个文件(在我的情况下是酸洗),覆盖已经存在的任何数据,一种选择是在循环之前打开文件,保持打开状态,并在每次迭代时截断它以在写入新数据之前擦除以前的数据:

import pickle
with open(filename, 'wb') as file:
    for blah in blahs:
        file.truncate(0)
        file.seek(0)
        pickle.dump(blah, file)

另一种方法是在每次迭代时重新打开文件,因为打开它会自动截断它:wb

import pickle
for blah in blahs:
    with open(filename, 'wb') as file:
        pickle.dump(blah, file)

哪个最好(在性能/速度和处理系统资源等方面)?有没有比使用和更好的方法来覆盖已经打开的文件中的数据?file.truncate()file.seek()

我知道有人问过一个类似的问题(每次打开/关闭文件是否比在过程完成之前保持打开状态更好?)但是它似乎是关于您何时想在每次迭代上追加而不是覆盖,所以我想知道后者中的截断等过程是否会导致任何重要的性能下降,从而使天平倾斜?

Python 性能 资源 覆盖

评论

2赞 FlyingTeller 7/14/2023
Which is best (in terms of performance- 听起来您可以轻松地在特定环境中为这两种方法计时。
0赞 o c 7/15/2023
@FlyingTeller 问题更多的是要知道,除了性能(资源处理等)之外,还有任何其他与此相关的一般智慧和技巧,是否存在与系统无关的一般概念可以使一个比另一个更好(以及这些概念是什么)
0赞 Matthias 7/15/2023
我真的不明白,如果你不断覆盖数据,为什么要使用循环。您可以简单地将最后一个条目写在 中。blahs
0赞 o c 7/15/2023
@Matthias 给出的代码只是基本结构的说明性示例 - 它显然不适合在实践中使用。该结构的实际用例的一个示例是动态生成数据的 while 循环,您希望在每次迭代中将数据作为备份写入文件,然后在循环中进一步处理它,然后在处理它之后不再需要备份,因此可以随后覆盖它。
1赞 ken 7/15/2023
在许多平台上,进行零填充。您可以通过执行然后写入 1 个字节来确认这一点。因此,按该顺序执行 、 ,然后(如有必要)将减少冗余零填充量。但是,差异远远小于所需的时间,所以我认为不值得这样做。另一种可能性是,如果缓冲区大小足够大于 ,它可能能够减少对磁盘的实际写入次数。此漏洞无法重新打开,因为它总是在关闭时写入。truncate(0)truncate(0)seek(0)seekdumptruncatedumpblah

答:

0赞 Sebastian Wozny 7/16/2023 #1

我不喜欢猜测,所以我分析了两种方法:

import pickle
import tempfile
from random import choices
from string import ascii_lowercase, ascii_uppercase, digits
from pathlib import Path

from performance_measurement import run_performance_comparison


class Bla:
    def __init__(self):
        population = ascii_uppercase + digits + ascii_lowercase
        self._content = str.join("", choices(population, k=50))


def truncate_approach(blahs: list[Bla], filename: str):
    with open(filename, "wb") as file:
        for blah in blahs:
            file.truncate(0)
            file.seek(0)
            pickle.dump(blah, file)


def reopen_approach(blahs: list[Bla], filename: str):
    for blah in blahs:
        with open(filename, "wb") as file:
            pickle.dump(blah, file)


def setup(N):
    return [[Bla() for i in range(N)], Path(tempfile.NamedTemporaryFile().name)]


run_performance_comparison(
    approaches=[truncate_approach, reopen_approach],
    data_size=[10, 20, 30, 100, 200, 300, 1000, 2000, 3000],
    setup=setup,
    number_of_repetitions=10,
)

truncate_approach稍微快一点。我认为这是因为我们与磁盘的交互较少,有时在我们必须与硬盘交互之前截断内容并重新设置写入缓冲区。

enter image description here

分析代码:

import timeit
from functools import partial

import matplotlib.pyplot as plt
from typing import List, Dict, Callable

from contextlib import contextmanager
import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms
import matplotlib.ticker as ticker
import numpy as np


@contextmanager
def data_provider(data_size, setup=lambda N: N, teardown=lambda: None):
    data = setup(data_size)
    yield data
    teardown(*data)


def run_performance_comparison(approaches: List[Callable],
                               data_size: List[int],
                               *,
                               setup=lambda N: [N],
                               teardown=lambda *N: None,
                               number_of_repetitions=5,
                               title='Performance Comparison',
                               data_name='N',
                               yscale='log',
                               xscale='log'):
    approach_times: Dict[Callable, List[float]] = {approach: [] for approach in approaches}
    for N in data_size:
        with data_provider(N, setup, teardown) as data:
            print(f'Running performance comparison for {data_name}={N}')
            for approach in approaches:
                function = partial(approach, *data)
                approach_time = min(timeit.Timer(function).repeat(repeat=number_of_repetitions, number=1))
                approach_times[approach].append(approach_time)

    for approach in approaches:
        plt.plot(data_size, approach_times[approach], label=approach.__name__)
    plt.yscale(yscale)
    plt.xscale(xscale)

    plt.xlabel(data_name)
    plt.ylabel('Execution Time (seconds)')
    plt.title(title)
    plt.legend()
    plt.show()