使用 asyncio/aiohttp 未完成响应有效负载

Response payload is not completed using asyncio/aiohttp

提问人:gbeaven 提问时间:5/29/2019 最后编辑:gbeaven 更新时间:9/11/2023 访问量:14567

问:

我编写了一个脚本,该脚本使用多个对象异步创建批量 API 作业/批处理,每个对象由单个语句查询,等待批处理完成,完成后将结果下载(流式传输)到服务器,进行一些数据转换,最后将结果同步上传到 .我已经对此进行了大量成功的试运行,并认为它运行良好,但是,我最近开始间歇性地收到以下错误,并且对如何解决有点不知所措,因为网络上很少有这样的报告/解决方案:Python 3.7(asyncio 3.4.3 and aiohttp 3.5.4)Salesforce(v45.0)SOQLSQL Server 2016 SP1 (13.0.4560.0)

aiohttp.client_exceptions。ClientPayloadError:响应有效负载不是 完成

示例代码片段:

import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree

#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
                password=password,
                security_token=securityToken,
                organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'

#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary = 
{'Account': {'job':
                {'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
             'id': '8752R00000iUjtReqS'},
             'soql': 'select Id,Name from Account'},

 'Contact': {'job':
                {'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
             'id': '8800o00000POIkLlLa'},
             'soql': 'select Id,Name from Contact'}}

async def retrieveResults(jobId, batchId, sfObject):
    headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
    async with aiohttp.ClientSession() as session:
        async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data) #list of batch results
            for resultID in batchResults:
                async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
                    async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)

async def asyncDownload():
    await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])

if __name__ == "__main__":
    asyncio.run(asyncDownload())

回溯(错误行与上面的代码片段不匹配):

回溯(最近一次调用最后一次):

文件“C:\Code\salesforce.py”,第 252 行,在 asyncio.run(asyncDownload())

文件“C:\Program Files\Python37\lib\asyncio\runners.py”,第 43 行,在 跑 回程loop.run_until_complete(主)

文件“C:\Program Files\Python37\lib\asyncio\base_events.py”,行 584, 在 run_until_complete 返回 future.result()

文件“C:\Code\salesforce.py”,第 241 行,在 asyncDownload 中 await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for objectDictionary 中的 sfObject])

文件“C:\Code\salesforce.py”,第 183 行,在 检索结果 块 = 等待 r.content.read(81920)

文件“C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py“,第 369 行,在 读 等待self._wait('读取')

文件“C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py“,第 297 行,在 _等 等待服务员

aiohttp.client_exceptions。ClientPayloadError:响应有效负载不是 完成

问题的根源似乎始于 81920 字节块中的流式传输数据,但据我所知,这已经是我能做到的了。r.content.read(81920)

我不认为这是我这边的网络问题,因为还有其他小作业连接到此服务器上的外部源,这些作业在此作业运行时没有问题。有谁知道这是怎么回事?

谢谢!

-编辑:

我已经尝试过,但仍然遇到同样的错误......iter_any()read()

async for data in r.content.iter_any():
    await outfile.write(data)

我试过了,但仍然遇到同样的错误......readline()

async for line in r.content.readline():
    await outfile.write(line)

从那以后,我在代码的错误处理部分(未包含在原始问题中)中使用了一些重试功能,这最终允许作业完成。有效负载错误仍在发生,这仍然是主要问题,但重试下载是一种成功的解决方法。如果有人能够提供进一步的信息,问题仍然存在。

python-3.x 异步 salesforce aiohttp

评论

0赞 eyescream 5/29/2019
作业在SF端完成好吗?您可以在“设置”->“批量数据加载作业”中看到它吗?
0赞 gbeaven 5/29/2019
@eyescream 是的,作业顺利完成,“已完成”批处理状态是我启动该功能的触发器。retrieveResults()
0赞 eyescream 5/29/2019
不知道,对不起。如果您从 Postman、curl、SF Workbench 发出相同的请求怎么办?如果它在沙盒中似乎运行良好,但在生产/开发人员版本中死亡 - 也许您正在用尽滚动的 24 小时 API 请求限制?(理想情况下,您会看到一个关于它的 HTTP 标头)
0赞 gbeaven 5/30/2019
我应该补充一点,我相信这会在数据流式传输到磁盘时发生(该部分),因为将开始 200Mb 的下载,然后在下载过程中的某个时间随机出现错误,但并非总是如此。我知道我没有用尽 API 限制 - 我经常关注它,并且使用率始终低于 5%。我将定期尝试,看看我是否至少可以完成我的下载,只是不喜欢失去异步功能。while True:requests
0赞 JerodG 11/12/2019
我有一个脚本抛出相同的错误,并且正在检索已成功运行数月的 JSON 数据。对我来说,问题出在服务器上;磁盘已满。一旦我清理了一些空间,它又开始正常工作了。您可能需要联系服务器管理员进行检查。

答:

0赞 Louis LIETAER 9/7/2021 #1

嗨,您是否尝试在以下位置插入 await asyncio.sleep(0):

                    ...
                    while True:
                        chunk = await r.content.read(81920)
                        await asyncio.sleep(0)
                        if not chunk:
                            break
                        await outfile.write(chunk)
                    ...

评论

0赞 Louis LIETAER 9/11/2021
@gbeaven 就我而言,这解决了我遇到的相同问题,使循环有机会处理其他协程。我还注意到在慢速连接上经常出现ClientPayloadError,所以我以这种方式设置了超时:
0赞 Louis LIETAER 9/11/2021
timeout=aiohttp。ClientTimeout(总计=60*60, sock_read=240);与 AIOHTTP 异步。ClientSession(timeout=timeout) 作为会话:
0赞 Kim Sant 12/1/2021 #2

我在Amazon Lambda中遇到了这个错误(这是在请求中抛出的)

await asyncio.gather(*tasks) # 类似 asyncio.ensure_future() 的任务

解决方案,修复构建环境:

FROM amazonlinux:2 AS 

FROM lambci/lambda:build-python3.8 

我想问题是库内部用于管理协程的 .so 文件或较低级别的东西与 lambda 环境不兼容。因此,在正确的 docker 基础中构建可以解决问题。

评论

0赞 Kim Sant 12/1/2021
无论如何,我不明白为什么会发生这种不匹配 docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html 因为 aws lambda python3.8 基于 amazon linux 2
0赞 Pratham Patel 12/6/2023 #3

“事件循环已在运行”是在已在事件循环中运行的脚本中使用 asyncio.run 时的常见问题。 若要解决此问题,可以使用 创建和运行异步任务。asyncio.create_task

  • 修改 async def retrieveResults
    添加了会话作为参数。 与 session: 异步使用,而不是创建新的 ClientSession。
    async def retrieveResults(session, jobId, batchId, sfObject):
        headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
        
        async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data)  # list of batch results
    
            for resultID in batchResults:
                async with session.get(
                    url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}',
                    headers=headers,
                    timeout=None
                ) as r:
                    async with aiofiles.open(
                        f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv',
                        'wb'
                    ) as outfile:
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)
  • 修改 async def downloadResults:将 async 与 as session:
    一起使用,以在函数的上下文中创建会话。 将会话传递给 retrieveResults 函数。
    aiohttp.ClientSession()
    async def asyncDownload():
        async with aiohttp.ClientSession() as session:
            tasks = [
                retrieveResults(session, objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject)
                for sfObject in objectDictionary
            ]
            await asyncio.gather(*tasks)
    
    if __name__ == "__main__":
        asyncio.run(asyncDownload())