在 Python 多处理中使用自定义数据类型实现微秒级数据检索

Achieving Microsecond-level Data Retrieval with Custom Data Types in Python Multiprocessing

提问人:GuyQian 提问时间:11/17/2023 最后编辑:GuyQian 更新时间:11/20/2023 访问量:56

问:

问题描述:

我目前正在研究一个涉及多处理并需要高效数据通信的 Python 项目。我正在处理的数据存储在一个列表中,列表的每个元素都是一个自定义数据类型。挑战在于实现超快速的数据检索,以微秒级的性能为目标。

具体要求:

  1. 数据内容:要传达的数据是一个包含自定义数据类型的列表,例如 pyqt 中的 QPointF。例如,[QPointF(i, 100) for i in range(0, 10000)]。

  2. 绩效目标:数据检索过程需要尽可能快,以微秒级效率为目标。

  3. 可用性:检索数据后,它应该可以直接使用,而无需额外的处理或组织。

以前的尝试:

我在多处理模块中探索了各种解决方案,包括:

  • multiprocessing.Queue

  • multiprocessing.Pipe

  • multiprocessing.managers.ShareableList

  • multiprocessing.shared_memory.SharedMemory

不幸的是,这些方法都没有达到所需的速度和可用性标准。QueuePipe 的检索时间都在毫秒范围内,而 ShareableListSharedMemory 不支持包含自定义数据类型的列表,因此需要额外的检索后组织。

请求指导:

我正在寻求对 Python 多处理模块或任何其他相关库中的替代方法或最佳实践的建议或见解,以实现包含自定义数据类型的列表的高效微秒级数据通信。

问题摘要:

在处理包含自定义数据类型的列表时,可以建议在多处理模块中实现高速数据通信(微秒级)的其他方法或途径?以前对 QueuePipeShareableListSharedMemory 的尝试都失败了。任何意见或建议将不胜感激。谢谢!

python-3.x 性能 多处理 python-multiprocessing

评论


答:

1赞 BitsAreNumbersToo 11/19/2023 #1

TL;DR - 在开箱即用的 Python 中,几乎不可能获得微秒级的计时。下面是一个使用 SharedMemory 的纯 Python 中可能的自制解决方案,它接近并可能激发您自己的开发

Python 中的微秒计时是一个非常残酷的目标。(插入关于在性能敏感任务中使用 Python 的强制性咆哮)我认为专注于低延迟并允许一些错过的消息可以让我们接近您想要的。

我在这里使用的方法是一个双循环缓冲区。第一个缓冲区是用于快速查找的固定步幅,并告诉您在另一个缓冲区中查找起点/停止点的位置。由于您没有提到固定的数据类型,因此很难使用单个缓冲区完成您想要的东西,而双循环缓冲区对此有所帮助,其中第一个是固定步幅(因此我们可以直接跳转到感兴趣的数据),第二个是容纳数据的任何大小(但我们可以直接跳转到感兴趣的数据,因为第一个已经告诉我们它在哪里)。从本质上讲,一个只是另一个腌制对象的记忆映射。在此实现中,数据可以在读取之前被覆盖,并且它支持任意数量的使用者。可以调整为等到读取后再覆盖单个使用者,不会丢失消息配置。我敢肯定,这里有很多优化,因为对正在移动的数据有更好的了解。

我放了大量的评论来解释逻辑,但请随时在评论中发布后续问题。

我假设你所有的对象都是可采摘的,或者可以这样,因为你提到例如它已经有这个要求。multiprocessing.Queue

您的要求:

  • 数据内容
    • 根据文档,您提到的对象有一个方法,因此它应该可以正常工作,大多数常见对象也是如此。如果没有,你可以随时包装它,尽管我知道你没有“[额外]组织”的要求。__reduce__
  • 绩效目标
    • 对于大多数大小合理的数据类型,它在 5 - 20 微秒范围内,并且在非常小的对象上可能表现更好,在我的测试中,单个 int 约为 1.2 微秒。
  • 可用性
    • 这提供了与您测试的解决方案一样相似的对象,在我的测试中,我没有发现输入的内容和输出的内容之间存在任何差异,尽管它确实有一个围绕它的类,因此存在一些相关的开销,无论如何这是不可避免的。

性能: 在我的 Windows 10、i9-10900K、2667 MHz RAM、Python 3.11.4 上,我可以将 1000 次读取时间缩短到每个 4.8 微秒左右,每个读取时间约为 1.2 微秒。intnumpy.arrayint

