提问人:Luke 提问时间:11/8/2023 最后编辑:Theodor ZouliasLuke 更新时间:11/8/2023 访问量:42
Windows 服务中的并发问题:多个线程访问同一 ProductID
Concurrency Issue in Windows Service: Multiple threads accessing same ProductID
问:
我有一个用 C# 实现的 Windows 服务,它从数据库中获取 ProductID 并使用多个线程并发处理它们。我遇到了一个问题,即多个线程正在访问同一个 ProductID,即使我已经应用了锁来防止这种情况。
以下是服务结构的概述:
该服务使用记录器进行日志记录。 它具有指定数量的处理 ProductID 的工作线程 ()。 ProductID 从数据库中获取并放置在 . 工作线程将 ProductID 从 a 移动到 a 进行处理。 线程使用对象来确保对共享数据结构的独占访问。 该服务将 ProductID 标记为“正在处理”以避免重复。 产品处理涉及更新数据库记录,可能需要一些时间(由 模拟)。
threadCount
newProductQueue
newProductQueue
processingQueue
locker
Thread.Sleep
我面临的问题是两个或多个线程正在处理相同的 ProductID,即使我使用了锁和处理状态字典来避免这种情况。我怀疑问题可能与锁、状态字典和 ProductID 之间的交互有关。
下面是相关的代码片段(为清楚起见进行了简化):
public partial class Service1 : ServiceBase
{
ThreadSafeLogger logger = new ThreadSafeLogger(AppDomain.CurrentDomain.BaseDirectory + "\\Logs\\ServiceLog_" + DateTime.Now.Date.ToShortDateString().Replace('/', '_') + ".txt");
private readonly Dictionary<int, bool> processingStatus = new Dictionary<int, bool>();
private int threadCount = 5; // Number of worker threads
private object locker = new object();
private readonly Queue<int> newProductQueue = new Queue<int>();
private readonly Queue<int> processingQueue = new Queue<int>();
private System.Timers.Timer refillQueueTimer;
private System.Timers.Timer threadTimer;
private CancellationTokenSource cancellationTokenSource;
public Service1()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
cancellationTokenSource = new CancellationTokenSource();
logger.Log($"{DateTime.UtcNow} START Thrad loop ");
// Populate the product queue with unique ProductIDs
// Start the worker tasks
var tasks = new Task[threadCount];
for (int i = 0; i < threadCount; i++)
{
tasks[i] = Task.Factory.StartNew(() => WorkerTask(i));
}
// Create and start the timer to refill the queue periodically
refillQueueTimer = new System.Timers.Timer();
refillQueueTimer.Elapsed += (sender, e) => PopulateProductQueue();
refillQueueTimer.Interval = TimeSpan.FromSeconds(5).TotalMilliseconds; // Adjust as needed
refillQueueTimer.Start();
}
protected override void OnStop()
{
logger.Log($" {DateTime.UtcNow} OnStop ");
// Signal cancellation to all threads
cancellationTokenSource.Cancel();
// Wait for all threads to finish
Task.WaitAll();
// Stop the refillQueueTimer
// refillQueueTimer.Stop();
}
private void WorkerTask(int threadNumber)
{
while (!cancellationTokenSource.IsCancellationRequested)
{
int productId = 0;
lock (locker)
{
// Move products from the new product queue to the processing queue
while (newProductQueue.Count > 0)
{
processingQueue.Enqueue(newProductQueue.Dequeue());
}
// Check if there are products to process
if (processingQueue.Count == 0)
{
continue; // No products to process, wait for new products to be added
}
// Get the next product to process
productId = processingQueue.Dequeue();
// Mark the product ID as being processed
processingStatus[productId] = true;
}
try
{
logger.Log($" Process Started {DateTime.Now} cnt {processingQueue.Count} threadNumber {threadNumber} productId {productId} ");
ProcessProduct(productId, threadNumber);
logger.Log($" Process finished {DateTime.Now} cnt {processingQueue.Count} threadNumber {threadNumber} productId {productId} ");
}
catch (Exception s)
{
logger.Log($"{DateTime.UtcNow} ERROR {s.Message}");
// Process the product with the given ProductID
}
}
logger.Log($"{DateTime.UtcNow} WorkerTask {threadNumber} exiting (shutdown event signaled).");
}
private void ProcessProduct(int productId, int threadNumber)
{
lock (locker)
{
// Check if the processing status is still true
if (processingStatus[productId] == false)
{
// The product has already been processed by another thread, so skip processing
return;
}
}
// Use your LINQ to SQL data context to load and update the product.
using (var dataContext = new CSEntities())
{
var product = dataContext.Products.SingleOrDefault(p => p.ProductID == productId);
if (product != null)
{
lock (locker)
{ // Mark the product ID as no longer being processed
processingStatus[productId] = false;
}
Thread.Sleep(10 * 1000);
Random rnd = new Random();
product.STATUS = "Processed by " + threadNumber + Convert.ToInt32(product.Counter);
product.Counter = Convert.ToInt32(product.Counter) + 1;
dataContext.SaveChanges();
}
}
}
private void PopulateProductQueue()
{
using (var dataContext = new CSEntities())
{
var productIds = dataContext.Products.Where(v => v.Counter == null).Select(p => p.ProductID).ToList();
lock (locker)
{
foreach (var productId in productIds)
{
if (!processingStatus.ContainsKey(productId))
{
processingStatus[productId] = false;
newProductQueue.Enqueue(productId);
}
}
if (newProductQueue.Count == 0 && processingQueue.Count == 0)
{
processingStatus.Clear();
}
}
}
}
}
我正在寻求有关如何解决此问题并确保每个 ProductID 仅由一个线程处理的建议。此外,如果需要,我将不胜感激改进代码结构的建议。
有关解决此并发问题的任何见解或指导将不胜感激。谢谢!
答: 暂无答案
评论
ConcurrentQueue
ActionBlock<int>
var block=new ActionBlock<int>(async id=>ProcessSingleProductAsync(id));
UPDATE
UPDATE Products SET Counter=1 Where Counter is NULL;
UPDATE Products Set Counter=Counter+1 where Counter is not null;
ExecuteUpdate