RX:如何等待订阅者完成?

RX: How to wait for subscribers to finish?

提问人:kra 提问时间:5/6/2019 最后编辑:kra 更新时间:5/9/2019 访问量:1625

问:

我有一个生产者,它从 Rest API 下载页面中的数据,以及几个处理页面的消费者(例如,将它们加载到数据库)。

我希望生产者和消费者并行工作,这意味着生产者不应该等待一个页面被消费后再下载下一个页面。每个使用者都需要按顺序处理页面。

下载所有页面后,主线程应等待所有使用者完成他们的工作(因为消费可能需要比生产更长的时间)。

我目前的做法如下:

我创建了一个下载页面的可观察对象,该可观察对象在附加消费者订阅者后立即启动。我将订阅者配置为观察 他们自己的线程用于并行执行。

C# 中的代码:

IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume1(page));

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume2(page));

observable.Connect();

这种实现的问题在于,主线程可能会在处理完所有页面和应用程序停止之前完成。

如何使用RX实现这一点?

谢谢!

编辑:

还尝试了以下方法(来自答案):

static void Main(string[] args)
{
    var getPages = Enumerable.Range(0, 10);

    var els1 = new EventLoopScheduler();
    var els2 = new EventLoopScheduler();

    var observable =
        getPages
            .ToObservable()
            .Publish(ps =>
                Observable
                    .Merge(
                        ps.Select(p => Observable.Start(() => consume1(p), els1)),
                        ps.Select(p => Observable.Start(() => consume2(p), els2))));

    observable.Wait();
}

public static void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public static void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

观察。Wait() 在基础可枚举完成生成值后立即返回。输出为:

1:0
2:0

只是为了证明,如果我们将 getPages 替换为:

var getPages = Enumerable.Range(0, 10)
    .Select(i =>
    {
        Console.WriteLine($"Produced {i}");
        Thread.Sleep(30);
        return i;
    });

则输出为:

Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9
与 C# 语言无关 的可观察响应 式编程 系统。

评论


答:

2赞 Enigmativity 5/6/2019 #1

我认为这可以满足您的需求:

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

我写了这个测试代码:

var getPages = Enumerable.Range(0, 10);

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

observable.Wait();

public void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

我得到了这个输出:

1:0
2:0
2:1
1:1
2:2
2:3
1:2
2:4
2:5
1:3
2:6
2:7
1:4
2:8
2:9
1:5
1:6
1:7
1:8
1:9

完成实例后,应调用它们来关闭线程。EventLoopScheduler.Dispose()

评论

0赞 kra 5/7/2019
谢谢。不幸的是,Wait() 不会等待消费者完成,并且页面不会按顺序处理。
0赞 kra 5/8/2019
还有其他想法吗?这似乎是一个非常基本的用例,尽管我无法提出解决方案。
0赞 kra 5/8/2019
不幸的是,在我的情况下不是 - 我的输出只显示前两行。如果你增加睡眠时间,也许这对你来说也是一样的。使用 System.Reactive 版本 3.1.1 和 4.1.5 进行测试。
0赞 Enigmativity 5/8/2019
@kra - 你能解释一下你的意思吗,它对你的情况不起作用?你试过我的示例代码吗?它是否像我的样本一样有效?你的代码有何不同?
1赞 Enigmativity 5/9/2019
@kra - 我弄错了,应该是.哎 呦。这并不是由于可枚举的完成 - 而是外部可观察的产生的第一个值。不是说它工作正常。SelectSelectManyIObservable<IObservable<Unit>>IObservable<Unit>