如何使用抽象流从网络流中连续异步读取?

How to read continuously and asynchronously from network stream using abstract stream?

提问人:Péter Szilvási 提问时间:8/15/2022 最后编辑:Péter Szilvási 更新时间:8/20/2022 访问量:1735

问:

描述

我想从或使用他们的抽象父类异步读取。有多种方法可以异步读取流:NetworkStreamSSLStreamStream

  • 异步编程模型 (APM):它使用和操作。BeginReadEndRead
  • 任务并行库 (TPL):它使用并创建任务延续。Task
  • 基于任务的异步模式 (TAP):操作后缀为 Async,可以使用关键字。asyncawait

我最感兴趣的是使用 TAP 模式来实现异步读取操作。下面的代码,异步读取到流的末尾,并以字节数组的形式返回数据:

    internal async Task<byte[]> ReadToEndAsync(Stream stream)
    {
        byte[] buffer = new byte[4096];
        using (MemoryStream memoryStream = new MemoryStream())
        {
            int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            while (bytesRead != 0)
            {
                // Received datas were aggregated to a memory stream.
                await memoryStream.WriteAsync(buffer, 0, bytesRead);
                bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            }

            return memoryStream.ToArray();
        }
    }

缓冲区大小为 4096kB。如果传输的数据大于缓冲区大小,则它将继续读取,直到 0(零)即流的末尾。它适用于 ,但它在使用 或 的操作中无限期挂起。这两个流的行为与其他流不同。FileStreamReadAsyncNetworkStreamSslStream

问题在于,当通信关闭时,网络流只会以 0(零)返回。但是,我不想在每次通过网络传输数据时都关闭通信。ReadAsyncSocket

问题

如何在不关闭通信的情况下避免 的阻塞呼叫?ReadAsyncSocket

C# 套接字 异步 NET-Standard-2.0 NetworkStream

评论

0赞 Jodrell 8/15/2022
如果套接字处于打开状态,但没有可用的数据,则表示与该套接字的连接的流将无限期挂起,直到数据到达、套接字关闭或触发取消。使用 仅阻止等待的任务,允许其他任务继续处理。ReadAsync... await ReadAsync(...)
0赞 Péter Szilvási 8/15/2022
没错。但是我需要当前接收到的数据的结果。因此,我也在等待导致阻塞的 ReadToEndAsync 方法。
1赞 Jodrell 8/15/2022
考虑使用 TPL Dataflow 创建网格,learn.microsoft.com/en-us/dotnet/standard/parallel-programming/...。但是,您不能既等到结束又在结束之前开始处理。您将需要具有中间缓冲的并发任务。
0赞 Péter Szilvási 8/15/2022
@Jodrell 感谢您指出 DataFlow 编程模型。事实证明,它可以与 async 和 await 运算符一起使用 learn.microsoft.com/en-us/dotnet/standard/parallel-programming/...
1赞 Jodrell 8/15/2022
TPL 数据流类型专为处理 TAP async/await 而设计。它们可以使您免于编写一些有风险的管道。我的观点不是关于数据流的适用性,而是需要稍微调整一下你的观点。

答:

3赞 Stephen Cleary 8/15/2022 #1

我想使用其抽象的 Stream 父类从 NetworkStream 或 SSLStream 异步读取。

你真的需要吗?TCP/IP 套接字通信非常复杂,要正确执行。我强烈建议自托管 HTTP API 或类似的东西。

问题在于,当 Socket 通信关闭时,网络流 ReadAsync 只会以 0(零)返回。

是的,您发布的代码仅在读取了整个流时才返回结果。

因此,您需要的是不同的退货条件。具体而言,您需要知道何时从流中读取了一条消息。你需要的是消息框架,正如我的博客中所描述的那样。

我的博客上也有一些示例代码,显示了最简单的消息框架解决方案之一(长度前缀)。请仔细注意最简单解决方案的复杂性和长度,并考虑是否真的要编写套接字级代码。

如果您确实想继续编写套接字应用程序,我建议您观看我关于异步 TCP/IP 套接字编程的 YouTube 系列

评论

0赞 Péter Szilvási 8/16/2022
是的,我们从头开始实现了 TCP/IP 套接字,因此我们不能轻易切换到 HTTP。感谢您的 youtube 视频链接,我希望我早点看到这些视频。我一定会去看看的。
0赞 Péter Szilvási 8/16/2022 #2

正如 Stephen Cleary 博客文章所指出的,有两种方法用于消息框架:长度前缀和分隔符。

  • 长度前缀:消息长度是已知的,因为它是通过网络发送的。
  • 分隔符:消息长度未知。转义由分隔符决定。

下面的代码使用长度前缀,以便将消息长度预置到消息前面。

internal async Task SendAsync(Stream stream, byte[] message)
{
    byte[] messageSize = BitConverter.GetBytes(message.Length);
    await stream.WriteAsync(messageSize, 0, messageSize.Length).ConfigureAwait(false);
    await stream.WriteAsync(message, 0, message.Length).ConfigureAwait(false);
}

发送端首先将消息长度转换为字节数组,然后将其发送给接收方。之所以是 4,是因为转换了有符号的 32 位整数。之后,客户端发送实际消息。接收器端稍微复杂一些:messageSize.Length

internal async Task<byte[]> ReceiveAsync(Stream stream)
{
    byte[] messageBuffer = null;
    byte[] lengthBuffer = new byte[4];

    try
    {
        await stream.ReadAsync(lengthBuffer, 0, lengthBuffer.Length).ConfigureAwait(false);
        int messageLength = BitConverter.ToInt32(lengthBuffer, 0);

        messageBuffer = new byte[messageLength];
        await ReadToEndAsync(stream, messageBuffer, messageLength).ConfigureAwait(false);
    }
    catch (Exception)
    {
        // Error occured during receiving.
        throw;
    }

    return dataBuffer;
}

首先,接收方将消息长度读取到长度为 4 的字节数组中。然后,它将接收到的字节数组转换为有符号值。一旦知道消息长度,就可以读取消息直到最后。Int32

private async Task ReadToEndAsync(Stream stream, byte[] messageBuffer, int messageLength)
{
    int offset = 0;
    while (offset < messageLength)
    {
        int bytesRead = await stream.ReadAsync(messageBuffer, offset, messageLength).ConfigureAwait(false);
        if (bytesRead == 0)
        {
            // Socket is closed.
            break;
        }

        offset += bytesRead;
    }
}

它从流中读取消息,直到到达。如果单次读取操作不够,则它会使用从流中读取的字节数来推进缓冲区的偏移量。messageLength

注意:要防止DOS攻击:

无论是使用长度前缀还是分隔符,都必须包含代码以防止拒绝服务攻击。可以为以长度为前缀的读取器提供巨大的消息大小;在没有分隔符的情况下,可以为分隔器读取器提供大量数据。

评论

0赞 Stephen Cleary 8/16/2022
上面的代码没有正确处理同时读取两条消息的情况。
0赞 Péter Szilvási 8/20/2022
@StephenCleary 没错。答案从分隔符改为长度前缀方法。
0赞 Stephen Cleary 8/21/2022
代码仍然错误。用于读取长度前缀可以返回 0 到 4(含)之间的任何数字,并且该返回值当前被忽略,而不是被正确处理。ReadAsync
0赞 Péter Szilvási 8/22/2022
这个问题只能使用PipeLines处理吗?
1赞 Stephen Cleary 8/22/2022
不,但管道肯定会让它变得更容易。