Windows 服务中的并发问题:多个线程访问同一 ProductID

Concurrency Issue in Windows Service: Multiple threads accessing same ProductID

提问人:Luke 提问时间:11/8/2023 最后编辑:Theodor ZouliasLuke 更新时间:11/8/2023 访问量:42

问:

我有一个用 C# 实现的 Windows 服务,它从数据库中获取 ProductID 并使用多个线程并发处理它们。我遇到了一个问题,即多个线程正在访问同一个 ProductID,即使我已经应用了锁来防止这种情况。

以下是服务结构的概述:

该服务使用记录器进行日志记录。 它具有指定数量的处理 ProductID 的工作线程 ()。 ProductID 从数据库中获取并放置在 . 工作线程将 ProductID 从 a 移动到 a 进行处理。 线程使用对象来确保对共享数据结构的独占访问。 该服务将 ProductID 标记为“正在处理”以避免重复。 产品处理涉及更新数据库记录,可能需要一些时间(由 模拟)。threadCountnewProductQueuenewProductQueueprocessingQueuelockerThread.Sleep

我面临的问题是两个或多个线程正在处理相同的 ProductID,即使我使用了锁和处理状态字典来避免这种情况。我怀疑问题可能与锁、状态字典和 ProductID 之间的交互有关。

下面是相关的代码片段(为清楚起见进行了简化):

public partial class Service1 : ServiceBase
{
     ThreadSafeLogger logger = new ThreadSafeLogger(AppDomain.CurrentDomain.BaseDirectory + "\\Logs\\ServiceLog_" + DateTime.Now.Date.ToShortDateString().Replace('/', '_') + ".txt");
     private readonly Dictionary<int, bool> processingStatus = new Dictionary<int, bool>();
     private int threadCount = 5; // Number of worker threads
     private object locker = new object();
     private readonly Queue<int> newProductQueue = new Queue<int>();
     private readonly Queue<int> processingQueue = new Queue<int>();
     private System.Timers.Timer refillQueueTimer;
     private System.Timers.Timer threadTimer;

     private CancellationTokenSource cancellationTokenSource;
     public Service1()
     {
         InitializeComponent();
     }
     protected override void OnStart(string[] args)
     {
         cancellationTokenSource = new CancellationTokenSource();

         logger.Log($"{DateTime.UtcNow}   START  Thrad loop  ");
         // Populate the product queue with unique ProductIDs

         // Start the worker tasks
         var tasks = new Task[threadCount];
         for (int i = 0; i < threadCount; i++)
         {
             tasks[i] = Task.Factory.StartNew(() => WorkerTask(i));
         }

         // Create and start the timer to refill the queue periodically
         refillQueueTimer = new System.Timers.Timer();
         refillQueueTimer.Elapsed += (sender, e) => PopulateProductQueue();
         refillQueueTimer.Interval = TimeSpan.FromSeconds(5).TotalMilliseconds; // Adjust as needed
         refillQueueTimer.Start();
     }

     protected override void OnStop()
     {
         logger.Log($"  {DateTime.UtcNow}  OnStop ");
         // Signal cancellation to all threads
         cancellationTokenSource.Cancel();

         // Wait for all threads to finish
         Task.WaitAll();
         // Stop the refillQueueTimer
         //  refillQueueTimer.Stop();
     }

     private void WorkerTask(int threadNumber)
     {
         while (!cancellationTokenSource.IsCancellationRequested)
         {
             int productId = 0;

             lock (locker)
             {
                 // Move products from the new product queue to the processing queue
                 while (newProductQueue.Count > 0)
                 {
                     processingQueue.Enqueue(newProductQueue.Dequeue());
                 }

                 // Check if there are products to process
                 if (processingQueue.Count == 0)
                 {
                     continue; // No products to process, wait for new products to be added
                 }

                 // Get the next product to process
                 productId = processingQueue.Dequeue();

                 // Mark the product ID as being processed
                 processingStatus[productId] = true;
             }
             try
             {
                     logger.Log($" Process Started {DateTime.Now}  cnt {processingQueue.Count} threadNumber {threadNumber} productId {productId} ");

                     ProcessProduct(productId, threadNumber);

                     logger.Log($" Process finished {DateTime.Now}  cnt {processingQueue.Count} threadNumber {threadNumber} productId {productId} ");
             }
             catch (Exception s)
             {
                 logger.Log($"{DateTime.UtcNow} ERROR {s.Message}");
                 // Process the product with the given ProductID
             } 
         }

         logger.Log($"{DateTime.UtcNow} WorkerTask {threadNumber} exiting (shutdown event signaled).");
     }

