跨 awaits 使用的自定义 TaskScheduler

Custom TaskScheduler that gets used across awaits

提问人:James 提问时间:3/1/2023 最后编辑:Theodor ZouliasJames 更新时间:3/2/2023 访问量:242

问:

除非您在 、async/await 和多处理方面有丰富的经验,否则无需进一步阅读。SynchronizationContextTaskSchedulerTaskFactory

在过去的几年里,我一直在接受一个大型的、大部分是单体的 C# 项目,该项目从 .NET 2.0 开始,并通过我们过去 15 年来一直在努力的 .NET 4.72 向上移动,并将其移动到 .NET 5/6,以便摆脱现已被有效弃用的 4.72,并最终将其托管在 Linux 上, 同时保持其 24/7 全天候生产运行,拥有超过 100 万用户。
在这个过程中,我开始欣赏 async/await 概念的优雅,但也对它表现得不合理的地方感到非常沮丧。
我已经接受了大多数怪癖,但通过这个过程,.NET Core(此时的 .NET 6)似乎是一个主要用于用户交互软件的框架,而不是用于队列处理等其他目的。 似乎 .NET Core 中的 async/await 主要是为了处理围绕旧的 Windows 消息泵 UI 构建的应用程序,并且 ASP.NET 每个请求执行一个流,而排除了其他一些体系结构。

让我详细说明一下。 在这些更新之前,我们的大部分代码都使用自定义线程池进行后端处理。
考虑到项目的性质(一半是前端 Web API 服务,一半是无头后端队列处理)和优化成本的需要,我们努力将后端处理服务器上的 CPU 利用率保持在接近 97% 的水平,并且我们还将并行处理用于一些前端操作。 我们发现,对于任何系统,如果 CPU 利用率远高于 97%,您将失去监控服务器上发生的事情的能力,也无法在不可避免地发生无限循环时检测它们。 我们的算法可以很好地达到这个利用率水平。 我们创建自定义线程池的原因有以下几个:

  1. 当许多操作排队时(通常需要将 CPU 保持在最佳范围内),默认值 ,因为它不是 FIFO,不可避免地会使一些长时间排队的操作匮乏,以支持最近排队的操作,从而导致超时和其他困难问题。 当 FIFO 处理引擎可以通过以更合理的 FIFO 顺序处理事物来避免超时,补偿这一点是浪费和乏味的。ThreadPool
  2. 确定何时以及将多少工作排队到默认线程池是极其困难的(如果不是不可能的话)。
    无论是过载还是过载,整体系统 CPU 利用率都很低,并且似乎没有任何属性组合来指示它处于哪种状态(和的组合几乎做到了,但在实践中,我无法提出一种对任意工作负载可靠的算法)。
    PendingWorkItemCountThreadCount

因此,使用默认线程池(除玩具代码外的任何内容)实现持续的 95%+ CPU 利用率多年来一直未能实现多次长期努力。 使用自定义线程池,我们可以很容易地查看我们是否有更多的容量,将其与当前的 CPU 利用率相结合,并使用它来决定是否开始更多工作。

输入 async。
这个概念很棒,能够保持逻辑代码流与以往基本相同的承诺,并停止浪费线程等待 IO,从而大大减少内存使用并获得一些性能改进,这推动了每个人朝着这个方向发展,以至于我们使用的许多库不再有非异步版本可供使用。 我没有发现任何专家建议在任何情况下都建议从非异步代码调用异步代码。 于是,异步化的过程开始了,异步僵尸病毒自下而上地传播代码,用一些临时包装器来欺骗异步代码同步运行,直到我们一路到达顶部。 不可避免地,我们到达了使用自定义线程池的代码,因此我们也尝试将其转换为 async/await。 但是,使用我们自己的自定义并不能很好地与 async/await 配合使用(有许多关于此的 SO 和博客文章)。 因此,我决定使用我们的自定义线程池编写我自己的 /。 关于如何正确地做到这一点的文档很少(很多事情根本没有),我花了几个月的时间研究和实施, 阅读 Stephen Toub 和 Stephen Cleary 的博客和 SO 帖子,以及梳理参考来源。 我最终得到的实现太大了,无法包含在这里。 它大部分工作,但处理似乎仍然被发布到默认线程池,导致上述欠载/过载状态中的各种过载和歧义。 我梳理了整个项目,寻找在默认 TaskScheduler 上运行代码的模式,而无法覆盖该行为。在消除所有这些之后,在默认代码上运行的问题仍然存在。 我最终将这些问题归结为这样一个事实,即在任何重要代码上使用“await”总是会将处理切换回默认任务计划程序。 由于上述问题,这是不可接受的。
最终,我看到了这篇 SO 帖子:了解 TaskScheduler.Current 的行为。 在阅读并查看 / 等的参考源之后。我明白为什么代码会以这种方式运行,但无法理解这种行为在扭曲的 ASP.NET 和 UI 代码世界之外的任何上下文中如何有意义。对于异步工作来说,完全忽略工作可以排队到的“当前同步上下文”并将工作排在其他地方是否真的有意义?
ThreadPoolSynchronizationContextTaskSchedulerTask t = MyAsyncCode();new Task(() => func())TaskSchedulerTaskTaskScheduler

