提问人:Coldchain9 提问时间:9/22/2023 最后编辑:Coldchain9 更新时间:9/23/2023 访问量:84
对对象的 JSON 数组进行分块,直到每个 Array 项的字节长度<静态阈值
Chunk a JSON Array of Objects until each Array item is of byte length < a Static Threshold
问:
我有一个遵循一致结构的结构,其中每个都有一个整数。但是,我需要确保每个字节的字节化(转换为JSON字符串时)小于指定的阈值。list
dict
dict
list
dict
如果超过该字节化阈值,我需要对该字典的整数进行分块。dict
list
尝试:
import json
payload: list[dict] = [
{"data1": [1,2,3,4]},
{"data2": [8,9,10]},
{"data3": [1,2,3,4,5,6,7]}
]
# Max size in bytes we can allow. This is static and a hard limit that is not variable.
MAX_SIZE: int = 25
def check_and_chunk(arr: list):
def check_size_bytes(item):
return True if len(json.dumps(item).encode("utf-8")) > MAX_SIZE else False
def chunk(item, num_chunks: int=2):
for i in range(0, len(item), num_chunks):
yield item[i:i+num_chunks]
# First check if the entire payload is smaller than the MAX_SIZE
if not check_size_bytes(arr):
return arr
# Lets find the items that are small and items that are too big, respectively
small, big = [], []
# Find the indices in the payload that are too big
big_idx: list = [i for i, j in enumerate(list(map(check_size_bytes, arr))) if j]
# Append these items respectively to their proper lists
item_append = (small.append, big.append)
for i, item in enumerate(arr):
item_append[i in set(big_idx)](item)
# Modify the big items until they are small enough to be moved to the small_items list
for i in big:
print(i)
# This is where I am unsure of how best to proceed. I'd like to essentially split the big dictionaries in the 'big' list such that it is small enough where each element is in the 'small' result.
可能期望的结果示例:
payload: list[dict] = [
{"data1": [1,2,3,4]},
{"data2": [8,9,10]},
{"data3": [1,2,3,4]},
{"data3": [5,6,7]}
]
答:
1赞
Andrej Kesely
9/22/2023
#1
IIUC,您可以使用生成器来生成大小合适的块:
import json
payload = [
{"data1": [1, 2, 3, 4]},
{"data2": [8, 9, 10]},
{"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
{"data4": [100, 200, -1, -10, 200, 300, 12, 13]},
]
MAX_SIZE = 25
def get_chunks(lst):
if len(lst) < 2:
return lst
curr, curr_len = [], 0
for v in lst:
s = str(v)
# current length of all numbers + length of current number + number of `, ` + `[]`
if curr_len + len(s) + 2 * len(curr) + 2 > MAX_SIZE:
yield curr
curr = [v]
curr_len = len(s)
else:
curr.append(v)
curr_len += len(s)
if curr:
yield curr
for d in payload:
for k, v in d.items():
for chunk in get_chunks(v):
d = {k: chunk}
print(f"{str(d):<40} {len(json.dumps(chunk).encode())=:<30}")
指纹:
{'data1': [1, 2, 3, 4]} len(json.dumps(chunk).encode())=12
{'data2': [8, 9, 10]} len(json.dumps(chunk).encode())=10
{'data3': [1, 2, 3, 4, 5, 6, 7, 8]} len(json.dumps(chunk).encode())=24
{'data3': [9, 10, 11, 12]} len(json.dumps(chunk).encode())=15
{'data4': [100, 200, -1, -10, 200]} len(json.dumps(chunk).encode())=24
{'data4': [300, 12, 13]} len(json.dumps(chunk).encode())=13
3赞
Hai Vu
9/22/2023
#2
我的方法从整数列表开始。我将从现有列表(我称之为)中取出一个并放入新列表()中,直到我超过长度限制。此时,我将备份一个数字并构建“块”。input_sequence
output_sequence
import json
import logging
import pprint
from collections import deque
logging.basicConfig(level=logging.DEBUG)
MAX_SIZE: int = 25
def split(key, input_sequence, limit, out):
"""Split the `input_sequence` into several smaller ones.
The result will be appended to the `out` list.
"""
input_sequence = deque(input_sequence)
output_sequence = []
while input_sequence:
# Move an element from input_sequence to output_sequence
element = input_sequence.popleft()
output_sequence.append(element)
# Build the dictionary in bytes
dict_str = json.dumps({key: output_sequence})
dict_binary = dict_str.encode("utf-8")
actual_length = len(dict_binary)
logging.debug("dict_binary=%r, len=%r", dict_binary, actual_length)
# If the length is over the limit, then back off one element
# And produce the result
if actual_length > limit:
logging.debug("Over the limit")
output_sequence.pop()
input_sequence.appendleft(element)
out.append({key: output_sequence})
output_sequence = []
# Left over
if output_sequence:
out.append({key: output_sequence})
def check_and_chunk(arr: list, limit):
out = []
for dict_object in arr:
for key, seq in dict_object.items():
split(key, seq, limit, out)
return out
payload: list[dict] = [
{"data1": [1, 2, 3, 4]},
{"data2": [8, 9, 10]},
{"data3": [1, 2, 3, 4, 5, 6, 7]},
{"data4": list(range(20))},
]
pprint.pprint(check_and_chunk(payload, MAX_SIZE))
这是输出。
DEBUG:root:dict_binary=b'{"data1": [1]}', len=14
DEBUG:root:dict_binary=b'{"data1": [1, 2]}', len=17
DEBUG:root:dict_binary=b'{"data1": [1, 2, 3]}', len=20
DEBUG:root:dict_binary=b'{"data1": [1, 2, 3, 4]}', len=23
DEBUG:root:dict_binary=b'{"data2": [8]}', len=14
DEBUG:root:dict_binary=b'{"data2": [8, 9]}', len=17
DEBUG:root:dict_binary=b'{"data2": [8, 9, 10]}', len=21
DEBUG:root:dict_binary=b'{"data3": [1]}', len=14
DEBUG:root:dict_binary=b'{"data3": [1, 2]}', len=17
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3]}', len=20
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3, 4]}', len=23
DEBUG:root:dict_binary=b'{"data3": [1, 2, 3, 4, 5]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data3": [5]}', len=14
DEBUG:root:dict_binary=b'{"data3": [5, 6]}', len=17
DEBUG:root:dict_binary=b'{"data3": [5, 6, 7]}', len=20
DEBUG:root:dict_binary=b'{"data4": [0]}', len=14
DEBUG:root:dict_binary=b'{"data4": [0, 1]}', len=17
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2]}', len=20
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2, 3]}', len=23
DEBUG:root:dict_binary=b'{"data4": [0, 1, 2, 3, 4]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [4]}', len=14
DEBUG:root:dict_binary=b'{"data4": [4, 5]}', len=17
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6]}', len=20
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6, 7]}', len=23
DEBUG:root:dict_binary=b'{"data4": [4, 5, 6, 7, 8]}', len=26
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [8]}', len=14
DEBUG:root:dict_binary=b'{"data4": [8, 9]}', len=17
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10]}', len=21
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10, 11]}', len=25
DEBUG:root:dict_binary=b'{"data4": [8, 9, 10, 11, 12]}', len=29
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [12]}', len=15
DEBUG:root:dict_binary=b'{"data4": [12, 13]}', len=19
DEBUG:root:dict_binary=b'{"data4": [12, 13, 14]}', len=23
DEBUG:root:dict_binary=b'{"data4": [12, 13, 14, 15]}', len=27
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [15]}', len=15
DEBUG:root:dict_binary=b'{"data4": [15, 16]}', len=19
DEBUG:root:dict_binary=b'{"data4": [15, 16, 17]}', len=23
DEBUG:root:dict_binary=b'{"data4": [15, 16, 17, 18]}', len=27
DEBUG:root:Over the limit
DEBUG:root:dict_binary=b'{"data4": [18]}', len=15
DEBUG:root:dict_binary=b'{"data4": [18, 19]}', len=19
[{'data1': [1, 2, 3, 4]},
{'data2': [8, 9, 10]},
{'data3': [1, 2, 3, 4]},
{'data3': [5, 6, 7]},
{'data4': [0, 1, 2, 3]},
{'data4': [4, 5, 6, 7]},
{'data4': [8, 9, 10, 11]},
{'data4': [12, 13, 14]},
{'data4': [15, 16, 17]},
{'data4': [18, 19]}]
笔记
- 我使用日志记录库进行调试输出。如果要关闭调试,请替换为
logging.DEBUG
logging.WARN
- 我修改了签名以添加大小限制,而不依赖于全局变量
check_and_chunk
- 我使用数据结构,它的行为类似于列表,但从左侧插入/删除速度更快。
deque
2赞
Nick
9/22/2023
#3
这是另一种基于计算元素长度并在此基础上拆分列表的解决方案。基本上,代码计算出 JSON dict () 部分的长度,然后是列表 () 的每个单独组件的长度,在每个长度上加 2 以考虑第一个元素和每个后续元素。 用于快速确定分割点需要的位置才能与最大长度 () 拟合。该函数被编写为生成器,以最大程度地减少大型数据集的内存使用量。key
keylen
lens
[]
,
bisect_right
= MAX_SIZE - keylen
import json
from bisect import bisect_right
def chunk_list_dict(dl, limit):
def chunk_dict_list(dd, limit):
ll = next(iter(dd.values()))
key = next(iter(dd.keys()))
keylen = len(json.dumps(dd)) - len(json.dumps(ll))
llen = 0
lens = [(llen := llen + len(str(i)) + 2) for i in ll]
max_len = limit - keylen
start = 0
end = len(lens)
out = []
while start < end:
last = bisect_right(lens, max_len)
yield { key : ll[start:last] }
max_len = lens[last-1] + limit - keylen
start = last
return out
for d in dl:
yield from chunk_dict_list(d, limit)
MAX_SIZE: int = 25
payload: list[dict] = [
{"data1": [1, 2, 3, 4]},
{"long_data_name": [1, 2, 3, 4]},
{"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
{"data4": [100, 200, -1, -10, 200, 300, 12, 13]},
]
print(list(chunk_list_dict(payload, MAX_SIZE)))
输出:
[
{'data1': [1, 2, 3, 4]},
{'long_data_name': [1]},
{'long_data_name': [2]},
{'long_data_name': [3]},
{'long_data_name': [4]},
{'data3': [1, 2, 3, 4]},
{'data3': [5, 6, 7, 8]},
{'data3': [9, 10, 11]},
{'data3': [12]},
{'data4': [100, 200, -1]},
{'data4': [-10, 200]},
{'data4': [300, 12, 13]}
]
1赞
Nick
9/23/2023
#4
正如注释中所讨论的,此任务的一种简单方法是递归地拆分输入列表,直到输出字典满足大小要求。这将在输出中提供更均匀大小的列表,但可能会导致比绝对必要的更多的字典(并且将由长度累积方法之一产生)。
import json
def split_list_dict(dl, limit):
def split_dict_list(dd, limit):
def json_len(ll):
return sum(map(len, map(str, ll))) + 2 * len(ll) # 2 * len(ll) allows for [] and ,
ll = next(iter(dd.values()))
key = next(iter(dd.keys()))
dict_jsonlen = len(json.dumps(dd))
if dict_jsonlen <= limit:
yield dd
return
list_jsonlen = json_len(ll)
keylen = dict_jsonlen - list_jsonlen
split_point = len(ll) // 2
yield from split_dict_list({ key : ll[:split_point] }, limit)
yield from split_dict_list({ key : ll[split_point:] }, limit)
for dd in dl:
yield from split_dict_list(dd, limit)
MAX_SIZE: int = 25
payload: list[dict] = [
{"data1": [1, 2, 3, 4]},
{"long_data_name": [1, 2, 3, 4]},
{"data3": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]},
{"data4": [100, 200, -1, -10, 200, 300, 12, 13]},
]
print(list(split_list_dict(payload, MAX_SIZE)))
输出:
[
{'data1': [1, 2, 3, 4]},
{'long_data_name': [1]},
{'long_data_name': [2]},
{'long_data_name': [3]},
{'long_data_name': [4]},
{'data3': [1, 2, 3]},
{'data3': [4, 5, 6]},
{'data3': [7, 8, 9]},
{'data3': [10, 11, 12]},
{'data4': [100, 200]},
{'data4': [-1, -10]},
{'data4': [200, 300]},
{'data4': [12, 13]}
]
评论