     private void ProcessProduct(int productId, int threadNumber)
     {
         lock (locker)
         {
             // Check if the processing status is still true
             if (processingStatus[productId] == false)
             {
                 // The product has already been processed by another thread, so skip processing
                 return;
             }
         }
        
         // Use your LINQ to SQL data context to load and update the product.
         using (var dataContext = new CSEntities())
         {
             var product = dataContext.Products.SingleOrDefault(p => p.ProductID == productId);

             if (product != null)
             {
                 lock (locker)
                 { // Mark the product ID as no longer being processed
                     processingStatus[productId] = false;
                 }
                 Thread.Sleep(10 * 1000);
                 Random rnd = new Random();
                     product.STATUS = "Processed by  " + threadNumber + Convert.ToInt32(product.Counter);
                     product.Counter = Convert.ToInt32(product.Counter) + 1;
                     dataContext.SaveChanges();
             }
         }
     }

     private void PopulateProductQueue()
     {
         using (var dataContext = new CSEntities())
         {
             var productIds = dataContext.Products.Where(v => v.Counter == null).Select(p => p.ProductID).ToList();
             lock (locker)
             {
                 foreach (var productId in productIds)
                 {
                     if (!processingStatus.ContainsKey(productId))
                     {
                         processingStatus[productId] = false;
                         newProductQueue.Enqueue(productId);
                     }
                 }

                 if (newProductQueue.Count == 0 && processingQueue.Count == 0)
                 {
                     processingStatus.Clear();
                 }
             }
         }
     }
 }

我正在寻求有关如何解决此问题并确保每个 ProductID 仅由一个线程处理的建议。此外,如果需要,我将不胜感激改进代码结构的建议。

有关解决此并发问题的任何见解或指导将不胜感激。谢谢!

C# 多线程 发并行处理 任务

评论

2赞 Charlieface 11/8/2023
您有什么理由不只是使用而不是尝试自己滚动?ConcurrentQueue
0赞 Luke 11/8/2023
我会尝试的
0赞 Panagiotis Kanavos 11/8/2023
一个更好的选择是具有特定并行度的选项。ActionBlock 将并发输入缓冲区与一个或多个处理消息的工作线程任务组合在一起。这可以消除大部分问题的代码,并将其替换为 另一方面,逐个检索和保存项目非常慢。数据库是为处理数据集而构建的,而不是处理单个行。在批处理中检索尽可能多的产品,处理它们并将它们保存为一个产品要快得多ActionBlock<int>var block=new ActionBlock<int>(async id=>ProcessSingleProductAsync(id));
0赞 Theodor Zoulias 11/8/2023
你已经用几句话讲述了你试图解决的更广泛的问题,以及一整本书关于你为解决它而设计的机制。您能告诉我们更多关于更广泛的问题吗?有什么要求,有什么限制?例如,是否要对具有相同 ID 的产品强制实施非并发处理策略?
0赞 Panagiotis Kanavos 11/8/2023
一个更好的选择是根本不加载数据,而是执行语句,一次修改所有符合条件的行。 或者是原子的,比将所有行加载到客户端并将它们写回来更快。EF Core 7 可以使用以下命令执行此类命令UPDATEUPDATE Products SET Counter=1 Where Counter is NULL;UPDATE Products Set Counter=Counter+1 where Counter is not null;ExecuteUpdate

答: 暂无答案