这让我想到了我的问题:有没有办法创建一个实际用于异步函数所有处理的自定义?如果不是,这似乎是系统中的一个大洞。TaskScheduler

以下是我想运行的示例:

public async Task SOSample()
{
    using MyTaskScheduler scheduler = MyTaskScheduler.Start();
    MySynchronizationContext context = new(scheduler);
    System.Threading.SynchronizationContext? oldContext = SynchronizationContext.Current;
    try
    {
        SynchronizationContext.SetSynchronizationContext(context);
        TaskCompletionSource<bool> completion = new();
        Task unwrapped = await Task.Factory.StartNew(
            () => VerifyTaskSchedulerRemainsCustom(), CancellationToken.None,
            TaskCreationOptions.None, scheduler);
        await unwrapped;
    }
    finally
    {
        SynchronizationContext.SetSynchronizationContext(oldContext);
    }
}

private async Task VerifyTaskSchedulerRemainsCustom()
{
    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));
    await Task.Yield();
    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));
    await Task.Delay(100).ConfigureAwait(true);
    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));

    // ... more arbitrary async processing

    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));
}

或者,有没有办法确定默认线程池是过载还是过载,以及在没有调试器的情况下确定是什么线程池过载(对于调试生产中的问题至关重要)?这将是大量的工作,但在某些方面是可取的。

C# 多线程异 async-await task-parallel-library

评论

0赞 Josh Heaps 3/1/2023
我不确定你的第一个问题是否有答案。可能有,我只是现在不知道。不过,有一种方法可以检查您拥有的线程。这是Microsoft关于此事的官方文件 learn.microsoft.com/en-us/dotnet/api/......
0赞 Vlad DX 3/1/2023
我很好奇,通过这些优化,您节省了多少基础设施成本?
0赞 James 3/1/2023
@JoshHeaps是的,很清楚这一点。这在确定或控制由默认线程池导致的系统锁定方面没有用处。
0赞 James 3/1/2023
@TheodorZoulias实际上,Assert.AreEqual(TaskScheduler.Current, scheduler) 要模糊得多,因为它可能取决于如何为 TaskScheduler 定义 Equals。因此,对第一个的评论。
1赞 James 3/1/2023
@VladDX,这取决于您运行的基础结构。如果您在裸机上运行,则可能是 2-3 倍。如果您在 AWS 上的 T* 实例上运行,则实际上不会节省任何成本。如果您在 C*/M* 实例上运行,它也可以是 2-3 倍。这完全取决于您可以获得多少默认线程池来实际用于工作负载。如果我能让它完全工作,看起来 async/await 会将内存使用量减少 4 倍。

答:

5赞 Stephen Cleary 3/1/2023 #1

我明白为什么代码会以这种方式运行,但无法理解这种行为在扭曲的 ASP.NET 和 UI 代码世界之外的任何上下文中如何有意义。

TaskScheduler.Current是 TPL 的遗留物,早于 .捕获的上下文是回退到当前 的当前。大多数情况下,SyncCtx 被捕获并且根本不起作用。asyncawaitSynchronizationContextTaskSchedulerTaskScheduler.Current

这让我想到了我的问题:有没有办法创建一个自定义 TaskScheduler,它实际上用于异步函数的所有处理?

