提问人:MrBott_a 提问时间:1/24/2023 最后编辑:Theodor ZouliasMrBott_a 更新时间:1/24/2023 访问量:171
为什么从流异步读取后我的线程 ID 会发生变化?[复制]
Why does my thread id changes after reading asynchronously from a stream? [duplicate]
问:
我有以下几种方法:
private static bool loaded = false;
private static bool replaying = false;
private static string wIndex = String.Empty;
private static WorldData wData;
private static ConcurrentDictionary<int, List<long>> streamPosition
= new ConcurrentDictionary<int, List<long>>();
private static ConcurrentDictionary<int, List<string>> collectionNames
= new ConcurrentDictionary<int, List<string>>();
private static async void StartReplay()
{
try
{
Stopwatch st = new Stopwatch();
while (loaded)
{
while (replaying)
{
st.Start();
for (int i = 0; i < collectionNames.Count; i++)
{
XLogger.Log(toConsole.Debug, Thread.CurrentThread.ManagedThreadId
.ToString());
wData.CopyCollection(await DeserializeListFromStreamAsync(
wData.GetCollectionByName(collectionNames[Thread.CurrentThread
.ManagedThreadId][i]), i, new CancellationToken()));
}
st.Stop();
int sleepTime = DebriefingManager.replayRate
- (int)st.ElapsedMilliseconds;
if (sleepTime > 0)
{
Thread.Sleep(sleepTime);
}
else
{
XLogger.Log(toConsole.Bad, "Debriefing is slow, video may lag.");
XLogger.Log(toFile.System, "Debriefing is slow, video may lag.");
}
st.Reset();
}
}
}
catch (Exception e)
{
XLogger.Log(toConsole.Bad, e.ToString());
XLogger.Log(toFile.Error, e.ToString());
}
}
private static async Task<ConcurrentDictionary<string, T>>
DeserializeListFromStreamAsync<T>(
ConcurrentDictionary<string, T> coll, int i, CancellationToken cancellationToken)
{
var dataStructures = new ConcurrentDictionary<string, T>();
using (FileStream stream = File.OpenRead(DebriefingManager
.GetReadingStreamByCollection(coll)))
{
stream.Position = streamPosition[Thread.CurrentThread.ManagedThreadId][i];
using (var streamReader = new MessagePackStreamReader(stream))
{
XLogger.Log(toConsole.Debug,
$"{Thread.CurrentThread.ManagedThreadId} --- test 1");
ReadOnlySequence<byte>? msgpack = await streamReader
.ReadAsync(cancellationToken);
XLogger.Log(toConsole.Debug,
$"{Thread.CurrentThread.ManagedThreadId} --- test 2");
if (msgpack is null) return null;
dataStructures = MessagePackSerializer
.Deserialize<ConcurrentDictionary<string, T>>(
(ReadOnlySequence<byte>)msgpack, cancellationToken: cancellationToken);
}
streamPosition[Thread.CurrentThread.ManagedThreadId][i] = stream.Position;
}
return dataStructures;
}
StartReplay
由三个不同的线程运行。
我需要为每个线程设置一个唯一的 id,因为我需要 并且对于每个线程都是唯一的。所以我考虑使用和 作为钥匙。List<long>
List<string>
ConcurrentDictionaries
Thread.CurrentThread.ManagedThreadId
我尝试的第一件事是使用,但我发现在这一行之后:Id 发生了变化。不知道它应该是不可变的,我什么也没想,并试图使用该属性,但是在同一行之后,标记的变量的值被重置为 0。
使用线程调试窗口后,我发现运行我的代码的线程在那行之后被“杀死”了,并使用新的线程来继续代码。Thread.CurrentThread.ManagedThreadId
ReadOnlySequence<byte>? msgpack = await streamReader.ReadAsync(cancellationToken);
[ThreadStatic]
我的问题是:为什么会这样?我该如何预防?这可能会影响性能吗?
编辑:我还应该补充一点,该方法是“单个 Stream 上的多个 MessagePack 结构”中的 MessagePack 文档中该方法的修改版本 “部分。
答:
为什么会这样?
因为这是野兽的本性(异步)。异步操作的完成发生在通常与启动异步操作的线程不同的线程上。对于控制台应用程序尤其如此,因为这些应用程序没有配备任何专用机制来恢复 .例如,如果您有一个 Windows 窗体应用,情况会有所不同。这些应用程序首先在 UI 线程上安装一个名为 WindowsFormsSynchronizationContext
的专用计划程序,该程序在 之后进行干预,并在 UI 线程上计划继续。控制台应用程序中没有这样的东西,因此您正在体验最纯粹的异步效果。await
await
我该如何预防?
通过没有异步执行流。只需同步等待所有异步操作,您将从头到尾处于同一线程中:
ReadOnlySequence<byte>? msgpack = streamReader
.ReadAsync(cancellationToken).GetAwaiter().GetResult();
如果你觉得到处写很烦人,你可以用这两种扩展方法将其缩短为:.GetAwaiter().GetResult()
.Wait2()
public static void Wait2(this Task task) => task.GetAwaiter().GetResult();
public static TResult Wait2<TResult>(this Task<TResult> task) => task.GetAwaiter().GetResult();
这可能会影响性能吗?
它可能会影响内存效率。在异步操作期间,您的线程将被阻塞,因此您的程序可能需要比平时更多的线程。这可能会对性能产生一些暂时的影响,以防性能饱和,并且需要生成更多线程。线程注入启发式方法有点保守,每秒最多注入一个新线程。如果这是一个问题,您可以使用 ThreadPool.SetMinThreads
方法提前配置。ThreadPool
ThreadPool
注意:一般来说,阻塞当前线程并不是编写代码的好方法。它违反了不阻塞异步代码的常识。在这里,我只是直接回答您关于如何防止线程更改的问题。我不建议你真的去做。如果你问我的建议,我会说重新考虑你到目前为止所做的一切,也许从头开始你的项目。.GetAwaiter().GetResult()
评论
AsyncStatic
If you have a suggestion on how I can access different informations from each thread running the same method
您可以将参数传递给任务,并将不同的对象传递给每个调用。