提问人:Péter Szilvási 提问时间:8/15/2022 最后编辑:Péter Szilvási 更新时间:8/20/2022 访问量:1735
如何使用抽象流从网络流中连续异步读取?
How to read continuously and asynchronously from network stream using abstract stream?
问:
描述
我想从或使用他们的抽象父类异步读取。有多种方法可以异步读取流:NetworkStream
SSLStream
Stream
- 异步编程模型 (APM):它使用和操作。
BeginRead
EndRead
- 任务并行库 (TPL):它使用并创建任务延续。
Task
- 基于任务的异步模式 (TAP):操作后缀为 Async,可以使用关键字。
async
await
我最感兴趣的是使用 TAP 模式来实现异步读取操作。下面的代码,异步读取到流的末尾,并以字节数组的形式返回数据:
internal async Task<byte[]> ReadToEndAsync(Stream stream)
{
byte[] buffer = new byte[4096];
using (MemoryStream memoryStream = new MemoryStream())
{
int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
while (bytesRead != 0)
{
// Received datas were aggregated to a memory stream.
await memoryStream.WriteAsync(buffer, 0, bytesRead);
bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
}
return memoryStream.ToArray();
}
}
缓冲区大小为 4096kB。如果传输的数据大于缓冲区大小,则它将继续读取,直到 0(零)即流的末尾。它适用于 ,但它在使用 或 的操作中无限期挂起。这两个流的行为与其他流不同。FileStream
ReadAsync
NetworkStream
SslStream
问题在于,当通信关闭时,网络流只会以 0(零)返回。但是,我不想在每次通过网络传输数据时都关闭通信。ReadAsync
Socket
问题
如何在不关闭通信的情况下避免 的阻塞呼叫?ReadAsync
Socket
答:
我想使用其抽象的 Stream 父类从 NetworkStream 或 SSLStream 异步读取。
你真的需要吗?TCP/IP 套接字通信非常复杂,要正确执行。我强烈建议自托管 HTTP API 或类似的东西。
问题在于,当 Socket 通信关闭时,网络流 ReadAsync 只会以 0(零)返回。
是的,您发布的代码仅在读取了整个流时才返回结果。
因此,您需要的是不同的退货条件。具体而言,您需要知道何时从流中读取了一条消息。你需要的是消息框架,正如我的博客中所描述的那样。
我的博客上也有一些示例代码,显示了最简单的消息框架解决方案之一(长度前缀)。请仔细注意最简单解决方案的复杂性和长度,并考虑是否真的要编写套接字级代码。
如果您确实想继续编写套接字应用程序,我建议您观看我关于异步 TCP/IP 套接字编程的 YouTube 系列。
评论
正如 Stephen Cleary 博客文章所指出的,有两种方法用于消息框架:长度前缀和分隔符。
- 长度前缀:消息长度是已知的,因为它是通过网络发送的。
- 分隔符:消息长度未知。转义由分隔符决定。
下面的代码使用长度前缀,以便将消息长度预置到消息前面。
internal async Task SendAsync(Stream stream, byte[] message)
{
byte[] messageSize = BitConverter.GetBytes(message.Length);
await stream.WriteAsync(messageSize, 0, messageSize.Length).ConfigureAwait(false);
await stream.WriteAsync(message, 0, message.Length).ConfigureAwait(false);
}
发送端首先将消息长度转换为字节数组,然后将其发送给接收方。之所以是 4,是因为转换了有符号的 32 位整数。之后,客户端发送实际消息。接收器端稍微复杂一些:messageSize.Length
internal async Task<byte[]> ReceiveAsync(Stream stream)
{
byte[] messageBuffer = null;
byte[] lengthBuffer = new byte[4];
try
{
await stream.ReadAsync(lengthBuffer, 0, lengthBuffer.Length).ConfigureAwait(false);
int messageLength = BitConverter.ToInt32(lengthBuffer, 0);
messageBuffer = new byte[messageLength];
await ReadToEndAsync(stream, messageBuffer, messageLength).ConfigureAwait(false);
}
catch (Exception)
{
// Error occured during receiving.
throw;
}
return dataBuffer;
}
首先,接收方将消息长度读取到长度为 4 的字节数组中。然后,它将接收到的字节数组转换为有符号值。一旦知道消息长度,就可以读取消息直到最后。Int32
private async Task ReadToEndAsync(Stream stream, byte[] messageBuffer, int messageLength)
{
int offset = 0;
while (offset < messageLength)
{
int bytesRead = await stream.ReadAsync(messageBuffer, offset, messageLength).ConfigureAwait(false);
if (bytesRead == 0)
{
// Socket is closed.
break;
}
offset += bytesRead;
}
}
它从流中读取消息,直到到达。如果单次读取操作不够,则它会使用从流中读取的字节数来推进缓冲区的偏移量。messageLength
注意:要防止DOS攻击:
无论是使用长度前缀还是分隔符,都必须包含代码以防止拒绝服务攻击。可以为以长度为前缀的读取器提供巨大的消息大小;在没有分隔符的情况下,可以为分隔器读取器提供大量数据。
评论
ReadAsync
评论
ReadAsync
... await ReadAsync(...)