<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

SkipUntil<TSource, TOther>

sealed class SkipUntil<TSource, TOther> : Producer<TSource, _<TSource, TOther>>
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class SkipUntil<TSource, TOther> : Producer<TSource, SkipUntil<TSource, TOther>._> { internal sealed class _ : Sink<TSource> { private sealed class SourceObserver : IObserver<TSource> { private readonly _ _parent; public volatile IObserver<TSource> _observer; private readonly SingleAssignmentDisposable _subscription; public IDisposable Disposable { set { _subscription.Disposable = value; } } public SourceObserver(_ parent) { _parent = parent; _observer = NopObserver<TSource>.Instance; _subscription = new SingleAssignmentDisposable(); } public void OnNext(TSource value) { _observer.OnNext(value); } public void OnError(Exception error) { _parent._observer.OnError(error); _parent.Dispose(); } public void OnCompleted() { _observer.OnCompleted(); _subscription.Dispose(); } } private sealed class OtherObserver : IObserver<TOther> { private readonly _ _parent; private readonly SourceObserver _sourceObserver; private readonly SingleAssignmentDisposable _subscription; public IDisposable Disposable { set { _subscription.Disposable = value; } } public OtherObserver(_ parent, SourceObserver sourceObserver) { _parent = parent; _sourceObserver = sourceObserver; _subscription = new SingleAssignmentDisposable(); } public void OnNext(TOther value) { _sourceObserver._observer = _parent._observer; _subscription.Dispose(); } public void OnError(Exception error) { _parent._observer.OnError(error); _parent.Dispose(); } public void OnCompleted() { _subscription.Dispose(); } } public _(IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { } public IDisposable Run(SkipUntil<TSource, TOther> parent) { SourceObserver sourceObserver = new SourceObserver(this); OtherObserver otherObserver = new OtherObserver(this, sourceObserver); IDisposable disposable = ObservableExtensions.SubscribeSafe<TOther>(parent._other, (IObserver<TOther>)otherObserver); IDisposable disposable3 = sourceObserver.Disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)sourceObserver); otherObserver.Disposable = disposable; return StableCompositeDisposable.Create(disposable3, disposable); } } private readonly IObservable<TSource> _source; private readonly IObservable<TOther> _other; public SkipUntil(IObservable<TSource> source, IObservable<TOther> other) { _source = source; _other = other; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(this); } } }