在托管服务中使用 RX 可观察对象处理批处理事件

Processing batched events with RX Observables in Hosted Service

提问人:mnj 提问时间:11/15/2023 更新时间:11/15/2023 访问量:15

问:

我有以下情况:

public class MyHostedService : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly IEventSource _eventSource;
    private readonly ILogger _logger;

    private IDisposable? _subscription;

    public MyHostedService(
        IServiceProvider serviceProvider,
        IEventSource eventSource,
        ILogger<MyHostedService> logger)
    {
        _serviceProvider = serviceProvider;
        _eventSource = eventSource;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _subscription = _eventSource.CreateEventsObservable()
            .Buffer(TimeSpan.FromSeconds(1), 100)
            .Where(batch => batch.Count is not 0)
            .Select((pes, batchNumber) => Observable.FromAsync(() => ProcessBatch(batchNumber, pes, stoppingToken)))
            .Concat()
            .Subscribe(
                onNext: (u) => { },
                onError: (e) => _logger.LogSubscriptionFailure(e),
                onCompleted: () => _logger.LogCompletedSubscription()
            );
    }

    private async Task ProcessBatch(
        int batchNumber, 
        IList<MyEvent> events,
         CancellationToken cancellationToken)
    {
        try
        {
            using var scope = _serviceProvider.CreateScope();
            var processor = scope.ServiceProvider.GetRequiredService<IProcessor>();
            await processor.Process(events, cancellationToken);

            _logger.LogProcessedBatch(batchNumber, events.Count);
        }
        catch (Exception e) when (e is not OperationCanceledException)
        {
            _logger.LogBatchError(e, batchNumber, events.Count);
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken) => await base.StopAsync(stoppingToken);

    public override void Dispose()
    {
        _subscription?.Dispose();
        base.Dispose();
    }
}

有几件事困扰着我:

  1. 我注意到处理有点慢 - 即使源 observable () 产生大量事件,批处理也不会太快。我想知道我是否可以提高这里的吞吐量。也许应该在内部使用来运行处理器?_eventSourceProcessBatchTask.Run
  2. 有很多活动。由于有一些暂时性依赖关系(如 http 客户端类型),我认为我不应该在应用程序的整个生命周期中注入和使用该实例。相反,每次处理批次时,我都会创建一个范围。性能好吗?会更好吗?IProcessorIProcessor
C# 依赖注入 可观察 System.Reactive

评论

0赞 Prolog 11/20/2023
批处理的创建频率如何?或者您每分钟/每小时处理多少批次?如果问题是性能,那么我需要您定义当前速度以及所需的水平。您能大致描述一下处理器的作用吗?单个处理器执行平均需要多长时间?
0赞 mnj 11/22/2023
每秒有数千个事件,肯定比这个应用程序可以处理的要多(因此我正在运行它的一些副本)。对于每个批次,都需要从外部 API 获取一堆数据。这些数据被粘合到文档批处理中,然后插入到MongoDB中。处理单个批处理需要 ~ 100 毫秒-500 毫秒,因为涉及一些网络请求。

答:

1赞 Enigmativity 11/21/2023 #1

最好有一个完整的 Rx 解决方案。打电话给你不是最好的方法。SubscribeExecuteAsync

你知道,你可以等待一个可观察的对象。

protected override async Task ExecuteAsync(CancellationToken stoppingToken) =>
    await
        Observable
            .Using(
                () => _serviceProvider.CreateScope(),
                scope =>
                    from es in _eventSource.CreateEventsObservable().Buffer(TimeSpan.FromSeconds(1), 100)
                    where es.Count > 0
                    let processor = scope.ServiceProvider.GetRequiredService<IProcessor>()
                    from r in Observable.FromAsync(ct => processor.Process(es, ct))
                    select r)
            .Do(u => { }, (e) => _logger.LogSubscriptionFailure(e), () => _logger.LogCompletedSubscription())
            .LastAsync()
            .TakeUntil(stoppingToken.ToObservable());

差不多了。不完全是您的原始查询,但它应该执行相同的处理,并且只创建一个范围。

您需要此扩展方法:

public static class ObservableEx2
{
    public static IObservable<Unit> ToObservable(this CancellationToken ct) =>
        Observable.Create<Unit>(observer => ct.Register(() =>
        {
            observer.OnNext(Unit.Default);
            observer.OnCompleted();
        }));
}

评论

0赞 mnj 11/22/2023
你能解释一下为什么它更好吗?它是为每个批次创建,还是在应用程序的整个生命周期中只创建一个?IProcessorIProcessor
0赞 Enigmativity 11/22/2023
@mnj - 它调用每个批次,但这只是因为我希望它位于带有关键字的可观察对象中。我本可以放入一个代码块并在可观察对象之外创建它(但仍然在内部,以便真正只制作一次。.GetRequiredService<IProcessor>()letUsing