提问人:Quasi 提问时间:11/17/2023 最后编辑:Quasi 更新时间:11/19/2023 访问量:67
Python 从其内存地址中检索数据
Python retrieving data from its memory address
问:
我目前有一个非常大的数据集(~15 GB),应该由多个 python 脚本同时使用。
加载数据集的单个元素花费的时间太长,因此整个数据集必须在内存中。
我目前最好的方法是使用一个加载了所有元素的 python 进程,并让其他 python 脚本通过与 localhost 的套接字连接请求该元素。
但是,我又遇到了必须对通过套接字发送的数据进行编码和解码的问题。所以我的下一个最佳想法如下,但是,我不知道这是否可行:
- 让脚本向具有数据的进程发送一个请求,其中包含它们希望通过套接字使用的数据集中数据点的 indeces
- 让数据进程返回数据点的内存地址
- 让脚本从那里加载元素
那么问题来了,这是否可行?
此致敬意
答:
0赞
BitsAreNumbersToo
11/19/2023
#1
我们在这里的细节有点稀疏,但这里有一个符合您要求的解决方案。我假设您的数据近似于表格(如 CSV 或 pandas。DataFrame),并且每个字段中的数据类型一致。
要使用它,
- 更新 、 、 和 信息以匹配您的数据
DATA_FIELDS
STRUCT_FMT_STR
ENCODING
STR_WIDTH
- update 提供程序以加载您的数据,而不是我创建的玩具数据
- 运行 provider 并等待它标记为“就绪”
- 通过使用 consumer 参数重新执行脚本,启动任意数量的使用者。
如果要进行大量测试(例如编辑脚本、运行脚本、检查结果、编辑重新运行检查等),则可以启动提供程序,然后与使用者重复运行测试,而无需重新加载数据,因为只要提供程序还活着,使用者就可以反复启动和死亡。
如果您有很多字符串和/或字符串可能很长,这可能会浪费大量空间,您可能需要考虑单独存储字符串以节省空间。
如果您有任何问题,请告诉我,我尝试发表很多评论来帮助解释逻辑,希望它有所帮助。
# Default libraries
import multiprocessing.shared_memory as sm
import sys
import uuid
import struct
# PyPi libraries
import numpy as np
import pandas as pd # Used only for convenience
# How wide can your strings get?
STR_WIDTH = 1024
# Data set size used only for generating fake data
DATA_SET_SIZE = int(1e6)
# A unique name that other processes can find the memory by
SHARED_MEMORY_NAME = 'SPECIAL_NAME_FOR_QUASI_77500330'
# Some example data, update yours to match the data you want to process
DATA_FIELDS = [
('id', str),
('num1', int),
('num2', float),
('num3', float),
('num4', float)
]
# A format string which matches the data above
STRUCT_FMT_STR = f'<{STR_WIDTH}sqddd'
# Calculate the stride
STRUCT_SIZE = struct.calcsize(STRUCT_FMT_STR)
# Will store the total number of samples for convenience
MEM_SIZE_FMT_STR = '<q'
MEM_SIZE_SIZE = struct.calcsize(MEM_SIZE_FMT_STR)
# What encoding to store strings in
# (if you change this, update struct format string too)
ENCODING = 'UTF-8'
# The path to your dataset or whatever, I used CSV for convenience
FPATH = '77500330_tmp.csv'
# This function puts data into the shared memory buffer
def put(sm, pos, *argc):
# Strings have to be stored as bytes strings, so convert those
outargs = [
bytes(x, encoding=ENCODING) if type(x) is str else x for x in argc
]
# Slot the data into the buffer
posa = MEM_SIZE_SIZE + pos * STRUCT_SIZE
posb = MEM_SIZE_SIZE + (pos + 1) * STRUCT_SIZE
sm.buf[posa: posb] = struct.pack(STRUCT_FMT_STR, *outargs)
# This function gets data from the shared memory buffer
def get(sm, pos):
# Retrieve the data from the buffer
posa = MEM_SIZE_SIZE + pos * STRUCT_SIZE
posb = MEM_SIZE_SIZE + (pos + 1) * STRUCT_SIZE
res = struct.unpack(STRUCT_FMT_STR, sm.buf[posa: posb])
# Strings from bytes (see put)
outval = [
str(x.rstrip(b'\x00'), encoding=ENCODING) if type(x) is bytes else x
for x in res
]
return outval
# This function stores the number of samples in the buffer
# makes it easier for consumers to know how much there is
def putsize(sm, total_samples):
size = struct.pack(MEM_SIZE_FMT_STR, total_samples)
sm.buf[0: MEM_SIZE_SIZE] = size
# This function retrieves the number of samples in the buffer
def getsize(sm):
size = struct.unpack(MEM_SIZE_FMT_STR, sm.buf[0:struct.calcsize('<q')])
return size[0]
# This is a function that makes data for my testing
# you don't need it after you learn to get the provider/consumer working
def makedata():
print('makedata')
rng = np.random.default_rng(seed=42)
id = [str(uuid.uuid4()) for i in range(DATA_SET_SIZE)]
num1 = rng.integers(0, 1e3, size=DATA_SET_SIZE)
num2 = rng.random(size=DATA_SET_SIZE)
num3 = rng.random(size=DATA_SET_SIZE)
num4 = rng.random(size=DATA_SET_SIZE)
data = pd.DataFrame(
{'id': id, 'num1': num1, 'num2': num2, 'num3': num3, 'num4': num4},
columns=[x[0] for x in DATA_FIELDS])
data = data.astype({x[0]: x[1] for x in DATA_FIELDS})
data.to_csv(FPATH)
# This function acts as the provider for the data
# It must live as long as any consumer still needs data
# Consumers can be started at any point after this reports 'ready'
def provider():
print('provider started')
# Replace this line with however you read your data
data = pd.read_csv(FPATH, index_col=0)
# This calculates the total size of the memory buffer to allocate
total_size = MEM_SIZE_SIZE + len(data) * STRUCT_SIZE
# Create the shared memory buffer
shared_memory_block = sm.SharedMemory(name=SHARED_MEMORY_NAME,
create=True, size=total_size)
# Store the number of samples in the buffer
putsize(shared_memory_block, len(data))
# Store the data in the buffer
for i in range(len(data)):
put(shared_memory_block, i, *data.loc[i].values)
# Wait
print('ready - press ctrl-c to stop')
try:
input()
except KeyboardInterrupt:
pass
except:
raise
# When all processes are done, unlink the memory
print('exiting ...')
shared_memory_block.unlink()
print('done!')
# A testing block to demonstrate that data gets in and out
# All three lines should be exactly the same
if True:
smb2 = sm.SharedMemory(name=SHARED_MEMORY_NAME)
print(list(data.loc[8].values))
print(get(shared_memory_block, 8))
print(get(smb2, 8))
# This function acts as the consumer of the data
# Many of these consumers can exist at the same time
# Consumers can also be started and stopped many times for a single provider
def consumer():
print('consumer started')
# Link to the shared memory block created by the producer
# If this line fails, check that the producer is in the 'ready' state
shared_memory_block = sm.SharedMemory(name=SHARED_MEMORY_NAME)
# Find out how many samples are available in the buffer
total_samples = getsize(shared_memory_block)
# Iterate through them, doing whatever it is that you need to do
for i in range(total_samples):
# Simply call 'get' on each index to retrieve the data
mydata = get(shared_memory_block, i)
print(mydata)
if __name__ == '__main__':
if sys.argv[1] == 'provider':
provider()
if sys.argv[1] == 'consumer':
consumer()
if sys.argv[1] == 'makedata':
makedata()
评论