Switch<TSource>
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class Switch<TSource> : Producer<TSource, Switch<TSource>._>
{
internal sealed class _ : Sink<TSource>, IObserver<IObservable<TSource>>
{
private sealed class InnerObserver : IObserver<TSource>
{
private readonly _ _parent;
private readonly ulong _id;
private readonly IDisposable _self;
public InnerObserver(_ parent, ulong id, IDisposable self)
{
_parent = parent;
_id = id;
_self = self;
}
public void OnNext(TSource value)
{
lock (_parent._gate) {
if (_parent._latest == _id)
_parent._observer.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_self.Dispose();
if (_parent._latest == _id) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
}
public void OnCompleted()
{
lock (_parent._gate) {
_self.Dispose();
if (_parent._latest == _id) {
_parent._hasLatest = false;
if (_parent._isStopped) {
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
}
}
}
private readonly object _gate = new object();
private IDisposable _subscription;
private SerialDisposable _innerSubscription;
private bool _isStopped;
private ulong _latest;
private bool _hasLatest;
public _(IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
}
public IDisposable Run(Switch<TSource> parent)
{
_innerSubscription = new SerialDisposable();
_isStopped = false;
_latest = 0;
_hasLatest = false;
((SingleAssignmentDisposable)(_subscription = new SingleAssignmentDisposable())).Disposable = ObservableExtensions.SubscribeSafe<IObservable<TSource>>(parent._sources, (IObserver<IObservable<TSource>>)this);
return StableCompositeDisposable.Create(_subscription, _innerSubscription);
}
public void OnNext(IObservable<TSource> value)
{
ulong id = 0;
lock (_gate) {
id = ++_latest;
_hasLatest = true;
}
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_innerSubscription.Disposable = singleAssignmentDisposable;
singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(value, (IObserver<TSource>)new InnerObserver(this, id, singleAssignmentDisposable));
}
public void OnError(Exception error)
{
lock (_gate) {
_observer.OnError(error);
}
base.Dispose();
}
public void OnCompleted()
{
lock (_gate) {
_subscription.Dispose();
_isStopped = true;
if (!_hasLatest) {
_observer.OnCompleted();
base.Dispose();
}
}
}
}
private readonly IObservable<IObservable<TSource>> _sources;
public Switch(IObservable<IObservable<TSource>> sources)
{
_sources = sources;
}
protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel)
{
return new _(observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(this);
}
}
}