Python 从其内存地址中检索数据

Python retrieving data from its memory address

提问人:Quasi 提问时间:11/17/2023 最后编辑:Quasi 更新时间:11/19/2023 访问量:67

问:

我目前有一个非常大的数据集(~15 GB),应该由多个 python 脚本同时使用。

加载数据集的单个元素花费的时间太长,因此整个数据集必须在内存中。

我目前最好的方法是使用一个加载了所有元素的 python 进程,并让其他 python 脚本通过与 localhost 的套接字连接请求该元素。

但是,我又遇到了必须对通过套接字发送的数据进行编码和解码的问题。所以我的下一个最佳想法如下,但是,我不知道这是否可行:

  1. 让脚本向具有数据的进程发送一个请求,其中包含它们希望通过套接字使用的数据集中数据点的 indeces
  2. 让数据进程返回数据点的内存地址
  3. 让脚本从那里加载元素

那么问题来了,这是否可行?

此致敬意

python-3.x 内存

评论

0赞 Abdul Aziz Barkat 11/17/2023
那你的问题是什么?如何获取数据的内存地址?请注意,尽管这可能是可能的,但它很有可能有问题、不安全等,与其处理所有这些问题,不如按照您通信的任何方式发送数据。您可能还应该考虑优化数据访问,而不是创建这样的解决方法。
1赞 juanpa.arrivillaga 11/17/2023
进程通常只能访问不在进程之间共享的虚拟地址。您必须使用显式共享内存,但您需要提供有关您尝试执行的操作的更多详细信息
0赞 Quasi 11/17/2023
好吧,我认为我不能比加载 tensorflow 数据集更优化。具体用例是让不同的训练脚本访问同一个数据集。但是,培训应该能够相互独立地开始。所以我不能把所有东西都放在一个脚本里。

答:

0赞 BitsAreNumbersToo 11/19/2023 #1

我们在这里的细节有点稀疏,但这里有一个符合您要求的解决方案。我假设您的数据近似于表格(如 CSV 或 pandas。DataFrame),并且每个字段中的数据类型一致。

要使用它,

  • 更新 、 、 和 信息以匹配您的数据DATA_FIELDSSTRUCT_FMT_STRENCODINGSTR_WIDTH
  • update 提供程序以加载您的数据,而不是我创建的玩具数据
  • 运行 provider 并等待它标记为“就绪”
  • 通过使用 consumer 参数重新执行脚本,启动任意数量的使用者。

如果要进行大量测试(例如编辑脚本、运行脚本、检查结果、编辑重新运行检查等),则可以启动提供程序,然后与使用者重复运行测试,而无需重新加载数据,因为只要提供程序还活着,使用者就可以反复启动和死亡。

如果您有很多字符串和/或字符串可能很长,这可能会浪费大量空间,您可能需要考虑单独存储字符串以节省空间。

如果您有任何问题,请告诉我,我尝试发表很多评论来帮助解释逻辑,希望它有所帮助。

# Default libraries
import multiprocessing.shared_memory as sm
import sys
import uuid
import struct

# PyPi libraries
import numpy as np
import pandas as pd # Used only for convenience

# How wide can your strings get?
STR_WIDTH = 1024
# Data set size used only for generating fake data
DATA_SET_SIZE = int(1e6)

# A unique name that other processes can find the memory by
SHARED_MEMORY_NAME = 'SPECIAL_NAME_FOR_QUASI_77500330'
# Some example data, update yours to match the data you want to process
DATA_FIELDS = [
    ('id', str),
    ('num1', int),
    ('num2', float),
    ('num3', float),
    ('num4', float)
]
# A format string which matches the data above
STRUCT_FMT_STR = f'<{STR_WIDTH}sqddd'
# Calculate the stride
STRUCT_SIZE = struct.calcsize(STRUCT_FMT_STR)
# Will store the total number of samples for convenience
MEM_SIZE_FMT_STR = '<q'
MEM_SIZE_SIZE = struct.calcsize(MEM_SIZE_FMT_STR)
# What encoding to store strings in
#   (if you change this, update struct format string too)
ENCODING = 'UTF-8'
# The path to your dataset or whatever, I used CSV for convenience
FPATH = '77500330_tmp.csv'

