对对象的 JSON 数组进行分块,直到每个 Array 项的字节长度<静态阈值

Chunk a JSON Array of Objects until each Array item is of byte length < a Static Threshold

提问人:Coldchain9 提问时间:9/22/2023 最后编辑:Coldchain9 更新时间:9/23/2023 访问量:84





import json

payload: list[dict] = [
    {"data1": [1,2,3,4]},
    {"data2": [8,9,10]},
    {"data3": [1,2,3,4,5,6,7]}

# Max size in bytes we can allow. This is static and a hard limit that is not variable.
MAX_SIZE: int = 25

def check_and_chunk(arr: list):

    def check_size_bytes(item):
        return True if len(json.dumps(item).encode("utf-8")) > MAX_SIZE else False

    def chunk(item, num_chunks: int=2):
        for i in range(0, len(item), num_chunks):
            yield item[i:i+num_chunks]

    # First check if the entire payload is smaller than the MAX_SIZE
    if not check_size_bytes(arr):
        return arr

    # Lets find the items that are small and items that are too big, respectively
    small, big = [], []

    # Find the indices in the payload that are too big
    big_idx: list = [i for i, j in enumerate(list(map(check_size_bytes, arr))) if j]

    # Append these items respectively to their proper lists
    item_append = (small.append, big.append)
    for i, item in enumerate(arr):
        item_append[i in set(big_idx)](item)
    # Modify the big items until they are small enough to be moved to the small_items list
    for i in big:
    # This is where I am unsure of how best to proceed. I'd like to essentially split the big dictionaries in the 'big' list such that it is small enough where each element is in the  'small' result.


payload: list[dict] = [
    {"data1": [1,2,3,4]},
    {"data2": [8,9,10]},
    {"data3": [1,2,3,4]},
    {"data3": [5,6,7]}
JSON的 列表 字典


0赞 Nick 9/22/2023


1赞 Andrej Kesely 9/22/2023 #1


import json

payload = [
    {"data1": [1, 2, 3, 4]},
    {"data2": [8, 9, 10]},
    {"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
    {"data4": [100, 200, -1, -10, 200, 300, 12, 13]},


def get_chunks(lst):
    if len(lst) < 2:
        return lst

    curr, curr_len = [], 0
    for v in lst:
        s = str(v)
        # current length of all numbers + length of current number + number of `, ` + `[]`
        if curr_len + len(s) + 2 * len(curr) + 2 > MAX_SIZE:
            yield curr
            curr = [v]
            curr_len = len(s)
            curr_len += len(s)

    if curr:
        yield curr

for d in payload:
    for k, v in d.items():
        for chunk in get_chunks(v):
            d = {k: chunk}
            print(f"{str(d):<40} {len(json.dumps(chunk).encode())=:<30}")


{'data1': [1, 2, 3, 4]}                  len(json.dumps(chunk).encode())=12                            
{'data2': [8, 9, 10]}                    len(json.dumps(chunk).encode())=10                            
{'data3': [1, 2, 3, 4, 5, 6, 7, 8]}      len(json.dumps(chunk).encode())=24                            
{'data3': [9, 10, 11, 12]}               len(json.dumps(chunk).encode())=15                            
{'data4': [100, 200, -1, -10, 200]}      len(json.dumps(chunk).encode())=24                            
{'data4': [300, 12, 13]}                 len(json.dumps(chunk).encode())=13                            
3赞 Hai Vu 9/22/2023 #2


import json
import logging
import pprint
from collections import deque


MAX_SIZE: int = 25

def split(key, input_sequence, limit, out):
    """Split the `input_sequence` into several smaller ones.

    The result will be appended to the `out` list.    
    input_sequence = deque(input_sequence)
    output_sequence = []
    while input_sequence:
        # Move an element from input_sequence to output_sequence
        element = input_sequence.popleft()

        # Build the dictionary in bytes
        dict_str = json.dumps({key: output_sequence})
        dict_binary = dict_str.encode("utf-8")
        actual_length = len(dict_binary)
        logging.debug("dict_binary=%r, len=%r", dict_binary, actual_length)

        # If the length is over the limit, then back off one element
        # And produce the result
        if actual_length > limit:
            logging.debug("Over the limit")
            out.append({key: output_sequence})
            output_sequence = []

    # Left over
    if output_sequence:
        out.append({key: output_sequence})

def check_and_chunk(arr: list, limit):
    out = []
    for dict_object in arr:
        for key, seq in dict_object.items():
            split(key, seq, limit, out)
    return out

payload: list[dict] = [
    {"data1": [1, 2, 3, 4]},
    {"data2": [8, 9, 10]},
    {"data3": [1, 2, 3, 4, 5, 6, 7]},
    {"data4": list(range(20))},

pprint.pprint(check_and_chunk(payload, MAX_SIZE))


DEBUG:root:dict_binary=b'{"data1": [1]}', len=14
DEBUG:root:dict_binary=b'{"data1": [1, 2]}', len=17
DEBUG:root:dict_binary=b'{"data1": [1, 2, 3]}', len=20
DEBUG:root:dict_binary=b'{"data1": [1, 2, 3, 4]}', len=23
DEBUG:root:dict_binary=b'{"data2": [8]}', len=14
DEBUG:root:dict_binary=b'{"data2": [8, 9]}', len=17
DEBUG:root:dict_binary=b'{"data2": [8, 9, 10]}', len=21
DEBUG:root:dict_binary=b'{"data3": [1]}', len=14
DEBUG:root:dict_binary=b'{"data3": [1, 2]}', len=17
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3]}', len=20
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3, 4]}', len=23
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3, 4, 5]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data3": [5]}', len=14
DEBUG:root:dict_binary=b'{"data3": [5, 6]}', len=17
DEBUG:root:dict_binary=b'{"data3": [5, 6, 7]}', len=20
DEBUG:root:dict_binary=b'{"data4": [0]}', len=14
DEBUG:root:dict_binary=b'{"data4": [0, 1]}', len=17
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2]}', len=20
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2, 3]}', len=23
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2, 3, 4]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [4]}', len=14
DEBUG:root:dict_binary=b'{"data4": [4, 5]}', len=17
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6]}', len=20
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6, 7]}', len=23
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6, 7, 8]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [8]}', len=14
DEBUG:root:dict_binary=b'{"data4": [8, 9]}', len=17
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10]}', len=21
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10, 11]}', len=25
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10, 11, 12]}', len=29
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [12]}', len=15
DEBUG:root:dict_binary=b'{"data4": [12, 13]}', len=19
DEBUG:root:dict_binary=b'{"data4": [12, 13, 14]}', len=23
DEBUG:root:dict_binary=b'{"data4": [12, 13, 14, 15]}', len=27
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [15]}', len=15
DEBUG:root:dict_binary=b'{"data4": [15, 16]}', len=19
DEBUG:root:dict_binary=b'{"data4": [15, 16, 17]}', len=23
DEBUG:root:dict_binary=b'{"data4": [15, 16, 17, 18]}', len=27
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [18]}', len=15
DEBUG:root:dict_binary=b'{"data4": [18, 19]}', len=19
[{'data1': [1, 2, 3, 4]},
 {'data2': [8, 9, 10]},
 {'data3': [1, 2, 3, 4]},
 {'data3': [5, 6, 7]},
 {'data4': [0, 1, 2, 3]},
 {'data4': [4, 5, 6, 7]},
 {'data4': [8, 9, 10, 11]},
 {'data4': [12, 13, 14]},
 {'data4': [15, 16, 17]},
 {'data4': [18, 19]}]


  • 我使用日志记录库进行调试输出。如果要关闭调试,请替换为logging.DEBUGlogging.WARN
  • 我修改了签名以添加大小限制,而不依赖于全局变量check_and_chunk
  • 我使用数据结构,它的行为类似于列表,但从左侧插入/删除速度更快。deque
2赞 Nick 9/22/2023 #3

这是另一种基于计算元素长度并在此基础上拆分列表的解决方案。基本上,代码计算出 JSON dict () 部分的长度,然后是列表 () 的每个单独组件的长度,在每个长度上加 2 以考虑第一个元素和每个后续元素。 用于快速确定分割点需要的位置才能与最大长度 () 拟合。该函数被编写为生成器,以最大程度地减少大型数据集的内存使用量。keykeylenlens[], bisect_right= MAX_SIZE - keylen

import json
from bisect import bisect_right

def chunk_list_dict(dl, limit):
    def chunk_dict_list(dd, limit):
        ll = next(iter(dd.values()))
        key = next(iter(dd.keys()))
        keylen = len(json.dumps(dd)) - len(json.dumps(ll))
        llen = 0
        lens = [(llen := llen + len(str(i)) + 2) for i in ll]
        max_len = limit - keylen
        start = 0
        end = len(lens)
        out = []
        while start < end:
            last = bisect_right(lens, max_len)
            yield { key : ll[start:last] }
            max_len = lens[last-1] + limit - keylen
            start = last
        return out
    for d in dl:
        yield from chunk_dict_list(d, limit)

MAX_SIZE: int = 25

payload: list[dict] = [
    {"data1": [1, 2, 3, 4]},
    {"long_data_name": [1, 2, 3, 4]},
    {"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
    {"data4": [100, 200, -1, -10, 200, 300, 12, 13]},

print(list(chunk_list_dict(payload, MAX_SIZE)))


  {'data1': [1, 2, 3, 4]},
  {'long_data_name': [1]},
  {'long_data_name': [2]},
  {'long_data_name': [3]},
  {'long_data_name': [4]},
  {'data3': [1, 2, 3, 4]},
  {'data3': [5, 6, 7, 8]},
  {'data3': [9, 10, 11]},
  {'data3': [12]},
  {'data4': [100, 200, -1]},
  {'data4': [-10, 200]},
  {'data4': [300, 12, 13]}
1赞 Nick 9/23/2023 #4


import json

def split_list_dict(dl, limit):
    def split_dict_list(dd, limit):
        def json_len(ll):
            return sum(map(len, map(str, ll))) + 2 * len(ll)    # 2 * len(ll) allows for [] and , 
        ll = next(iter(dd.values()))
        key = next(iter(dd.keys()))
        dict_jsonlen = len(json.dumps(dd))
        if dict_jsonlen <= limit:
            yield dd
        list_jsonlen = json_len(ll)
        keylen = dict_jsonlen - list_jsonlen
        split_point = len(ll) // 2
        yield from split_dict_list({ key : ll[:split_point] }, limit)
        yield from split_dict_list({ key : ll[split_point:] }, limit)
    for dd in dl:
        yield from split_dict_list(dd, limit)

MAX_SIZE: int = 25

payload: list[dict] = [
    {"data1": [1, 2, 3, 4]},
    {"long_data_name": [1, 2, 3, 4]},
    {"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
    {"data4": [100, 200, -1, -10, 200, 300, 12, 13]},

print(list(split_list_dict(payload, MAX_SIZE)))


  {'data1': [1, 2, 3, 4]},
  {'long_data_name': [1]},
  {'long_data_name': [2]},
  {'long_data_name': [3]},
  {'long_data_name': [4]},
  {'data3': [1, 2, 3]},
  {'data3': [4, 5, 6]},
  {'data3': [7, 8, 9]},
  {'data3': [10, 11, 12]},
  {'data4': [100, 200]},
  {'data4': [-1, -10]},
  {'data4': [200, 300]},
  {'data4': [12, 13]}