提问人:creativergk 提问时间:11/11/2023 最后编辑:creativergk 更新时间:11/12/2023 访问量:59
将任务排入队列以一个接一个地运行
Queueing tasks to run one after the other
问:
我有一个场景,我想在工作线程上执行一个长时间运行的操作作为任务,以生成将显示在 UI 线程上的结果。
但是,用于长时间操作的服务不是线程安全的,并且任何时候都应该只有一个任务使用它(我无法更改此服务,因此必须按原样使用它)。我还必须处理许多请求可以快速连续出现的情况。就我而言,我只关心最后提交的请求,任何以前的请求都应该被取消/忽略(如果可能的话)。
例如,如果 5 个请求一个接一个地传入,那么理想情况下应该取消前 4 个请求,只显示最后一个请求。如果其中一个任务已经开始,则必须等待它直到完成(因为无法取消服务),但结果只是被忽略且不显示。
我以前使用 BlockingCollection 来实现队列。这工作正常,但我对必须为 BlockingCollection 循环设置一个专用线程(一次会有很多这样的实例在运行中)不满意。所以我正在尝试另一种方法。
这个想法是,每当有新请求传入时,都会启动一个任务,但为了处理任何时候只有一个任务可以使用该服务的要求,我使用资源计数为 1 的信号量。为了处理“过期”请求被忽略的情况,我只需将取消令牌传递给 Semaphore WaitAsync,这样大多数任务就会在到达使用服务的点之前被取消。因此,我们必须等待已经开始的过时请求的情况将很少。
对我来说,这似乎是解决问题的一个非常干净的解决方案,但是剩下的问题是如何处置CancelationTokenSources?我想不出一种干净的方法来处理它们?所以我正在寻找建议。
internal partial class ViewModel : ObservableObject
{
private CancellationTokenSource? _lastCancellation;
private Service _service = new Service();
private SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
[ObservableProperty]
public string _queryResult;
[ObservableProperty]
public bool _isLoading;
public async Task Refresh(string query)
{
_lastCancellation?.Cancel();
var cancel = new CancellationTokenSource();
_lastCancellation = cancel;
try
{
await Task.Run(async () =>
{
await _semaphore.WaitAsync(cancel.Token);
try
{
IsLoading = true;
_service.Query = query;
_service.Process();
}
finally
{
_semaphore.Release();
IsLoading = false;
}
}, cancel.Token);
}
catch (Exception ex) when (ex is OperationCanceledException)
{
// ignore
cancel.Dispose();
Debug.WriteLine($"{query} was cancelled.");
return;
}
catch (Exception ex)
{
// an exception was thrown from the service
Debug.WriteLine("Error = " + ex.Message);
QueryResult = "Error = " + ex.Message;
return;
}
// don't update the UI for requests that were cancelled
if (cancel.IsCancellationRequested)
{
cancel.Dispose();
Debug.WriteLine($"{query} was started before it could be cancelled (ignoring)");
return;
}
QueryResult = _service.Result;
Debug.WriteLine("*** Accepted = " + QueryResult);
}
}
/// <summary>
/// Service is NOT thread safe and I cannot change its interface
/// </summary>
internal class Service
{
public string Query { get; set; }
public string Result { get; private set; }
public void Process()
{
Debug.WriteLine("Start " + Query);
string query = Query;
// simulate long running task
Thread.Sleep(2000);
Result = query + "#";
Debug.WriteLine("End " + Result);
}
}
上面的代码显示了实现,下面的代码在某种程度上是一个压力测试,以确保它按预期工作。
int count = 0;
for (int i = 0; i < 10; ++i)
{
ViewModel.Refresh("Query " + count);
count++;
}
编辑:
更新以包含@HABO的建议
internal partial class ViewModel2 : ObservableObject
{
private Service _service = new Service();
private Task? _runningTask;
private ConcurrentQueue<string> _queries = new();
[ObservableProperty]
public string? _queryResult;
[ObservableProperty]
public bool _isLoading;
public async Task Refresh(string query)
{
_queries.Enqueue(query);
if (_runningTask == null || _runningTask.IsCompleted)
{
try
{
_runningTask = Task.Run(() =>
{
IsLoading = true;
while (_queries.Any())
{
// get the last query, ignoring anything else
string? lastQuery = null;
while (_queries.TryDequeue(out string? nextQuery))
{
lastQuery = nextQuery;
}
try
{
_service.Query = lastQuery;
_service.Process();
}
catch (Exception)
{
// we only want to throw if we're processing the last query
if (!_queries.Any())
throw;
}
}
// There is a race condition here.
// Can be seen by adding a Sleep() and then
// calling Refresh() during the Sleep() period
});
await _runningTask;
}
catch (Exception ex)
{
// an exception was thrown from the service
Debug.WriteLine("Error = " + ex.Message);
QueryResult = "Error = " + ex.Message;
IsLoading = false;
return;
}
QueryResult = _service.Result;
Debug.WriteLine("*** Accepted = " + QueryResult);
IsLoading = false;
}
}
/// <summary>
/// Service is NOT thread safe and I cannot change its interface
/// </summary>
private class Service
{
public string Query { get; set; }
public string Result { get; private set; }
public void Process()
{
Debug.WriteLine("Start " + Query);
string query = Query;
// simulate long running task
Thread.Sleep(2000);
//throw new Exception("");
Result = query + "#";
Debug.WriteLine("End " + Result);
}
}
}
答: 暂无答案
评论
.Clear()
Refresh
.Enqueue()