提问人:kra 提问时间:5/6/2019 最后编辑:kra 更新时间:5/9/2019 访问量:1625
RX:如何等待订阅者完成?
RX: How to wait for subscribers to finish?
问:
我有一个生产者,它从 Rest API 下载页面中的数据,以及几个处理页面的消费者(例如,将它们加载到数据库)。
我希望生产者和消费者并行工作,这意味着生产者不应该等待一个页面被消费后再下载下一个页面。每个使用者都需要按顺序处理页面。
下载所有页面后,主线程应等待所有使用者完成他们的工作(因为消费可能需要比生产更长的时间)。
我目前的做法如下:
我创建了一个下载页面的可观察对象,该可观察对象在附加消费者订阅者后立即启动。我将订阅者配置为观察 他们自己的线程用于并行执行。
C# 中的代码:
IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume1(page));
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume2(page));
observable.Connect();
这种实现的问题在于,主线程可能会在处理完所有页面和应用程序停止之前完成。
如何使用RX实现这一点?
谢谢!
编辑:
还尝试了以下方法(来自答案):
static void Main(string[] args)
{
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.Select(p => Observable.Start(() => consume1(p), els1)),
ps.Select(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
}
public static void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public static void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
观察。Wait() 在基础可枚举完成生成值后立即返回。输出为:
1:0
2:0
只是为了证明,如果我们将 getPages 替换为:
var getPages = Enumerable.Range(0, 10)
.Select(i =>
{
Console.WriteLine($"Produced {i}");
Thread.Sleep(30);
return i;
});
则输出为:
Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9
答:
2赞
Enigmativity
5/6/2019
#1
我认为这可以满足您的需求:
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
我写了这个测试代码:
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
public void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
我得到了这个输出:
1:0 2:0 2:1 1:1 2:2 2:3 1:2 2:4 2:5 1:3 2:6 2:7 1:4 2:8 2:9 1:5 1:6 1:7 1:8 1:9
完成实例后,应调用它们来关闭线程。EventLoopScheduler
.Dispose()
评论
0赞
kra
5/7/2019
谢谢。不幸的是,Wait() 不会等待消费者完成,并且页面不会按顺序处理。
0赞
kra
5/8/2019
还有其他想法吗?这似乎是一个非常基本的用例,尽管我无法提出解决方案。
0赞
kra
5/8/2019
不幸的是,在我的情况下不是 - 我的输出只显示前两行。如果你增加睡眠时间,也许这对你来说也是一样的。使用 System.Reactive 版本 3.1.1 和 4.1.5 进行测试。
0赞
Enigmativity
5/8/2019
@kra - 你能解释一下你的意思吗,它对你的情况不起作用?你试过我的示例代码吗?它是否像我的样本一样有效?你的代码有何不同?
1赞
Enigmativity
5/9/2019
@kra - 我弄错了,应该是.哎 呦。这并不是由于可枚举的完成 - 而是外部可观察的产生的第一个值。不是说它工作正常。Select
SelectMany
IObservable<IObservable<Unit>>
IObservable<Unit>
评论