将任务排入队列以一个接一个地运行

Queueing tasks to run one after the other

提问人:creativergk 提问时间:11/11/2023 最后编辑:creativergk 更新时间:11/12/2023 访问量:59

问:

我有一个场景,我想在工作线程上执行一个长时间运行的操作作为任务,以生成将显示在 UI 线程上的结果。

但是,用于长时间操作的服务不是线程安全的,并且任何时候都应该只有一个任务使用它(我无法更改此服务,因此必须按原样使用它)。我还必须处理许多请求可以快速连续出现的情况。就我而言,我只关心最后提交的请求,任何以前的请求都应该被取消/忽略(如果可能的话)。

例如,如果 5 个请求一个接一个地传入,那么理想情况下应该取消前 4 个请求,只显示最后一个请求。如果其中一个任务已经开始,则必须等待它直到完成(因为无法取消服务),但结果只是被忽略且不显示。

我以前使用 BlockingCollection 来实现队列。这工作正常,但我对必须为 BlockingCollection 循环设置一个专用线程(一次会有很多这样的实例在运行中)不满意。所以我正在尝试另一种方法。

这个想法是,每当有新请求传入时,都会启动一个任务,但为了处理任何时候只有一个任务可以使用该服务的要求,我使用资源计数为 1 的信号量。为了处理“过期”请求被忽略的情况,我只需将取消令牌传递给 Semaphore WaitAsync,这样大多数任务就会在到达使用服务的点之前被取消。因此,我们必须等待已经开始的过时请求的情况将很少。

对我来说,这似乎是解决问题的一个非常干净的解决方案,但是剩下的问题是如何处置CancelationTokenSources?我想不出一种干净的方法来处理它们?所以我正在寻找建议。

internal partial class ViewModel : ObservableObject
{
    private CancellationTokenSource? _lastCancellation;
    private Service _service = new Service();
    private SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);

    [ObservableProperty]
    public string _queryResult;

    [ObservableProperty]
    public bool _isLoading;

    public async Task Refresh(string query)
    {
        _lastCancellation?.Cancel();

        var cancel = new CancellationTokenSource();
        _lastCancellation = cancel;

        try
        {
            await Task.Run(async () =>
            {
                await _semaphore.WaitAsync(cancel.Token);

                try
                {
                    IsLoading = true;

                    _service.Query = query;
                    _service.Process();
                }
                finally
                {
                    _semaphore.Release();
                    IsLoading = false;
                }
            }, cancel.Token);
        }
        catch (Exception ex) when (ex is OperationCanceledException)
        {
            // ignore
            cancel.Dispose();
            Debug.WriteLine($"{query} was cancelled.");
            return;
        }
        catch (Exception ex) 
        {
            // an exception was thrown from the service
            Debug.WriteLine("Error = " + ex.Message);
            QueryResult = "Error = " + ex.Message;
            return;
        }

        // don't update the UI for requests that were cancelled
        if (cancel.IsCancellationRequested)
        {
            cancel.Dispose();
            Debug.WriteLine($"{query} was started before it could be cancelled (ignoring)");
            return;
        }

        QueryResult = _service.Result;
        Debug.WriteLine("*** Accepted = " + QueryResult);
    }
}

/// <summary>
/// Service is NOT thread safe and I cannot change its interface
/// </summary>
internal class Service
{
    public string Query { get; set; }
    public string Result { get; private set; }

    public void Process()
    {
        Debug.WriteLine("Start " + Query);

        string query = Query;

        // simulate long running task
        Thread.Sleep(2000);

        Result = query + "#";

        Debug.WriteLine("End " + Result);
    }
}

上面的代码显示了实现,下面的代码在某种程度上是一个压力测试,以确保它按预期工作。

int count = 0;
for (int i = 0; i < 10; ++i)
{
     ViewModel.Refresh("Query " + count);
     count++;
}

编辑:

更新以包含@HABO的建议

internal partial class ViewModel2 : ObservableObject
{
    private Service _service = new Service();
    private Task? _runningTask;

    private ConcurrentQueue<string> _queries = new();

    [ObservableProperty]
    public string? _queryResult;

    [ObservableProperty]
    public bool _isLoading;

    public async Task Refresh(string query)
    {
        _queries.Enqueue(query);

        if (_runningTask == null || _runningTask.IsCompleted)
        {
            try
            {
                _runningTask = Task.Run(() =>
                {
                    IsLoading = true;

                    while (_queries.Any())
                    {
                        // get the last query, ignoring anything else
                        string? lastQuery = null;
                        while (_queries.TryDequeue(out string? nextQuery))
                        {
                            lastQuery = nextQuery;
                        }

                        try
                        {
                            _service.Query = lastQuery;
                            _service.Process();
                        }
                        catch (Exception)
                        {
                            // we only want to throw if we're processing the last query
                            if (!_queries.Any())
                                throw;
                        }
                    }

                    // There is a race condition here.
                    // Can be seen by adding a Sleep() and then
                    // calling Refresh() during the Sleep() period
                });

                await _runningTask;
            }
            catch (Exception ex)
            {
                // an exception was thrown from the service
                Debug.WriteLine("Error = " + ex.Message);
                QueryResult = "Error = " + ex.Message;
                IsLoading = false;
                return;
            }

            QueryResult = _service.Result;
            Debug.WriteLine("*** Accepted = " + QueryResult);

            IsLoading = false;
        }
    }

    /// <summary>
    /// Service is NOT thread safe and I cannot change its interface
    /// </summary>
    private class Service
    {
        public string Query { get; set; }
        public string Result { get; private set; }

        public void Process()
        {
            Debug.WriteLine("Start " + Query);

            string query = Query;

            // simulate long running task
            Thread.Sleep(2000);

            //throw new Exception("");

            Result = query + "#";

            Debug.WriteLine("End " + Result);
        }
    }
}
C# 队列 任务 信号量

评论

1赞 HABO 11/11/2023
我正在思考更多关于单个任务和单个静态请求的事情。这解决了多个任务和多个排队请求的问题。当请求到达时,它被放入静态变量中,如果任务的状态为未运行,则启动任务。该任务会立即复制请求以供内部使用。任务做的最后一件事是检查新请求,然后开始处理或停止它。稍微锁定以保护请求,您应该可以开始了。
0赞 creativergk 11/11/2023
这听起来是一个很好的方法,我会试一试。
0赞 creativergk 11/12/2023
@HABO我已经更新了示例以包含您建议的想法。不确定它是否存在竞争条件问题,因此最好获得一些反馈?
0赞 HABO 11/12/2023
您仍在对所有请求进行排队,只是为了稍后刷新无关的请求。如果不出意外,您可以在新请求之前立即排队。当然,周围有合适的锁定,.Clear()Refresh.Enqueue()

答: 暂无答案