使用 Task.WhenAll 并行化 .NET 任务的最佳方式

Best way to parallelize .NET tasks with Task.WhenAll

提问人:Zenith 提问时间:10/18/2023 最后编辑:halferZenith 更新时间:10/19/2023 访问量:154

问:

我有一个程序,需要执行以下指令:

  1. 通过 Http 请求检索项目列表
  2. 对于每个检索到的项目,以该项目为参数执行另一个 Http 请求

目前,我的解决方案如下:

var items = await _repository.GetItems(someInt, someGuid);
var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto,TransactionType>>();

foreach (var item in items)
{
    _ = itemsUpdated.TryAdd(item, 
        (await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))!);
}

在我的应用程序中,我在许多地方使用 Task.WhenAll() 来对任务进行平行处理以提高性能。我只是不明白我怎么能在这里使用它。

我尝试了以下方法,但卡住了:

var items = await _repository.GetItems(someInt, someGuid);
var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto,TransactionType>>();

var tasks = new List<Task>();

foreach (var item in items)
{
    tasks.Add(_updateRepository.AsyncGetChangedEntities(someInt, someGuid, item));

    //_ = itemsUpdated.TryAdd(item, 
    //    (await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))!);
}

await Task.WhenAll(tasks);
// how to get the updatedItems from the tasks and insert
// by item in the itemsUpdated ConcurrentDictionary

此外,由于我们谈论的是提高性能;在初始化 ConcurrentDictionary 时,我希望传递预期的长度以避免重新初始化后备数组。但是,构造函数还要求许多线程预期在 ConcurrentDictionary 上执行。我不知道在这里用什么。当然,这取决于用于并行化任务的解决方案。

C# 异步 并发 并行处理 任务

评论

1赞 Fildor 10/18/2023
一定是吗Task.WhenAll
4赞 GSerg 10/18/2023
您确定您的存储库同时支持多个正在进行的任务吗?例如,Entity Framework 没有
0赞 Fildor 10/18/2023
也许是这样的:dotnetfiddle.net/rJU1xB?但请注意,在提供异步 API 时,并非所有内容都支持并发访问。
1赞 Theodor Zoulias 10/18/2023
是否考虑过使用 .NET 6 Parallel.ForEachAsync API?
1赞 Theodor Zoulias 10/19/2023
Stijn,你可以在这里找到一个例子。如果您还想收集结果,请参阅此答案Parallel.ForEachAsync

答:

0赞 Cregennan 10/18/2023 #1
var items = await _repository.GetItems();

var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto, TransactionType>>(concurrencyLevel: Environment.ProcessorCount,  capacity: items.Length);

async Task<KeyValue> InvokeTask(Item item)
   {
      var value = await _updateRepository.AsyncGetChangedEntities(someint, someGuid, item);
      itemsUpdated.TryAdd(item, value);
      return  new() {Key = item, Value = value};
   }
        
var tasks = items.Select(InvokeTask)
                 .ToArray(); //Tasks will be created here

var results = await Task.WhenAll(tasks); //Wait for them to complete and collect the results

您可以创建一些函数来包装您的异步调用,这样您就可以获得并且不必显式地将结果转换为。Task<T>T

KeyValue类型可能是这样的:

public struct KeyValue
    {
        public Item Key;
        public ConcurrentDictionary<Dto, TransactionType> Value;
    }

您可以将 as 和(默认情况下使用此值)作为传递到构造函数中。items.LengthcapacityEnvironment.ProccessorCountconcurrencyLevelConcurrentDictionary

评论

0赞 Fildor 10/18/2023
When are ,则 Task.WhenAll 已返回tasksTask<TResult>Task<TResult[]>
1赞 Cregennan 10/18/2023
@Fildor,谢谢,我已经纠正了我的答案
0赞 Zenith 10/19/2023
谢谢你的回答!也感谢 Environment.ProcessorCount 信息!
0赞 Puygrenier Solann 10/18/2023 #2

我喜欢你的原始代码。

你能试试下面的代码(使用)吗?我进性能调优不如别人,希望大家能获得良好的性能:Task.Run

var tasks = new List<Task>();

foreach (var item in items)
{
    tasks.Add(Task.Run( () => {
      _ = itemsUpdated.TryAdd(item, 
          (await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))
          );
    });
}

await Task.WhenAll(tasks);

评论

0赞 Zenith 10/19/2023
Task.Run 不是 await 的可取方法。此外,这不会并行化任务。它们仍然按顺序运行
4赞 Stephen Cleary 10/18/2023 #3

在我的应用程序中,我在许多地方使用 Task.WhenAll() 来并行化任务以提高性能。我只是不明白我怎么能在这里使用它。

一种常见的解决方案是使用 LINQ(特别是使用异步委托)将某些项映射到某个任务序列,然后:SelectTask.WhenAll

var items = await _repository.GetItems(someInt, someGuid);

var tasks = items
    .Select(item => _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item))
    .ToList();

var results = await Task.WhenAll(tasks);

results将包含一个包含所有任务结果(无论返回什么)的数组。AsyncGetChangedEntities

我认为不需要与提供的代码并发数据结构。但是,如果出于某种原因需要并发字典,则以下操作应该有效:

var items = await _repository.GetItems(someInt, someGuid);

var tasks = items
    .Select(async item => new KeyValuePair<Item, ConcurrentDictionary<Dto,TransactionType>>(
        item,
        await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item)))
    .ToList();

var results = await Task.WhenAll(tasks);
var itemsUpdated = new ConcurrentDictionary<Item, ConcurrentDictionary<Dto,TransactionType>>(results);

评论

0赞 Zenith 10/19/2023
你说得对,在此示例中不需要并发数据结构,但其他代码需要它。
0赞 Zenith 10/19/2023
我怎样才能将项目插入到您的答案的 updatedItems 字典中。我现在得到一个项目列表,而不是 updatedItems 的字典。
1赞 Stephen Cleary 10/19/2023
@StijnWingens:如果结果必须是并发字典,则可以将结果列表更改为字典构造函数并将其传递给字典构造函数。SelectSelect(async item => new KeyValuePair<Item, ...>(item, await _updateRepository.AsyncGetChangedEntities(someInt, someGuid, item)))
0赞 Zenith 10/19/2023
您能否编辑答案,使其返回并发字典?
0赞 Stephen Cleary 10/20/2023
@StijnWingens:完成。