提问人:mnj 提问时间:11/15/2023 更新时间:11/15/2023 访问量:15
在托管服务中使用 RX 可观察对象处理批处理事件
Processing batched events with RX Observables in Hosted Service
问:
我有以下情况:
public class MyHostedService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly IEventSource _eventSource;
private readonly ILogger _logger;
private IDisposable? _subscription;
public MyHostedService(
IServiceProvider serviceProvider,
IEventSource eventSource,
ILogger<MyHostedService> logger)
{
_serviceProvider = serviceProvider;
_eventSource = eventSource;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_subscription = _eventSource.CreateEventsObservable()
.Buffer(TimeSpan.FromSeconds(1), 100)
.Where(batch => batch.Count is not 0)
.Select((pes, batchNumber) => Observable.FromAsync(() => ProcessBatch(batchNumber, pes, stoppingToken)))
.Concat()
.Subscribe(
onNext: (u) => { },
onError: (e) => _logger.LogSubscriptionFailure(e),
onCompleted: () => _logger.LogCompletedSubscription()
);
}
private async Task ProcessBatch(
int batchNumber,
IList<MyEvent> events,
CancellationToken cancellationToken)
{
try
{
using var scope = _serviceProvider.CreateScope();
var processor = scope.ServiceProvider.GetRequiredService<IProcessor>();
await processor.Process(events, cancellationToken);
_logger.LogProcessedBatch(batchNumber, events.Count);
}
catch (Exception e) when (e is not OperationCanceledException)
{
_logger.LogBatchError(e, batchNumber, events.Count);
}
}
public override async Task StopAsync(CancellationToken stoppingToken) => await base.StopAsync(stoppingToken);
public override void Dispose()
{
_subscription?.Dispose();
base.Dispose();
}
}
有几件事困扰着我:
- 我注意到处理有点慢 - 即使源 observable () 产生大量事件,批处理也不会太快。我想知道我是否可以提高这里的吞吐量。也许应该在内部使用来运行处理器?
_eventSource
ProcessBatch
Task.Run
- 有很多活动。由于有一些暂时性依赖关系(如 http 客户端类型),我认为我不应该在应用程序的整个生命周期中注入和使用该实例。相反,每次处理批次时,我都会创建一个范围。性能好吗?会更好吗?
IProcessor
IProcessor
答:
1赞
Enigmativity
11/21/2023
#1
最好有一个完整的 Rx 解决方案。打电话给你不是最好的方法。Subscribe
ExecuteAsync
你知道,你可以等待一个可观察的对象。
protected override async Task ExecuteAsync(CancellationToken stoppingToken) =>
await
Observable
.Using(
() => _serviceProvider.CreateScope(),
scope =>
from es in _eventSource.CreateEventsObservable().Buffer(TimeSpan.FromSeconds(1), 100)
where es.Count > 0
let processor = scope.ServiceProvider.GetRequiredService<IProcessor>()
from r in Observable.FromAsync(ct => processor.Process(es, ct))
select r)
.Do(u => { }, (e) => _logger.LogSubscriptionFailure(e), () => _logger.LogCompletedSubscription())
.LastAsync()
.TakeUntil(stoppingToken.ToObservable());
差不多了。不完全是您的原始查询,但它应该执行相同的处理,并且只创建一个范围。
您需要此扩展方法:
public static class ObservableEx2
{
public static IObservable<Unit> ToObservable(this CancellationToken ct) =>
Observable.Create<Unit>(observer => ct.Register(() =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
}));
}
评论
0赞
mnj
11/22/2023
你能解释一下为什么它更好吗?它是为每个批次创建,还是在应用程序的整个生命周期中只创建一个?IProcessor
IProcessor
0赞
Enigmativity
11/22/2023
@mnj - 它调用每个批次,但这只是因为我希望它位于带有关键字的可观察对象中。我本可以放入一个代码块并在可观察对象之外创建它(但仍然在内部,以便真正只制作一次。.GetRequiredService<IProcessor>()
let
Using
评论