<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

Timeout<TSource>

static class Timeout<TSource>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class Timeout<[System.Runtime.CompilerServices.Nullable(2)] TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Relative : Producer<TSource, Relative._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly TimeSpan _dueTime; private readonly IObservable<TSource> _other; private readonly IScheduler _scheduler; private long _index; private SingleAssignmentDisposableValue _mainDisposable; private SingleAssignmentDisposableValue _otherDisposable; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _timerDisposable; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Relative parent, IObserver<TSource> observer) : base(observer) { _dueTime = parent._dueTime; _other = parent._other; _scheduler = parent._scheduler; } public override void Run(IObservable<TSource> source) { CreateTimer(0); _mainDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this); } protected override void Dispose(bool disposing) { if (disposing) { _mainDisposable.Dispose(); _otherDisposable.Dispose(); Disposable.Dispose(ref _timerDisposable); } base.Dispose(disposing); } private void CreateTimer(long idx) { if (Disposable.TrySetMultiple(ref _timerDisposable, null)) { IDisposable value = Scheduler.ScheduleAction<(long, _)>(_scheduler, (idx, this), _dueTime, (Action<(long, _)>)delegate((long idx, _ instance) state) { state.instance.Timeout(state.idx); }); Disposable.TrySetMultiple(ref _timerDisposable, value); } } private void Timeout(long idx) { if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx) { _mainDisposable.Dispose(); IDisposable disposable = _other.Subscribe(GetForwarder()); _otherDisposable.Disposable = disposable; } } public override void OnNext(TSource value) { long num = Volatile.Read(ref _index); if (num != 9223372036854775807 && Interlocked.CompareExchange(ref _index, num + 1, num) == num) { Volatile.Read<IDisposable>(ref _timerDisposable)?.Dispose(); ForwardOnNext(value); CreateTimer(num + 1); } } public override void OnError(Exception error) { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) { Disposable.Dispose(ref _timerDisposable); ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) { Disposable.Dispose(ref _timerDisposable); ForwardOnCompleted(); } } } private readonly IObservable<TSource> _source; private readonly TimeSpan _dueTime; private readonly IObservable<TSource> _other; private readonly IScheduler _scheduler; public Relative(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler) { _source = source; _dueTime = dueTime; _other = other; _scheduler = scheduler; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Absolute : Producer<TSource, Absolute._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly IObservable<TSource> _other; private SerialDisposableValue _serialDisposable; private int _wip; public _(IObservable<TSource> other, IObserver<TSource> observer) : base(observer) { _other = other; } public void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Absolute parent) { SetUpstream(Scheduler.ScheduleAction<_>(parent._scheduler, this, parent._dueTime, (Action<_>)delegate(_ this) { this.Timeout(); })); _serialDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this); } protected override void Dispose(bool disposing) { if (disposing) _serialDisposable.Dispose(); base.Dispose(disposing); } private void Timeout() { if (Interlocked.Increment(ref _wip) == 1) _serialDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(_other, GetForwarder()); } public override void OnNext(TSource value) { if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0) { ForwardOnNext(value); if (Interlocked.Decrement(ref _wip) != 0) _serialDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(_other, GetForwarder()); } } public override void OnError(Exception error) { if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0) ForwardOnError(error); } public override void OnCompleted() { if (Interlocked.CompareExchange(ref _wip, 1, 0) == 0) ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly DateTimeOffset _dueTime; private readonly IObservable<TSource> _other; private readonly IScheduler _scheduler; public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler) { _source = source; _dueTime = dueTime; _other = other; _scheduler = scheduler; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(_other, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(this); } } } }