Switch<TSource>
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
1
})]
internal sealed class Switch<[System.Runtime.CompilerServices.Nullable(2)] TSource> : Producer<TSource, Switch<TSource>._>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
1
})]
internal sealed class _ : Sink<IObservable<TSource>, TSource>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
private sealed class InnerObserver : SafeObserver<TSource>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
private readonly _ _parent;
private readonly ulong _id;
public InnerObserver([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] _ parent, ulong id)
{
_parent = parent;
_id = id;
}
public override void OnNext(TSource value)
{
lock (_parent._gate) {
if (_parent._latest == _id)
_parent.ForwardOnNext(value);
}
}
public override void OnError(Exception error)
{
lock (_parent._gate) {
Dispose();
if (_parent._latest == _id)
_parent.ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_parent._gate) {
Dispose();
if (_parent._latest == _id) {
_parent._hasLatest = false;
if (_parent._isStopped)
_parent.ForwardOnCompleted();
}
}
}
}
private readonly object _gate = new object();
private SerialDisposableValue _innerSerialDisposable;
private bool _isStopped;
private ulong _latest;
private bool _hasLatest;
public _(IObserver<TSource> observer)
: base(observer)
{
}
protected override void Dispose(bool disposing)
{
if (disposing)
_innerSerialDisposable.Dispose();
base.Dispose(disposing);
}
public override void OnNext(IObservable<TSource> value)
{
ulong id = default(ulong);
lock (_gate) {
id = ++_latest;
_hasLatest = true;
}
InnerObserver innerObserver = new InnerObserver(this, id);
_innerSerialDisposable.Disposable = innerObserver;
innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TSource>(value, (IObserver<TSource>)innerObserver));
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
lock (_gate) {
DisposeUpstream();
_isStopped = true;
if (!_hasLatest)
ForwardOnCompleted();
}
}
}
private readonly IObservable<IObservable<TSource>> _sources;
public Switch(IObservable<IObservable<TSource>> sources)
{
_sources = sources;
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
protected override _ CreateSink(IObserver<TSource> observer)
{
return new _(observer);
}
protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] _ sink)
{
sink.Run(_sources);
}
}
}