提问人:Yoonsik 提问时间:1/12/2023 最后编辑:vimuthYoonsik 更新时间:8/7/2023 访问量:39
使用 python 将 json 消息引入 even hub
Ingesting json message into even hub by using python
问:
我正在尝试将 json 消息引入 AZURE 事件中心 我的问题是 json 消息的大小,因为事件中心有 1MB 的限制 我有一个大的json消息,它由多个json消息组成
DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
此数据是一个示例。 DATA 已经是 json 格式,但 DATA 包含 10000+ 个相同格式的 json 事件 我想将此 json 消息引入事件中心I would would like to inceptive this json message into event hub
谁能帮我 如何将这一条大消息引入事件中心?通过切片或其他方式
多谢!
允锡
我尝试对它进行切片,但一条 json 消息中的事件数量总是不同且非常大......
答:
0赞
Anupam Chand
8/7/2023
#1
最有效的方法是将庞大的消息列表拆分为多个批次,并将这些批次逐个发送到 Eventhub。 批处理的大小由每条消息的大小决定,请记住,单个批处理只能发送 1MB。假设单个消息的平均大小为 100 字节,则每批大约有 10K 条消息。为了安全起见,您可以将其减少到 5000-8000。
下面是一段代码,它将原始消息 JSON 数组 (DATA) 分解为各个批次的 JSON 数组,并将它们一次一个发送到事件中心。您可以将batch_limit调整到 5000-8000 之间的任何值。由于您说数组中的消息数可以是 10K+,因此您将分 2-3 批发送。
import time
import asyncio
import os
import json
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData
CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxx='
EVENTHUB_NAME = 'xxxxxxxxxxxxxx'
DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
batch_limit = 2
async def run():
print('started')
producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)
batch_cnt = 0
msg_array = []
for DATA_msg in DATA:
msg_array.append(DATA_msg)
batch_cnt += 1
if batch_cnt > batch_limit:
async with producer:
event_data_batch = await producer.create_batch()
for msg in msg_array:
event_data_batch.add(EventData(json.dumps(msg)))
await producer.send_batch(event_data_batch)
print('sent a batch of messages')
batch_cnt = 0
msg_array = []
if batch_cnt > 0:
async with producer:
event_data_batch = await producer.create_batch()
for msg in msg_array:
event_data_batch.add(EventData(json.dumps(msg)))
await producer.send_batch(event_data_batch)
print('sent remaining messages as last batch')
start_time = time.time()
asyncio.run(run())
print("Send messages in {} seconds.".format(time.time() - start_time))
评论