在不同任务中同时使用 ClientWebSocket、SendAsync 和 ReceiveAsync 方法的意外行为 (.Net Framework)

Unexpected behavior using ClientWebSocket SendAsync and ReceiveAsync methods simultaniously in different tasks (.Net Framework)

提问人:Sergey Skrynnik 提问时间:9/13/2023 最后编辑:Sergey Skrynnik 更新时间:9/13/2023 访问量:62

问:

我有我的自定义 WebSocket 客户端(使用 System.Net.WebSockets 库)运行良好。在内部,库有 2 个独立的任务:第一个任务是向 Web 套接字服务器发送命令,第二个任务是接收来自服务器的响应。在每个任务中,我都使用“无限循环”方法(因为它们在后台工作),例如:

发送任务:

// Task that sends command through the web socket and write them to response queue.
// Implementation of responseQueue & commandQueue are below. 
// CommandQueue has some items at the beggining.
while (webSocket.State != WebSocketState.Closed)
{
  var command = await commandQueue.ReadAsync(cts);
  await webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, CancellationToken.None);
  //in order to log command sent, push it into response queue.
  responseQueue.Push(command);
}

接收任务:

//Task that receive responses from the same web socket and push them into response queue as well.

//these variables needs for a multipacket responses
int dataPerPacket = 8 * 1024; //8192
StringBuilder strBuilder = new StringBuilder();
var buffer = new ArraySegment<byte>(new byte[dataPerPacket]);

while (webSocket.State != WebSocketState.Closed)
{
  var receiveResult = await webSocket.ReceiveAsync(buffer, recCts);

  // receive message
  strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
  //keep doing until end of message
  while (!receiveResult.EndOfMessage)
  {
      receiveResult = await webSocket.ReceiveAsync(buffer, recCts);
      strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
  }

  responseQueue.Push(strBuilder.ToString());
  strBuilder.Clear();
}

我简化了我的代码,但没有丢失任何有价值的东西。websocket 只是在互联网上打开的回显服务器。

因此,对于命令和响应队列,我使用的是通道方法,并实现了自己的自定义类(因为我使用的是.NET Framework 4.5.2)。此类的意图是能够生成(或写入)数据到 SendTask 中的 responseQueue,并有效地使用(或读取)ReceiveTaskresponseQueue 中的数据,即不阻塞(并避免由于无限循环而消耗 CPU)。

通道的实现:

public class Channel
{
        private ConcurrentQueue<string> _queue;
        private SemaphoreSlim _flag;

        public Channel()
        {
            _queue = new ConcurrentQueue<string>();
            _flag = new SemaphoreSlim(0);
        }
        
        public async Task<string> ReadAsync(CancellationToken readCancelToken)
        {
            await _flag.WaitAsync(readCancelToken);
            if (readCancelToken.IsCancellationRequested)
            {
                return default;
            }

            if (_queue.TryDequeue(out item))
            {
                return item;
            }

            return default;
        }

        public void Push(string item)
        {
            _queue.Enqueue(item);
            _flag.Release();
        }
    }

但是,由于某种不清楚的原因,我在工作时有一个意想不到的行为。我希望推送到 responseQueue 的顺序如下:

command 1, command 2, response 1, command 3, response 2,...

即异步行为。当后续的 SendAsync 执行时,我们将接收响应并将响应推送到队列中。但实际上我得到以下订单:

command 1, command 2,.....command 100, response 1, response 2, .... 

那么我的问题为什么?出于什么原因?我怎样才能做到这一点?

我敢肯定原因不是由于 Channel 的实现。

我做了什么来弄清楚:

  1. 如果我在 SendTask 中添加一些时间延迟(至少),我会得到预期的顺序,即异步行为。await Task.Delay(1)
  2. 我使用 Stopwatch 类来测量调用 SendAsync 的经过时间,它显示我 0 毫秒。这很奇怪的输出,但我不知道如何解释这种行为。
  3. 查看源 WebSocketClient 代码,以确保 SendAsync 应该需要一些时间(在我的理解中)。
C# WebSocket Net-4.5 通道

评论

0赞 Stephen Cleary 9/16/2023
我怀疑问题根本不在于 Web 套接字,而是由于 await 隐式使用 TaskContinuationOptions.ExecuteSynchronously,正如我在博客中所描述的那样。我建议将所有 websocket 代码替换为 ,如果仍然重现该问题,请发布一个完整的最小示例。await Task.Delay(15)

答:

0赞 Gustavo F 9/13/2023 #1

根据您告知的内容,问题是:

