Timeout<TSource, TTimeout>
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class Timeout<TSource, TTimeout> : Producer<TSource, Timeout<TSource, TTimeout>._>
{
internal sealed class _ : IdentitySink<TSource>
{
private sealed class TimeoutObserver : SafeObserver<TTimeout>
{
private readonly _ _parent;
private readonly long _id;
public TimeoutObserver(_ parent, long id)
{
_parent = parent;
_id = id;
}
public override void OnNext(TTimeout value)
{
OnCompleted();
}
public override void OnError(Exception error)
{
if (!_parent.TimeoutError(_id, error))
Dispose();
}
public override void OnCompleted()
{
_parent.Timeout(_id);
Dispose();
}
}
private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;
private readonly IObservable<TSource> _other;
private IDisposable _sourceDisposable;
private IDisposable _timerDisposable;
private long _index;
public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer)
: base(observer)
{
_timeoutSelector = parent._timeoutSelector;
_other = parent._other;
}
public void Run(Timeout<TSource, TTimeout> parent)
{
SetTimer(parent._firstTimeout, 0);
Disposable.TrySetSingle(ref _sourceDisposable, ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this));
}
protected override void Dispose(bool disposing)
{
if (disposing) {
Disposable.TryDispose(ref _sourceDisposable);
Disposable.TryDispose(ref _timerDisposable);
}
base.Dispose(disposing);
}
public override void OnNext(TSource value)
{
long num = Volatile.Read(ref _index);
if (num != 9223372036854775807 && Interlocked.CompareExchange(ref _index, num + 1, num) == num) {
Volatile.Read<IDisposable>(ref _timerDisposable)?.Dispose();
ForwardOnNext(value);
IObservable<TTimeout> observable = null;
try {
observable = _timeoutSelector(value);
} catch (Exception error) {
ForwardOnError(error);
return;
}
SetTimer(observable, num + 1);
}
}
public override void OnError(Exception error)
{
if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807)
ForwardOnError(error);
}
public override void OnCompleted()
{
if (Interlocked.Exchange(ref _index, 9223372036854775807) != 9223372036854775807)
ForwardOnCompleted();
}
private void Timeout(long idx)
{
if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx)
Disposable.TrySetSerial(ref _sourceDisposable, ObservableExtensions.SubscribeSafe<TSource>(_other, GetForwarder()));
}
private bool TimeoutError(long idx, Exception error)
{
if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, 9223372036854775807, idx) == idx) {
ForwardOnError(error);
return true;
}
return false;
}
private void SetTimer(IObservable<TTimeout> timeout, long idx)
{
TimeoutObserver timeoutObserver = new TimeoutObserver(this, idx);
if (Disposable.TrySetSerial(ref _timerDisposable, timeoutObserver)) {
IDisposable resource = timeout.Subscribe(timeoutObserver);
timeoutObserver.SetResource(resource);
}
}
}
private readonly IObservable<TSource> _source;
private readonly IObservable<TTimeout> _firstTimeout;
private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;
private readonly IObservable<TSource> _other;
public Timeout(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutSelector, IObservable<TSource> other)
{
_source = source;
_firstTimeout = firstTimeout;
_timeoutSelector = timeoutSelector;
_other = other;
}
protected override _ CreateSink(IObserver<TSource> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
}