<PackageReference Include="System.Reactive" Version="4.0.0" />

If<TResult>

sealed class If<TResult> : Producer<TResult, _<TResult>>, IEvaluatableObservable<TResult>
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class If<TResult> : Producer<TResult, If<TResult>._>, IEvaluatableObservable<TResult> { internal sealed class _ : Sink<TResult>, IObserver<TResult> { private readonly If<TResult> _parent; public _(If<TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public IDisposable Run() { IObservable<TResult> observable = null; try { observable = _parent.Eval(); } catch (Exception error) { _observer.OnError(error); base.Dispose(); return Disposable.Empty; } return ObservableExtensions.SubscribeSafe<TResult>(observable, (IObserver<TResult>)this); } public void OnNext(TResult value) { _observer.OnNext(value); } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnCompleted(); base.Dispose(); } } private readonly Func<bool> _condition; private readonly IObservable<TResult> _thenSource; private readonly IObservable<TResult> _elseSource; public If(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource) { _condition = condition; _thenSource = thenSource; _elseSource = elseSource; } public IObservable<TResult> Eval() { if (!_condition()) return _elseSource; return _thenSource; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(); } } }