<PackageReference Include="System.Reactive" Version="4.1.0" />

Timeout<TSource>

static class Timeout<TSource>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal static class Timeout<TSource> { internal sealed class Relative : Producer<TSource, Relative._> { internal sealed class _ : IdentitySink<TSource> { private readonly TimeSpan _dueTime; private readonly IObservable<TSource> _other; private readonly IScheduler _scheduler; private long _index; private IDisposable _mainDisposable; private IDisposable _otherDisposable; private IDisposable _timerDisposable; public _(Relative parent, IObserver<TSource> observer) : base(observer) { _dueTime = parent._dueTime; _other = parent._other; _scheduler = parent._scheduler; } public override void Run(IObservable<TSource> source) { CreateTimer(0); Disposable.SetSingle(ref _mainDisposable, ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this)); } protected override void Dispose(bool disposing) { if (disposing) { Disposable.TryDispose(ref _mainDisposable); Disposable.TryDispose(ref _otherDisposable); Disposable.TryDispose(ref _timerDisposable); } base.Dispose(disposing); } private void CreateTimer(long idx) { if (Disposable.TrySetMultiple(ref _timerDisposable, null)) { IDisposable value = Scheduler.ScheduleAction<(long, _)>(_scheduler, (idx, this), _dueTime, (Action<(long, _)>)delegate((long idx, _ instance) state) { state.instance.Timeout(state.idx); }); Disposable.TrySetMultiple(ref _timerDisposable, value); } } private void Timeout(long idx) { if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx) { Disposable.TryDispose(ref _mainDisposable); IDisposable value = _other.Subscribe(GetForwarder()); Disposable.SetSingle(ref _otherDisposable, value); } } public override void OnNext(TSource value) { long num = Volatile.Read(ref _index); if (num != 9223372036854775807 && Interlocked.CompareExchange(ref _index, num + 1, num) == num) { Volatile.Read<IDisposable>(ref _timerDisposable)?.Dispose(); ForwardOnNext(value); CreateTimer(num + 1); } } public override void OnError(Exception error) { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) { Disposable.TryDispose(ref _timerDisposable); ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) { Disposable.TryDispose(ref _timerDisposable); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _dueTime; private readonly IObservable<TSource> _other; private readonly IScheduler _scheduler; public Relative(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler) { _source = source; _dueTime = dueTime; _other = other; _scheduler = scheduler; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class Absolute : Producer<TSource, Absolute._> { internal sealed class _ : IdentitySink<TSource> { private readonly IObservable<TSource> _other; private IDisposable _serialDisposable; private int _wip; public _(IObservable<TSource> other, IObserver<TSource> observer) : base(observer) { _other = other; } public void Run(Absolute parent) { SetUpstream(Scheduler.ScheduleAction<_>(parent._scheduler, this, parent._dueTime, (Action<_>)delegate(_ this) { this.Timeout(); })); Disposable.TrySetSingle(ref _serialDisposable, ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this)); } protected override void Dispose(bool disposing) { if (disposing) Disposable.TryDispose(ref _serialDisposable); base.Dispose(disposing); } private void Timeout() { if (Interlocked.Increment(ref _wip) == 1) Disposable.TrySetSerial(ref _serialDisposable, ObservableExtensions.SubscribeSafe<TSource>(_other, GetForwarder())); } public override void OnNext(TSource value) { if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0) { ForwardOnNext(value); if (Interlocked.Decrement(ref _wip) != 0) Disposable.TrySetSerial(ref _serialDisposable, ObservableExtensions.SubscribeSafe<TSource>(_other, GetForwarder())); } } public override void OnError(Exception error) { if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0) ForwardOnError(error); } public override void OnCompleted() { if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0) ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly DateTimeOffset _dueTime; private readonly IObservable<TSource> _other; private readonly IScheduler _scheduler; public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler) { _source = source; _dueTime = dueTime; _other = other; _scheduler = scheduler; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(_other, observer); } protected override void Run(_ sink) { sink.Run(this); } } } }