<PackageReference Include="System.Reactive" Version="4.3.2" />

Delay<TSource>

static class Delay<TSource>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal static class Delay<TSource> { internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._> where TParent : Base<TParent> { internal abstract class _ : IdentitySink<TSource> { protected IStopwatch _watch; protected IScheduler _scheduler; protected _(TParent parent, IObserver<TSource> observer) : base(observer) { _scheduler = parent._scheduler; } public void Run(TParent parent) { _watch = _scheduler.StartStopwatch(); RunCore(parent); base.Run(parent._source); } protected abstract void RunCore(TParent parent); } internal abstract class S : _ { protected readonly object _gate = new object(); protected IDisposable _cancelable; protected TimeSpan _delay; protected bool _ready; protected bool _active; protected bool _running; protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); private bool _hasCompleted; private TimeSpan _completeAt; private bool _hasFailed; private Exception _exception; protected S(TParent parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) Disposable.TryDispose(ref _cancelable); } public override void OnNext(TSource value) { bool flag = false; lock (_gate) { TimeSpan interval = _watch.Elapsed.Add(_delay); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, interval)); flag = (_ready && !_active); _active = true; } if (flag) Disposable.TrySetSerial(ref _cancelable, Scheduler.Schedule<S>(_scheduler, this, _delay, (Action<S, Action<S, TimeSpan>>)delegate(S this, Action<S, TimeSpan> a) { this.DrainQueue(a); })); } public override void OnError(Exception error) { DisposeUpstream(); bool flag = false; lock (_gate) { _queue.Clear(); _exception = error; _hasFailed = true; flag = !_running; } if (flag) ForwardOnError(error); } public override void OnCompleted() { DisposeUpstream(); bool flag = false; lock (_gate) { TimeSpan timeSpan = _completeAt = _watch.Elapsed.Add(_delay); _hasCompleted = true; flag = (_ready && !_active); _active = true; } if (flag) Disposable.TrySetSerial(ref _cancelable, Scheduler.Schedule<S>(_scheduler, this, _delay, (Action<S, Action<S, TimeSpan>>)delegate(S this, Action<S, TimeSpan> a) { this.DrainQueue(a); })); } protected void DrainQueue(Action<S, TimeSpan> recurse) { lock (_gate) { if (_hasFailed) return; _running = true; } bool flag = false; bool flag2; Exception error; bool flag4; bool flag5; TimeSpan arg; while (true) { flag2 = false; error = null; bool flag3 = false; TSource value = default(TSource); flag4 = false; flag5 = false; arg = default(TimeSpan); lock (_gate) { if (_hasFailed) { error = _exception; flag2 = true; _running = false; } else { TimeSpan elapsed = _watch.Elapsed; if (_queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = _queue.Peek(); TimeSpan interval = timeInterval.Interval; if (interval.CompareTo(elapsed) <= 0 && !flag) { timeInterval = _queue.Dequeue(); value = timeInterval.Value; flag3 = true; } else { flag5 = true; arg = Scheduler.Normalize(interval.Subtract(elapsed)); _running = false; } } else if (_hasCompleted) { if (_completeAt.CompareTo(elapsed) <= 0 && !flag) flag4 = true; else { flag5 = true; arg = Scheduler.Normalize(_completeAt.Subtract(elapsed)); _running = false; } } else { _running = false; _active = false; } } } if (!flag3) break; ForwardOnNext(value); flag = true; } if (flag4) ForwardOnCompleted(); else if (flag2) { ForwardOnError(error); } else if (flag5) { recurse(this, arg); } } } protected abstract class L : _ { protected readonly object _gate = new object(); protected IDisposable _cancelable; private readonly SemaphoreSlim _evt = new SemaphoreSlim(0); protected TimeSpan _delay; protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); private CancellationTokenSource _stop; private bool _hasCompleted; private TimeSpan _completeAt; private bool _hasFailed; private Exception _exception; protected L(TParent parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) Disposable.TryDispose(ref _cancelable); } protected void ScheduleDrain() { _stop = new CancellationTokenSource(); Disposable.TrySetSerial(ref _cancelable, new CancellationDisposable(_stop)); _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue); } public override void OnNext(TSource value) { lock (_gate) { TimeSpan interval = _watch.Elapsed.Add(_delay); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, interval)); _evt.Release(); } } public override void OnError(Exception error) { DisposeUpstream(); lock (_gate) { _queue.Clear(); _exception = error; _hasFailed = true; _evt.Release(); } } public override void OnCompleted() { DisposeUpstream(); lock (_gate) { TimeSpan timeSpan = _completeAt = _watch.Elapsed.Add(_delay); _hasCompleted = true; _evt.Release(); } } private void DrainQueue(ICancelable cancel) { bool flag; Exception error; bool flag3; while (true) { try { _evt.Wait(_stop.Token); } catch (OperationCanceledException) { return; } flag = false; error = null; bool flag2 = false; TSource value = default(TSource); flag3 = false; bool flag4 = false; TimeSpan dueTime = default(TimeSpan); lock (_gate) { if (_hasFailed) { error = _exception; flag = true; } else { TimeSpan elapsed = _watch.Elapsed; if (_queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = _queue.Dequeue(); flag2 = true; value = timeInterval.Value; TimeSpan interval = timeInterval.Interval; if (interval.CompareTo(elapsed) > 0) { flag4 = true; dueTime = Scheduler.Normalize(interval.Subtract(elapsed)); } } else if (_hasCompleted) { flag3 = true; if (_completeAt.CompareTo(elapsed) > 0) { flag4 = true; dueTime = Scheduler.Normalize(_completeAt.Subtract(elapsed)); } } } } if (flag4) { ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim(); Scheduler.ScheduleAction<ManualResetEventSlim>(_scheduler, manualResetEventSlim, dueTime, (Action<ManualResetEventSlim>)delegate(ManualResetEventSlim slimTimer) { slimTimer.Set(); }); try { manualResetEventSlim.Wait(_stop.Token); } catch (OperationCanceledException) { return; } } if (!flag2) break; ForwardOnNext(value); } if (flag3) ForwardOnCompleted(); else if (flag) { ForwardOnError(error); } } } protected readonly IObservable<TSource> _source; protected readonly IScheduler _scheduler; protected Base(IObservable<TSource> source, IScheduler scheduler) { _source = source; _scheduler = scheduler; } } internal sealed class Absolute : Base<Absolute> { private new sealed class S : Base<Absolute>.S { public S(Absolute parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore(Absolute parent) { _ready = false; Disposable.TrySetSingle(ref _cancelable, Scheduler.ScheduleAction<S>(parent._scheduler, this, parent._dueTime, (Action<S>)delegate(S this) { this.Start(); })); } private void Start() { TimeSpan dueTime = default(TimeSpan); bool flag = false; lock (_gate) { _delay = _watch.Elapsed; Queue<System.Reactive.TimeInterval<TSource>> queue = _queue; _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); if (queue.Count > 0) { dueTime = queue.Peek().Interval; while (queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = queue.Dequeue(); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(timeInterval.Value, timeInterval.Interval.Add(_delay))); } flag = true; _active = true; } _ready = true; } if (flag) Disposable.TrySetSerial(ref _cancelable, Scheduler.Schedule<Base<Absolute>.S>(_scheduler, (Base<Absolute>.S)this, dueTime, (Action<Base<Absolute>.S, Action<Base<Absolute>.S, TimeSpan>>)delegate(Base<Absolute>.S this, Action<Base<Absolute>.S, TimeSpan> a) { DrainQueue(a); })); } } private new sealed class L : Base<Absolute>.L { public L(Absolute parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore(Absolute parent) { Disposable.TrySetSingle(ref _cancelable, Scheduler.ScheduleAction<L>(parent._scheduler, this, parent._dueTime, (Action<L>)delegate(L this) { this.Start(); })); } private void Start() { lock (_gate) { _delay = _watch.Elapsed; Queue<System.Reactive.TimeInterval<TSource>> queue = _queue; _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); while (queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = queue.Dequeue(); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(timeInterval.Value, timeInterval.Interval.Add(_delay))); } } ScheduleDrain(); } } private readonly DateTimeOffset _dueTime; public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) : base(source, scheduler) { _dueTime = dueTime; } protected override _ CreateSink(IObserver<TSource> observer) { if (_scheduler.AsLongRunning() == null) return new S(this, observer); return new L(this, observer); } protected override void Run(_ sink) { sink.Run(this); } } internal sealed class Relative : Base<Relative> { private new sealed class S : Base<Relative>.S { public S(Relative parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore(Relative parent) { _ready = true; _delay = Scheduler.Normalize(parent._dueTime); } } private new sealed class L : Base<Relative>.L { public L(Relative parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore(Relative parent) { _delay = Scheduler.Normalize(parent._dueTime); ScheduleDrain(); } } private readonly TimeSpan _dueTime; public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) : base(source, scheduler) { _dueTime = dueTime; } protected override _ CreateSink(IObserver<TSource> observer) { if (_scheduler.AsLongRunning() == null) return new S(this, observer); return new L(this, observer); } protected override void Run(_ sink) { sink.Run(this); } } } }