使用 python 将 json 消息引入 even hub

Ingesting json message into even hub by using python

提问人:Yoonsik 提问时间:1/12/2023 最后编辑:vimuthYoonsik 更新时间:8/7/2023 访问量:39

问:

我正在尝试将 json 消息引入 AZURE 事件中心 我的问题是 json 消息的大小,因为事件中心有 1MB 的限制 我有一个大的json消息,它由多个json消息组成

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]

此数据是一个示例。 DATA 已经是 json 格式,但 DATA 包含 10000+ 个相同格式的 json 事件 我想将此 json 消息引入事件中心I would would like to inceptive this json message into event hub

谁能帮我 如何将这一条大消息引入事件中心?通过切片或其他方式

多谢!

允锡

我尝试对它进行切片,但一条 json 消息中的事件数量总是不同且非常大......

JSON Azure Slice 数据引入 EventHub

评论


答:

0赞 Anupam Chand 8/7/2023 #1

最有效的方法是将庞大的消息列表拆分为多个批次,并将这些批次逐个发送到 Eventhub。 批处理的大小由每条消息的大小决定,请记住,单个批处理只能发送 1MB。假设单个消息的平均大小为 100 字节,则每批大约有 10K 条消息。为了安全起见,您可以将其减少到 5000-8000。

下面是一段代码,它将原始消息 JSON 数组 (DATA) 分解为各个批次的 JSON 数组,并将它们一次一个发送到事件中心。您可以将batch_limit调整到 5000-8000 之间的任何值。由于您说数组中的消息数可以是 10K+,因此您将分 2-3 批发送。

import time
import asyncio
import os
import json

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData

CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxx='
EVENTHUB_NAME = 'xxxxxxxxxxxxxx'

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
batch_limit = 2
async def run():
    print('started')
    producer = EventHubProducerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        eventhub_name=EVENTHUB_NAME
    )
    batch_cnt = 0
    msg_array = []
    for DATA_msg in DATA:
      msg_array.append(DATA_msg)
      batch_cnt += 1
      if batch_cnt > batch_limit:
        async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent a batch of messages')
        batch_cnt = 0
        msg_array = []
    if batch_cnt > 0:
       async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent remaining messages as last batch')
      
start_time = time.time()
asyncio.run(run())
print("Send messages in {} seconds.".format(time.time() - start_time))