为什么从流异步读取后我的线程 ID 会发生变化?[复制]

Why does my thread id changes after reading asynchronously from a stream? [duplicate]

提问人:MrBott_a 提问时间:1/24/2023 最后编辑:Theodor ZouliasMrBott_a 更新时间:1/24/2023 访问量:171

问:

我有以下几种方法:

private static bool loaded = false;
private static bool replaying = false;
private static string wIndex = String.Empty;
private static WorldData wData;
private static ConcurrentDictionary<int, List<long>> streamPosition
    = new ConcurrentDictionary<int, List<long>>();
private static ConcurrentDictionary<int, List<string>> collectionNames
    = new ConcurrentDictionary<int, List<string>>();
private static async void StartReplay()
{
    try
    {
        Stopwatch st = new Stopwatch();
        while (loaded)
        {
            while (replaying)
            {
                st.Start();
                for (int i = 0; i < collectionNames.Count; i++)
                {
                    XLogger.Log(toConsole.Debug, Thread.CurrentThread.ManagedThreadId
                        .ToString());
                    wData.CopyCollection(await DeserializeListFromStreamAsync(
                        wData.GetCollectionByName(collectionNames[Thread.CurrentThread
                        .ManagedThreadId][i]), i, new CancellationToken()));
                }
                st.Stop();
                int sleepTime = DebriefingManager.replayRate
                    - (int)st.ElapsedMilliseconds;
                if (sleepTime > 0)
                {
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    XLogger.Log(toConsole.Bad, "Debriefing is slow, video may lag.");
                    XLogger.Log(toFile.System, "Debriefing is slow, video may lag.");
                }
                st.Reset();
            }
        }
    }
    catch (Exception e)
    {
        XLogger.Log(toConsole.Bad, e.ToString());
        XLogger.Log(toFile.Error, e.ToString());
    }
}

private static async Task<ConcurrentDictionary<string, T>>
    DeserializeListFromStreamAsync<T>(
    ConcurrentDictionary<string, T> coll, int i, CancellationToken cancellationToken)
{
    var dataStructures = new ConcurrentDictionary<string, T>();
    using (FileStream stream = File.OpenRead(DebriefingManager
        .GetReadingStreamByCollection(coll)))
    {
        stream.Position = streamPosition[Thread.CurrentThread.ManagedThreadId][i];
        using (var streamReader = new MessagePackStreamReader(stream))
        {
            XLogger.Log(toConsole.Debug,
                $"{Thread.CurrentThread.ManagedThreadId} --- test 1");
            ReadOnlySequence<byte>? msgpack = await streamReader
                .ReadAsync(cancellationToken);
            XLogger.Log(toConsole.Debug,
                $"{Thread.CurrentThread.ManagedThreadId} --- test 2");
            if (msgpack is null) return null;
            dataStructures = MessagePackSerializer
                .Deserialize<ConcurrentDictionary<string, T>>(
                (ReadOnlySequence<byte>)msgpack, cancellationToken: cancellationToken);
        }
        streamPosition[Thread.CurrentThread.ManagedThreadId][i] = stream.Position;
    }

    return dataStructures;
}

StartReplay由三个不同的线程运行。 我需要为每个线程设置一个唯一的 id,因为我需要 并且对于每个线程都是唯一的。所以我考虑使用和 作为钥匙。List<long>List<string>ConcurrentDictionariesThread.CurrentThread.ManagedThreadId

我尝试的第一件事是使用,但我发现在这一行之后:Id 发生了变化。不知道它应该是不可变的,我什么也没想,并试图使用该属性,但是在同一行之后,标记的变量的值被重置为 0。 使用线程调试窗口后,我发现运行我的代码的线程在那行之后被“杀死”了,并使用新的线程来继续代码。Thread.CurrentThread.ManagedThreadIdReadOnlySequence<byte>? msgpack = await streamReader.ReadAsync(cancellationToken);[ThreadStatic]

我的问题是:为什么会这样?我该如何预防?这可能会影响性能吗?

编辑:我还应该补充一点,该方法是“单个 Stream 上的多个 MessagePack 结构”中的 MessagePack 文档中该方法的修改版本 “部分。

C# IO 线程安全 msgpack

评论

0赞 Rand Random 1/24/2023
您是否在 asp.net 中运行此代码?如果是,则没有同步上下文。你可以读到这个: blog.stephencleary.com/2017/03/...
3赞 Ralf 1/24/2023
为什么要防止这种情况发生?这是任务的性质。任务不是线程,任务中的延续通常意味着更改线程。这是正常的,不是问题。
1赞 Charlieface 1/24/2023
这并不能回答为什么你需要维护它们在哪个线程上运行。它们将在线程池线程(其中有很多线程)上运行,并且没有任何理由说明这是一个问题。它通常会提高性能,因为这意味着任务不必等待原始线程可用。
1赞 Charlieface 1/24/2023
只需为每个异步流分配一个不同的 ID(顺序 int 或 GUID),并使用它来访问字典。如果这是相关的,也有。如果不进一步了解代码的实际操作以及为什么需要将任务状态存储在全局字典中,就很难说出您需要什么。AsyncStatic
2赞 Matthew Watson 1/24/2023
If you have a suggestion on how I can access different informations from each thread running the same method您可以将参数传递给任务,并将不同的对象传递给每个调用。

答:

2赞 Theodor Zoulias 1/24/2023 #1

为什么会这样?

因为这是野兽的本性(异步)。异步操作的完成发生在通常与启动异步操作的线程不同的线程上。对于控制台应用程序尤其如此,因为这些应用程序没有配备任何专用机制来恢复 .例如,如果您有一个 Windows 窗体应用,情况会有所不同。这些应用程序首先在 UI 线程上安装一个名为 WindowsFormsSynchronizationContext 的专用计划程序,该程序在 之后进行干预,并在 UI 线程上计划继续。控制台应用程序中没有这样的东西,因此您正在体验最纯粹的异步效果。awaitawait

我该如何预防?

通过没有异步执行流。只需同步等待所有异步操作,您将从头到尾处于同一线程中:

ReadOnlySequence<byte>? msgpack = streamReader
    .ReadAsync(cancellationToken).GetAwaiter().GetResult();

如果你觉得到处写很烦人,你可以用这两种扩展方法将其缩短为:.GetAwaiter().GetResult().Wait2()

public static void Wait2(this Task task) => task.GetAwaiter().GetResult();
public static TResult Wait2<TResult>(this Task<TResult> task) => task.GetAwaiter().GetResult();

这可能会影响性能吗?

它可能会影响内存效率。在异步操作期间,您的线程将被阻塞,因此您的程序可能需要比平时更多的线程。这可能会对性能产生一些暂时的影响,以防性能饱和,并且需要生成更多线程。线程注入启发式方法有点保守,每秒最多注入一个新线程。如果这是一个问题,您可以使用 ThreadPool.SetMinThreads 方法提前配置。ThreadPoolThreadPool

注意:一般来说,阻塞当前线程并不是编写代码的好方法。它违反了不阻塞异步代码的常识。在这里,我只是直接回答您关于如何防止线程更改的问题。我不建议你真的去做。如果你问我的建议,我会说重新考虑你到目前为止所做的一切,也许从头开始你的项目。.GetAwaiter().GetResult()