# This function puts data into the shared memory buffer
def put(sm, pos, *argc):
    # Strings have to be stored as bytes strings, so convert those
    outargs = [
        bytes(x, encoding=ENCODING) if type(x) is str else x for x in argc
    ]
    
    # Slot the data into the buffer
    posa = MEM_SIZE_SIZE + pos * STRUCT_SIZE
    posb = MEM_SIZE_SIZE + (pos + 1) * STRUCT_SIZE
    sm.buf[posa: posb] = struct.pack(STRUCT_FMT_STR, *outargs)

# This function gets data from the shared memory buffer
def get(sm, pos):
    # Retrieve the data from the buffer
    posa = MEM_SIZE_SIZE + pos * STRUCT_SIZE
    posb = MEM_SIZE_SIZE + (pos + 1) * STRUCT_SIZE
    res = struct.unpack(STRUCT_FMT_STR, sm.buf[posa: posb])
    # Strings from bytes (see put)
    outval = [
        str(x.rstrip(b'\x00'), encoding=ENCODING) if type(x) is bytes else x
        for x in res
    ]
    return outval

# This function stores the number of samples in the buffer
#   makes it easier for consumers to know how much there is
def putsize(sm, total_samples):
    size = struct.pack(MEM_SIZE_FMT_STR, total_samples)
    sm.buf[0: MEM_SIZE_SIZE] = size

# This function retrieves the number of samples in the buffer
def getsize(sm):
    size = struct.unpack(MEM_SIZE_FMT_STR, sm.buf[0:struct.calcsize('<q')])
    return size[0]

# This is a function that makes data for my testing
#   you don't need it after you learn to get the provider/consumer working
def makedata():
    print('makedata')
    
    rng = np.random.default_rng(seed=42)
    
    id = [str(uuid.uuid4()) for i in range(DATA_SET_SIZE)]
    num1 = rng.integers(0, 1e3, size=DATA_SET_SIZE)
    num2 = rng.random(size=DATA_SET_SIZE)
    num3 = rng.random(size=DATA_SET_SIZE)
    num4 = rng.random(size=DATA_SET_SIZE)
    
    data = pd.DataFrame(
        {'id': id, 'num1': num1, 'num2': num2, 'num3': num3, 'num4': num4},
        columns=[x[0] for x in DATA_FIELDS])
    data = data.astype({x[0]: x[1] for x in DATA_FIELDS})
    
    data.to_csv(FPATH)

# This function acts as the provider for the data
#   It must live as long as any consumer still needs data
#   Consumers can be started at any point after this reports 'ready'
def provider():
    print('provider started')
    # Replace this line with however you read your data
    data = pd.read_csv(FPATH, index_col=0)
    # This calculates the total size of the memory buffer to allocate
    total_size = MEM_SIZE_SIZE + len(data) * STRUCT_SIZE
    # Create the shared memory buffer
    shared_memory_block = sm.SharedMemory(name=SHARED_MEMORY_NAME,
        create=True, size=total_size)
    # Store the number of samples in the buffer
    putsize(shared_memory_block, len(data))
    # Store the data in the buffer
    for i in range(len(data)):
        put(shared_memory_block, i, *data.loc[i].values)
    # Wait
    print('ready - press ctrl-c to stop')
    try:
        input()
    except KeyboardInterrupt:
        pass
    except:
        raise
    # When all processes are done, unlink the memory
    print('exiting ...')
    shared_memory_block.unlink()
    print('done!')
    # A testing block to demonstrate that data gets in and out
    #   All three lines should be exactly the same
    if True:
        smb2 = sm.SharedMemory(name=SHARED_MEMORY_NAME)
        print(list(data.loc[8].values))
        print(get(shared_memory_block, 8))
        print(get(smb2, 8))

# This function acts as the consumer of the data
#   Many of these consumers can exist at the same time
#   Consumers can also be started and stopped many times for a single provider
def consumer():
    print('consumer started')
    # Link to the shared memory block created by the producer
    #   If this line fails, check that the producer is in the 'ready' state
    shared_memory_block = sm.SharedMemory(name=SHARED_MEMORY_NAME)
    # Find out how many samples are available in the buffer
    total_samples = getsize(shared_memory_block)
    # Iterate through them, doing whatever it is that you need to do
    for i in range(total_samples):
        # Simply call 'get' on each index to retrieve the data
        mydata = get(shared_memory_block, i)
        print(mydata)

if __name__ == '__main__':
    if sys.argv[1] == 'provider':
        provider()
    if sys.argv[1] == 'consumer':
        consumer()
    if sys.argv[1] == 'makedata':
        makedata()