<PackageReference Include="System.Reactive" Version="6.0.0" />

Switch<TSource>

sealed class Switch<TSource> : Producer<TSource, _<TSource>>
using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1 })] internal sealed class Switch<[System.Runtime.CompilerServices.Nullable(2)] TSource> : Producer<TSource, Switch<TSource>._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1 })] internal sealed class _ : Sink<IObservable<TSource>, TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class InnerObserver : SafeObserver<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] private readonly _ _parent; private readonly ulong _id; public InnerObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ parent, ulong id) { _parent = parent; _id = id; } public override void OnNext(TSource value) { lock (_parent._gate) { if (_parent._latest == _id) _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { Dispose(); if (_parent._latest == _id) _parent.ForwardOnError(error); } } public override void OnCompleted() { lock (_parent._gate) { Dispose(); if (_parent._latest == _id) { _parent._hasLatest = false; if (_parent._isStopped) _parent.ForwardOnCompleted(); } } } } private readonly object _gate = new object(); private SerialDisposableValue _innerSerialDisposable; private bool _isStopped; private ulong _latest; private bool _hasLatest; public _(IObserver<TSource> observer) : base(observer) { } protected override void Dispose(bool disposing) { if (disposing) _innerSerialDisposable.Dispose(); base.Dispose(disposing); } public override void OnNext(IObservable<TSource> value) { ulong id = default(ulong); lock (_gate) { id = ++_latest; _hasLatest = true; } InnerObserver innerObserver = new InnerObserver(this, id); _innerSerialDisposable.Disposable = innerObserver; innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TSource>(value, (IObserver<TSource>)innerObserver)); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { DisposeUpstream(); _isStopped = true; if (!_hasLatest) ForwardOnCompleted(); } } } private readonly IObservable<IObservable<TSource>> _sources; public Switch(IObservable<IObservable<TSource>> sources) { _sources = sources; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(_sources); } } }