提问人:Zenith 提问时间:10/18/2023 最后编辑:halferZenith 更新时间:10/19/2023 访问量:154
使用 Task.WhenAll 并行化 .NET 任务的最佳方式
Best way to parallelize .NET tasks with Task.WhenAll
问:
我有一个程序,需要执行以下指令:
- 通过 Http 请求检索项目列表
- 对于每个检索到的项目,以该项目为参数执行另一个 Http 请求
目前,我的解决方案如下:
var items = await _repository.GetItems(someInt, someGuid);
var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto,TransactionType>>();
foreach (var item in items)
{
_ = itemsUpdated.TryAdd(item,
(await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))!);
}
在我的应用程序中,我在许多地方使用 Task.WhenAll() 来对任务进行平行处理以提高性能。我只是不明白我怎么能在这里使用它。
我尝试了以下方法,但卡住了:
var items = await _repository.GetItems(someInt, someGuid);
var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto,TransactionType>>();
var tasks = new List<Task>();
foreach (var item in items)
{
tasks.Add(_updateRepository.AsyncGetChangedEntities(someInt, someGuid, item));
//_ = itemsUpdated.TryAdd(item,
// (await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))!);
}
await Task.WhenAll(tasks);
// how to get the updatedItems from the tasks and insert
// by item in the itemsUpdated ConcurrentDictionary
此外,由于我们谈论的是提高性能;在初始化 ConcurrentDictionary 时,我希望传递预期的长度以避免重新初始化后备数组。但是,构造函数还要求许多线程预期在 ConcurrentDictionary 上执行。我不知道在这里用什么。当然,这取决于用于并行化任务的解决方案。
答:
var items = await _repository.GetItems();
var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto, TransactionType>>(concurrencyLevel: Environment.ProcessorCount, capacity: items.Length);
async Task<KeyValue> InvokeTask(Item item)
{
var value = await _updateRepository.AsyncGetChangedEntities(someint, someGuid, item);
itemsUpdated.TryAdd(item, value);
return new() {Key = item, Value = value};
}
var tasks = items.Select(InvokeTask)
.ToArray(); //Tasks will be created here
var results = await Task.WhenAll(tasks); //Wait for them to complete and collect the results
您可以创建一些函数来包装您的异步调用,这样您就可以获得并且不必显式地将结果转换为。Task<T>
T
KeyValue
类型可能是这样的:
public struct KeyValue
{
public Item Key;
public ConcurrentDictionary<Dto, TransactionType> Value;
}
您可以将 as 和(默认情况下使用此值)作为传递到构造函数中。items.Length
capacity
Environment.ProccessorCount
concurrencyLevel
ConcurrentDictionary
评论
我喜欢你的原始代码。
你能试试下面的代码(使用)吗?我进性能调优不如别人,希望大家能获得良好的性能:Task.Run
var tasks = new List<Task>();
foreach (var item in items)
{
tasks.Add(Task.Run( () => {
_ = itemsUpdated.TryAdd(item,
(await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))
);
});
}
await Task.WhenAll(tasks);
评论
在我的应用程序中,我在许多地方使用 Task.WhenAll() 来并行化任务以提高性能。我只是不明白我怎么能在这里使用它。
一种常见的解决方案是使用 LINQ(特别是使用异步委托)将某些项映射到某个任务序列,然后:Select
Task.WhenAll
var items = await _repository.GetItems(someInt, someGuid);
var tasks = items
.Select(item => _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))
.ToList();
var results = await Task.WhenAll(tasks);
results
将包含一个包含所有任务结果(无论返回什么)的数组。AsyncGetChangedEntities
我认为不需要与提供的代码并发数据结构。但是,如果出于某种原因需要并发字典,则以下操作应该有效:
var items = await _repository.GetItems(someInt, someGuid);
var tasks = items
.Select(async item => new KeyValuePair<Item, ConcurrentDictionary<Dto,TransactionType>>(
item,
await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item)))
.ToList();
var results = await Task.WhenAll(tasks);
var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto,TransactionType>>(results);
评论
Select
Select(async item => new KeyValuePair<Item, ...>(item, await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item)))
评论
Task.WhenAll
Parallel.ForEachAsync
API?Parallel.ForEachAsync