对对象的 JSON 数组进行分块,直到每个 Array 项的字节长度<静态阈值

Chunk a JSON Array of Objects until each Array item is of byte length < a Static Threshold

提问人:Coldchain9 提问时间:9/22/2023 最后编辑:Coldchain9 更新时间:9/23/2023 访问量:84

问:

我有一个遵循一致结构的结构,其中每个都有一个整数。但是,我需要确保每个字节的字节化(转换为JSON字符串时)小于指定的阈值。listdictdictlistdict

如果超过该字节化阈值,我需要对该字典的整数进行分块。dictlist

尝试:


import json

payload: list[dict] = [
    {"data1": [1,2,3,4]},
    {"data2": [8,9,10]},
    {"data3": [1,2,3,4,5,6,7]}
]

# Max size in bytes we can allow. This is static and a hard limit that is not variable.
MAX_SIZE: int = 25

def check_and_chunk(arr: list):

    def check_size_bytes(item):
        return True if len(json.dumps(item).encode("utf-8")) > MAX_SIZE else False

    def chunk(item, num_chunks: int=2):
        for i in range(0, len(item), num_chunks):
            yield item[i:i+num_chunks]

    # First check if the entire payload is smaller than the MAX_SIZE
    if not check_size_bytes(arr):
        return arr

    # Lets find the items that are small and items that are too big, respectively
    small, big = [], []

    # Find the indices in the payload that are too big
    big_idx: list = [i for i, j in enumerate(list(map(check_size_bytes, arr))) if j]

    # Append these items respectively to their proper lists
    item_append = (small.append, big.append)
    for i, item in enumerate(arr):
        item_append[i in set(big_idx)](item)
    
    # Modify the big items until they are small enough to be moved to the small_items list
    for i in big:
        print(i)
    # This is where I am unsure of how best to proceed. I'd like to essentially split the big dictionaries in the 'big' list such that it is small enough where each element is in the  'small' result.

可能期望的结果示例:

payload: list[dict] = [
    {"data1": [1,2,3,4]},
    {"data2": [8,9,10]},
    {"data3": [1,2,3,4]},
    {"data3": [5,6,7]}
]
JSON的 列表 字典

评论

0赞 Nick 9/22/2023
拆分时,各个块有多大重要吗?

答:

1赞 Andrej Kesely 9/22/2023 #1

IIUC,您可以使用生成器来生成大小合适的块:

import json