import sys
import pickle
import multiprocessing.shared_memory as sm
import time
import uuid
import struct

# PyPi libraries
import numpy as np

# String encoding for storing in the buffer
ENCODING = 'UTF-8'

def getuuid():
    return bytes(str(uuid.uuid4()), encoding=ENCODING)

# GuyQianMessage
class GQM:
    '''
    A low read-latency message service between processes using multiprocessing
    Uses a circular buffer - older data is eventually lost, any process
        reading from this buffer which falls behind will miss entries
    DIR buffer contains UUID, index and start stop positions in the data buffer
    DAT buffer is the data buffer and contains UUID and pickle bytes
    
    Use `put` to put data into the buffer
    Use `get` to retrieve data from the buffer
    Use `get` with verbose=True to get back index and UUID for identifying 
        skipped or repeated data
    
    Typical use case:
    Proc 1:
        gqm = GQM(True)
        while True:
            # Generate data
            gqm.put(mydatatosend)
        # Exit time
        gqm.close()
    
    Proc 2:
        gqm = GQM()
        while True:
            data, index, uuid = gqm.get(verbose=True)
            if uuid == lastuuid:
                # repeat data
                pass
            if index > lastindex + 1:
                # missed data
                pass
            # process data
    
    '''
    
    # These are unique names for the memory space
    SHARED_MEMORY_NAME_DIR = 'SPECIAL_NAME_FOR_GUYQIAN_77500126_DIR'
    SHARED_MEMORY_NAME_DAT = 'SPECIAL_NAME_FOR_GUYQIAN_77500126_DAT'
    
    # Capture the length of a UUID (its stored in the buffer with the entry)
    UUID_LEN = len(getuuid())
    
    # How many positions to keep in the directory
    #   By checking indexes, processes can determine if they need to also 
    #   check an earlier position for data
    #   Eaching backwards improperly is caught by UUID matching
    DIR_POS_COUNT = 1024
    # The data type of the position in the DIR buffer
    DIR_POS_TYPE = 'L'
    # Pre calculate how much space the position takes up
    DIR_POS_LEN = struct.calcsize(f'<{DIR_POS_TYPE}')
    # Pre generate the format string for each entry in the DIR buffer
    DIR_ROW_FMT = f'<{UUID_LEN}sLLL'
    # Pre calculate the stride in the DIR buffer
    DIR_STRIDE = struct.calcsize(DIR_ROW_FMT)
    # Pre calculate the total DIR buffer size
    BUFFER_SIZE_DIR = struct.calcsize(
        f'<{DIR_POS_TYPE}{DIR_STRIDE * DIR_POS_COUNT}c'
    )
    
    # Set the size of the DAT buffer
    #   Larger holds more entries before overwriting
    #   May need to be high for large datatypes
    BUFFER_SIZE_DAT = int(1e6)
    
    def __init__(self, provider=False):
        '''
        Initialize with provider set to True for the data creator
        Initialize with no arguments or provider set to False for data users
        
        Returns a GQM object
        '''
        # Keep this flag - prevent consumers from trying to put data in
        self.provider = provider
        # If this is the provider, create the buffers using the precalculated
        #   sizes and names
        if self.provider:
            self.smo_dir = sm.SharedMemory(
                name=GQM.SHARED_MEMORY_NAME_DIR,
                create=True,
                size=GQM.BUFFER_SIZE_DIR
            )
            self.smo_dat = sm.SharedMemory(
                name=GQM.SHARED_MEMORY_NAME_DAT,
                create=True,
                size=GQM.BUFFER_SIZE_DAT
            )
        # If this is the consumer, hook into the existing buffer
        else:
            self.smo_dir = sm.SharedMemory(name=GQM.SHARED_MEMORY_NAME_DIR)
            self.smo_dat = sm.SharedMemory(name=GQM.SHARED_MEMORY_NAME_DAT)
        # This is the current byte position in the DAT buffer to write the 
        #   next item to
        self.dat_pos = 0
        # This is the current row numbre in the DIR buffer to write the 
        #   next look up info to
        self.dir_pos = 0
        # This is the index of the item being put in so that consumers can
        #   determine if they have missed any data
        self.index = 0
    
    def close(self):
        '''
        Call this function from the provider to close out the buffers
        DO NOT call this function before all consumers are finished
        
        Consumers do not need to do anything when finished
        '''
        # Clean up the buffers by unlinking
        # https://docs.python.org/3/library/multiprocessing.shared_memory.html
        if not self.provider:
            # Only the provider should call the close function
            raise AssertionError('This is not a provider! '\
                '- Unlink only from provider!')
        self.smo_dir.unlink()
        self.smo_dat.unlink()
    
    def put(self, obj):
        '''
        Add data to the buffer
        
        Returns
            index of the item added
            a UUID for the item added
        '''
        # Only providers should call put
        if not self.provider:
            raise AssertionError('Put called from consumer!')
        
        # Pickle the object
        #   If it fails, it may not be picklable, see error message below
        try:
            objp = pickle.dumps(obj)
        except TypeError:
            # Capture this error to help developers understand how to fix it
            raise TypeError('Most likely this error was thrown because an '\
                'object which cannot be pickled was passed as a message. '\
                '\n\tFor custom classes, add a __reduce__ method.' \
                '\n\tSee https://stackoverflow.com/questions/19855156'
            )
        except:
            # If something else happened raise the error as is
            raise
        
        # Calculate the amount of space necessary in the DAT buffer
        space_req = GQM.UUID_LEN + len(objp)
        # Check if there is that much left
        #   If not, reset to the beginning of the buffer
        #   Relies on the UUID's to stop improper reads
        #   This makes it a circular buffer
        if self.dat_pos + space_req > GQM.BUFFER_SIZE_DAT:
            self.dat_pos = 0
        # The bytes of the block to write the data to
        start = self.dat_pos
        stop = self.dat_pos + space_req
        # Generate a new UUID to use for this entry so that it can be 
        #   uniquely identified
        uuid = getuuid()
        
        # Put the data into the DAT buffer first
        self.smo_dat.buf[start:stop] = struct.pack(
            f'<{space_req}s',
            uuid + objp
        )
        
        # Next put the data into the DIR buffer
        self._put_dir_row(self.dir_pos, uuid, self.index, start, stop)
        # Lastly update the current entry marker for the DIR buffer
        self._put_dir_pos(self.dir_pos)
        
        # Increment the index to track how many items have been put into the
        #   buffer
        self.index += 1
        # Increment the dir_pos for the next call and wrap on the total length
        #   to fulfill circular buffer property
        self.dir_pos += 1
        self.dir_pos %= GQM.DIR_POS_COUNT
        # Record the next position to start writing to the DAT buffer
        self.dat_pos = stop
        # Return the index and UUID in case they are to be used for something
        return self.index, uuid
    
    def get(self, verbose=False, pos_override=None):
        '''
        Retrieves the latest data from the buffer
        To check if data was skipped or is repeated, check index and uuid
        
        Returns
            The data object
            index of this entry (if verbose)
            uuid of this entry (if verbose)
        
        '''
        # Allow a pos override for testing or advanced usage
        if pos_override is None:
            pos = self._get_dir_pos()
        else:
            # Could add some logic for negatives to allow retrieving previous
            #   entries blindly without checking positions and indexes, etc
            pos = pos_override
        
        # Retrieve the information from the DIR buffer
        uuid_dir, index, start, end = self._get_dir_row(pos)
        # Retrieve the data from the DAT buffer
        uuid_dat, pickledata = struct.unpack(
            f'<{GQM.UUID_LEN}s{end - start - GQM.UUID_LEN}s',
            self.smo_dat.buf[start: end]
        )
        # Verify that the UUID's match
        #   If they don't match the DAT could have been updated while we were
        #   reading the DIR buffer 
        #   It should be possible to resolve by simply recalling get
        # If this triggers often, the DAT buffer may be too small
        if not uuid_dat == uuid_dir:
            raise AssertionError('Failed to retrieve matching UUID from data')
        # Read the object back
        outobject = pickle.loads(pickledata)
        # Return the object (and index and UUID if requested)
        if verbose:
            return outobject, index, uuid_dat
        return outobject
    
    def _get_indices_dir(self, pos):
        # Get the indices of the DIR buffer entry
        #   The position indicator is first, and then pos strides is the start
        #   of that entry in the buffer
        start = GQM.DIR_POS_LEN + GQM.DIR_STRIDE * pos
        # Because the DIR buffer is fixed strides, we can just add the stride
        #   to the start to get the end
        return start, start + GQM.DIR_STRIDE
    
    def _put_dir_row(self, pos, uuid, index, start, stop):
        # Put data into a row in the DIR buffer (reverse of _get_dir_row)
        # Get the start and stop positions
        startdir, stopdir = self._get_indices_dir(pos)
        # Put the data in using the pre-generated fmt string
        self.smo_dir.buf[startdir: stopdir] = struct.pack(
            GQM.DIR_ROW_FMT,
            uuid,
            index,
            start,
            stop
        )
    
    def _get_dir_row(self, pos=None):
        # Get a row from the DIR buffer (reverse of _put_dir_row)
        # Allow position overrides for testing or advanced usage
        if pos is None:
            pos = self._get_dir_pos()
        # Get the start and stop positions in the DIR buffer
        start, stop = self._get_indices_dir(pos)
        # Retrieve and unpack the data using the pre-generated fmt string
        uuid, index, startdat, stopdat = struct.unpack(
            GQM.DIR_ROW_FMT,
            self.smo_dir.buf[start: stop]
        )
        # Return all the data collectd
        return uuid, index, startdat, stopdat
    
    def _put_dir_pos(self, pos):
        # Update the current DIR buffer position (reverse of _get_dir_pos)
        self.smo_dir.buf[0: GQM.DIR_POS_LEN] = struct.pack(
            f'<{GQM.DIR_POS_TYPE}',
            pos
        )
    
    def _get_dir_pos(self):
        # Retrieve teh current DIR buffer position (reverse of _put_dir_pos)
        next_pos = struct.unpack(
            f'<{GQM.DIR_POS_TYPE}',
            self.smo_dir.buf[0: GQM.DIR_POS_LEN]
        )
        # Struct unpack returns a tuple even for a single item in the fmt str
        return next_pos[0]


