Timer
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Timer
{
internal abstract class Single : Producer<long, Single._>
{
internal sealed class Relative : Single
{
private readonly TimeSpan _dueTime;
public Relative(TimeSpan dueTime, IScheduler scheduler)
: base(scheduler)
{
_dueTime = dueTime;
}
protected override _ CreateSink(IObserver<long> observer)
{
return new _(observer);
}
protected override void Run(_ sink)
{
sink.Run(this, _dueTime);
}
}
internal sealed class Absolute : Single
{
private readonly DateTimeOffset _dueTime;
public Absolute(DateTimeOffset dueTime, IScheduler scheduler)
: base(scheduler)
{
_dueTime = dueTime;
}
protected override _ CreateSink(IObserver<long> observer)
{
return new _(observer);
}
protected override void Run(_ sink)
{
sink.Run(this, _dueTime);
}
}
internal sealed class _ : IdentitySink<long>
{
public _(IObserver<long> observer)
: base(observer)
{
}
public void Run(Single parent, DateTimeOffset dueTime)
{
SetUpstream(parent._scheduler.Schedule(this, dueTime, (IScheduler _, _ state) => state.Invoke()));
}
public void Run(Single parent, TimeSpan dueTime)
{
SetUpstream(parent._scheduler.Schedule(this, dueTime, (IScheduler _, _ state) => state.Invoke()));
}
private IDisposable Invoke()
{
ForwardOnNext(0);
ForwardOnCompleted();
return Disposable.Empty;
}
}
private readonly IScheduler _scheduler;
public Single(IScheduler scheduler)
{
_scheduler = scheduler;
}
}
internal abstract class Periodic : Producer<long, Periodic._>
{
internal sealed class Relative : Periodic
{
private readonly TimeSpan _dueTime;
public Relative(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)
: base(period, scheduler)
{
_dueTime = dueTime;
}
protected override _ CreateSink(IObserver<long> observer)
{
return new _(_period, observer);
}
protected override void Run(_ sink)
{
sink.Run(this, _dueTime);
}
}
internal sealed class Absolute : Periodic
{
private readonly DateTimeOffset _dueTime;
public Absolute(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)
: base(period, scheduler)
{
_dueTime = dueTime;
}
protected override _ CreateSink(IObserver<long> observer)
{
return new _(_period, observer);
}
protected override void Run(_ sink)
{
sink.Run(this, _dueTime);
}
}
internal sealed class _ : IdentitySink<long>
{
private readonly TimeSpan _period;
private int _pendingTickCount;
private IDisposable _periodic;
public _(TimeSpan period, IObserver<long> observer)
: base(observer)
{
_period = period;
}
public void Run(Periodic parent, DateTimeOffset dueTime)
{
SetUpstream(parent._scheduler.Schedule<object>(null, dueTime, InvokeStart));
}
public void Run(Periodic parent, TimeSpan dueTime)
{
if (dueTime == _period)
SetUpstream(parent._scheduler.SchedulePeriodic(0, _period, (Func<long, long>)Tick));
else
SetUpstream(parent._scheduler.Schedule<object>(null, dueTime, InvokeStart));
}
private long Tick(long count)
{
ForwardOnNext(count);
return count + 1;
}
private IDisposable InvokeStart(IScheduler self, object state)
{
_pendingTickCount = 1;
SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)(_periodic = new SingleAssignmentDisposable());
singleAssignmentDisposable.Disposable = self.SchedulePeriodic(1, _period, (Func<long, long>)Tock);
try {
ForwardOnNext(0);
} catch (Exception exception) {
singleAssignmentDisposable.Dispose();
exception.Throw();
}
if (Interlocked.Decrement(ref _pendingTickCount) > 0) {
SingleAssignmentDisposable singleAssignmentDisposable2 = new SingleAssignmentDisposable();
singleAssignmentDisposable2.Disposable = self.Schedule(1, CatchUp);
return StableCompositeDisposable.Create(singleAssignmentDisposable, singleAssignmentDisposable2);
}
return singleAssignmentDisposable;
}
private long Tock(long count)
{
if (Interlocked.Increment(ref _pendingTickCount) == 1) {
ForwardOnNext(count);
Interlocked.Decrement(ref _pendingTickCount);
}
return count + 1;
}
private void CatchUp(long count, Action<long> recurse)
{
try {
ForwardOnNext(count);
} catch (Exception exception) {
_periodic.Dispose();
exception.Throw();
}
if (Interlocked.Decrement(ref _pendingTickCount) > 0)
recurse(count + 1);
}
}
private readonly TimeSpan _period;
private readonly IScheduler _scheduler;
public Periodic(TimeSpan period, IScheduler scheduler)
{
_period = period;
_scheduler = scheduler;
}
}
}
}