<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

Case<TValue, TResult>

sealed class Case<TValue, TResult> : Producer<TResult, _<TValue, TResult>>, IEvaluatableObservable<TResult>
using System.Collections.Generic; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Case<TValue, TResult> : Producer<TResult, Case<TValue, TResult>._>, IEvaluatableObservable<TResult> { internal sealed class _ : Sink<TResult>, IObserver<TResult> { public _(IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { } public IDisposable Run(Case<TValue, TResult> parent) { IObservable<TResult> observable = null; try { observable = parent.Eval(); } catch (Exception error) { _observer.OnError(error); base.Dispose(); return Disposable.Empty; } return ObservableExtensions.SubscribeSafe<TResult>(observable, (IObserver<TResult>)this); } public void OnNext(TResult value) { _observer.OnNext(value); } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnCompleted(); base.Dispose(); } } private readonly Func<TValue> _selector; private readonly IDictionary<TValue, IObservable<TResult>> _sources; private readonly IObservable<TResult> _defaultSource; public Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource) { _selector = selector; _sources = sources; _defaultSource = defaultSource; } public IObservable<TResult> Eval() { if (_sources.TryGetValue(_selector(), out IObservable<TResult> value)) return value; return _defaultSource; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(this); } } }