等待所有任务在线程池中完成

Waiting for all tasks to complete in Threadpool

提问人:SP222 提问时间:11/14/2023 最后编辑:Theodor ZouliasSP222 更新时间:11/14/2023 访问量:70

问:

我需要有关此代码的帮助,因此,如果有人愿意提供帮助,我将不胜感激。

我正在开发一个简单的线程池,它传递一个数字数组,线程并行计算每个数字的阶乘。我的代码中有什么问题,例如,如果我有两个线程并且我有六个任务怎么办。两个线程接受一个任务,完成它,程序就到此结束。我想要的是让他们接受任务,去做,如果有更多的任务,开始新的任务并做它们,只有当所有任务都完成时,程序才会结束。

这是我的代码:

MyThreadPool.cs:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;

namespace ThreadPool
{
    public class MyThreadPool : IDisposable
    {
        private bool _disposed = false;

        private object _lock = new object();

        private Queue<MyTask> tasks = new();

        public MyThreadPool(int numThreads = 0)
        {
            for (int i = 0; i < numThreads; i++)
            {
                new Thread(() =>
                {
                    while (_disposed == false)
                    {

                        MyTask? task;
                        lock (_lock)
                        {
                            while (!tasks.TryDequeue(out task) && _disposed == false)
                            {
                                Monitor.Wait(_lock);
                            }
                        }
                        if (task != null)
                        {
                            Console.WriteLine(@$"Thread {Thread.CurrentThread
                                .ManagedThreadId} calculated factorial for {task
                                .getNum()}. Result: {task.CalculateFactorial(task
                                .getNum())}");
                            lock (_lock)
                            {
                                if (tasks.Count > 0)
                                {
                                    Monitor.Pulse(_lock);
                                }
                            }
                        }
                    }
                }).Start();
            }
        }

        public MyTask Enqueue(int number)
        {
            MyTask task = new(number);
            lock (_lock)
            {
                tasks.Enqueue(task);
                Monitor.Pulse(_lock);
            }
            return task;
        }

        public void Dispose()
        {
            _disposed = true;
            lock (_lock)
            {
                Monitor.PulseAll(_lock);
            }
        }
    }
}

我的任务.cs:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ThreadPool
{
    public class MyTask
    {
        private int num;

        public void setNum(int num)
        {
            this.num = num;
        }

        public int getNum()
        {
            return this.num;
        }

        public MyTask(int number)
        {
            this.num = number;
        }

        public int CalculateFactorial(int number)
        {
            int result = 1;
            for (int i = number; i > 0; i--)
            { 
                result = result * i;
            }
            return result;
        }
    }
}

程序.cs:

int[] array = { 1, 2, 3, 4, 5, 6 };
int numThread = 3;
int x = 0;
MyThreadPool threadPool = new(numThread);;
for (int i = 0; i < array.Length; i++)
{
    threadPool.Enqueue(array[i]);
}

threadPool.Dispose();
C# 并行处理 任务 线程池

评论

3赞 Panagiotis Kanavos 11/14/2023
线程池并不简单,.NET 已经有一个构建良好且易于使用的线程池和任务。线程池也不使用锁和监视器来执行其工作。它们不使用线程不安全的集合进行缓冲。这段代码的意义何在?
0赞 SP222 11/14/2023
是的,但我在这里需要它进行教育,以了解线程的工作原理。
2赞 Panagiotis Kanavos 11/14/2023
你不会通过尝试实现喷气发动机来了解汽车。.NET 是开源的,这意味着您可以查找并检查线程池代码本身。
0赞 SP222 11/14/2023
我在这里最大的问题是,并非所有任务都已完成,但线程接受一个任务,完成它,并且很可能_disposed设置为 true,然后调用所有线程来完成其工作,因为 _disposed = true。
2赞 JonasH 11/14/2023
请不要将这样的实现用于您关心的任何内容。一个好的经验法则是使用你可以摆脱的最高级别的抽象。这意味着诸如 BlockingCollection、Task、Parallel.For 等。我不想阻止学习,但如果我尝试这样的事情,我会广泛研究文档和其他资源。在编写这样的低级多线程代码时,很容易犯错误,错误可能并不明显,或者告诉你哪里出了问题。

答:

0赞 shingo 11/14/2023 #1

有很多方法,因为你只使用 while+moniter 来同步线程,所以我也只使用这个方法。

我们需要一个计数:

private int _incompleteTaskCount = 0;

public MyTask Enqueue(int number)
{
    ....
    lock (_lock)
    {
        tasks.Enqueue(task);
        _incompleteTaskCount++;
        Monitor.Pulse(_lock);
    }
    ....
}

public MyThreadPool(int numThreads = 0)
{
    ....
    lock (_lock)
    {
        _incompleteTaskCount--;
        // if (tasks.Count > 0)
        // {
            Monitor.Pulse(_lock);
        // }
    }
    ....
}

然后你可以等到变成 0。_incompleteTaskCount

public void WaitForComplete()
{
    lock (_lock)
    {
        while (_incompleteTaskCount > 0)
        {
            Monitor.Wait(_lock);
        }
    }
}

评论

0赞 Theodor Zoulias 11/14/2023
出于好奇,和 the 有什么不一样?_incompleteTaskCounttasks.Count
0赞 shingo 11/14/2023
我猜标题中提到的“完成”是指所有通话的完成。但是在这一行之后将是 0:.Console.WriteLinetasks.Counttasks.TryDequeue(out task)