C# 加快从响应流到 ViewStream 的写入速度

C# Speed Up Writing from Response Stream to ViewStream

提问人:0xSingularity 提问时间:6/3/2022 更新时间:6/3/2022 访问量:258

问:

我有这个代码,它正在将文件异步拆分为多个部分,并使用 HTTP 内容范围下载它们。然后,它将下载的数据写入内存映射文件上的 ViewStream。我目前正在从响应流读取到缓冲区中,然后将缓冲区中的所有数据写入 ViewStream。有没有更有效/更快的方法可以做到这一点?我并不真正关心内存使用,但我正在努力最大限度地提高速度。Pieces 是一个列表,其中包含指示文件片段的 (Start, End) 值元组,而 httpPool 是一个对象池,其中包含一堆预配置的 HTTP 客户端。非常感谢任何帮助,谢谢!

await Parallel.ForEachAsync(pieces,
                        new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
                        async (piece, cancellationToken) =>
                        {
                            //Get a http client from the pool and request for the content range
                            var client = httpPool.Get();
                            var request = new HttpRequestMessage { RequestUri = new Uri(url) };
                            request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);
                            
                            //Request headers so we dont cache the file into memory
                            if (client != null)
                            {
                                var message = await client.SendAsync(request,HttpCompletionOption.ResponseHeadersRead,cancellationToken).ConfigureAwait(false);

                                if (message.IsSuccessStatusCode)
                                {
                                    //Get the content stream from the message request
                                    using (var streamToRead = await message.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
                                    {
                                        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
                                        using (var streams = mmf.CreateViewStream(piece.Item1,message.Content.Headers.ContentLength!.Value,MemoryMappedFileAccess.Write))
                                        {
                                            //Copy from the content stream to the mmf stream
                                            var buffer = new byte[bufferSize];
                                            int offset, bytesRead;
                                            // Until we've read everything
                                            do
                                            {
                                                offset = 0;
                                                // Until the buffer is very nearly full or there's nothing left to read
                                                do
                                                {
                                                    bytesRead = await streamToRead.ReadAsync(buffer.AsMemory(offset, bufferSize - offset),cancellationToken);
                                                    offset += bytesRead;
                                                } while (bytesRead != 0 && offset < bufferSize);

                                                // Empty the buffer
                                                if (offset != 0)
                                                {
                                                    await streams.WriteAsync(buffer.AsMemory(0, offset),cancellationToken);
                                                }
                                            } while (bytesRead != 0);

                                            streams.Flush();
                                            streams.Close();
                                        }

                                        streamToRead.Close();
                                    }
                                }

                                message.Content.Dispose();
                                message.Dispose();
                            }

                            request.Dispose();
                            httpPool.Return(client);
                        });
C# 异步 网络 async-await io

评论

0赞 musium 6/3/2022
您可以尝试使用 learn.microsoft.com/en-us/dotnet/api/...,而不是创建新的缓冲区
0赞 0xSingularity 6/3/2022
@musium 谢谢!没想到这个。我试图弄清楚如何在 annon 函数中使用 StackAlloc 哈哈
0赞 Petrusion 6/3/2022
@gregyjames分配缓冲区一次不会对方法的性能产生影响。我相信这不是瓶颈。我想使用池缓冲区很好,但在这种情况下,它几乎不会做任何事情。此外,使用堆栈内存,这甚至没有任何意义,因为在每次等待后,该方法可能很容易由不同的线程处理,这对您没有帮助。堆栈内存并不比堆内存快。

答:

1赞 Petrusion 6/3/2022 #1

我不知道它会有多大帮助,但我试着做点什么。它的效果如何?

我还做了一些重构,所以这里有一些注意事项:

  • 如果您已经有块或语句,请不要调用或手动调用。它所做的只是给你的代码增加噪音,让任何阅读它的人都感到困惑。事实上,几乎从不打电话或手动。.Close().Dispose()usingusing.Close().Dispose()
  • 您是否意识到如果方法中发生任何异常,将永远不会返回到池中?您需要在块中或使用在其实现中返回池的结构来执行这些操作。(此外,如果发生任何异常,则不会在方法中处理,请添加clientfinallyIDisposableclientDispose()requestusing)
  • 如果可能,请首选提前返回的 if 语句,而不是包装方法其余部分的语句。后者难以阅读和维护。
  • 您并没有真正从并行中受益,因为 99% 的方法都在异步等待 IO。改用即可。Task.WhenAll()
  • 我摆脱了自定义缓冲/复制,只是调用了接受 .它应该可能有助于性能,可能。我认为它必须比最简单的缓冲器更好地优化。CopyToAsync()message.ContentStream

法典:

await Task.WhenAll(pieces.Select(p => DownloadToMemoryMappedFile(p)));

// change the piece type from dynamic to what you need
async Task DownloadToMemoryMappedFile(dynamic piece, CancellationToken cancellationToken = default)
{
    //Get a http client from the pool and request for the content range
    var client = httpPool.Get();

    try
    {
        using var request = new HttpRequestMessage { RequestUri = new Uri(url) };

        //Request headers so we dont cache the file into memory
        request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);

        if (client is null)
        return;

        using var message = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

        if (!message.IsSuccessStatusCode)
            return;

        //Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
        using var streams = mmf.CreateViewStream(piece.Item1, message.Content.Headers.ContentLength!.Value, MemoryMappedFileAccess.Write);

        await message.Content.CopyToAsync(streams).ConfigureAwait(false);
    }
    finally
    {
        httpPool.Return(client);
    }
}