加快从 Telegram 下载数据并将其保存到 CSV 文件的代码

Speeding up the code for downloading data from Telegram and saving it to a CSV file

提问人:I.S.M. 提问时间:10/19/2023 更新时间:10/19/2023 访问量:69

问:

我实现了以下功能,我想尽可能加快速度:

async def async_update_database(self, source_id: str, source_hash: str, message_id_1: str, message_id_n: str, file: str):

        entity = types.InputPeerChannel(int(source_id), int(source_hash))

        n_messages = 0
        n_comments = 0

        with open(file, 'a', encoding = "utf-8") as fout:
            for message_id in range(int(message_id_1), int(message_id_n) + 1):
                message = await self.client.get_messages(entity, ids = message_id)
                if message == None or message.message == "" or message.replies == None or message.replies.replies == 0:
                    continue

                fout.write("M," + str(int(message.date.timestamp())) + ',' + str(message_id) + ',' + repr(message.message) + '\n')
                n_messages += 1

                async for comment in self.client.iter_messages(entity, reply_to = message_id):
                    if isinstance(comment.sender, types.User) and not comment.sender.bot:
                        fout.write("C," + str(int(comment.date.timestamp())) + ',' + str(comment.sender.id) + ',' + repr(comment.message) + '\n')
                        n_comments += 1

        return (str(n_messages) + " " + str(n_comments))

该函数是从外部代码调用的,从 Telegram 频道请求帖子和评论的数据,下载它们并以 CSV 格式保存到文件中。有很多调用,因此写入文件设置为追加到结束模式。下载卷:调用此函数可以下载 10'000 个帖子,每个帖子有 1000 条评论,即 10'000'000 行被写入文件。

你可以用这个代码做什么?我不是 Python 专家,我可能在这里犯了一些愚蠢的错误。在我看来不是很理想:1)在这种形式下写到文件末尾是否有效?2) 异步调用有问题吗?3) 我是否正确使用了 Telethon 库中的函数?

python 字符串 csv 文件 telethon

评论

0赞 ShadowRanger 10/19/2023
基本错误(与性能无关):未使用模块编写文件。CSV 几乎不可能正确,您应该始终使用它来确保您打算使用的任何方言的正确性(您还需要打开文件以避免行尾翻译,以便 CSV 遵循所选方言的行尾格式,而不是操作系统默认)。csvnewline=''
0赞 ShadowRanger 10/19/2023
就性能而言,这几乎肯定会受到网络 I/O 的限制,除了获得更大的管道之外,没有什么会对性能产生有意义的影响。追加模式功能完善,如果代码正常工作,我不知道你对答案的期望是什么。
0赞 I.S.M. 10/19/2023
@ShadowRanger 我正在考虑使用多个帐户来使用它们的 api id/hash 以并行模式加载数据,但可能需要不同的服务器和 IP 地址
0赞 ShadowRanger 10/19/2023
对 Telegram 的服务条款一无所知,我几乎可以保证这违反了它们。在您的司法管辖区,这可能是非法的,也可能不是非法的,但如果您不想因违规而被关闭您的帐户,我强烈建议您不要这样做。
0赞 Ouss 10/19/2023
并发是一个问题。如果第二个呼叫在第一个呼叫结束之前开始,会发生什么情况?=> 您不能使用相同的 .我会用将内容存储在 nosql 数据库或图形数据库中来替换 csv。file

答:

2赞 Sam Hollenbach 10/19/2023 #1

您的代码目前实际上没有使用任何优点(当只考虑此终结点时)。通过在 for 循环中调用或稍后调用,您可以有效地同步处理这些调用,并且每个调用只会在另一个调用完成后执行。虽然这仍然有助于允许在其他 FastAPI 端点中进行调用,但您没有在此处进行任何优化。asyncawaitasync forasyncasyncasync

您可以通过使用 asyncio.gather() 并传递函数列表以“并行”异步执行来提高性能(这并不是真正的并行,但这是一个足够好的类比)。围绕 的代码的第一部分将变成如下内容:await

import functools


message_ids = list(range(int(message_id_1), int(message_id_n) + 1))
# create a partial function with "entity" already passed
get_messages_func = functools.partial(self.client.get_messages, entity)
messages = await asyncio.gather(get_messages_func, message_ids)

消息应以消息列表的形式结束。这里的区别在于,调用不会等待上一个调用返回后再开始,而是会在上一个调用开始后立即开始,并在收到响应后立即返回。将此方法应用于第二个异步调用也可以提高性能。client.get_messages

我在这里能想到的主要问题是您的程序内存不足,因为它不会“同时”处理所有请求及其返回值,这意味着在解析新响应之前没有任何时间对旧响应进行垃圾回收。您可以通过设置一次处理的请求数 () 的限制来解决此问题。gather

评论

1赞 Lonami 10/20/2023
更大的问题是,每条消息都有一个网络调用。 能够一次获取多条消息。或者更好的是,不要按 ID 获取,而是按偏移量获取。get_messages