如何使用带有 0(1) 内存的 lxml?

How to use lxml with 0(1) memory?

提问人:Istvan 提问时间:11/9/2023 最后编辑:Istvan 更新时间:11/9/2023 访问量:64

问:

我正在尝试使用~72G XML文件。我想将其转换为CSV。

这是我假设在后台使用迭代器的代码,因为我可能在关于 lxml 的某个地方读到过它。

from lxml import etree
import csv

file_name = "data/discogs_20231001_releases.xml"
# file_name = "data/sample.xml"


def handle_artists(artists):
    ret = []
    for artist in artists:
        artist_dict = {}
        for artist_tag in artist:
            if artist_tag.tag == "id":
                artist_dict["id"] = artist_tag.text
            elif artist_tag.tag == "name":
                artist_dict["name"] = artist_tag.text
        ret.append(artist_dict)
    return ret


events = ("start", "end")
context = etree.iterparse(file_name, events=events)

with open("data/discogs_20231001_releases.csv", "w") as f:
    w = csv.DictWriter(f=f, fieldnames=["release_id", "release_title"], delimiter="\t")
    w.writeheader()

    for action, elem in context:
        if action == "start" and elem.tag == "release":
            release_id = elem.get("id")
            release = {"release_id": release_id}
            for child in elem:
                tag = child.tag
                if tag == "title":
                    release["release_title"] = child.text
                elif tag == "artists":
                    # release["artists"] = handle_artists(child)
                    pass
            if release.get("release_title") != None:
                w.writerow(release)

当我运行它时,我得到:

Job 1, 'python release.py' terminated by signal SIGKILL (Forced quit)

我不确定我是否在XML解析器或CSV编写器上犯了错误。

检查内存分析器是否可以确切地告诉我哪一个是罪魁祸首。

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    49     25.8 MiB     25.8 MiB           1   @profile
    50                                         def process_event(elem):
    51     25.8 MiB      0.0 MiB           1       release = {}
    52     25.8 MiB      0.0 MiB           1       if elem.tag == "release":
    53                                                 release_id = elem.get("id")
    54                                                 release = {"release_id": release_id}
    55                                                 for child in elem:
    56                                                     tag = child.tag
    57                                                     if tag == "title":
    58                                                         release["release_title"] = child.text
    59                                                     elif tag == "artists":
    60                                                         # release["artists"] = handle_artists(child)
    61                                                         pass
    62     25.8 MiB      0.0 MiB           1       return release
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    49    176.0 MiB    176.0 MiB           1   @profile
    50                                         def process_event(elem):
    51    176.0 MiB      0.0 MiB           1       release = {}
    52    176.0 MiB      0.0 MiB           1       if elem.tag == "release":
    53                                                 release_id = elem.get("id")
    54                                                 release = {"release_id": release_id}
    55                                                 for child in elem:
    56                                                     tag = child.tag
    57                                                     if tag == "title":
    58                                                         release["release_title"] = child.text
    59                                                     elif tag == "artists":
    60                                                         # release["artists"] = handle_artists(child)
    61                                                         pass
    62    176.0 MiB      0.0 MiB           1       return release

建议的fast_iter解决方案执行以下操作:

Python(99102,0x202e0e080) malloc: *** error for object 0x1338b7fc0: pointer being freed was not allocated
Python(99102,0x202e0e080) malloc: *** set a breakpoint in malloc_error_break to debug
fish: Job 1, 'python release.py' terminated by signal SIGABRT (Abort)
蟒蛇 LXML

评论

0赞 slothrop 11/9/2023
这回答了你的问题吗?为什么 lxml.etree.iterparse() 会占用我所有的内存?
0赞 Istvan 11/9/2023
这很奇怪。即使使用建议的解决方案,它仍然使用 O(n) 内存。
0赞 AKX 11/9/2023
由于您实际上似乎并不需要完整的文档结构,因此我建议您使用 SAX 样式的解析器和状态机。
0赞 Istvan 11/9/2023
你能给我举个例子如何在 Python 中使用 SAX 样式解析器吗?
0赞 Sam Mason 11/9/2023
当搜索“SAX style parser with Python”(即你的确切单词!)时,我得到的第一个结果是 stackoverflow.com/q/42325244/1358308 这似乎是相关的

答:

1赞 AKX 11/9/2023 #1

