如何从ray中的对象存储中清除对象?

How to clear objects from the object store in ray?

提问人:Stefan 提问时间:4/1/2021 最后编辑:Stefan 更新时间:4/3/2021 访问量:5285

问:

我正在尝试有前途的多处理包。我有一个似乎无法解决的问题。我的程序第一次运行良好,但在第二次运行时,该异常在行上引发:rayray.put()

ObjectStoreFullError: Failed to put object ffffffffffffffffffffffffffffffffffffffff010000000c000000 in object store because it is full. Object size is 2151680255 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.

我想做什么:在我的实际代码中(我打算编写),我需要按顺序处理许多代码。我想一次在内存中保存一个,并在大数据对象上执行几个繁重的(独立)计算。我想并行执行这些计算。完成这些操作后,我必须将对象存储中的这些替换为新的,然后再次(并行)开始计算。big_data_objectsbig_data_objectbig_data_object

使用我的测试脚本,我通过在没有 .如果我使用对象存储关闭被清除,但重新初始化需要很长时间,并且我无法按我想要的顺序处理多个。ray.shutdown()rayray.shutdown()big_data_object

我研究了哪些信息来源:我研究了这篇文档 Ray Design Patterns 并研究了“Antipattern: Closure capture of large / unserializable object”一节,以及正确的模式应该是什么样子。我还研究了入门指南,这导致了以下测试脚本。

重现该问题的最小示例:我创建了一个测试脚本来测试这一点。它是这样的:

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus, include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x

# Define large data
big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
# big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))


#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
for index in range(4):
    del result_refs[0]

del big_data_object_ref

我认为哪里出了问题:我想我删除了脚本末尾对对象存储的所有引用。因此,应从对象存储中清除对象(如此所述)。显然,有些地方出了问题,因为对象存储中仍然存在。但是,从对象存储中删除它们就好了。big_data_objectresults

一些调试信息:我使用命令检查了对象存储,这是我得到的:ray memory

(c:\python\cenv38rl) PS C:\WINDOWS\system32> ray memory
---------------------------------------------------------------------------------------------------------------------
 Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=20952
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=29368
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=17388
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=24208
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=27684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; worker pid=6860
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\serialization.py:object_ref_deserializer:45 | c:\python\cenv38rl\lib\site-packages\ray\function_manager.py:fetch_and_register_remote_function:180 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_process_key:140 | c:\python\cenv38rl\lib\site-packages\ray\import_thread.py:_run:87
; driver pid=28684
ffffffffffffffffffffffffffffffffffffffff010000000b000000  LOCAL_REFERENCE       2151680261   c:\python\cenv38rl\lib\site-packages\ray\worker.py:put_object:277 | c:\python\cenv38rl\lib\site-packages\ray\worker.py:put:1489 | c:\python\cenv38rl\lib\site-packages\ray\_private\client_mode_hook.py:wrapper:47 | C:\Users\Stefan\Documents\Python examples\Multiprocess_Ray3_SO.py:<module>:42
---------------------------------------------------------------------------------------------------------------------

--- Aggregate object store stats across all nodes ---
Plasma memory usage 2052 MiB, 1 objects, 77.41% full

我尝试过的一些事情:如果,我替换为:my_function

@ray.remote
def my_function(x):  # Later I will have multiple different my_functions to extract separate feature from my big_data_objects
    time.sleep(1)
    # data_item = ray.get(big_data_object_ref)
    # return data_item[0,0]+x
    return 5

然后脚本成功清除了对象存储,但无法使用我需要它。my_functionbig_data_object

我的问题是:如何修复我的代码,以便在不关闭光线的情况下从脚本末尾的对象存储中删除big_data_object

注意:我使用 pip install ray 安装了 ray,它给了我现在使用的 ray==1.2.0 版本。我在 Windows 上使用 ray,并在 conda(实际上是 miniconda)环境中在 Spyder v4.2.5 中开发,以防相关。

编辑:我也在具有 8GB RAM 的 Ubuntu 机器上进行了测试。为此,我使用了 1GB。 我可以确认这台机器上也出现了这个问题。big_data_object

输出:ray memory

(SO_ray) stefan@stefan-HP-ZBook-15:~/Documents/Ray_test_scripts$ ray memory 
---------------------------------------------------------------------------------------------------------------------
 Object ID                                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================================
; worker pid=18593
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18591
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; worker pid=18590
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
; driver pid=17712
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   (put object)  | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:wrapper:47 | /home/stefan/Documents/Ray_test_scripts/Multiprocess_Ray3_SO.py:<module>:43 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/spyder_kernels/customize/spydercustomize.py:exec_code:453
; worker pid=18592
ffffffffffffffffffffffffffffffffffffffff0100000001000000  LOCAL_REFERENCE       1076480259   /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/function_manager.py:fetch_and_register_remote_function:180 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_process_key:140 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/site-packages/ray/import_thread.py:_run:87 | /home/stefan/miniconda3/envs/SO_ray/lib/python3.8/threading.py:run:870
---------------------------------------------------------------------------------------------------------------------