在 ReceiveTask 中,信号量中没有命令 to 只有 .为了使用信号量,所有任务都必须请求输入并通知发布。EnterRelease

当你这样做时,它会起作用,因为它给了时间。await Task.Delay(1)ReceiveTaskRelease

下面是一个模拟网络和操作的示例:SendAsyncReceiveAsync

using System.Text;

class q77091244
{
    static void Main(string[] args)
    {
        //Channel commandQueue = new Channel("command");       
        Channel responseQueue = new Channel();  

        // SendTask
        Task SendTask = Task.Run(async () => 
        {
            Console.WriteLine("SendTask started...");
            for (int i = 0; i < 10; i++)
            {
                CancellationToken cts = new CancellationToken();
                Console.WriteLine("SendTask Waiting");
                var command = await responseQueue.ReadAsync(cts);
                Console.WriteLine("SendTask Sending");
                await Task.Delay(100); //simulates await webSocket.SendAsync...
                //in order to log command sent, push it into response queue.
                Console.WriteLine("SendTask Push");
                responseQueue.Push(command);
            }
        });

        Task ReceiveTask = Task.Run(async () => 
        {
            Console.WriteLine("ReceiveTask started...");
            StringBuilder strBuilder = new StringBuilder();
            var buffer = Encoding.UTF8.GetBytes("item x");

            for (int i = 0; i < 10; i++)
            {
                CancellationToken cts = new CancellationToken();
                Console.WriteLine("ReceiveTask Waiting");
                await responseQueue.WaitAsync(cts);                
                // receive message
                strBuilder.Append(Encoding.UTF8.GetString(buffer, 0, buffer.Length));
                Console.WriteLine("ReceiveTask Receiving");
                await Task.Delay(150); //simultates await webSocket.ReceiveAsync(buffer, recCts);
                Console.WriteLine("ReceiveTask Push");
                responseQueue.Push(strBuilder.ToString());
                strBuilder.Clear();
            }
        });

        Thread.Sleep(10000);

    }
}

通道是相同的,但初始化为 1 以绿色状态启动(更多信息)且不操作:SemaphoreSlimWaitAsyncDequeue

using System.Collections.Concurrent;

public class Channel
{
        private ConcurrentQueue<string> _queue;
        private SemaphoreSlim _flag;

        public Channel()
        {
            _queue = new ConcurrentQueue<string>();
            _flag = new SemaphoreSlim(1);
        }
        
        public async Task WaitAsync(CancellationToken readCancelToken)
        {
            await _flag.WaitAsync(readCancelToken);
        }

        public async Task<string> ReadAsync(CancellationToken readCancelToken)
        {          
            await _flag.WaitAsync(readCancelToken);
            if (readCancelToken.IsCancellationRequested)
            {
                return default;
            }

            string item;
            if (_queue.TryDequeue(out item))
            {
                Console.WriteLine("dequed " + item);
                return item;
            }

            return default;
        }

        public void Push(string item)
        {
            Console.WriteLine("enqueue " + item);
            _queue.Enqueue(item);
            _flag.Release();
        }
    }

输出:

SendTask started...
SendTask Waiting
ReceiveTask started...
SendTask Sending
ReceiveTask Waiting
SendTask Push
enqueue 
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed
SendTask Sending
SendTask Push
enqueue 
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed item x
SendTask Sending
SendTask Push
enqueue item x
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed
SendTask Sending
SendTask Push
enqueue 

评论

0赞 Sergey Skrynnik 9/14/2023
感谢您深入研究此内容。根据您的回答,我有一些疑问: 1) “所有任务都必须请求输入并通知发布。” - 我不确定这是否正确,每个使用一个共享同步资源的任务都不必请求然后在其体内释放。一个任务可以请求(即减少内在计数器),一个任务可以释放(即增加计数器),但也许我没有得到你,我们是一样的
0赞 Sergey Skrynnik 9/14/2023
2)我已经检查了你的代码,添加没有改变任何,在注释掉这个块后,行为保持不变。此外,我的代码适用于 2 个不同的队列,它们具有不同的信号量。设置为 0 的意图是,在队列为空之前,我们无法从队列中读取,一旦它包含任何(计数器将增加),我们就可以读取它(计数器减少)。await responseQueue.WaitAsync(cts);SemaphoreSlim
0赞 Sergey Skrynnik 9/15/2023
我想问题可能是由于.NET web-Socket无法并行执行SendAsync和ReceiveAsync。正因为如此,我观察到所有命令首先被发送(套接字被 SendTask 阻止,只发送所有命令),然后接收所有响应。