<PackageReference Include="System.Reactive" Version="6.0.0-preview.16" />

Delay<TSource>

static class Delay<TSource>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class Delay<[System.Runtime.CompilerServices.Nullable(2)] TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 1 })] internal abstract class Base<[System.Runtime.CompilerServices.Nullable(0)] TParent> : Producer<TSource, Base<TParent>._> where TParent : Base<TParent> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal abstract class _ : IdentitySink<TSource> { protected readonly IScheduler _scheduler; [System.Runtime.CompilerServices.Nullable(2)] private IStopwatch _watch; protected TimeSpan Elapsed => _watch.Elapsed; protected _(TParent parent, IObserver<TSource> observer) : base(observer) { _scheduler = parent._scheduler; } public void Run(TParent parent) { _watch = _scheduler.StartStopwatch(); RunCore(parent); base.Run(parent._source); } protected abstract void RunCore(TParent parent); } [System.Runtime.CompilerServices.Nullable(0)] internal abstract class S : _ { protected readonly object _gate = new object(); protected SerialDisposableValue _cancelable; protected TimeSpan _delay; protected bool _ready; protected bool _active; protected bool _running; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1 })] protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); private bool _hasCompleted; private TimeSpan _completeAt; private bool _hasFailed; [System.Runtime.CompilerServices.Nullable(2)] private Exception _exception; protected S(TParent parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _cancelable.Dispose(); } public override void OnNext(TSource value) { bool flag = false; lock (_gate) { TimeSpan interval = base.Elapsed.Add(_delay); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, interval)); flag = (_ready && !_active); _active = true; } if (flag) DrainQueue(_delay); } public override void OnError(Exception error) { DisposeUpstream(); bool flag = false; lock (_gate) { _queue.Clear(); _exception = error; _hasFailed = true; flag = !_running; } if (flag) ForwardOnError(error); } public override void OnCompleted() { DisposeUpstream(); bool flag = false; lock (_gate) { TimeSpan timeSpan = _completeAt = base.Elapsed.Add(_delay); _hasCompleted = true; flag = (_ready && !_active); _active = true; } if (flag) DrainQueue(_delay); } protected void DrainQueue(TimeSpan next) { _cancelable.Disposable = Scheduler.Schedule<S>(_scheduler, this, next, (Action<S, Action<S, TimeSpan>>)delegate(S this, Action<S, TimeSpan> a) { this.DrainQueue(a); }); } private void DrainQueue([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 0, 0 })] Action<S, TimeSpan> recurse) { lock (_gate) { if (_hasFailed) return; _running = true; } bool flag = false; bool flag2; Exception error; bool flag4; bool flag5; TimeSpan arg; while (true) { flag2 = false; error = null; bool flag3 = false; TSource value = default(TSource); flag4 = false; flag5 = false; arg = default(TimeSpan); lock (_gate) { if (_hasFailed) { error = _exception; flag2 = true; _running = false; } else { TimeSpan elapsed = base.Elapsed; if (_queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = _queue.Peek(); TimeSpan interval = timeInterval.Interval; if (interval.CompareTo(elapsed) <= 0 && !flag) { timeInterval = _queue.Dequeue(); value = timeInterval.Value; flag3 = true; } else { flag5 = true; arg = Scheduler.Normalize(interval.Subtract(elapsed)); _running = false; } } else if (_hasCompleted) { if (_completeAt.CompareTo(elapsed) <= 0 && !flag) flag4 = true; else { flag5 = true; arg = Scheduler.Normalize(_completeAt.Subtract(elapsed)); _running = false; } } else { _running = false; _active = false; } } } if (!flag3) break; ForwardOnNext(value); flag = true; } if (flag4) ForwardOnCompleted(); else if (flag2) { ForwardOnError(error); } else if (flag5) { recurse(this, arg); } } } [System.Runtime.CompilerServices.Nullable(0)] protected abstract class L : _ { protected readonly object _gate = new object(); private readonly SemaphoreSlim _evt = new SemaphoreSlim(0); [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1 })] protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); protected SerialDisposableValue _cancelable; protected TimeSpan _delay; private bool _hasCompleted; private TimeSpan _completeAt; private bool _hasFailed; [System.Runtime.CompilerServices.Nullable(2)] private Exception _exception; protected L(TParent parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _cancelable.Dispose(); } protected void ScheduleDrain() { CancellationDisposable cancellationDisposable = new CancellationDisposable(); _cancelable.Disposable = cancellationDisposable; _scheduler.AsLongRunning().ScheduleLongRunning<CancellationToken>(cancellationDisposable.Token, (Action<CancellationToken, ICancelable>)DrainQueue); } public override void OnNext(TSource value) { lock (_gate) { TimeSpan interval = base.Elapsed.Add(_delay); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, interval)); _evt.Release(); } } public override void OnError(Exception error) { DisposeUpstream(); lock (_gate) { _queue.Clear(); _exception = error; _hasFailed = true; _evt.Release(); } } public override void OnCompleted() { DisposeUpstream(); lock (_gate) { TimeSpan timeSpan = _completeAt = base.Elapsed.Add(_delay); _hasCompleted = true; _evt.Release(); } } private void DrainQueue(CancellationToken token, ICancelable cancel) { bool flag; Exception error; bool flag3; while (true) { try { _evt.Wait(token); } catch (OperationCanceledException) { return; } flag = false; error = null; bool flag2 = false; TSource value = default(TSource); flag3 = false; bool flag4 = false; TimeSpan dueTime = default(TimeSpan); lock (_gate) { if (_hasFailed) { error = _exception; flag = true; } else { TimeSpan elapsed = base.Elapsed; if (_queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = _queue.Dequeue(); flag2 = true; value = timeInterval.Value; TimeSpan interval = timeInterval.Interval; if (interval.CompareTo(elapsed) > 0) { flag4 = true; dueTime = Scheduler.Normalize(interval.Subtract(elapsed)); } } else if (_hasCompleted) { flag3 = true; if (_completeAt.CompareTo(elapsed) > 0) { flag4 = true; dueTime = Scheduler.Normalize(_completeAt.Subtract(elapsed)); } } } } if (flag4) { ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim(); Scheduler.ScheduleAction<ManualResetEventSlim>(_scheduler, manualResetEventSlim, dueTime, (Action<ManualResetEventSlim>)delegate(ManualResetEventSlim slimTimer) { slimTimer.Set(); }); try { manualResetEventSlim.Wait(token); } catch (OperationCanceledException) { return; } } if (!flag2) break; ForwardOnNext(value); } if (flag3) ForwardOnCompleted(); else if (flag) { ForwardOnError(error); } } } protected readonly IObservable<TSource> _source; protected readonly IScheduler _scheduler; protected Base(IObservable<TSource> source, IScheduler scheduler) { _source = source; _scheduler = scheduler; } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] internal sealed class Absolute : Base<Absolute> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] private new sealed class S : Base<Absolute>.S { public S([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Absolute parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Absolute parent) { _ready = false; _cancelable.TrySetFirst(Scheduler.ScheduleAction<S>(parent._scheduler, this, parent._dueTime, (Action<S>)delegate(S this) { this.Start(); })); } private void Start() { TimeSpan next = default(TimeSpan); bool flag = false; lock (_gate) { _delay = base.Elapsed; Queue<System.Reactive.TimeInterval<TSource>> queue = _queue; _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); if (queue.Count > 0) { next = queue.Peek().Interval; while (queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = queue.Dequeue(); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(timeInterval.Value, timeInterval.Interval.Add(_delay))); } flag = true; _active = true; } _ready = true; } if (flag) DrainQueue(next); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] private new sealed class L : Base<Absolute>.L { public L([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Absolute parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Absolute parent) { _cancelable.TrySetFirst(Scheduler.ScheduleAction<L>(parent._scheduler, this, parent._dueTime, (Action<L>)delegate(L this) { this.Start(); })); } private void Start() { lock (_gate) { _delay = base.Elapsed; Queue<System.Reactive.TimeInterval<TSource>> queue = _queue; _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); while (queue.Count > 0) { System.Reactive.TimeInterval<TSource> timeInterval = queue.Dequeue(); _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(timeInterval.Value, timeInterval.Interval.Add(_delay))); } } ScheduleDrain(); } } private readonly DateTimeOffset _dueTime; public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) : base(source, scheduler) { _dueTime = dueTime; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { if (_scheduler.AsLongRunning() == null) return new S(this, observer); return new L(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1, 0 })] _ sink) { sink.Run(this); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] internal sealed class Relative : Base<Relative> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] private new sealed class S : Base<Relative>.S { public S([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Relative parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Relative parent) { _ready = true; _delay = Scheduler.Normalize(parent._dueTime); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 0, 1, 0 })] private new sealed class L : Base<Relative>.L { public L([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Relative parent, IObserver<TSource> observer) : base(parent, observer) { } protected override void RunCore([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Relative parent) { _delay = Scheduler.Normalize(parent._dueTime); ScheduleDrain(); } } private readonly TimeSpan _dueTime; public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) : base(source, scheduler) { _dueTime = dueTime; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { if (_scheduler.AsLongRunning() == null) return new S(this, observer); return new L(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1, 0 })] _ sink) { sink.Run(this); } } } }