def gt(s=0):
    # A convenience function for returning times for rough performance calc
    return time.perf_counter() * 1e6 - s

def _demo():
    gqmp = GQM(True)
    gqmp2 = GQM()
    
    # A variety of data types to demonstrate that it can recover the object
    test_data = [
        1,
        'abc',
        b'/x00/x01',
        {'abc': 10},
        list(range(10)),
        np.array(range(100, 1100), int),
        [{'a': [{'b': {'c': [1, 'a',]}}]}]
    ]
    
    for test_point in test_data:
        gqmp.put(test_point)
        x = gqmp2.get()
        assert(pickle.dumps(x) == pickle.dumps(test_point))
    
    print('Example get: ', gqmp2.get(verbose=True))
    
    gqmp.put(np.array(range(1000), int))
    n = int(1e5)
    s = gt()
    for i in range(n):
        gqmp2.get()
    unit_time = gt(s) / n
    print(f'get time: {unit_time:.2f} u-sec')
    # Approx 4.8 u-sec per read
    
    gqmp.close()

def _test():
    
    # Check if u-sec write/read is even feasible
    n = int(1e5)
    mydata = np.array(range(100), int)
    s = gt()
    for i in range(n):
        mypick = pickle.dumps(mydata)
        mynewdata = pickle.loads(mypick)
    unit_time = gt(s) / n # ~8.1
    print(f'Pickle/Unpickle time: {unit_time:.2f} u-sec')
    # Not really , approx 8.1 u-sec per write-read
    
    # Check if u-sec read only is feasible
    n = int(1e5)
    mydata = np.array(range(100), int)
    mypick = pickle.dumps(mydata)
    s = gt()
    for i in range(n):
        mynewdata = pickle.loads(mypick)
    unit_time = gt(s) / n # ~2.5
    print(f'Unpickle time:        {unit_time:.2f} u-sec')
    # Maybe , approx 2.5 u-sec per read

if __name__ == '__main__':
    if sys.argv[1] == 'test':
        _test()
    elif sys.argv[1] == 'demo':
        _demo()

这是一个小图表,简要显示了内存是如何组织的。记忆是如何组织的

评论

0赞 GuyQian 11/20/2023
感谢您提供的方法,但经过测试,它仍然无法满足我的要求。当我的列表是 [QPointF(i, 100) for i in range(0, 1000)] 时,测试时间超过 2 毫秒。当列表为 [QPointF(i, 100) for i in range(0, 10000)] 时,测试时间超过 20 毫秒。腌制过程中需要太多时间。
0赞 BitsAreNumbersToo 11/30/2023
有没有办法传递QPointF的值,而不是要求两端都有整个对象?如果你真的需要它,你必须要么接受重建对象,要么你可能在编写自己的 C 扩展的领域,尽管我自己对它不够熟悉,无法提供任何代码。