这是 SAX-y 的重新实现;它只准确存储您在任何给定时刻解析的信息。

当我们知道我们不在标签内时,不完全比较节点路径可能会更快一些,但这留给读者作为练习。artist

from xml.sax import parse, ContentHandler


class DiscogsFileHandler(ContentHandler):
    def __init__(self, *, write_release):
        super().__init__()
        self.current_release = None
        self.current_artist = None
        self.write_release = write_release
        self.tag_stack = []
        self.char_buffer = ""

    @property
    def tag_path(self) -> tuple:
        return tuple(self.tag_stack)

    def startElement(self, name: str, attrs):
        self.tag_stack.append(name)
        path = self.tag_path
        if path == ("releases", "release"):
            assert not self.current_release
            self.current_release = {**dict(attrs), "artists": []}
        elif path == ("releases", "release", "artists", "artist"):
            assert not self.current_artist
            self.current_artist = dict(attrs)
            self.current_release["artists"].append(self.current_artist)

    def endElement(self, name: str):
        path = self.tag_path
        if path == ("releases", "release"):
            self.write_release(self.current_release)
            self.current_release = None
        if path == ("releases", "release", "title"):
            self.current_release["title"] = self.char_buffer
        if path == ("releases", "release", "artists", "artist", "name"):
            self.current_artist["name"] = self.char_buffer
        if path == ("releases", "release", "artists", "artist", "id"):
            self.current_artist["id"] = self.char_buffer
        if path == ("releases", "release", "artists", "artist"):
            self.current_artist = None
        assert self.tag_stack.pop() == name
        self.char_buffer = ""

    def characters(self, content: str):
        self.char_buffer += content


def write_release(rel: dict):
    print("Got release:", rel)
    # Your CSV writing code here...


parse("discogs.xml", DiscogsFileHandler(write_release=write_release))

输出是(例如,它不写 CSV)

Got release: {'id': '1', 'status': 'Accepted', 'artists': [{'id': '1', 'name': 'The Persuader'}], 'title': 'Stockholm'}
Got release: {'id': '2', 'status': 'Accepted', 'artists': [{'id': '2', 'name': 'Mr. James Barth & A.D.'}], 'title': "Knockin' Boots (Vol 2 Of 2)"}
Got release: {'id': '3', 'status': 'Accepted', 'artists': [{'id': '3', 'name': 'Josh Wink'}], 'title': 'Profound Sounds Vol. 1'}
Got release: {'id': '4', 'status': 'Accepted', 'artists': [{'id': '21', 'name': 'Faze Action'}], 'title': 'Moving Cities'}
Got release: {'id': '5', 'status': 'Accepted', 'artists': [{'id': '22', 'name': 'Datacide'}], 'title': 'Flowerhead'}
Got release: {'id': '6', 'status': 'Accepted', 'artists': [{'id': '2', 'name': 'Mr. James Barth & A.D.'}], 'title': "Knockin' Boots (Vol 1 Of 2)"}
Got release: {'id': '7', 'status': 'Accepted', 'artists': [{'id': '28', 'name': 'Moonchildren'}], 'title': 'Moonchildren EP'}
Got release: {'id': '8', 'status': 'Accepted', 'artists': [{'id': '29', 'name': 'Sweet Abraham'}], 'title': 'Spreading Outward EP'}
Got release: {'id': '9', 'status': 'Accepted', 'artists': [{'id': '33', 'name': 'Blue Six'}], 'title': 'Pure'}
Got release: {'id': '10', 'status': 'Accepted', 'artists': [{'id': '36', 'name': 'Lovetronic'}], 'title': 'You Are Love'}

我不想下载完整的文件,所以我这样做了

curl -fSL 'https://discogs-data-dumps.s3-us-west-2.amazonaws.com/data/2023/discogs_20231001_releases.xml.gz' | head -c 10000000 | gzcat | xmllint --recover - > discogs.xml

至少要抓住其中的一部分。

为了证明这确实执行了流式处理,您可以在代码中更改为,并在文件中实际将文件从 Internet 流式传输到程序:"discogs.xml"sys.stdin.buffer

curl -fSL 'https://discogs-data-dumps.s3-us-west-2.amazonaws.com/data/2023/discogs_20231001_releases.xml.gz' | gzcat | python so77452645.py

评论

0赞 Istvan 11/9/2023
这太棒了,谢谢!