您可以创建一个默认使用的。请注意,、 和 朋友将继续使用线程池,因为这正是他们应该做的。Task.RunConfigureAwait(false)

您尚未发布代码,因此我只想猜测问题在于您在执行其工作项时没有设置当前代码。 将捕获上下文并使用它来安排其延续,但它不会恢复上下文;这是上下文本身的责任。TaskSchedulerSynchronizationContextawait

随意查看我的 AsyncContext;它是一个单线程上下文,但它同时提供了 a 和 .SynchronizationContextTaskScheduler

评论

1赞 Theodor Zoulias 3/1/2023
并且不保证所有工作都将在 上运行。我已经发布了一个实验演示是这个答案。Task.RunConfigureAwait(false)ThreadPool
1赞 Stephen Cleary 3/1/2023
好点子。延续也可以在另一个线程上运行。
2赞 Theodor Zoulias 3/1/2023 #2

这是一个相对简单的自定义,它的行为类似于具有预定义线程数的自定义。它基于 BlockingCollection<Task>TaskSchedulerThreadPool

public class CustomThreadPool : TaskScheduler, IDisposable
{
    private readonly BlockingCollection<Task> _queue;
    private readonly Thread[] _threads;

    public CustomThreadPool(int threadsCount = 1)
    {
        _queue = new BlockingCollection<Task>();
        _threads = Enumerable.Range(0, threadsCount).Select(_ => new Thread(() =>
        {
            foreach (var task in _queue.GetConsumingEnumerable())
                TryExecuteTask(task);
        }) { IsBackground = true }).ToArray();
        Array.ForEach(_threads, t => t.Start());
    }

    protected override void QueueTask(Task task)
    {
        try { _queue.Add(task); }
        catch (ObjectDisposedException)
        {
            ThreadPool.QueueUserWorkItem(_ => throw new TaskSchedulerException());
        }
    }

    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued)
    {
        if (Array.IndexOf(_threads, Thread.CurrentThread) < 0) return false;
        return TryExecuteTask(task);
    }

    public override int MaximumConcurrencyLevel => _threads.Length;
    protected override IEnumerable<Task> GetScheduledTasks() => _queue;

    public void Dispose()
    {
        _queue.CompleteAdding();
        Array.ForEach(_threads, t => t.Join());
        _queue.Dispose();
    }
}

让我们使用这个调度程序并检查断言:

public static async Task Main()
{
    using (CustomThreadPool scheduler = new())
    {
        await Task.Factory.StartNew(() => VerifyTaskSchedulerRemainsCustom(),
            CancellationToken.None, TaskCreationOptions.DenyChildAttach,
            scheduler).Unwrap();

        Console.WriteLine($"Out: {TaskScheduler.Current}");
        await Task.Delay(500); // Give some time for the pending async void.
    }
    Console.WriteLine($"Finished");
}

private static async Task VerifyTaskSchedulerRemainsCustom()
{
    Console.WriteLine($"1: {TaskScheduler.Current}");
    await Task.Yield();
    Console.WriteLine($"2: {TaskScheduler.Current}");
    await Task.Delay(100).ConfigureAwait(true);
    Console.WriteLine($"3: {TaskScheduler.Current}");
    MyAsyncVoid();

    async void MyAsyncVoid()
    {
        Console.WriteLine($"4: {TaskScheduler.Current}");
        await Task.Yield();
        Console.WriteLine($"5: {TaskScheduler.Current}");
        await Task.Delay(100).ConfigureAwait(true);
        Console.WriteLine($"6: {TaskScheduler.Current}");
    }
}

输出:

1: CustomThreadPool
2: CustomThreadPool
3: CustomThreadPool
4: CustomThreadPool
5: CustomThreadPool
Out: System.Threading.Tasks.ThreadPoolTaskScheduler
6: CustomThreadPool
Finished

在线演示

保留在所有检查点中,甚至在异步 void 方法中也是如此。请注意,不涉及任何内容。对于此测试,A 就足够了。缺点是自定义无法知道该方法何时完成。在上面的演示中,完成时仍在运行中。如果您的应用程序随意使用方法,则上述仅使用方法可能不合适。TaskScheduler.CurrentSynchronizationContextTaskSchedulerTaskSchedulerasync voidawait Task.Factory.StartNewMyAsyncVoidasync voidTaskScheduler