创建具有多个段的 ReadOnlySequence

Creating a ReadOnlySequence with multiple segments

提问人:nop 提问时间:4/21/2022 更新时间:3/7/2023 访问量:589

问:

如何创建具有多个区段的?在下面的代码中,我使用了 MessagePack 中的 Sequence 类,但一旦我实现了我需要的内容,我就使用哪一个并不重要。ReadOnlySequence

我需要模拟多个段才能测试 SendLoopAsync 方法。由于某种原因,SendAsync 的 Sequence 输出单个段,即使我向序列写入了两次。

public ValueTask SendAsync(string message)
{
    // TODO: Simulate multiple segments
    var bytes = Encoding.UTF8.GetBytes(message);

    var first = bytes.Take(128).ToArray();
    var second = bytes.Skip(128).ToArray();

    var payload = new Sequence<byte>(); // should be disposed later on
    payload.Write(first);
    payload.Write(second);

    return _output.Writer.WriteAsync(new Message
    {
        MessageType = WebSocketMessageType.Text,
        Payload = payload
    }, CancellationToken.None);
}

private async Task SendLoopAsync(CancellationToken cancellationToken)
{
    while (await _output.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
    {
        while (_output.Reader.TryRead(out var message))
        {
            var sequence = message.Sequence.AsReadOnlySequence;

            if (sequence.IsEmpty)
            {
                continue;
            }

            while (!sequence.IsSingleSegment)
            {
                await _clientWebSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken).ConfigureAwait(false);
                sequence = sequence.Slice(sequence.First.Length);
            }

            await _clientWebSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken).ConfigureAwait(false);
            message.Sequence.Dispose();
        }
    }
}
C# .NET .net-core 序列

评论

2赞 Ralph Tee 5/12/2022
此链接简明扼要地解释了所有内容:stevejgordon.co.uk/...
0赞 nop 5/13/2022
@RalphTee,谢谢!我不知道我是怎么错过那篇文章的,但它是一篇好文章!

答:

2赞 Prince Owen 1/31/2023 #1

我写了一个允许您将多个链接在一起。它隐式转换为,因此它应该很容易集成到任何项目中。classReadOnlySequenceReadOnlySequence

每次附加 或 时,它都会创建一个新段 ()。ReadonlySequenceMemoryReadOnlyChunk<T>

public class ChunkedSequence<T>
{
    private ReadOnlyChunk<T> _first;
    private ReadOnlyChunk<T> _current;

    private bool _changed;
    private ReadOnlySequence<T>? _sequence;

    public ChunkedSequence()
    {
        _first = _current = null;
        _sequence = null;
        _changed = false;
    }

    public ChunkedSequence(ReadOnlySequence<T> sequence) : this()
    {
        Append(sequence);
    }

    public void Append(ReadOnlySequence<T> sequence)
    {
        SequencePosition pos = sequence.Start;
        while (sequence.TryGet(ref pos, out ReadOnlyMemory<T> mem, true))
        {
            Append(mem);
        }
    }

    public void Append(ReadOnlyMemory<T> memory)
    {
        if (_current == null)
        {
            _first = _current = new ReadOnlyChunk<T>(memory);
        }
        else
        {
            _current = _current.Append(memory);
        }

        _changed = true;
    }

    internal ReadOnlySequence<T> GetSequence()
    {
        if (_changed)
        {
            _sequence = new ReadOnlySequence<T>(_first, 0, _current, _current.Memory.Length);
        }
        else _sequence ??= new ReadOnlySequence<T>();

        return _sequence.Value;
    }

    public static implicit operator ReadOnlySequence<T>(ChunkedSequence<T> sequence)
    {
        return sequence.GetSequence();
    }

    private sealed class ReadOnlyChunk<_T> : ReadOnlySequenceSegment<_T>
    {
        public ReadOnlyChunk(ReadOnlyMemory<_T> memory)
        {
            Memory = memory;
        }

        public ReadOnlyChunk<_T> Append(ReadOnlyMemory<_T> memory)
        {
            ReadOnlyChunk<_T> nextChunk = new ReadOnlyChunk<_T>(memory)
            {
                RunningIndex = RunningIndex + Memory.Length
            };

            Next = nextChunk;
            return nextChunk;
        }
    }
}