提问人:nop 提问时间:4/21/2022 更新时间:3/7/2023 访问量:589
创建具有多个段的 ReadOnlySequence
Creating a ReadOnlySequence with multiple segments
问:
如何创建具有多个区段的?在下面的代码中,我使用了 MessagePack 中的 Sequence 类,但一旦我实现了我需要的内容,我就使用哪一个并不重要。ReadOnlySequence
我需要模拟多个段才能测试 SendLoopAsync 方法。由于某种原因,SendAsync 的 Sequence 输出单个段,即使我向序列写入了两次。
public ValueTask SendAsync(string message)
{
// TODO: Simulate multiple segments
var bytes = Encoding.UTF8.GetBytes(message);
var first = bytes.Take(128).ToArray();
var second = bytes.Skip(128).ToArray();
var payload = new Sequence<byte>(); // should be disposed later on
payload.Write(first);
payload.Write(second);
return _output.Writer.WriteAsync(new Message
{
MessageType = WebSocketMessageType.Text,
Payload = payload
}, CancellationToken.None);
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
while (await _output.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (_output.Reader.TryRead(out var message))
{
var sequence = message.Sequence.AsReadOnlySequence;
if (sequence.IsEmpty)
{
continue;
}
while (!sequence.IsSingleSegment)
{
await _clientWebSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken).ConfigureAwait(false);
sequence = sequence.Slice(sequence.First.Length);
}
await _clientWebSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken).ConfigureAwait(false);
message.Sequence.Dispose();
}
}
}
答:
2赞
Prince Owen
1/31/2023
#1
我写了一个允许您将多个链接在一起。它隐式转换为,因此它应该很容易集成到任何项目中。class
ReadOnlySequence
ReadOnlySequence
每次附加 或 时,它都会创建一个新段 ()。ReadonlySequence
Memory
ReadOnlyChunk<T>
public class ChunkedSequence<T>
{
private ReadOnlyChunk<T> _first;
private ReadOnlyChunk<T> _current;
private bool _changed;
private ReadOnlySequence<T>? _sequence;
public ChunkedSequence()
{
_first = _current = null;
_sequence = null;
_changed = false;
}
public ChunkedSequence(ReadOnlySequence<T> sequence) : this()
{
Append(sequence);
}
public void Append(ReadOnlySequence<T> sequence)
{
SequencePosition pos = sequence.Start;
while (sequence.TryGet(ref pos, out ReadOnlyMemory<T> mem, true))
{
Append(mem);
}
}
public void Append(ReadOnlyMemory<T> memory)
{
if (_current == null)
{
_first = _current = new ReadOnlyChunk<T>(memory);
}
else
{
_current = _current.Append(memory);
}
_changed = true;
}
internal ReadOnlySequence<T> GetSequence()
{
if (_changed)
{
_sequence = new ReadOnlySequence<T>(_first, 0, _current, _current.Memory.Length);
}
else _sequence ??= new ReadOnlySequence<T>();
return _sequence.Value;
}
public static implicit operator ReadOnlySequence<T>(ChunkedSequence<T> sequence)
{
return sequence.GetSequence();
}
private sealed class ReadOnlyChunk<_T> : ReadOnlySequenceSegment<_T>
{
public ReadOnlyChunk(ReadOnlyMemory<_T> memory)
{
Memory = memory;
}
public ReadOnlyChunk<_T> Append(ReadOnlyMemory<_T> memory)
{
ReadOnlyChunk<_T> nextChunk = new ReadOnlyChunk<_T>(memory)
{
RunningIndex = RunningIndex + Memory.Length
};
Next = nextChunk;
return nextChunk;
}
}
}
评论