payload = [
    {"data1": [1, 2, 3, 4]},
    {"data2": [8, 9, 10]},
    {"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
    {"data4": [100, 200, -1, -10, 200, 300, 12, 13]},
]

MAX_SIZE = 25


def get_chunks(lst):
    if len(lst) < 2:
        return lst

    curr, curr_len = [], 0
    for v in lst:
        s = str(v)
        # current length of all numbers + length of current number + number of `, ` + `[]`
        if curr_len + len(s) + 2 * len(curr) + 2 > MAX_SIZE:
            yield curr
            curr = [v]
            curr_len = len(s)
        else:
            curr.append(v)
            curr_len += len(s)

    if curr:
        yield curr


for d in payload:
    for k, v in d.items():
        for chunk in get_chunks(v):
            d = {k: chunk}
            print(f"{str(d):<40} {len(json.dumps(chunk).encode())=:<30}")

指纹:

{'data1': [1, 2, 3, 4]}                  len(json.dumps(chunk).encode())=12                            
{'data2': [8, 9, 10]}                    len(json.dumps(chunk).encode())=10                            
{'data3': [1, 2, 3, 4, 5, 6, 7, 8]}      len(json.dumps(chunk).encode())=24                            
{'data3': [9, 10, 11, 12]}               len(json.dumps(chunk).encode())=15                            
{'data4': [100, 200, -1, -10, 200]}      len(json.dumps(chunk).encode())=24                            
{'data4': [300, 12, 13]}                 len(json.dumps(chunk).encode())=13                            
3赞 Hai Vu 9/22/2023 #2

我的方法从整数列表开始。我将从现有列表(我称之为)中取出一个并放入新列表()中,直到我超过长度限制。此时,我将备份一个数字并构建“块”。input_sequenceoutput_sequence

import json
import logging
import pprint
from collections import deque

logging.basicConfig(level=logging.DEBUG)

MAX_SIZE: int = 25


def split(key, input_sequence, limit, out):
    """Split the `input_sequence` into several smaller ones.

    The result will be appended to the `out` list.    
    """
    input_sequence = deque(input_sequence)
    output_sequence = []
    
    while input_sequence:
        # Move an element from input_sequence to output_sequence
        element = input_sequence.popleft()
        output_sequence.append(element)

        # Build the dictionary in bytes
        dict_str = json.dumps({key: output_sequence})
        dict_binary = dict_str.encode("utf-8")
        actual_length = len(dict_binary)
        logging.debug("dict_binary=%r, len=%r", dict_binary, actual_length)

        # If the length is over the limit, then back off one element
        # And produce the result
        if actual_length > limit:
            logging.debug("Over the limit")
            output_sequence.pop()
            input_sequence.appendleft(element)
            out.append({key: output_sequence})
            output_sequence = []

    # Left over
    if output_sequence:
        out.append({key: output_sequence})


def check_and_chunk(arr: list, limit):
    out = []
    for dict_object in arr:
        for key, seq in dict_object.items():
            split(key, seq, limit, out)
    return out


payload: list[dict] = [
    {"data1": [1, 2, 3, 4]},
    {"data2": [8, 9, 10]},
    {"data3": [1, 2, 3, 4, 5, 6, 7]},
    {"data4": list(range(20))},
]

pprint.pprint(check_and_chunk(payload, MAX_SIZE))

这是输出。

DEBUG:root:dict_binary=b'{"data1": [1]}', len=14
DEBUG:root:dict_binary=b'{"data1": [1, 2]}', len=17
DEBUG:root:dict_binary=b'{"data1": [1, 2, 3]}', len=20
DEBUG:root:dict_binary=b'{"data1": [1, 2, 3, 4]}', len=23
DEBUG:root:dict_binary=b'{"data2": [8]}', len=14
DEBUG:root:dict_binary=b'{"data2": [8, 9]}', len=17
DEBUG:root:dict_binary=b'{"data2": [8, 9, 10]}', len=21
DEBUG:root:dict_binary=b'{"data3": [1]}', len=14
DEBUG:root:dict_binary=b'{"data3": [1, 2]}', len=17
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3]}', len=20
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3, 4]}', len=23
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3, 4, 5]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data3": [5]}', len=14
DEBUG:root:dict_binary=b'{"data3": [5, 6]}', len=17
DEBUG:root:dict_binary=b'{"data3": [5, 6, 7]}', len=20
DEBUG:root:dict_binary=b'{"data4": [0]}', len=14
DEBUG:root:dict_binary=b'{"data4": [0, 1]}', len=17
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2]}', len=20
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2, 3]}', len=23
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2, 3, 4]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [4]}', len=14
DEBUG:root:dict_binary=b'{"data4": [4, 5]}', len=17
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6]}', len=20
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6, 7]}', len=23
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6, 7, 8]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [8]}', len=14
DEBUG:root:dict_binary=b'{"data4": [8, 9]}', len=17
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10]}', len=21
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10, 11]}', len=25
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10, 11, 12]}', len=29
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [12]}', len=15
DEBUG:root:dict_binary=b'{"data4": [12, 13]}', len=19
DEBUG:root:dict_binary=b'{"data4": [12, 13, 14]}', len=23
DEBUG:root:dict_binary=b'{"data4": [12, 13, 14, 15]}', len=27
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [15]}', len=15
DEBUG:root:dict_binary=b'{"data4": [15, 16]}', len=19
DEBUG:root:dict_binary=b'{"data4": [15, 16, 17]}', len=23
DEBUG:root:dict_binary=b'{"data4": [15, 16, 17, 18]}', len=27
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [18]}', len=15
DEBUG:root:dict_binary=b'{"data4": [18, 19]}', len=19
[{'data1': [1, 2, 3, 4]},
 {'data2': [8, 9, 10]},
 {'data3': [1, 2, 3, 4]},
 {'data3': [5, 6, 7]},
 {'data4': [0, 1, 2, 3]},
 {'data4': [4, 5, 6, 7]},
 {'data4': [8, 9, 10, 11]},
 {'data4': [12, 13, 14]},
 {'data4': [15, 16, 17]},
 {'data4': [18, 19]}]

