<PackageReference Include="System.Reactive" Version="4.1.0" />

Timeout<TSource, TTimeout>

sealed class Timeout<TSource, TTimeout> : Producer<TSource, _<TSource, TTimeout>>
using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Timeout<TSource, TTimeout> : Producer<TSource, Timeout<TSource, TTimeout>._> { internal sealed class _ : IdentitySink<TSource> { private sealed class TimeoutObserver : SafeObserver<TTimeout> { private readonly _ _parent; private readonly long _id; public TimeoutObserver(_ parent, long id) { _parent = parent; _id = id; } public override void OnNext(TTimeout value) { OnCompleted(); } public override void OnError(Exception error) { if (!_parent.TimeoutError(_id, error)) Dispose(); } public override void OnCompleted() { _parent.Timeout(_id); Dispose(); } } private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector; private readonly IObservable<TSource> _other; private IDisposable _sourceDisposable; private IDisposable _timerDisposable; private long _index; public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer) : base(observer) { _timeoutSelector = parent._timeoutSelector; _other = parent._other; } public void Run(Timeout<TSource, TTimeout> parent) { SetTimer(parent._firstTimeout, 0); Disposable.TrySetSingle(ref _sourceDisposable, ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this)); } protected override void Dispose(bool disposing) { if (disposing) { Disposable.TryDispose(ref _sourceDisposable); Disposable.TryDispose(ref _timerDisposable); } base.Dispose(disposing); } public override void OnNext(TSource value) { long num = Volatile.Read(ref _index); if (num != 9223372036854775807 && Interlocked.CompareExchange(ref _index, num + 1, num) == num) { Volatile.Read<IDisposable>(ref _timerDisposable)?.Dispose(); ForwardOnNext(value); IObservable<TTimeout> observable = null; try { observable = _timeoutSelector(value); } catch (Exception error) { ForwardOnError(error); return; } SetTimer(observable, num + 1); } } public override void OnError(Exception error) { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) ForwardOnError(error); } public override void OnCompleted() { if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807) ForwardOnCompleted(); } private void Timeout(long idx) { if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx) Disposable.TrySetSerial(ref _sourceDisposable, ObservableExtensions.SubscribeSafe<TSource>(_other, GetForwarder())); } private bool TimeoutError(long idx, Exception error) { if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx) { ForwardOnError(error); return true; } return false; } private void SetTimer(IObservable<TTimeout> timeout, long idx) { TimeoutObserver timeoutObserver = new TimeoutObserver(this, idx); if (Disposable.TrySetSerial(ref _timerDisposable, timeoutObserver)) { IDisposable resource = timeout.Subscribe(timeoutObserver); timeoutObserver.SetResource(resource); } } } private readonly IObservable<TSource> _source; private readonly IObservable<TTimeout> _firstTimeout; private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector; private readonly IObservable<TSource> _other; public Timeout(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutSelector, IObservable<TSource> other) { _source = source; _firstTimeout = firstTimeout; _timeoutSelector = timeoutSelector; _other = other; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(this); } } }