ThreadPool 争用条件、关闭、锁定还是其他原因?

ThreadPool race-condition, closure, locking or something else?

提问人:gambit 提问时间:10/29/2022 最后编辑:Theodor Zouliasgambit 更新时间:10/31/2022 访问量:74

问:

对线程来说仍然很陌生,所以我确定它是那些小陷阱和重复问题之一,但我无法找到浏览线程的答案。

我有一个 C# 端口扫描器应用程序。 我正在使用线程池为每个端口启动一个新的 TcpClient,并探测它是否打开。 在经历了闭包和线程同步的概念之后,我遇到了一个问题,即当多个线程尝试将其结果保存到(列表)中的不同索引时。Orchestrator.hosts

我有多个线程试图更新单个列表结果对象。我的理解是,只要我在写入时锁定对象,这就可以了,但是我发现在某些更新中,多个条目都获得了相同的更新。 IE,线程 #1 应该更新为“打开”, 会发生什么情况: 线程 #1 使用端口结果更新多个主机,尽管传递了主机的特定索引。 到“打开”,到“打开”,到“打开”,Hosts[0].Ports[0].StatusHosts[0].Ports[0].StatusHosts[1].Ports[0].StatusHosts[2].Ports[0].Status

不知道我的问题出在哪里。我调用的 Static 方法用于执行给定端口的探测

    public static void ScanTCPPorts()
    {
        // Create a list of portsToScan objects to send to thread workers
        //List<ScanPortRequest> portsToScan = new List<ScanPortRequest>();
    
        using (ManualResetEvent resetEvent = new ManualResetEvent(false))
        {
            int toProcess = 0;
            for (var i = 0; i < hostCount; i++) // Starting at Begining
            {
                int currentHostId = i;
                // To hold our current hosts ID (Assign outside of threaded function to avoid race-condition)
                if (hosts[i].IsAlive || scanDefinition.isForced())
                {
                    int portCount = hosts[i].Ports.Count;
                    for (int p = 0; p < portCount; p++)
                    {
                        // Thread-safe Increment our workQueue counter
                        Interlocked.Increment(ref toProcess);
    
                        int currentPortPosition = p;
    
                        // We need to send the arrayIndex in to the thread function
                        PortScanRequestResponse portRequestResponse = new PortScanRequestResponse(hosts[currentHostId], currentHostId, hosts[currentHostId].Ports[currentPortPosition], currentPortPosition);
    
                        ThreadPool.QueueUserWorkItem(
                            new WaitCallback(threadedRequestResponseInstance => {
                                PortScanRequestResponse portToScan = threadedRequestResponseInstance as PortScanRequestResponse;
                                PortScanRequestResponse threadResult = PortScanner.scanTCPPort(portToScan);
                                // Lock so Thread-safe update to result
                                lock (Orchestrator.hosts[portToScan.hostResultIndex])
                                {
                                    if (threadResult.port.status == PortStatus.Open)
                                    {
                                        // Update result
Orchestrator.hosts[portToScan.hostResultIndex].Ports[portToScan.portResultIndex].status = PortStatus.Open;
                                        //Logger.Log(hosts[currentHostId].IPAddress + " " + hosts[currentHostId].Ports[currentPortPosition].type + " " + hosts[currentHostId].Ports[currentPortPosition].portNumber + " is open");
                                    }
                                    else
                                    {
                                        Orchestrator.hosts[portToScan.hostResultIndex].Ports[portToScan.portResultIndex].status = PortStatus.Closed;
                                    }
                                    // Check if this was the last scan for the given host
                                    if (Orchestrator.hosts[portToScan.hostResultIndex].PortScanComplete != true)
                                    {
                                        if (Orchestrator.hosts[portToScan.hostResultIndex].isCompleted())
                                        {
                                            Orchestrator.hosts[portToScan.hostResultIndex].PortScanComplete = true;
                                            // Logger.Log(hosts[currentHostId].IPAddress + " has completed a port scan");
                                            Orchestrator.hosts[portToScan.hostResultIndex].PrintPortSummery();
                                        }
                                    }
                                }
                                // Safely decrement the counter
                                if (Interlocked.Decrement(ref toProcess) == 0)
                                    resetEvent.Set();
                            }), portRequestResponse);   // Pass in our Port to scan
                    }
                }
            }
            resetEvent.WaitOne();
        }
    }

下面是一个单独的公共静态类中的工作进程。

    public static PortScanRequestResponse scanTCPPort(object portScanRequest) {
        PortScanRequestResponse portScanResponse = portScanRequest as PortScanRequestResponse;
        HostDefinition host = portScanResponse.host;
        ScanPort port = portScanResponse.port;
        try
        {
            using (TcpClient threadedClient = new TcpClient())
            {
                try
                {
                    IAsyncResult result = threadedClient.BeginConnect(host.IPAddress, port.portNumber, null, null);
                    Boolean success = result.AsyncWaitHandle.WaitOne(Orchestrator.scanDefinition.GetPortTimeout(), false);
    
                    if (threadedClient.Client != null)
                    {
                        if (success)
                        {
                            threadedClient.EndConnect(result);
                            threadedClient.Close();
                            portScanResponse.port.status = PortStatus.Open;
                            return portScanResponse;
                        }
                    }
                } catch { }
            }
        }
        catch
        { }
        portScanResponse.port.status = PortStatus.Closed;
        return portScanResponse;
    }

最初,我从自由变量中提取主机索引,认为这是问题所在,因此将其移至委托内部。 我尝试在写入的任何地方锁定 Hosts 对象。 我尝试了不同的线程同步技术(和)。CountdownEventManualResetEvent

我认为我只是一些基本的线程原理我还没有被介绍,或者我犯了一个非常简单的逻辑错误。

C# 多线程 闭包 Threadpool ManualResetEvent

评论

0赞 Charlieface 10/30/2022
您可能需要考虑使用任务,而不是使用线程。这将提高性能,并且更易于编写和理解。asyncawait

答:

0赞 Theodor Zoulias 10/30/2022 #1

我有多个线程试图更新单个列表结果对象。我的理解是,只要我在写入时锁定对象,这就可以了。

我没有研究过你的代码,但上面的陈述是不正确的。在多线程环境中使用 或 或任何其他非线程安全对象时,必须同步与该对象的所有交互。一次只允许一个线程与对象交互。写入和读取都必须包含在语句中,使用相同的 locker 对象。甚至读取也必须同步。否则,用法是错误的,并且程序的行为是未定义的。List<T>lockCount

评论

0赞 gambit 10/31/2022
存储对象的更新中存在锁定。至于 Count 上的同步,我通过委托将计数作为 local-var 传递给线程,并避免了自由变量模型。
0赞 gambit 10/31/2022 #2

我非常关注它是一个线程问题,因为这是我的第一个线程项目。事实证明,我没有意识到 List <> 对象的副本是对其原始对象(引用类型)的引用。我以为我的线程正在以一种不可预测的方式访问我的保存结构,但我的端口数组都引用了同一个对象。

这是我的端口列表中的“引用类型”与“值类型”<>问题。

评论

0赞 Community 11/3/2022
正如目前所写的那样,你的答案尚不清楚。请编辑以添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。您可以在帮助中心找到有关如何写出好答案的更多信息。