Catch<TSource, TException>
sealed class Catch<TSource, TException> : Producer<TSource, _<TSource, TException>> where TException : Exception
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class Catch<TSource, TException> : Producer<TSource, Catch<TSource, TException>._> where TException : Exception
{
internal sealed class _ : Sink<TSource>, IObserver<TSource>
{
private sealed class HandlerObserver : IObserver<TSource>
{
private readonly _ _parent;
public HandlerObserver(_ parent)
{
_parent = parent;
}
public void OnNext(TSource value)
{
_parent._observer.OnNext(value);
}
public void OnError(Exception error)
{
_parent._observer.OnError(error);
_parent.Dispose();
}
public void OnCompleted()
{
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
private readonly Func<TException, IObservable<TSource>> _handler;
private SerialDisposable _subscription;
public _(Func<TException, IObservable<TSource>> handler, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_handler = handler;
}
public IDisposable Run(IObservable<TSource> source)
{
_subscription = new SerialDisposable();
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_subscription.Disposable = singleAssignmentDisposable;
singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this);
return _subscription;
}
public void OnNext(TSource value)
{
_observer.OnNext(value);
}
public void OnError(Exception error)
{
TException arg;
if ((arg = (error as TException)) != null) {
IObservable<TSource> observable = null;
try {
observable = _handler(arg);
} catch (Exception error2) {
_observer.OnError(error2);
base.Dispose();
return;
}
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_subscription.Disposable = singleAssignmentDisposable;
singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(observable, (IObserver<TSource>)new HandlerObserver(this));
} else {
_observer.OnError(error);
base.Dispose();
}
}
public void OnCompleted()
{
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TException, IObservable<TSource>> _handler;
public Catch(IObservable<TSource> source, Func<TException, IObservable<TSource>> handler)
{
_source = source;
_handler = handler;
}
protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel)
{
return new _(_handler, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_source);
}
}
}