提问人:user22229019 提问时间:7/20/2023 最后编辑:user22229019 更新时间:7/20/2023 访问量:89
协作暂停异步方法
Cooperatively pausing async methods
问:
我正在将 Task 用于长时间运行的异步处理操作,我希望能够在任意时刻暂停和恢复该操作。幸运的是,Microsoft 的 TPL 作者之一已经想出了解决这个问题的方法。唯一的麻烦是他的解决方案无法正常工作。
当您在下面的代码中取出 时,代码将在第一个请求之后停止接受暂停请求。如果要相信 的值,则代码似乎会在与其他任务相同的线程上恢复执行。此外,的输出表明它正在多个线程上运行。await Task.Delay(100)
SomeMethodAsync
Thread.CurrentThread.ManagedThreadId
SomeMethodAsync
我一直觉得 TPL 相当混乱,难以使用,异步/等待更是如此,所以我很难理解这里发生了什么。如果有人能解释,我将不胜感激。
极简示例代码:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PauseTokenTest {
class Program {
static void Main() {
var pts = new PauseTokenSource();
Task.Run(() =>
{
while (true) {
Console.ReadLine();
Console.WriteLine(
$"{Thread.CurrentThread.ManagedThreadId}: Pausing task");
pts.IsPaused = !pts.IsPaused;
}
});
SomeMethodAsync(pts.Token).Wait();
}
public static async Task SomeMethodAsync(PauseToken pause) {
for (int i = 0; ; i++) {
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: {i}");
// Comment this out and repeatedly pausing and resuming will no longer work.
await Task.Delay(100);
await pause.WaitWhilePausedAsync();
}
}
}
public class PauseTokenSource {
internal static readonly Task s_completedTask =
Task.FromResult(true);
volatile TaskCompletionSource<bool> m_paused;
public bool IsPaused {
get { return m_paused != null; }
set {
if (value) {
Interlocked.CompareExchange(
ref m_paused, new TaskCompletionSource<bool>(), null);
} else {
while (true) {
var tcs = m_paused;
if (tcs == null) return;
if (Interlocked.CompareExchange(ref m_paused, null, tcs) == tcs) {
tcs.SetResult(true);
break;
}
}
}
}
}
public PauseToken Token { get { return new PauseToken(this); } }
internal Task WaitWhilePausedAsync() {
var cur = m_paused;
return cur != null ? cur.Task : s_completedTask;
}
}
public struct PauseToken {
readonly PauseTokenSource m_source;
internal PauseToken(PauseTokenSource source) {
m_source = source;
}
public bool IsPaused {
get { return m_source != null && m_source.IsPaused; }
}
public Task WaitWhilePausedAsync() {
return IsPaused ? m_source.WaitWhilePausedAsync() :
PauseTokenSource.s_completedTask;
}
}
}
答:
发生这种情况的原因是任务延续的执行方式。在框架中的许多地方,任务延续尽可能同步执行。这并不总是可能的,但它确实变得普遍的一个地方是线程池线程,这些线程通常被视为可交换的。
await
是使用同步延续的地方之一;这在任何地方都没有正式记录 AFAIK,但我在我的博客上描述了这种行为。
因此,本质上发生的事情是这样的:暂停(将返回的任务从 - 没有上下文),它将其余部分附加为该任务的延续。然后,当(线程池)线程切换时,它将完成从 返回的任务。然后,运行时会查看与该延续关联的上下文(无),以便将延续视为与任何线程池线程兼容。嘿,当前线程是线程池线程!所以它只是直接运行延续。SomeMethodAsync
await
WaitWhilePausedAsync
SomeMethodAsync
Task.Run
IsPaused
WaitWhilePausedAsync
有趣的是,如果没有另一个方法,该方法将变得完全同步:它总是从 中检索已经完成的任务,因此它只是永远继续执行其无限循环。提醒一下,await
的行为与已经完成的任务同步(正如我在博客中描述的那样)。这个无限循环在 setter 中运行,因此代码永远不会继续其循环再次调用。await
SomeMethodAsync
WaitWhilePausedAsync
IsPaused
Task.Run
ReadLine
如果确实有 ,则它可以异步运行,返回给其调用者(完成延续),并允许循环继续执行。SomeMethodAsync
await
Task.Run
正如西奥多·祖利亚斯(Theodor Zoulias)所建议的那样,将旗帜传递给遗嘱也可以。在这种情况下,任务延续是异步运行(在单独的线程池线程上),而不是直接在调用线程上执行。TaskCreationOptions.RunContinuationsAsynchronously
TaskCompletionSource<bool>
IIRC,您引用的博客文章早于该标志。它也早于(非通用).RunContinuationsAsynchronously
TaskCompletionSource
此外,正如 Theodor Zoulias 所建议的那样,我的 Nito.AsyncEx 库中有一个可以避免此问题的库。它还使用 .PauseTokenSource
RunContinuationsAsynchronously
评论
static async Task Main()
.Wait()
await
PauseTokenSource
TaskCompletionSource<bool>
TaskCreationOptions.RunContinuationsAsynchronously