如何同步返回未知序列的异步进程的结果?

How can I synchronously return the result of an asynchronous process of unknown sequence?

提问人:Will 提问时间:11/16/2023 更新时间:11/16/2023 访问量:26

问:

考虑一个昂贵的计算过程,无论它处理多少数据,它都可以很好地扩展,它需要在交互式 MATLAB 环境中提供,该环境可以随时生成新数据。如果新数据在进程的实例已经在进行中时到达,我想放弃该过程并一起重新启动所有现有数据,以便尽早获得迄今为止所有已知数据的结果。

下面的最小示例类重现了这种行为:它有一个向存储数组添加元素的方法和一个主要工作方法,该方法只是在第一次执行缓慢操作后将所有存储的数据相乘。工作方法是异步调用的,因此可以在不影响最终输出结果的情况下引导任何未完成的调用。2DoWorkparfevalcancel

classdef ExampleClass < handle
    properties
        InputData
        OutputData
    end

    properties (Hidden, Transient)
        WorkFuture parallel.Future
    end
    
    methods
        function obj = ExampleClass()
            uibutton(uifigure, ButtonPushedFcn = @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
        end

        function output = DoWork(obj)
            pause(2);
            drawnow
            output = obj.InputData * 2;
        end

        function val = StoreOutput(obj,val)
            obj.OutputData = val;
        end

        function F = AddData(obj,val)
            obj.WorkFuture.cancel;
            obj.InputData(end+1) = val;
            F = parfeval(parallel.Pool.empty,@obj.DoWork,1).afterAll(@obj.StoreOutput,1);
            obj.WorkFuture = F;
        end
    end
end

实例化该类会创建一个带有按钮的按钮,用于将新数据(鼠标 X 坐标)馈送到对象中。连续单击几次将始终导致属性中存储了如此多的元素,并在最后一次单击后的 2 秒内存储了相同数量的输出。h = ExampleClass;uifigureInputDataOutputData

现在假设有一个同步函数需要此过程的结果,其中包括输入值的输出。如果它是进程的唯一客户端,则可以通过调用以下命令来获取此结果:n

result = h.AddData(n).fetchOutputs;

这将根据需要返回。但是,如果在工作仍在完成时单击该按钮以添加更多输入数据,则返回的 FROM 将被取消,因此结果为错误:FutureAddData

Error using parallel.Future/fetchOutputs
One or more futures resulted in an error.

Caused by:
    Execution of the future was cancelled.

错误发生后不久,包含输入输出的所需数据最终仍存储在 中,但我缺少一种方法来获取有关要等待的内容或在哪里找到串行进程信息的信息。任何允许将责任“接力棒”传递给另一个人而不是简单地停止有效的解决方案,或者允许串行代码等待事件的通知而不是完成,似乎都可以使这变得可行。nhFutureFuture

这样的事情可能吗?

MATLAB 异步

评论

1赞 rahnema1 11/20/2023
您的问题包含 6 个段落。6 段中有 5 段专门介绍您迄今为止尝试过的内容,但第一段是最重要的,是唯一解释您尝试做的事情的段落,我认为这是不完整的。请解释一下你要做什么。
0赞 Will 11/20/2023
@rahnema1这并不准确,但仍然反映了所写的内容很难阅读。以前是一段上下文,然后是说明该上下文的示例,然后是该上下文中实际问题的陈述。我把“我想做什么”的陈述放在问题的第二段前面,并更仔细地解释了问题在例子中的应用;希望这能更清楚吗?

答:

2赞 VonC 11/20/2023 #1

有没有办法定义一个具有相同预期效果的同步调用(触发异步进程并最终在该进程完成时返回一个值),该调用可以容忍取消它触发的初始异步进程?

这将通过确保同步调用等待最新异步进程的完成来实现,即使它启动的原始进程已被取消。
这可以通过维护对最新 Future 对象的引用并使用循环等待计算完成来实现:
ExampleClass

classdef ExampleClass < handle
    properties
        InputData
        OutputData
        LatestFuture parallel.Future
    end
    
    methods
        function obj = ExampleClass()
            uibutton(uifigure, 'ButtonPushedFcn', @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
            obj.LatestFuture = parallel.Future;
        end

        function output = DoWork(obj)
            pause(2); % Simulate a slow process
            output = obj.InputData * 2;
        end

        function StoreOutput(obj, val)
            obj.OutputData = val;
        end

        function F = AddData(obj, val)
            if ~isempty(obj.WorkFuture) && isvalid(obj.WorkFuture)
                cancel(obj.WorkFuture);
            end
            obj.InputData(end+1) = val;
            F = parfeval(@obj.DoWork, 1);
            obj.WorkFuture = F;
            obj.LatestFuture = F;
        end

        function output = SyncGetData(obj, val)
            F = obj.AddData(val);
            while ~strcmp(F.State, 'finished')
                pause(0.1); % Wait for the latest future to finish
                if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
                    F = obj.LatestFuture;
                end
            end
            output = fetchOutputs(F);
        end
    end
end

该属性跟踪最近的异步计算。该方法被修改为在开始新计算时更新。新方法会等待最新计算的结果,即使原始方法被取消也是如此。LatestFutureAddDataLatestFutureSyncGetData

+----------------------+
| SyncGetData          |
| +------------------+ |
| |  Call AddData    |-----> Starts new async process
| +------------------+ |
| |  Wait Loop       |-----> Waits for the latest Future
| +------------------+ |       to complete
| |  Fetch Output    |-----> Retrieves result from the latest
| +------------------+ |       completed process
+----------------------+

您现在应该使用 .h.AddData(n).fetchOutputs;result = h.SyncGetData(n);


这看起来应该有效,但在这样做时,会阻止主线程执行暂停调用,这取决于调用与取消完成的比率,这可能会对整体性能产生相当大的影响。AddData

有没有办法让该行将控制权返回给其他函数,等待在该线程上执行,而不是暂停任意时间长度?

诚然,该函数确实会停止执行,如果有其他任务需要并发运行,这可能并不理想。pause

为了缓解这种情况,您可以使用非阻塞方法,允许 MATLAB 在等待异步进程完成的同时继续处理其他事件和回调。

与其重复检查对象的状态,不如设置一个回调,一旦对象的状态更改为 ,MATLAB 将调用该回调。FutureFuture'finished'

并使用 afterAll,这是类的一个方法,该方法计划在一组期货(在本例中,只是最新的期货)完成后执行回调。Future

function output = SyncGetData(obj, val)
    F = obj.AddData(val);
    waitFuture = F.afterAll(@obj.RetrieveOutput);
    output = fetchOutputs(waitFuture);
end

function RetrieveOutput(obj)
    % Callback function to handle the output
    if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
        obj.OutputData = fetchOutputs(obj.LatestFuture);
    end
end

RetrieveOutput是处理输出检索的新方法。当 future () 完成时,它会自动调用。LatestFuture


看起来这个更新的解决方案只处理一次返回的取消和替换未来,但不适用于后续取消。AddData

第一次取消完成分配给 within 的 future 等触发,但随后将获得替换的 future 。FSyncGetDataRetrieveOutputRetrieveOutputobj.LatestFuture

如果这个未来还没有完成,那么将等待它完成,如果该替换也被取消,则将失败。fetchOutputsfetchOutputs

True:问题在于只处理一个级别的取消和替换。如果在提取其输出之前取消并再次替换引用的对象,则可能会导致失败。SyncGetDataFutureLatestFutureRetrieveOutput

因此,您可能必须持续监视更改的状态并适应更改,而不是依赖单个回调来处理输出检索。实现一种机制,如果它检测到它试图从中获取的未来已被取消和替换,则重新安排自身。并修改以检查尝试获取输出之前的状态:如果被取消,则安排新的回调。LatestFutureRetrieveOutputRetrieveOutputLatestFutureLatestFuture

function RetrieveOutput(obj)
    if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
        if strcmp(obj.LatestFuture.State, 'finished')
            obj.OutputData = fetchOutputs(obj.LatestFuture);
        elseif strcmp(obj.LatestFuture.State, 'failed')
            % Reschedule the callback for the new LatestFuture
            waitFuture = obj.LatestFuture.afterAll(@obj.RetrieveOutput);
        end
    end
end

现在,方法检查是否已完成。如果是这样,它将获取输出。
如果被取消,则会为新的 . 这应该确保,如果发生多次取消,系统会适应并等待最新流程的最终完成。
它应该处理对象的多次取消和替换。
RetrieveOutputLatestFutureLatestFutureafterAllLatestFutureFuture

你会有:

+---------------------+
|    ExampleClass     |
| +-----------------+ |
| |  InputData      | |   <---- New data input
| +-----------------+ |
| |  OutputData     | |   <---- Stored result
| +-----------------+ |
| |  WorkFuture     |----> Asynchronous process (DoWork)
| +-----------------+ |     Cancelled and restarted on new data
| |  LatestFuture   |----> Tracks latest Future, updated on restart
| +-----------------+ |
| |  SyncGetData    | |   <---- Synchronous call initiates process
| +-----------------+ |
| |  RetrieveOutput | |   <---- Callback function
| +-----------------+ |       - Checks Future status
|                     |       - Fetches output or reschedules
+---------------------+

关键问题是确保正确等待最近的数据处理完成,即使有取消。由于取消存储在属性中,因此我们应该检查这一点,而不是依赖属性。您需要确保不会过早返回,并等待异步任务的正确完成。SyncGetDataErrorStatusSyncGetData

classdef ExampleClass < handle
    properties
        InputData
        OutputData
        LatestFuture parallel.Future
    end
    
    methods
        % (Other methods remain the same)

        function output = SyncGetData(obj, val)
            obj.AddData(val);
            output = obj.WaitForCompletion();
        end

        function output = WaitForCompletion(obj)
            while true
                try
                    % Fetch output if the latest future is finished
                    if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
                        output = fetchOutputs(obj.LatestFuture);
                        break; % Break the loop on successful completion
                    end
                catch e
                    % Handle the cancellation error
                    if strcmp(e.identifier, 'parallel:future:CancelledExecution')
                        continue; % Continue the loop for the new LatestFuture
                    else
                        rethrow(e); % Rethrow other errors
                    end
                end
            end
        end

        function RetrieveOutput(obj)
            % Adjusted to handle cancellation error
            if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
                if ~isempty(obj.LatestFuture.Error) && ...
                   strcmp(obj.LatestFuture.Error.identifier, 'parallel:future:CancelledExecution')
                    % Reschedule the callback for the new LatestFuture
                    obj.LatestFuture.afterAll(@obj.RetrieveOutput);
                else
                    % Fetch the output normally
                    obj.OutputData = fetchOutputs(obj.LatestFuture);
                end
            end
        end
    end
end

RetrieveOutput经过修改,以检查酒店是否取消,并在必要时重新安排。Error

评论

0赞 Will 11/28/2023
这看起来应该有效,但在这样做时,会阻止调用在主线程上执行,这取决于调用与取消完成的比率,这可能会对整体性能产生相当大的影响。有没有办法让该行将控制权返回给其他函数,等待在该线程上执行,而不是暂停任意时间长度?pauseAddData
0赞 VonC 11/28/2023
@Will我已经编辑了答案以解决您的评论。
0赞 Will 12/1/2023
看起来这个更新的解决方案只处理一次返回的取消和替换未来,但不适用于后续取消。第一次取消完成分配给 within 的 future 等触发,但随后将获得替换的 future 。如果这个未来还没有完成,那么将等待它完成,如果该替换也被取消,则将失败。AddDataFSyncGetDataRetrieveOutputRetrieveOutputobj.LatestFuturefetchOutputsfetchOutputs
0赞 VonC 12/1/2023
@Will 好点子。我已经编辑了答案以解决您的评论。
0赞 Will 12/1/2023
像这样使用递归看起来非常有前途,但是代码在测试时似乎不起作用。在修复了一些简单的错误(例如,不是有效的,取消存储在属性中)后,我发现取消会导致在处理给定的新数据之前提前返回最新值。稍后我会尝试找出需要更改的内容,但如果对您来说更明显地如何解决这个问题,那么肯定会欢迎一个工作示例cancelledStatusErrorSyncGetDataOutputData
0赞 Mohamd Imran 11/27/2023 #2

想象一下,您正在处理一项独立工作的任务,但如果出现新数据,则可能会停止。当您使用名为“”的工具启动此任务时,当您尝试获得结果时,可能会遇到打嗝 - 取消错误。因此,为了使事情变得顺利,您可以调整“”方法。现在,它检查之前的“未来”是否被取消。如果是这样,它就会为正在进行的任务创造一个新的“未来”。如果它没有被取消,它就会取消之前的“未来”并鞭打一个新的“未来”。这种调整可确保即使初始任务被取消,同步请求仍然可以顺利获取结果。parfevalAddData

更新了 AddData 方法

function F = AddData(obj, val)
    % Future not empty canceled , if any
    if ~isempty(obj.WorkFuture)
        obj.WorkFuture.cancel;
    end

    % Store the new data
    obj.InputData(end+1) = val;

    % Creating the new  Future to continue ongoing process
    ongoingFuture = parfeval(parallel.Pool.empty, @obj.DoWork, 1).afterAll(@obj.StoreOutput, 1);

    % WorkFuture updated with ongoingFuture  Future 
    obj.WorkFuture = ongoingFuture;

    % Return  new Future to fetch  outputs synchronously
    F = parfeval(parallel.Pool.empty, @() fetchOutputs(ongoingFuture), 1);
end

现在,我们所做的只是不使用相同的方法,因为我们只是不创建一个新的并将其返回为输出获取,此同步调用将不会与正在进行的进程的取消状态相关联。换句话说,无论初始异步进程被取消,调用都会继续进行Futureobj.WorkFutureFuture

此更新会检查是否在尝试创建新更新之前取消上一个更新。如果它被取消,它将为正在进行的过程创造一个新的未来。如果未取消,则取消上一个 Future 并创建一个新 Future。现在,当您调用 时,即使取消了初始异步进程,它也会继续获取结果。同步调用通过为正在进行的工作创建新的 Future 来适应取消。WorkFutureh.AddData(n).fetchOutputs

评论

0赞 Will 11/27/2023
这里的代码似乎没有按照你说的那样做;在语句的两个分支的末尾运行完全相同的行,所以不妨把它带回外面。分支之间的唯一区别似乎是是否有调用(并且只避免调用空数组或已取消的数组,这已经不起作用)以及传递给的数据是被存储还是丢弃,而我总是想存储它。传递到的对象仍然存储在其中,因此在完成之前可能会被取消。obj.WorkFuture = ...ifcancelcancelFutureAddDataFuturefetchOutputsobj
0赞 Mohamd Imran 11/28/2023
@Will我不确定两者是否翻倍,我想也许它发生在我今天晚些时候修改我的答案时,现在关于代码似乎没有按照你说的那样做,我已经重新阅读了你的问题,看看我是否错过了什么,但我注意到从我对你的问题错误的理解是“从 AddData 返回的 Future 被取消,所以结果是一个错误”我的意思是是的它应该导致错误导致 fetchoutputs 尝试从取消的 Future 中获取结果,您在存储数据之前取消,创建到期的 Futureobj.WorkFuture =ifobj.WorkFutureafterAll
0赞 Will 11/28/2023
这当然是问题中设置的示例的预期行为,只要同步调用是同步调用,并且返回的调用与存储的相同,因此可以取消,该行为就会一直存在。其中至少有一件事似乎必须改变;因此,问题是如何改变它以实现预期的行为。AddData(...).fetchOutputsFutureAddDataobj.WorkFuture
0赞 Mohamd Imran 11/28/2023
@Will现在我明白了你想要什么,我们可以通过创建一个新的 Future 并返回它而不是返回相同的 Future 来解决这个问题,请参阅我更新的答案代码部分及其下方