笔记

  • 我使用日志记录库进行调试输出。如果要关闭调试,请替换为logging.DEBUGlogging.WARN
  • 我修改了签名以添加大小限制,而不依赖于全局变量check_and_chunk
  • 我使用数据结构,它的行为类似于列表,但从左侧插入/删除速度更快。deque
2赞 Nick 9/22/2023 #3

这是另一种基于计算元素长度并在此基础上拆分列表的解决方案。基本上,代码计算出 JSON dict () 部分的长度,然后是列表 () 的每个单独组件的长度,在每个长度上加 2 以考虑第一个元素和每个后续元素。 用于快速确定分割点需要的位置才能与最大长度 () 拟合。该函数被编写为生成器,以最大程度地减少大型数据集的内存使用量。keykeylenlens[], bisect_right= MAX_SIZE - keylen

import json
from bisect import bisect_right

def chunk_list_dict(dl, limit):
    def chunk_dict_list(dd, limit):
        ll = next(iter(dd.values()))
        key = next(iter(dd.keys()))
        keylen = len(json.dumps(dd)) - len(json.dumps(ll))
        llen = 0
        lens = [(llen := llen + len(str(i)) + 2) for i in ll]
        max_len = limit - keylen
        start = 0
        end = len(lens)
        out = []
        while start < end:
            last = bisect_right(lens, max_len)
            yield { key : ll[start:last] }
            max_len = lens[last-1] + limit - keylen
            start = last
        return out
    for d in dl:
        yield from chunk_dict_list(d, limit)

MAX_SIZE: int = 25

payload: list[dict] = [
    {"data1": [1, 2, 3, 4]},
    {"long_data_name": [1, 2, 3, 4]},
    {"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
    {"data4": [100, 200, -1, -10, 200, 300, 12, 13]},
]

print(list(chunk_list_dict(payload, MAX_SIZE)))

输出:

[
  {'data1': [1, 2, 3, 4]},
  {'long_data_name': [1]},
  {'long_data_name': [2]},
  {'long_data_name': [3]},
  {'long_data_name': [4]},
  {'data3': [1, 2, 3, 4]},
  {'data3': [5, 6, 7, 8]},
  {'data3': [9, 10, 11]},
  {'data3': [12]},
  {'data4': [100, 200, -1]},
  {'data4': [-10, 200]},
  {'data4': [300, 12, 13]}
]
1赞 Nick 9/23/2023 #4

正如注释中所讨论的,此任务的一种简单方法是递归地拆分输入列表,直到输出字典满足大小要求。这将在输出中提供更均匀大小的列表,但可能会导致比绝对必要的更多的字典(并且将由长度累积方法之一产生)。

import json

def split_list_dict(dl, limit):
    def split_dict_list(dd, limit):
        def json_len(ll):
            return sum(map(len, map(str, ll))) + 2 * len(ll)    # 2 * len(ll) allows for [] and , 
        
        ll = next(iter(dd.values()))
        key = next(iter(dd.keys()))
        dict_jsonlen = len(json.dumps(dd))
        if dict_jsonlen <= limit:
            yield dd
            return
        list_jsonlen = json_len(ll)
        keylen = dict_jsonlen - list_jsonlen
        split_point = len(ll) // 2
        yield from split_dict_list({ key : ll[:split_point] }, limit)
        yield from split_dict_list({ key : ll[split_point:] }, limit)
    
    for dd in dl:
        yield from split_dict_list(dd, limit)

MAX_SIZE: int = 25

payload: list[dict] = [
    {"data1": [1, 2, 3, 4]},
    {"long_data_name": [1, 2, 3, 4]},
    {"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
    {"data4": [100, 200, -1, -10, 200, 300, 12, 13]},
]

print(list(split_list_dict(payload, MAX_SIZE)))

输出:

[
  {'data1': [1, 2, 3, 4]},
  {'long_data_name': [1]},
  {'long_data_name': [2]},
  {'long_data_name': [3]},
  {'long_data_name': [4]},
  {'data3': [1, 2, 3]},
  {'data3': [4, 5, 6]},
  {'data3': [7, 8, 9]},
  {'data3': [10, 11, 12]},
  {'data4': [100, 200]},
  {'data4': [-1, -10]},
  {'data4': [200, 300]},
  {'data4': [12, 13]}
]