提问人:Sergey Skrynnik 提问时间:9/13/2023 最后编辑:Sergey Skrynnik 更新时间:9/13/2023 访问量:62
在不同任务中同时使用 ClientWebSocket、SendAsync 和 ReceiveAsync 方法的意外行为 (.Net Framework)
Unexpected behavior using ClientWebSocket SendAsync and ReceiveAsync methods simultaniously in different tasks (.Net Framework)
问:
我有我的自定义 WebSocket 客户端(使用 System.Net.WebSockets 库)运行良好。在内部,库有 2 个独立的任务:第一个任务是向 Web 套接字服务器发送命令,第二个任务是接收来自服务器的响应。在每个任务中,我都使用“无限循环”方法(因为它们在后台工作),例如:
发送任务:
// Task that sends command through the web socket and write them to response queue.
// Implementation of responseQueue & commandQueue are below.
// CommandQueue has some items at the beggining.
while (webSocket.State != WebSocketState.Closed)
{
var command = await commandQueue.ReadAsync(cts);
await webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, CancellationToken.None);
//in order to log command sent, push it into response queue.
responseQueue.Push(command);
}
接收任务:
//Task that receive responses from the same web socket and push them into response queue as well.
//these variables needs for a multipacket responses
int dataPerPacket = 8 * 1024; //8192
StringBuilder strBuilder = new StringBuilder();
var buffer = new ArraySegment<byte>(new byte[dataPerPacket]);
while (webSocket.State != WebSocketState.Closed)
{
var receiveResult = await webSocket.ReceiveAsync(buffer, recCts);
// receive message
strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
//keep doing until end of message
while (!receiveResult.EndOfMessage)
{
receiveResult = await webSocket.ReceiveAsync(buffer, recCts);
strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
}
responseQueue.Push(strBuilder.ToString());
strBuilder.Clear();
}
我简化了我的代码,但没有丢失任何有价值的东西。websocket 只是在互联网上打开的回显服务器。
因此,对于命令和响应队列,我使用的是通道方法,并实现了自己的自定义类(因为我使用的是.NET Framework 4.5.2)。此类的意图是能够生成(或写入)数据到 SendTask 中的 responseQueue,并有效地使用(或读取)ReceiveTask 中 responseQueue 中的数据,即不阻塞(并避免由于无限循环而消耗 CPU)。
通道的实现:
public class Channel
{
private ConcurrentQueue<string> _queue;
private SemaphoreSlim _flag;
public Channel()
{
_queue = new ConcurrentQueue<string>();
_flag = new SemaphoreSlim(0);
}
public async Task<string> ReadAsync(CancellationToken readCancelToken)
{
await _flag.WaitAsync(readCancelToken);
if (readCancelToken.IsCancellationRequested)
{
return default;
}
if (_queue.TryDequeue(out item))
{
return item;
}
return default;
}
public void Push(string item)
{
_queue.Enqueue(item);
_flag.Release();
}
}
但是,由于某种不清楚的原因,我在工作时有一个意想不到的行为。我希望推送到 responseQueue 的顺序如下:
command 1, command 2, response 1, command 3, response 2,...
即异步行为。当后续的 SendAsync 执行时,我们将接收响应并将响应推送到队列中。但实际上我得到以下订单:
command 1, command 2,.....command 100, response 1, response 2, ....
那么我的问题为什么?出于什么原因?我怎样才能做到这一点?
我敢肯定原因不是由于 Channel 的实现。
我做了什么来弄清楚:
- 如果我在 SendTask 中添加一些时间延迟(至少),我会得到预期的顺序,即异步行为。
await Task.Delay(1)
- 我使用 Stopwatch 类来测量调用 SendAsync 的经过时间,它显示我 0 毫秒。这很奇怪的输出,但我不知道如何解释这种行为。
- 查看源 WebSocketClient 代码,以确保 SendAsync 应该需要一些时间(在我的理解中)。
答:
根据您告知的内容,问题是:
在 ReceiveTask 中,信号量中没有命令 to 只有 .为了使用信号量,所有任务都必须请求输入并通知发布。Enter
Release
当你这样做时,它会起作用,因为它给了时间。await Task.Delay(1)
ReceiveTask
Release
下面是一个模拟网络和操作的示例:SendAsync
ReceiveAsync
using System.Text;
class q77091244
{
static void Main(string[] args)
{
//Channel commandQueue = new Channel("command");
Channel responseQueue = new Channel();
// SendTask
Task SendTask = Task.Run(async () =>
{
Console.WriteLine("SendTask started...");
for (int i = 0; i < 10; i++)
{
CancellationToken cts = new CancellationToken();
Console.WriteLine("SendTask Waiting");
var command = await responseQueue.ReadAsync(cts);
Console.WriteLine("SendTask Sending");
await Task.Delay(100); //simulates await webSocket.SendAsync...
//in order to log command sent, push it into response queue.
Console.WriteLine("SendTask Push");
responseQueue.Push(command);
}
});
Task ReceiveTask = Task.Run(async () =>
{
Console.WriteLine("ReceiveTask started...");
StringBuilder strBuilder = new StringBuilder();
var buffer = Encoding.UTF8.GetBytes("item x");
for (int i = 0; i < 10; i++)
{
CancellationToken cts = new CancellationToken();
Console.WriteLine("ReceiveTask Waiting");
await responseQueue.WaitAsync(cts);
// receive message
strBuilder.Append(Encoding.UTF8.GetString(buffer, 0, buffer.Length));
Console.WriteLine("ReceiveTask Receiving");
await Task.Delay(150); //simultates await webSocket.ReceiveAsync(buffer, recCts);
Console.WriteLine("ReceiveTask Push");
responseQueue.Push(strBuilder.ToString());
strBuilder.Clear();
}
});
Thread.Sleep(10000);
}
}
通道是相同的,但初始化为 1 以绿色状态启动(更多信息)且不操作:SemaphoreSlim
WaitAsync
Dequeue
using System.Collections.Concurrent;
public class Channel
{
private ConcurrentQueue<string> _queue;
private SemaphoreSlim _flag;
public Channel()
{
_queue = new ConcurrentQueue<string>();
_flag = new SemaphoreSlim(1);
}
public async Task WaitAsync(CancellationToken readCancelToken)
{
await _flag.WaitAsync(readCancelToken);
}
public async Task<string> ReadAsync(CancellationToken readCancelToken)
{
await _flag.WaitAsync(readCancelToken);
if (readCancelToken.IsCancellationRequested)
{
return default;
}
string item;
if (_queue.TryDequeue(out item))
{
Console.WriteLine("dequed " + item);
return item;
}
return default;
}
public void Push(string item)
{
Console.WriteLine("enqueue " + item);
_queue.Enqueue(item);
_flag.Release();
}
}
输出:
SendTask started...
SendTask Waiting
ReceiveTask started...
SendTask Sending
ReceiveTask Waiting
SendTask Push
enqueue
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed
SendTask Sending
SendTask Push
enqueue
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed item x
SendTask Sending
SendTask Push
enqueue item x
SendTask Waiting
ReceiveTask Receiving
ReceiveTask Push
enqueue item x
ReceiveTask Waiting
dequed
SendTask Sending
SendTask Push
enqueue
评论
await responseQueue.WaitAsync(cts);
SemaphoreSlim
评论
await
隐式使用TaskContinuationOptions.ExecuteSynchronously
,正如我在博客中所描述的那样。我建议将所有 websocket 代码替换为 ,如果仍然重现该问题,请发布一个完整的最小示例。await Task.Delay(15)