<PackageReference Include="System.Reactive" Version="4.0.0" />

Catch<TSource, TException>

sealed class Catch<TSource, TException> : Producer<TSource, _<TSource, TException>> where TException : Exception
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Catch<TSource, TException> : Producer<TSource, Catch<TSource, TException>._> where TException : Exception { internal sealed class _ : Sink<TSource>, IObserver<TSource> { private sealed class HandlerObserver : IObserver<TSource> { private readonly _ _parent; public HandlerObserver(_ parent) { _parent = parent; } public void OnNext(TSource value) { _parent._observer.OnNext(value); } public void OnError(Exception error) { _parent._observer.OnError(error); _parent.Dispose(); } public void OnCompleted() { _parent._observer.OnCompleted(); _parent.Dispose(); } } private readonly Func<TException, IObservable<TSource>> _handler; private SerialDisposable _subscription; public _(Func<TException, IObservable<TSource>> handler, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _handler = handler; } public IDisposable Run(IObservable<TSource> source) { _subscription = new SerialDisposable(); SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _subscription.Disposable = singleAssignmentDisposable; singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this); return _subscription; } public void OnNext(TSource value) { _observer.OnNext(value); } public void OnError(Exception error) { TException arg; if ((arg = (error as TException)) != null) { IObservable<TSource> observable = null; try { observable = _handler(arg); } catch (Exception error2) { _observer.OnError(error2); base.Dispose(); return; } SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _subscription.Disposable = singleAssignmentDisposable; singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(observable, (IObserver<TSource>)new HandlerObserver(this)); } else { _observer.OnError(error); base.Dispose(); } } public void OnCompleted() { _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TException, IObservable<TSource>> _handler; public Catch(IObservable<TSource> source, Func<TException, IObservable<TSource>> handler) { _source = source; _handler = handler; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(_handler, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(_source); } } }