提问人:0xSingularity 提问时间:6/3/2022 更新时间:6/3/2022 访问量:258
C# 加快从响应流到 ViewStream 的写入速度
C# Speed Up Writing from Response Stream to ViewStream
问:
我有这个代码,它正在将文件异步拆分为多个部分,并使用 HTTP 内容范围下载它们。然后,它将下载的数据写入内存映射文件上的 ViewStream。我目前正在从响应流读取到缓冲区中,然后将缓冲区中的所有数据写入 ViewStream。有没有更有效/更快的方法可以做到这一点?我并不真正关心内存使用,但我正在努力最大限度地提高速度。Pieces 是一个列表,其中包含指示文件片段的 (Start, End) 值元组,而 httpPool 是一个对象池,其中包含一堆预配置的 HTTP 客户端。非常感谢任何帮助,谢谢!
await Parallel.ForEachAsync(pieces,
new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
async (piece, cancellationToken) =>
{
//Get a http client from the pool and request for the content range
var client = httpPool.Get();
var request = new HttpRequestMessage { RequestUri = new Uri(url) };
request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);
//Request headers so we dont cache the file into memory
if (client != null)
{
var message = await client.SendAsync(request,HttpCompletionOption.ResponseHeadersRead,cancellationToken).ConfigureAwait(false);
if (message.IsSuccessStatusCode)
{
//Get the content stream from the message request
using (var streamToRead = await message.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
{
//Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
using (var streams = mmf.CreateViewStream(piece.Item1,message.Content.Headers.ContentLength!.Value,MemoryMappedFileAccess.Write))
{
//Copy from the content stream to the mmf stream
var buffer = new byte[bufferSize];
int offset, bytesRead;
// Until we've read everything
do
{
offset = 0;
// Until the buffer is very nearly full or there's nothing left to read
do
{
bytesRead = await streamToRead.ReadAsync(buffer.AsMemory(offset, bufferSize - offset),cancellationToken);
offset += bytesRead;
} while (bytesRead != 0 && offset < bufferSize);
// Empty the buffer
if (offset != 0)
{
await streams.WriteAsync(buffer.AsMemory(0, offset),cancellationToken);
}
} while (bytesRead != 0);
streams.Flush();
streams.Close();
}
streamToRead.Close();
}
}
message.Content.Dispose();
message.Dispose();
}
request.Dispose();
httpPool.Return(client);
});
答:
1赞
Petrusion
6/3/2022
#1
我不知道它会有多大帮助,但我试着做点什么。它的效果如何?
我还做了一些重构,所以这里有一些注意事项:
- 如果您已经有块或语句,请不要调用或手动调用。它所做的只是给你的代码增加噪音,让任何阅读它的人都感到困惑。事实上,几乎从不打电话或手动。
.Close()
.Dispose()
using
using
.Close()
.Dispose()
- 您是否意识到如果方法中发生任何异常,将永远不会返回到池中?您需要在块中或使用在其实现中返回池的结构来执行这些操作。(此外,如果发生任何异常,则不会在方法中处理,请添加
client
finally
IDisposable
client
Dispose()
request
using
) - 如果可能,请首选提前返回的 if 语句,而不是包装方法其余部分的语句。后者难以阅读和维护。
- 您并没有真正从并行中受益,因为 99% 的方法都在异步等待 IO。改用即可。
Task.WhenAll()
- 我摆脱了自定义缓冲/复制,只是调用了接受 .它应该可能有助于性能,可能。我认为它必须比最简单的缓冲器更好地优化。
CopyToAsync()
message.Content
Stream
法典:
await Task.WhenAll(pieces.Select(p => DownloadToMemoryMappedFile(p)));
// change the piece type from dynamic to what you need
async Task DownloadToMemoryMappedFile(dynamic piece, CancellationToken cancellationToken = default)
{
//Get a http client from the pool and request for the content range
var client = httpPool.Get();
try
{
using var request = new HttpRequestMessage { RequestUri = new Uri(url) };
//Request headers so we dont cache the file into memory
request.Headers.Range = new RangeHeaderValue(piece.Item1, piece.Item2);
if (client is null)
return;
using var message = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
if (!message.IsSuccessStatusCode)
return;
//Create a memory mapped stream to the mmf with the piece offset and size equal to the response size
using var streams = mmf.CreateViewStream(piece.Item1, message.Content.Headers.ContentLength!.Value, MemoryMappedFileAccess.Write);
await message.Content.CopyToAsync(streams).ConfigureAwait(false);
}
finally
{
httpPool.Return(client);
}
}
评论