如何在自定义 ReadOnlySequence 实现结束时修剪零?

How do I trim zeros at the end of a custom ReadOnlySequence implementation?

提问人:nop 提问时间:4/20/2022 更新时间:4/20/2022 访问量:113

问:

Web 套接字服务器需要 JSON 字符串。 发送。问题是尾随零。由于尾随零,它会导致无效的 JSON 消息。问题是我该如何修剪它们?_webSocket.SendAsyncReadOnlyMemory<byte>sequence.First

用法

var request = new JsonRpcRequest<object>
{
    JsonRpc = "2.0",
    Id = 1,
    Method = "public/get_instruments",
    Params = @params
};

var message = JsonSerializer.Serialize(request);

using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);

var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);

var msg = new ChannelWebSocket.Message
{
    MessageType = WebSocketMessageType.Text,
    Payload = seq
};

await client.Output.WriteAsync(msg).ConfigureAwait(false);

法典

private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
    await foreach (var message in _output.Reader.ReadAllAsync())
    {
        var sequence = message.Payload.ReadOnlySequence;
        if (sequence.IsEmpty)
            continue;

        while (!sequence.IsSingleSegment)
        {
            await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
            sequence = sequence.Slice(sequence.First.Length);
        }

        await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
        message.Payload.Dispose();
    }

    await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}

public sealed class Message
{
    public WebSocketMessageType MessageType { get; set; }
    public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}

public sealed class OwnedMemorySequence<T> : IDisposable
{
    private readonly CollectionDisposable _disposable = new();
    private readonly MemorySequence<T> _sequence = new();

    public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;

    public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
    {
        _disposable.Add(memoryOwner);
        _sequence.Append(memoryOwner.Memory);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
    }

    public void Dispose()
    {
        _disposable.Dispose();
    }
}

public static class MemoryOwnerSliceExtensions
{
    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
    {
        if (start == 0 && length == owner.Memory.Length)
            return owner;
        return new SliceOwner<T>(owner, start, length);
    }

    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
    {
        if (start == 0)
            return owner;
        return new SliceOwner<T>(owner, start);
    }

    private sealed class SliceOwner<T> : IMemoryOwner<T>
    {
        private readonly IMemoryOwner<T> _owner;

        public SliceOwner(IMemoryOwner<T> owner, int start, int length)
        {
            _owner = owner;
            Memory = _owner.Memory.Slice(start, length);
        }

        public SliceOwner(IMemoryOwner<T> owner, int start)
        {
            _owner = owner;
            Memory = _owner.Memory[start..];
        }

        public Memory<T> Memory { get; }

        public void Dispose()
        {
            _owner.Dispose();
        }
    }
}

public sealed class MemorySequence<T>
{
    private MemorySegment? _head;
    private MemorySegment? _tail;

    public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);

    public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
    {
        if (_tail == null)
            _head = _tail = new MemorySegment(buffer, 0);
        else
            _tail = _tail.Append(buffer);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
    }

    private sealed class MemorySegment : ReadOnlySequenceSegment<T>
    {
        public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
        {
            Memory = memory;
            RunningIndex = runningIndex;
        }

        public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
        {
            var next = new MemorySegment(nextMemory, RunningIndex + Memory.Length);
            Next = next;
            return next;
        }
    }
}
C# .NET 序列

评论


答:

2赞 TakingTheKnifeOut 4/20/2022 #1

你不需要修剪。你只需要相应地切片你的记忆。

注意(引用文档)返回“[...]一个能够至少容纳 T 的 minBufferSize 元素的内存块。这里重要的位是“至少”,这意味着允许返回的内存大于请求的大小。MemoryPool<T>.Rent

您需要做的就是在将请求的大小添加到 OwnedMemorySequence<T>_sequence 成员之前,如果它恰好大于请求的大小,则从内存中创建一个切片。

评论

0赞 nop 4/20/2022
那么我该怎么做呢?
0赞 nop 4/21/2022
明白了。.seq.Append(buffer.Slice(0, bytesCount));