<PackageReference Include="System.Reactive" Version="4.1.0-preview.84" />

Timer

static class Timer
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal static class Timer { internal abstract class Single : Producer<long, Single._> { internal sealed class Relative : Single { private readonly TimeSpan _dueTime; public Relative(TimeSpan dueTime, IScheduler scheduler) : base(scheduler) { _dueTime = dueTime; } protected override _ CreateSink(IObserver<long> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this, _dueTime); } } internal sealed class Absolute : Single { private readonly DateTimeOffset _dueTime; public Absolute(DateTimeOffset dueTime, IScheduler scheduler) : base(scheduler) { _dueTime = dueTime; } protected override _ CreateSink(IObserver<long> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(this, _dueTime); } } internal sealed class _ : IdentitySink<long> { public _(IObserver<long> observer) : base(observer) { } public void Run(Single parent, DateTimeOffset dueTime) { SetUpstream(parent._scheduler.Schedule(this, dueTime, (IScheduler _, _ state) => state.Invoke())); } public void Run(Single parent, TimeSpan dueTime) { SetUpstream(parent._scheduler.Schedule(this, dueTime, (IScheduler _, _ state) => state.Invoke())); } private IDisposable Invoke() { ForwardOnNext(0); ForwardOnCompleted(); return Disposable.Empty; } } private readonly IScheduler _scheduler; public Single(IScheduler scheduler) { _scheduler = scheduler; } } internal abstract class Periodic : Producer<long, Periodic._> { internal sealed class Relative : Periodic { private readonly TimeSpan _dueTime; public Relative(TimeSpan dueTime, TimeSpan period, IScheduler scheduler) : base(period, scheduler) { _dueTime = dueTime; } protected override _ CreateSink(IObserver<long> observer) { return new _(_period, observer); } protected override void Run(_ sink) { sink.Run(this, _dueTime); } } internal sealed class Absolute : Periodic { private readonly DateTimeOffset _dueTime; public Absolute(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler) : base(period, scheduler) { _dueTime = dueTime; } protected override _ CreateSink(IObserver<long> observer) { return new _(_period, observer); } protected override void Run(_ sink) { sink.Run(this, _dueTime); } } internal sealed class _ : IdentitySink<long> { private readonly TimeSpan _period; private int _pendingTickCount; private IDisposable _periodic; public _(TimeSpan period, IObserver<long> observer) : base(observer) { _period = period; } public void Run(Periodic parent, DateTimeOffset dueTime) { SetUpstream(parent._scheduler.Schedule<object>(null, dueTime, InvokeStart)); } public void Run(Periodic parent, TimeSpan dueTime) { if (dueTime == _period) SetUpstream(parent._scheduler.SchedulePeriodic(0, _period, (Func<long, long>)Tick)); else SetUpstream(parent._scheduler.Schedule<object>(null, dueTime, InvokeStart)); } private long Tick(long count) { ForwardOnNext(count); return count + 1; } private IDisposable InvokeStart(IScheduler self, object state) { _pendingTickCount = 1; SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)(_periodic = new SingleAssignmentDisposable()); singleAssignmentDisposable.Disposable = self.SchedulePeriodic(1, _period, (Func<long, long>)Tock); try { ForwardOnNext(0); } catch (Exception exception) { singleAssignmentDisposable.Dispose(); exception.Throw(); } if (Interlocked.Decrement(ref _pendingTickCount) > 0) { SingleAssignmentDisposable singleAssignmentDisposable2 = new SingleAssignmentDisposable(); singleAssignmentDisposable2.Disposable = self.Schedule(1, CatchUp); return StableCompositeDisposable.Create(singleAssignmentDisposable, singleAssignmentDisposable2); } return singleAssignmentDisposable; } private long Tock(long count) { if (Interlocked.Increment(ref _pendingTickCount) == 1) { ForwardOnNext(count); Interlocked.Decrement(ref _pendingTickCount); } return count + 1; } private void CatchUp(long count, Action<long> recurse) { try { ForwardOnNext(count); } catch (Exception exception) { _periodic.Dispose(); exception.Throw(); } if (Interlocked.Decrement(ref _pendingTickCount) > 0) recurse(count + 1); } } private readonly TimeSpan _period; private readonly IScheduler _scheduler; public Periodic(TimeSpan period, IScheduler scheduler) { _period = period; _scheduler = scheduler; } } } }