<PackageReference Include="System.Reactive" Version="4.0.0" />

TakeUntil<TSource, TOther>

sealed class TakeUntil<TSource, TOther> : Producer<TSource, _<TSource, TOther>>
using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class TakeUntil<TSource, TOther> : Producer<TSource, TakeUntil<TSource, TOther>._> { internal sealed class _ : Sink<TSource> { private sealed class SourceObserver : IObserver<TSource> { private readonly _ _parent; public volatile bool _open; public SourceObserver(_ parent) { _parent = parent; _open = false; } public void OnNext(TSource value) { if (_open) _parent._observer.OnNext(value); else { lock (_parent) { _parent._observer.OnNext(value); } } } public void OnError(Exception error) { lock (_parent) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { lock (_parent) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } private sealed class OtherObserver : IObserver<TOther> { private readonly _ _parent; private readonly SourceObserver _sourceObserver; private readonly SingleAssignmentDisposable _subscription; public IDisposable Disposable { set { _subscription.Disposable = value; } } public OtherObserver(_ parent, SourceObserver sourceObserver) { _parent = parent; _sourceObserver = sourceObserver; _subscription = new SingleAssignmentDisposable(); } public void OnNext(TOther value) { lock (_parent) { _parent._observer.OnCompleted(); _parent.Dispose(); } } public void OnError(Exception error) { lock (_parent) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { lock (_parent) { _sourceObserver._open = true; _subscription.Dispose(); } } } public _(IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { } public IDisposable Run(TakeUntil<TSource, TOther> parent) { SourceObserver sourceObserver = new SourceObserver(this); OtherObserver otherObserver = new OtherObserver(this, sourceObserver); IDisposable disposable2 = otherObserver.Disposable = ObservableExtensions.SubscribeSafe<TOther>(parent._other, (IObserver<TOther>)otherObserver); IDisposable disposable3 = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)sourceObserver); return StableCompositeDisposable.Create(disposable2, disposable3); } } private readonly IObservable<TSource> _source; private readonly IObservable<TOther> _other; public TakeUntil(IObservable<TSource> source, IObservable<TOther> other) { _source = source; _other = other; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(this); } } }