--- Aggregate object store stats across all nodes ---
Plasma memory usage 1026 MiB, 1 objects, 99.69% full

我必须运行程序,以便在执行程序后可以使用 .例如,如果我运行程序,则在脚本完成时自动终止,因此我无法检查我的脚本是否按预期清除对象存储。Spyderray memoryPyCharmray

python-3.x windows 射线

评论

1赞 Sang 4/1/2021
嗯,这很有趣。你介意做两件事吗?1.您可以尝试使用Mac或Linux机器吗?这只是为了弄清楚问题是否是由于一些微妙的 Windows 错误 2.另外,您介意将其重新发布到我们的 Github 主页吗?我很想尝试重现它并在这里看到问题;github.com/ray-project/ray/issues
0赞 Stefan 4/1/2021
@Sang,我在你的GitHub页面上发布了相同的内容:github.com/ray-project/ray/issues/15058

答:

4赞 Sang 4/2/2021 #1

问题是您的远程函数捕获了 ,并且永远不会删除来自那里的引用。请注意,当您执行此类操作时:big_data_object_ref

# Define function to do work in parallel
@ray.remote
def my_function(x):  # Later I will have multiple (different) my_functions to extract different feature from my big_data_object
    time.sleep(1)
    data_item = ray.get(big_data_object_ref)
    return data_item[0,0]+x

# Define large data
big_data_object = np.random.rand(16400,16400)
big_data_object_ref = ray.put(big_data_object)

big_data_object_ref序列化为远程函数定义。因此,在删除此序列化函数定义(位于内部)之前,有一个永久指针。ray

请改用以下类型的模式:

#%% Imports
import ray
import time
import psutil
import numpy as np


#%% Testing ray
# Start Ray
num_cpus = psutil.cpu_count(logical=False)
if not ray.is_initialized():
    ray.init(num_cpus=num_cpus, include_dashboard=False)

# Define function to do work in parallel
@ray.remote
def my_function(big_data_object, x):
    time.sleep(1)
    return big_data_object[0,0]+x

# Define large data
#big_data_object = np.random.rand(16400,16400)  # Define an object of approx 2 GB. Works on my machine (16 GB RAM)
# big_data_object = np.random.rand(11600,11600)  # Define an object of approx 1 GB.
big_data_object = np.random.rand(8100,8100)  # Define an object of approx 500 MB.
# big_data_object = np.random.rand(5000,5000)  # Define an object of approx 190 MB.
big_data_object_ref = ray.put(big_data_object)
print("ref in a driver ", big_data_object_ref)

# Start 4 tasks in parallel.
result_refs = []
# for item in data:
for item in range(4):
    result_refs.append(my_function.remote(big_data_object_ref, item))
    
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_refs)
print("Results: {}".format(results))
print(result_refs)

#%% Clean-up object store data - Still their is a (huge) memory leak in the object store.
#for index in range(4):
#    del result_refs[0]
del result_refs

del big_data_object_ref
import time
time.sleep(1000)

不同之处在于,现在我们作为参数传递给远程函数,而不是在远程函数中捕获它。big_data_object_ref

注意:当对象引用传递到远程函数时,它们会自动取消引用。所以没有必要在远程功能中使用。如果要在远程函数内部显式调用,请将列表或字典中的对象引用作为参数传递给远程函数。在本例中,你会得到如下内容:ray.get()ray.get()

# Remote function
@ray.remote
def my_function(big_data_object_ref_list, x):
    time.sleep(1)
    big_data_object = ray.get(big_data_object_ref_list[0])
    return big_data_object[0,0]+x

# Calling the remote function
my_function.remote([big_data_object_ref], item)

注2:您使用 IPython 控制台。现在在控制台和控制台之间存在一些已知问题。只需确保删除脚本中的引用,而不是使用直接输入到控制台的命令(因为这样引用将被删除,但项目不会从对象存储中删除)。若要在脚本运行时使用命令检查对象存储,可以在脚本末尾使用一些代码,例如:SpyderrayIPythonIPythonray memory

#%% Testing ray
# ... my ray testing code

#%% Clean-up object store data
print("Wait 10 sec BEFORE deletion")
time.sleep(10)  # Now quickly use the 'ray memory' command to inspect the contents of the object store.

del result_refs
del big_data_object_ref

print("Wait 10 sec AFTER deletion")
time.sleep(10)  # Now again use the 'ray memory' command to inspect the contents of the object store.

评论

0赞 Sang 4/2/2021
当然@MarcoBonelli!不知道这个。谢谢你让我知道:)。我修改了我的答案。
0赞 Stefan 4/2/2021
我修改了答案(现在仍然需要批准),以包括我们在 GitHub 上讨论的所有相关细节。这些详细信息可以帮助其他用户更快地学习 Ray。