RxJS 如何用 catch 忽略错误并继续前进

RxJS how to ignore an error with catch and keep going

提问人:devguy 提问时间:4/26/2017 最后编辑:Royi Namirdevguy 更新时间:2/20/2018 访问量:23821

问:

嗨,我有以下代码,我想知道如何防止主(上游)Observable 在抛出错误时被删除。

如何更改以下代码,以便显示所有数字都应为“4”?

我正在寻找一种通用模式解决方案,该解决方案在其他情况下适用于不同的运算符。这是我能想到的最简单的情况。

const Rx = require('rxjs/Rx');

function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source.filter(x => checkValue(x))
  .catch(err => Rx.Observable.empty())
  .subscribe(v => console.log(v));
JavaScript 异常 错误处理 rxjs5

评论


答:

0赞 zinyando 4/26/2017 #1

您需要在其中使用 flatMap 运算符进行过滤。在此示例的 flatMap 中,我使用 Observable.if() 进行过滤,因为它保证我一直在返回可观察对象。我相信你可以用其他方式做到这一点,但这对我来说是一个干净的实现。

const source = Rx.Observable.interval(100).take(10).flatMap((x)=>
    Rx.Observable.if(() => x !== 4, 
    Rx.Observable.of(x),
    Rx.Observable.throw("Bad value"))
    .catch((err) => {
        return Rx.Observable.empty()
    })
);

source.subscribe(v => console.log(v));

评论

0赞 nshew13 2/9/2018
Observable.if()似乎已在 v5 中删除。
0赞 pgreen2 6/28/2018
我相信它只是重命名为iif()
20赞 Berkeley Martinez 4/27/2017 #2

您将希望保持源可观察对象运行,但如果您让错误发生在主事件流上,它将折叠整个可观察对象,并且您将不再接收项目。

该解决方案涉及创建一个单独的流,您可以在其中进行过滤和捕获,而不会让上游管道塌陷。

const Rx = require('rxjs/Rx');
function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source
  // pass the item into the projection function of the switchMap operator
  .switchMap(x => {
     // we create a new stream of just one item
     // this stream is created for every item emitted by the source observable
     return Observable.of(x)
       // now we run the filter
       .filter(checkValue)
       // we catch the error here within the projection function
       // on error this upstream pipe will collapse, but that is ok because it starts within this function and will not effect the source
       // the downstream operators will never see the error so they will also not be effect
       .catch(err => Rx.Observable.empty());
     })
     .subscribe(v => console.log(v));

您还可以使用传递给 catch 选择器的第二个参数来重新启动可观察源,但这会启动它,就好像它以前没有运行过一样。

const Rx = require('rxjs/Rx');

function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source.filter(x => checkValue(x))
  .catch((err, source) => source)
  .subscribe(v => console.log(v));

但这并没有达到预期的效果。您将看到一个重复发出 1..3 的流,直到时间结束......或者关闭脚本。以先到者为准。(这是必不可少的).retry()

评论

0赞 devguy 4/27/2017
谢谢!我几乎想出了相同的解决方案,但是我最终使用flatMap只是因为它读起来更好,但与switchMap做同样的事情。关于传递参数以捕获的第二个解决方案对我来说是新的,感谢您的分享。
1赞 Berkeley Martinez 4/27/2017
flatMap 是 mergeMap 的别名。我建议切换到 mergeMap 或 switchMap,因为它在 v5 中更习惯 Rx。flatMap 是遗留的。
0赞 Ewald Benes 12/4/2018
mergeMap而不是作品。 似乎在完成事件时将完成传播到父级可观察对象,内部发出这实际上意味着外部事件不会继续按预期进行。我使用 RxJs 6。switchMapswitchMapObservable.empty()
1赞 vitaly-t 5/24/2021
内部流的较新语法为:.of(x).pipe(filter(checkValue), onErrorResumeNext)