<PackageReference Include="System.Reactive" Version="4.1.5" />

RetryWhen<T, U>

sealed class RetryWhen<T, U> : IObservable<T>
using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class RetryWhen<T, U> : IObservable<T> { private sealed class MainObserver : Sink<T>, IObserver<T> { internal sealed class HandlerObserver : IObserver<U> { private readonly MainObserver _main; internal HandlerObserver(MainObserver main) { _main = main; } public void OnCompleted() { _main.HandlerComplete(); } public void OnError(Exception error) { _main.HandlerError(error); } public void OnNext(U value) { _main.HandlerNext(); } } private readonly IObserver<Exception> _errorSignal; internal readonly HandlerObserver HandlerConsumer; private readonly IObservable<T> _source; private IDisposable _upstream; internal IDisposable HandlerUpstream; private int _trampoline; private int _halfSerializer; private Exception _error; internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream) { _source = source; _errorSignal = errorSignal; HandlerConsumer = new HandlerObserver(this); } protected override void Dispose(bool disposing) { if (disposing) { Disposable.TryDispose(ref _upstream); Disposable.TryDispose(ref HandlerUpstream); } base.Dispose(disposing); } public void OnCompleted() { HalfSerializer.ForwardOnCompleted<T>((ISink<T>)this, ref _halfSerializer, ref _error); } public void OnError(Exception error) { if (Disposable.TrySetSerial(ref _upstream, null)) _errorSignal.OnNext(error); } public void OnNext(T value) { HalfSerializer.ForwardOnNext<T>((ISink<T>)this, value, ref _halfSerializer, ref _error); } private void HandlerError(Exception error) { HalfSerializer.ForwardOnError<T>((ISink<T>)this, error, ref _halfSerializer, ref _error); } private void HandlerComplete() { HalfSerializer.ForwardOnCompleted<T>((ISink<T>)this, ref _halfSerializer, ref _error); } internal void HandlerNext() { if (Interlocked.Increment(ref _trampoline) == 1) { do { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); if (Disposable.TrySetSingle(ref _upstream, singleAssignmentDisposable) != 0) break; singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<T>(_source, (IObserver<T>)this); } while (Interlocked.Decrement(ref _trampoline) != 0); } } } private readonly IObservable<T> _source; private readonly Func<IObservable<Exception>, IObservable<U>> _handler; internal RetryWhen(IObservable<T> source, Func<IObservable<Exception>, IObservable<U>> handler) { _source = source; _handler = handler; } public IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); Subject<Exception> subject = new Subject<Exception>(); IObservable<U> observable = null; try { observable = _handler(subject); if (observable == null) throw new NullReferenceException("The handler returned a null IObservable"); } catch (Exception error) { observer.OnError(error); return Disposable.Empty; } MainObserver mainObserver = new MainObserver(observer, _source, new RedoSerializedObserver<Exception>(subject)); IDisposable value = ObservableExtensions.SubscribeSafe<U>(observable, (IObserver<U>)mainObserver.HandlerConsumer); Disposable.SetSingle(ref mainObserver.HandlerUpstream, value); mainObserver.HandlerNext(); return mainObserver; } } }