<PackageReference Include="System.Reactive" Version="4.0.0" />

Switch<TSource>

sealed class Switch<TSource> : Producer<TSource, _<TSource>>
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Switch<TSource> : Producer<TSource, Switch<TSource>._> { internal sealed class _ : Sink<TSource>, IObserver<IObservable<TSource>> { private sealed class InnerObserver : IObserver<TSource> { private readonly _ _parent; private readonly ulong _id; private readonly IDisposable _self; public InnerObserver(_ parent, ulong id, IDisposable self) { _parent = parent; _id = id; _self = self; } public void OnNext(TSource value) { lock (_parent._gate) { if (_parent._latest == _id) _parent._observer.OnNext(value); } } public void OnError(Exception error) { lock (_parent._gate) { _self.Dispose(); if (_parent._latest == _id) { _parent._observer.OnError(error); _parent.Dispose(); } } } public void OnCompleted() { lock (_parent._gate) { _self.Dispose(); if (_parent._latest == _id) { _parent._hasLatest = false; if (_parent._isStopped) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } private readonly object _gate = new object(); private IDisposable _subscription; private SerialDisposable _innerSubscription; private bool _isStopped; private ulong _latest; private bool _hasLatest; public _(IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { } public IDisposable Run(Switch<TSource> parent) { _innerSubscription = new SerialDisposable(); _isStopped = false; _latest = 0; _hasLatest = false; ((SingleAssignmentDisposable)(_subscription = new SingleAssignmentDisposable())).Disposable = ObservableExtensions.SubscribeSafe<IObservable<TSource>>(parent._sources, (IObserver<IObservable<TSource>>)this); return StableCompositeDisposable.Create(_subscription, _innerSubscription); } public void OnNext(IObservable<TSource> value) { ulong id = 0; lock (_gate) { id = ++_latest; _hasLatest = true; } SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _innerSubscription.Disposable = singleAssignmentDisposable; singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(value, (IObserver<TSource>)new InnerObserver(this, id, singleAssignmentDisposable)); } public void OnError(Exception error) { lock (_gate) { _observer.OnError(error); } base.Dispose(); } public void OnCompleted() { lock (_gate) { _subscription.Dispose(); _isStopped = true; if (!_hasLatest) { _observer.OnCompleted(); base.Dispose(); } } } } private readonly IObservable<IObservable<TSource>> _sources; public Switch(IObservable<IObservable<TSource>> sources) { _sources = sources; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(this); } } }