提问人:GuyQian 提问时间:11/17/2023 最后编辑:GuyQian 更新时间:11/20/2023 访问量:56
在 Python 多处理中使用自定义数据类型实现微秒级数据检索
Achieving Microsecond-level Data Retrieval with Custom Data Types in Python Multiprocessing
问:
问题描述:
我目前正在研究一个涉及多处理并需要高效数据通信的 Python 项目。我正在处理的数据存储在一个列表中,列表的每个元素都是一个自定义数据类型。挑战在于实现超快速的数据检索,以微秒级的性能为目标。
具体要求:
数据内容:要传达的数据是一个包含自定义数据类型的列表,例如 pyqt 中的 QPointF。例如,[QPointF(i, 100) for i in range(0, 10000)]。
绩效目标:数据检索过程需要尽可能快,以微秒级效率为目标。
可用性:检索数据后,它应该可以直接使用,而无需额外的处理或组织。
以前的尝试:
我在多处理
模块中探索了各种解决方案,包括:
multiprocessing.Queue
multiprocessing.Pipe
multiprocessing.managers.ShareableList
multiprocessing.shared_memory.SharedMemory
不幸的是,这些方法都没有达到所需的速度和可用性标准。Queue
和 Pipe
的检索时间都在毫秒范围内,而 ShareableList
和 SharedMemory
不支持包含自定义数据类型的列表,因此需要额外的检索后组织。
请求指导:
我正在寻求对 Python 多处理
模块或任何其他相关库中的替代方法或最佳实践的建议或见解,以实现包含自定义数据类型的列表的高效微秒级数据通信。
问题摘要:
在处理包含自定义数据类型的列表时,可以建议在多处理
模块中实现高速数据通信(微秒级)的其他方法或途径?以前对 Queue
、Pipe
、ShareableList
和 SharedMemory
的尝试都失败了。任何意见或建议将不胜感激。谢谢!
答:
TL;DR - 在开箱即用的 Python 中,几乎不可能获得微秒级的计时。下面是一个使用 SharedMemory 的纯 Python 中可能的自制解决方案,它接近并可能激发您自己的开发
Python 中的微秒计时是一个非常残酷的目标。(插入关于在性能敏感任务中使用 Python 的强制性咆哮)我认为专注于低延迟并允许一些错过的消息可以让我们接近您想要的。
我在这里使用的方法是一个双循环缓冲区。第一个缓冲区是用于快速查找的固定步幅,并告诉您在另一个缓冲区中查找起点/停止点的位置。由于您没有提到固定的数据类型,因此很难使用单个缓冲区完成您想要的东西,而双循环缓冲区对此有所帮助,其中第一个是固定步幅(因此我们可以直接跳转到感兴趣的数据),第二个是容纳数据的任何大小(但我们可以直接跳转到感兴趣的数据,因为第一个已经告诉我们它在哪里)。从本质上讲,一个只是另一个腌制对象的记忆映射。在此实现中,数据可以在读取之前被覆盖,并且它支持任意数量的使用者。可以调整为等到读取后再覆盖单个使用者,不会丢失消息配置。我敢肯定,这里有很多优化,因为对正在移动的数据有更好的了解。
我放了大量的评论来解释逻辑,但请随时在评论中发布后续问题。
我假设你所有的对象都是可采摘的,或者可以这样,因为你提到例如它已经有这个要求。multiprocessing.Queue
您的要求:
- 数据内容
- 根据文档,您提到的对象有一个方法,因此它应该可以正常工作,大多数常见对象也是如此。如果没有,你可以随时包装它,尽管我知道你没有“[额外]组织”的要求。
__reduce__
- 根据文档,您提到的对象有一个方法,因此它应该可以正常工作,大多数常见对象也是如此。如果没有,你可以随时包装它,尽管我知道你没有“[额外]组织”的要求。
- 绩效目标
- 对于大多数大小合理的数据类型,它在 5 - 20 微秒范围内,并且在非常小的对象上可能表现更好,在我的测试中,单个 int 约为 1.2 微秒。
- 可用性
- 这提供了与您测试的解决方案一样相似的对象,在我的测试中,我没有发现输入的内容和输出的内容之间存在任何差异,尽管它确实有一个围绕它的类,因此存在一些相关的开销,无论如何这是不可避免的。
性能:
在我的 Windows 10、i9-10900K、2667 MHz RAM、Python 3.11.4 上,我可以将 1000 次读取时间缩短到每个 4.8 微秒左右,每个读取时间约为 1.2 微秒。int
numpy.array
int
import sys
import pickle
import multiprocessing.shared_memory as sm
import time
import uuid
import struct
# PyPi libraries
import numpy as np
# String encoding for storing in the buffer
ENCODING = 'UTF-8'
def getuuid():
return bytes(str(uuid.uuid4()), encoding=ENCODING)
# GuyQianMessage
class GQM:
'''
A low read-latency message service between processes using multiprocessing
Uses a circular buffer - older data is eventually lost, any process
reading from this buffer which falls behind will miss entries
DIR buffer contains UUID, index and start stop positions in the data buffer
DAT buffer is the data buffer and contains UUID and pickle bytes
Use `put` to put data into the buffer
Use `get` to retrieve data from the buffer
Use `get` with verbose=True to get back index and UUID for identifying
skipped or repeated data
Typical use case:
Proc 1:
gqm = GQM(True)
while True:
# Generate data
gqm.put(mydatatosend)
# Exit time
gqm.close()
Proc 2:
gqm = GQM()
while True:
data, index, uuid = gqm.get(verbose=True)
if uuid == lastuuid:
# repeat data
pass
if index > lastindex + 1:
# missed data
pass
# process data
'''
# These are unique names for the memory space
SHARED_MEMORY_NAME_DIR = 'SPECIAL_NAME_FOR_GUYQIAN_77500126_DIR'
SHARED_MEMORY_NAME_DAT = 'SPECIAL_NAME_FOR_GUYQIAN_77500126_DAT'
# Capture the length of a UUID (its stored in the buffer with the entry)
UUID_LEN = len(getuuid())
# How many positions to keep in the directory
# By checking indexes, processes can determine if they need to also
# check an earlier position for data
# Eaching backwards improperly is caught by UUID matching
DIR_POS_COUNT = 1024
# The data type of the position in the DIR buffer
DIR_POS_TYPE = 'L'
# Pre calculate how much space the position takes up
DIR_POS_LEN = struct.calcsize(f'<{DIR_POS_TYPE}')
# Pre generate the format string for each entry in the DIR buffer
DIR_ROW_FMT = f'<{UUID_LEN}sLLL'
# Pre calculate the stride in the DIR buffer
DIR_STRIDE = struct.calcsize(DIR_ROW_FMT)
# Pre calculate the total DIR buffer size
BUFFER_SIZE_DIR = struct.calcsize(
f'<{DIR_POS_TYPE}{DIR_STRIDE * DIR_POS_COUNT}c'
)
# Set the size of the DAT buffer
# Larger holds more entries before overwriting
# May need to be high for large datatypes
BUFFER_SIZE_DAT = int(1e6)
def __init__(self, provider=False):
'''
Initialize with provider set to True for the data creator
Initialize with no arguments or provider set to False for data users
Returns a GQM object
'''
# Keep this flag - prevent consumers from trying to put data in
self.provider = provider
# If this is the provider, create the buffers using the precalculated
# sizes and names
if self.provider:
self.smo_dir = sm.SharedMemory(
name=GQM.SHARED_MEMORY_NAME_DIR,
create=True,
size=GQM.BUFFER_SIZE_DIR
)
self.smo_dat = sm.SharedMemory(
name=GQM.SHARED_MEMORY_NAME_DAT,
create=True,
size=GQM.BUFFER_SIZE_DAT
)
# If this is the consumer, hook into the existing buffer
else:
self.smo_dir = sm.SharedMemory(name=GQM.SHARED_MEMORY_NAME_DIR)
self.smo_dat = sm.SharedMemory(name=GQM.SHARED_MEMORY_NAME_DAT)
# This is the current byte position in the DAT buffer to write the
# next item to
self.dat_pos = 0
# This is the current row numbre in the DIR buffer to write the
# next look up info to
self.dir_pos = 0
# This is the index of the item being put in so that consumers can
# determine if they have missed any data
self.index = 0
def close(self):
'''
Call this function from the provider to close out the buffers
DO NOT call this function before all consumers are finished
Consumers do not need to do anything when finished
'''
# Clean up the buffers by unlinking
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if not self.provider:
# Only the provider should call the close function
raise AssertionError('This is not a provider! '\
'- Unlink only from provider!')
self.smo_dir.unlink()
self.smo_dat.unlink()
def put(self, obj):
'''
Add data to the buffer
Returns
index of the item added
a UUID for the item added
'''
# Only providers should call put
if not self.provider:
raise AssertionError('Put called from consumer!')
# Pickle the object
# If it fails, it may not be picklable, see error message below
try:
objp = pickle.dumps(obj)
except TypeError:
# Capture this error to help developers understand how to fix it
raise TypeError('Most likely this error was thrown because an '\
'object which cannot be pickled was passed as a message. '\
'\n\tFor custom classes, add a __reduce__ method.' \
'\n\tSee https://stackoverflow.com/questions/19855156'
)
except:
# If something else happened raise the error as is
raise
# Calculate the amount of space necessary in the DAT buffer
space_req = GQM.UUID_LEN + len(objp)
# Check if there is that much left
# If not, reset to the beginning of the buffer
# Relies on the UUID's to stop improper reads
# This makes it a circular buffer
if self.dat_pos + space_req > GQM.BUFFER_SIZE_DAT:
self.dat_pos = 0
# The bytes of the block to write the data to
start = self.dat_pos
stop = self.dat_pos + space_req
# Generate a new UUID to use for this entry so that it can be
# uniquely identified
uuid = getuuid()
# Put the data into the DAT buffer first
self.smo_dat.buf[start:stop] = struct.pack(
f'<{space_req}s',
uuid + objp
)
# Next put the data into the DIR buffer
self._put_dir_row(self.dir_pos, uuid, self.index, start, stop)
# Lastly update the current entry marker for the DIR buffer
self._put_dir_pos(self.dir_pos)
# Increment the index to track how many items have been put into the
# buffer
self.index += 1
# Increment the dir_pos for the next call and wrap on the total length
# to fulfill circular buffer property
self.dir_pos += 1
self.dir_pos %= GQM.DIR_POS_COUNT
# Record the next position to start writing to the DAT buffer
self.dat_pos = stop
# Return the index and UUID in case they are to be used for something
return self.index, uuid
def get(self, verbose=False, pos_override=None):
'''
Retrieves the latest data from the buffer
To check if data was skipped or is repeated, check index and uuid
Returns
The data object
index of this entry (if verbose)
uuid of this entry (if verbose)
'''
# Allow a pos override for testing or advanced usage
if pos_override is None:
pos = self._get_dir_pos()
else:
# Could add some logic for negatives to allow retrieving previous
# entries blindly without checking positions and indexes, etc
pos = pos_override
# Retrieve the information from the DIR buffer
uuid_dir, index, start, end = self._get_dir_row(pos)
# Retrieve the data from the DAT buffer
uuid_dat, pickledata = struct.unpack(
f'<{GQM.UUID_LEN}s{end - start - GQM.UUID_LEN}s',
self.smo_dat.buf[start: end]
)
# Verify that the UUID's match
# If they don't match the DAT could have been updated while we were
# reading the DIR buffer
# It should be possible to resolve by simply recalling get
# If this triggers often, the DAT buffer may be too small
if not uuid_dat == uuid_dir:
raise AssertionError('Failed to retrieve matching UUID from data')
# Read the object back
outobject = pickle.loads(pickledata)
# Return the object (and index and UUID if requested)
if verbose:
return outobject, index, uuid_dat
return outobject
def _get_indices_dir(self, pos):
# Get the indices of the DIR buffer entry
# The position indicator is first, and then pos strides is the start
# of that entry in the buffer
start = GQM.DIR_POS_LEN + GQM.DIR_STRIDE * pos
# Because the DIR buffer is fixed strides, we can just add the stride
# to the start to get the end
return start, start + GQM.DIR_STRIDE
def _put_dir_row(self, pos, uuid, index, start, stop):
# Put data into a row in the DIR buffer (reverse of _get_dir_row)
# Get the start and stop positions
startdir, stopdir = self._get_indices_dir(pos)
# Put the data in using the pre-generated fmt string
self.smo_dir.buf[startdir: stopdir] = struct.pack(
GQM.DIR_ROW_FMT,
uuid,
index,
start,
stop
)
def _get_dir_row(self, pos=None):
# Get a row from the DIR buffer (reverse of _put_dir_row)
# Allow position overrides for testing or advanced usage
if pos is None:
pos = self._get_dir_pos()
# Get the start and stop positions in the DIR buffer
start, stop = self._get_indices_dir(pos)
# Retrieve and unpack the data using the pre-generated fmt string
uuid, index, startdat, stopdat = struct.unpack(
GQM.DIR_ROW_FMT,
self.smo_dir.buf[start: stop]
)
# Return all the data collectd
return uuid, index, startdat, stopdat
def _put_dir_pos(self, pos):
# Update the current DIR buffer position (reverse of _get_dir_pos)
self.smo_dir.buf[0: GQM.DIR_POS_LEN] = struct.pack(
f'<{GQM.DIR_POS_TYPE}',
pos
)
def _get_dir_pos(self):
# Retrieve teh current DIR buffer position (reverse of _put_dir_pos)
next_pos = struct.unpack(
f'<{GQM.DIR_POS_TYPE}',
self.smo_dir.buf[0: GQM.DIR_POS_LEN]
)
# Struct unpack returns a tuple even for a single item in the fmt str
return next_pos[0]
def gt(s=0):
# A convenience function for returning times for rough performance calc
return time.perf_counter() * 1e6 - s
def _demo():
gqmp = GQM(True)
gqmp2 = GQM()
# A variety of data types to demonstrate that it can recover the object
test_data = [
1,
'abc',
b'/x00/x01',
{'abc': 10},
list(range(10)),
np.array(range(100, 1100), int),
[{'a': [{'b': {'c': [1, 'a',]}}]}]
]
for test_point in test_data:
gqmp.put(test_point)
x = gqmp2.get()
assert(pickle.dumps(x) == pickle.dumps(test_point))
print('Example get: ', gqmp2.get(verbose=True))
gqmp.put(np.array(range(1000), int))
n = int(1e5)
s = gt()
for i in range(n):
gqmp2.get()
unit_time = gt(s) / n
print(f'get time: {unit_time:.2f} u-sec')
# Approx 4.8 u-sec per read
gqmp.close()
def _test():
# Check if u-sec write/read is even feasible
n = int(1e5)
mydata = np.array(range(100), int)
s = gt()
for i in range(n):
mypick = pickle.dumps(mydata)
mynewdata = pickle.loads(mypick)
unit_time = gt(s) / n # ~8.1
print(f'Pickle/Unpickle time: {unit_time:.2f} u-sec')
# Not really , approx 8.1 u-sec per write-read
# Check if u-sec read only is feasible
n = int(1e5)
mydata = np.array(range(100), int)
mypick = pickle.dumps(mydata)
s = gt()
for i in range(n):
mynewdata = pickle.loads(mypick)
unit_time = gt(s) / n # ~2.5
print(f'Unpickle time: {unit_time:.2f} u-sec')
# Maybe , approx 2.5 u-sec per read
if __name__ == '__main__':
if sys.argv[1] == 'test':
_test()
elif sys.argv[1] == 'demo':
_demo()
这是一个小图表,简要显示了内存是如何组织的。记忆是如何组织的
评论