提问人:Will 提问时间:11/16/2023 更新时间:11/16/2023 访问量:26
如何同步返回未知序列的异步进程的结果?
How can I synchronously return the result of an asynchronous process of unknown sequence?
问:
考虑一个昂贵的计算过程,无论它处理多少数据,它都可以很好地扩展,它需要在交互式 MATLAB 环境中提供,该环境可以随时生成新数据。如果新数据在进程的实例已经在进行中时到达,我想放弃该过程并一起重新启动所有现有数据,以便尽早获得迄今为止所有已知数据的结果。
下面的最小示例类重现了这种行为:它有一个向存储数组添加元素的方法和一个主要工作方法,该方法只是在第一次执行缓慢操作后将所有存储的数据相乘。工作方法是异步调用的,因此可以在不影响最终输出结果的情况下引导任何未完成的调用。2
DoWork
parfeval
cancel
classdef ExampleClass < handle
properties
InputData
OutputData
end
properties (Hidden, Transient)
WorkFuture parallel.Future
end
methods
function obj = ExampleClass()
uibutton(uifigure, ButtonPushedFcn = @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
end
function output = DoWork(obj)
pause(2);
drawnow
output = obj.InputData * 2;
end
function val = StoreOutput(obj,val)
obj.OutputData = val;
end
function F = AddData(obj,val)
obj.WorkFuture.cancel;
obj.InputData(end+1) = val;
F = parfeval(parallel.Pool.empty,@obj.DoWork,1).afterAll(@obj.StoreOutput,1);
obj.WorkFuture = F;
end
end
end
实例化该类会创建一个带有按钮的按钮,用于将新数据(鼠标 X 坐标)馈送到对象中。连续单击几次将始终导致属性中存储了如此多的元素,并在最后一次单击后的 2 秒内存储了相同数量的输出。h = ExampleClass;
uifigure
InputData
OutputData
现在假设有一个同步函数需要此过程的结果,其中包括输入值的输出。如果它是进程的唯一客户端,则可以通过调用以下命令来获取此结果:n
result = h.AddData(n).fetchOutputs;
这将根据需要返回。但是,如果在工作仍在完成时单击该按钮以添加更多输入数据,则返回的 FROM 将被取消,因此结果为错误:Future
AddData
Error using parallel.Future/fetchOutputs
One or more futures resulted in an error.
Caused by:
Execution of the future was cancelled.
错误发生后不久,包含输入输出的所需数据最终仍存储在 中,但我缺少一种方法来获取有关要等待的内容或在哪里找到串行进程信息的信息。任何允许将责任“接力棒”传递给另一个人而不是简单地停止有效的解决方案,或者允许串行代码等待事件的通知而不是完成,似乎都可以使这变得可行。n
h
Future
Future
这样的事情可能吗?
答:
有没有办法定义一个具有相同预期效果的同步调用(触发异步进程并最终在该进程完成时返回一个值),该调用可以容忍取消它触发的初始异步进程?
这将通过确保同步调用等待最新异步进程的完成来实现,即使它启动的原始进程已被取消。
这可以通过维护对最新 Future
对象的引用并使用循环等待计算完成来实现:ExampleClass
classdef ExampleClass < handle
properties
InputData
OutputData
LatestFuture parallel.Future
end
methods
function obj = ExampleClass()
uibutton(uifigure, 'ButtonPushedFcn', @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
obj.LatestFuture = parallel.Future;
end
function output = DoWork(obj)
pause(2); % Simulate a slow process
output = obj.InputData * 2;
end
function StoreOutput(obj, val)
obj.OutputData = val;
end
function F = AddData(obj, val)
if ~isempty(obj.WorkFuture) && isvalid(obj.WorkFuture)
cancel(obj.WorkFuture);
end
obj.InputData(end+1) = val;
F = parfeval(@obj.DoWork, 1);
obj.WorkFuture = F;
obj.LatestFuture = F;
end
function output = SyncGetData(obj, val)
F = obj.AddData(val);
while ~strcmp(F.State, 'finished')
pause(0.1); % Wait for the latest future to finish
if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
F = obj.LatestFuture;
end
end
output = fetchOutputs(F);
end
end
end
该属性跟踪最近的异步计算。该方法被修改为在开始新计算时更新。新方法会等待最新计算的结果,即使原始方法被取消也是如此。LatestFuture
AddData
LatestFuture
SyncGetData
+----------------------+
| SyncGetData |
| +------------------+ |
| | Call AddData |-----> Starts new async process
| +------------------+ |
| | Wait Loop |-----> Waits for the latest Future
| +------------------+ | to complete
| | Fetch Output |-----> Retrieves result from the latest
| +------------------+ | completed process
+----------------------+
您现在应该使用 .h.AddData(n).fetchOutputs;
result = h.SyncGetData(n);
这看起来应该有效,但在这样做时,会阻止主线程执行暂停调用,这取决于调用与取消完成的比率,这可能会对整体性能产生相当大的影响。
AddData
有没有办法让该行将控制权返回给其他函数,等待在该线程上执行,而不是暂停任意时间长度?
诚然,该函数确实会停止执行,如果有其他任务需要并发运行,这可能并不理想。pause
为了缓解这种情况,您可以使用非阻塞方法,允许 MATLAB 在等待异步进程完成的同时继续处理其他事件和回调。
与其重复检查对象的状态,不如设置一个回调,一旦对象的状态更改为 ,MATLAB 将调用该回调。Future
Future
'finished'
并使用 afterAll
,这是类的一个方法,该方法计划在一组期货(在本例中,只是最新的期货)完成后执行回调。Future
function output = SyncGetData(obj, val)
F = obj.AddData(val);
waitFuture = F.afterAll(@obj.RetrieveOutput);
output = fetchOutputs(waitFuture);
end
function RetrieveOutput(obj)
% Callback function to handle the output
if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
obj.OutputData = fetchOutputs(obj.LatestFuture);
end
end
RetrieveOutput
是处理输出检索的新方法。当 future () 完成时,它会自动调用。LatestFuture
看起来这个更新的解决方案只处理一次返回的取消和替换未来,但不适用于后续取消。
AddData
第一次取消完成分配给 within 的 future 等触发,但随后将获得替换的 future 。
F
SyncGetData
RetrieveOutput
RetrieveOutput
obj.LatestFuture
如果这个未来还没有完成,那么将等待它完成,如果该替换也被取消,则将失败。
fetchOutputs
fetchOutputs
True:问题在于只处理一个级别的取消和替换。如果在提取其输出之前取消并再次替换引用的对象,则可能会导致失败。SyncGetData
Future
LatestFuture
RetrieveOutput
因此,您可能必须持续监视更改的状态并适应更改,而不是依赖单个回调来处理输出检索。实现一种机制,如果它检测到它试图从中获取的未来已被取消和替换,则重新安排自身。并修改以检查尝试获取输出之前的状态:如果被取消,则安排新的回调。LatestFuture
RetrieveOutput
RetrieveOutput
LatestFuture
LatestFuture
function RetrieveOutput(obj)
if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
if strcmp(obj.LatestFuture.State, 'finished')
obj.OutputData = fetchOutputs(obj.LatestFuture);
elseif strcmp(obj.LatestFuture.State, 'failed')
% Reschedule the callback for the new LatestFuture
waitFuture = obj.LatestFuture.afterAll(@obj.RetrieveOutput);
end
end
end
现在,方法检查是否已完成。如果是这样,它将获取输出。
如果被取消,则会为新的 .
这应该确保,如果发生多次取消,系统会适应并等待最新流程的最终完成。
它应该处理对象的多次取消和替换。RetrieveOutput
LatestFuture
LatestFuture
afterAll
LatestFuture
Future
你会有:
+---------------------+
| ExampleClass |
| +-----------------+ |
| | InputData | | <---- New data input
| +-----------------+ |
| | OutputData | | <---- Stored result
| +-----------------+ |
| | WorkFuture |----> Asynchronous process (DoWork)
| +-----------------+ | Cancelled and restarted on new data
| | LatestFuture |----> Tracks latest Future, updated on restart
| +-----------------+ |
| | SyncGetData | | <---- Synchronous call initiates process
| +-----------------+ |
| | RetrieveOutput | | <---- Callback function
| +-----------------+ | - Checks Future status
| | - Fetches output or reschedules
+---------------------+
关键问题是确保正确等待最近的数据处理完成,即使有取消。由于取消存储在属性中,因此我们应该检查这一点,而不是依赖属性。您需要确保不会过早返回,并等待异步任务的正确完成。SyncGetData
Error
Status
SyncGetData
classdef ExampleClass < handle
properties
InputData
OutputData
LatestFuture parallel.Future
end
methods
% (Other methods remain the same)
function output = SyncGetData(obj, val)
obj.AddData(val);
output = obj.WaitForCompletion();
end
function output = WaitForCompletion(obj)
while true
try
% Fetch output if the latest future is finished
if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
output = fetchOutputs(obj.LatestFuture);
break; % Break the loop on successful completion
end
catch e
% Handle the cancellation error
if strcmp(e.identifier, 'parallel:future:CancelledExecution')
continue; % Continue the loop for the new LatestFuture
else
rethrow(e); % Rethrow other errors
end
end
end
end
function RetrieveOutput(obj)
% Adjusted to handle cancellation error
if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
if ~isempty(obj.LatestFuture.Error) && ...
strcmp(obj.LatestFuture.Error.identifier, 'parallel:future:CancelledExecution')
% Reschedule the callback for the new LatestFuture
obj.LatestFuture.afterAll(@obj.RetrieveOutput);
else
% Fetch the output normally
obj.OutputData = fetchOutputs(obj.LatestFuture);
end
end
end
end
end
RetrieveOutput
经过修改,以检查酒店是否取消,并在必要时重新安排。Error
评论
pause
AddData
AddData
F
SyncGetData
RetrieveOutput
RetrieveOutput
obj.LatestFuture
fetchOutputs
fetchOutputs
cancelled
Status
Error
SyncGetData
OutputData
想象一下,您正在处理一项独立工作的任务,但如果出现新数据,则可能会停止。当您使用名为“”的工具启动此任务时,当您尝试获得结果时,可能会遇到打嗝 - 取消错误。因此,为了使事情变得顺利,您可以调整“”方法。现在,它检查之前的“未来”是否被取消。如果是这样,它就会为正在进行的任务创造一个新的“未来”。如果它没有被取消,它就会取消之前的“未来”并鞭打一个新的“未来”。这种调整可确保即使初始任务被取消,同步请求仍然可以顺利获取结果。parfeval
AddData
更新了 AddData
方法
function F = AddData(obj, val)
% Future not empty canceled , if any
if ~isempty(obj.WorkFuture)
obj.WorkFuture.cancel;
end
% Store the new data
obj.InputData(end+1) = val;
% Creating the new Future to continue ongoing process
ongoingFuture = parfeval(parallel.Pool.empty, @obj.DoWork, 1).afterAll(@obj.StoreOutput, 1);
% WorkFuture updated with ongoingFuture Future
obj.WorkFuture = ongoingFuture;
% Return new Future to fetch outputs synchronously
F = parfeval(parallel.Pool.empty, @() fetchOutputs(ongoingFuture), 1);
end
现在,我们所做的只是不使用相同的方法,因为我们只是不创建一个新的并将其返回为输出获取,此同步调用将不会与正在进行的进程的取消状态相关联。换句话说,无论初始异步进程被取消,调用都会继续进行Future
obj.WorkFuture
Future
此更新会检查是否在尝试创建新更新之前取消上一个更新。如果它被取消,它将为正在进行的过程创造一个新的未来。如果未取消,则取消上一个 Future 并创建一个新 Future。现在,当您调用 时,即使取消了初始异步进程,它也会继续获取结果。同步调用通过为正在进行的工作创建新的 Future 来适应取消。WorkFuture
h.AddData(n).fetchOutputs
评论
obj.WorkFuture = ...
if
cancel
cancel
Future
AddData
Future
fetchOutputs
obj
obj.WorkFuture =
if
obj.WorkFuture
afterAll
AddData(...).fetchOutputs
Future
AddData
obj.WorkFuture
评论