TakeUntil<TSource, TOther>
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class TakeUntil<TSource, TOther> : Producer<TSource, TakeUntil<TSource, TOther>._>
{
internal sealed class _ : Sink<TSource>
{
private sealed class SourceObserver : IObserver<TSource>
{
private readonly _ _parent;
public volatile bool _open;
public SourceObserver(_ parent)
{
_parent = parent;
_open = false;
}
public void OnNext(TSource value)
{
if (_open)
_parent._observer.OnNext(value);
else {
lock (_parent) {
_parent._observer.OnNext(value);
}
}
}
public void OnError(Exception error)
{
lock (_parent) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
lock (_parent) {
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
}
private sealed class OtherObserver : IObserver<TOther>
{
private readonly _ _parent;
private readonly SourceObserver _sourceObserver;
private readonly SingleAssignmentDisposable _subscription;
public IDisposable Disposable {
set {
_subscription.Disposable = value;
}
}
public OtherObserver(_ parent, SourceObserver sourceObserver)
{
_parent = parent;
_sourceObserver = sourceObserver;
_subscription = new SingleAssignmentDisposable();
}
public void OnNext(TOther value)
{
lock (_parent) {
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
public void OnError(Exception error)
{
lock (_parent) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
lock (_parent) {
_sourceObserver._open = true;
_subscription.Dispose();
}
}
}
public _(IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
}
public IDisposable Run(TakeUntil<TSource, TOther> parent)
{
SourceObserver sourceObserver = new SourceObserver(this);
OtherObserver otherObserver = new OtherObserver(this, sourceObserver);
IDisposable disposable2 = otherObserver.Disposable = ObservableExtensions.SubscribeSafe<TOther>(parent._other, (IObserver<TOther>)otherObserver);
IDisposable disposable3 = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)sourceObserver);
return StableCompositeDisposable.Create(disposable2, disposable3);
}
}
private readonly IObservable<TSource> _source;
private readonly IObservable<TOther> _other;
public TakeUntil(IObservable<TSource> source, IObservable<TOther> other)
{
_source = source;
_other = other;
}
protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel)
{
return new _(observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(this);
}
}
}