<PackageReference Include="System.Reactive" Version="6.1.0-preview.9" />

Timeout<TSource, TTimeout>

sealed class Timeout<TSource, TTimeout> : Producer<TSource, _<TSource, TTimeout>>
using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1, 1 })] internal sealed class Timeout<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TTimeout> : Producer<TSource, Timeout<TSource, TTimeout>._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class TimeoutObserver : SafeObserver<TTimeout> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] private readonly _ _parent; private readonly long _id; public TimeoutObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ parent, long id) { _parent = parent; _id = id; } public override void OnNext(TTimeout value) { OnCompleted(); } public override void OnError(Exception error) { if (!_parent.TimeoutError(_id, error)) Dispose(); } public override void OnCompleted() { _parent.Timeout(_id); Dispose(); } } private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector; private readonly IObservable<TSource> _other; private SerialDisposableValue _sourceDisposable; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _timerDisposable; private long _index; public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer) : base(observer) { _timeoutSelector = parent._timeoutSelector; _other = parent._other; } public void Run(Timeout<TSource, TTimeout> parent) { SetTimer(parent._firstTimeout, 0); _sourceDisposable.TrySetFirst(ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this)); } protected override void Dispose(bool disposing) { if (disposing) { _sourceDisposable.Dispose(); Disposable.Dispose(ref _timerDisposable); } base.Dispose(disposing); } public override void OnNext(TSource value) { long num = Volatile.Read(ref _index); if (num != 9223372036854775807 && Interlocked.CompareExchange(ref _index, num + 1, num) == num) { Volatile.Read<IDisposable>(ref _timerDisposable)?.Dispose(); ForwardOnNext(value); IObservable<TTimeout> timeout; try { timeout = _timeoutSelector(value); } catch (Exception error) { ForwardOnError(error); return; } SetTimer(timeout, num + 1); } } public override void OnError(Exception error) { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) ForwardOnError(error); } public override void OnCompleted() { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) ForwardOnCompleted(); } private void Timeout(long idx) { if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx) _sourceDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(_other, GetForwarder()); } private bool TimeoutError(long idx, Exception error) { if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx) { ForwardOnError(error); return true; } return false; } private void SetTimer(IObservable<TTimeout> timeout, long idx) { TimeoutObserver timeoutObserver = new TimeoutObserver(this, idx); if (Disposable.TrySetSerial(ref _timerDisposable, timeoutObserver)) { IDisposable resource = timeout.Subscribe(timeoutObserver); timeoutObserver.SetResource(resource); } } } private readonly IObservable<TSource> _source; private readonly IObservable<TTimeout> _firstTimeout; private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector; private readonly IObservable<TSource> _other; public Timeout(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutSelector, IObservable<TSource> other) { _source = source; _firstTimeout = firstTimeout; _timeoutSelector = timeoutSelector; _other = other; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(this); } } }