LocalScheduler
Abstract base class for machine-local schedulers, using the local system clock for time-based operations.
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.PlatformServices;
using System.Threading;
namespace System.Reactive.Concurrency
{
public abstract class LocalScheduler : IScheduler, IStopwatchProvider, IServiceProvider
{
private abstract class WorkItem : IComparable<WorkItem>, IDisposable
{
public readonly LocalScheduler Scheduler;
public readonly DateTimeOffset DueTime;
private IDisposable _disposable;
private int _hasRun;
protected WorkItem(LocalScheduler scheduler, DateTimeOffset dueTime)
{
Scheduler = scheduler;
DueTime = dueTime;
_hasRun = 0;
}
public void Invoke(IScheduler scheduler)
{
if (Interlocked.Exchange(ref _hasRun, 1) == 0)
try {
if (!Disposable.GetIsDisposed(ref _disposable))
Disposable.SetSingle(ref _disposable, InvokeCore(scheduler));
} finally {
SystemClock.Release();
}
}
protected abstract IDisposable InvokeCore(IScheduler scheduler);
public int CompareTo(WorkItem other)
{
return Comparer<DateTimeOffset>.Default.Compare(DueTime, other.DueTime);
}
public void Dispose()
{
Disposable.TryDispose(ref _disposable);
}
}
private sealed class WorkItem<TState> : WorkItem
{
private readonly TState _state;
private readonly Func<IScheduler, TState, IDisposable> _action;
public WorkItem(LocalScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
: base(scheduler, dueTime)
{
_state = state;
_action = action;
}
protected override IDisposable InvokeCore(IScheduler scheduler)
{
return _action(scheduler, _state);
}
}
private static readonly object Gate = new object();
private static readonly object StaticGate = new object();
private static readonly PriorityQueue<WorkItem> LongTerm = new PriorityQueue<WorkItem>();
private static readonly SerialDisposable NextLongTermTimer = new SerialDisposable();
private static WorkItem _nextLongTermWorkItem;
private readonly PriorityQueue<WorkItem> _shortTerm = new PriorityQueue<WorkItem>();
private readonly HashSet<IDisposable> _shortTermWork = new HashSet<IDisposable>();
private static readonly TimeSpan ShortTerm = TimeSpan.FromSeconds(10);
private const int MaxErrorRatio = 1000;
private static readonly TimeSpan LongToShort = TimeSpan.FromSeconds(5);
private static readonly TimeSpan RetryShort = TimeSpan.FromMilliseconds(50);
private static readonly TimeSpan MaxSupportedTimer = TimeSpan.FromMilliseconds(4294967294);
public virtual DateTimeOffset Now => Scheduler.Now;
public virtual IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
return Schedule(state, TimeSpan.Zero, action);
}
public abstract IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
public virtual IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
return Enqueue(state, dueTime, action);
}
public virtual IStopwatch StartStopwatch()
{
return ConcurrencyAbstractionLayer.Current.StartStopwatch();
}
object IServiceProvider.GetService(Type serviceType)
{
return GetService(serviceType);
}
protected virtual object GetService(Type serviceType)
{
if (serviceType == typeof(IStopwatchProvider))
return this;
if (serviceType == typeof(ISchedulerLongRunning))
return this as ISchedulerLongRunning;
if (serviceType == typeof(ISchedulerPeriodic))
return this as ISchedulerPeriodic;
return null;
}
protected LocalScheduler()
{
SystemClock.Register(this);
}
private IDisposable Enqueue<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
{
TimeSpan t = Scheduler.Normalize(dueTime - Now);
if (t == TimeSpan.Zero)
return Schedule(state, TimeSpan.Zero, action);
SystemClock.AddRef();
WorkItem<TState> workItem = new WorkItem<TState>(this, state, dueTime, action);
if (t <= ShortTerm)
ScheduleShortTermWork(workItem);
else
ScheduleLongTermWork(workItem);
return workItem;
}
private void ScheduleShortTermWork(WorkItem item)
{
lock (Gate) {
_shortTerm.Enqueue(item);
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_shortTermWork.Add(singleAssignmentDisposable);
TimeSpan dueTime = Scheduler.Normalize(item.DueTime - item.Scheduler.Now);
singleAssignmentDisposable.Disposable = item.Scheduler.Schedule((this, singleAssignmentDisposable), dueTime, (IScheduler self, (LocalScheduler this, SingleAssignmentDisposable d) tuple) => tuple.this.ExecuteNextShortTermWorkItem(self, tuple.d));
}
}
private IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel)
{
WorkItem workItem = null;
lock (Gate) {
if (_shortTermWork.Remove(cancel) && _shortTerm.Count > 0)
workItem = _shortTerm.Dequeue();
}
if (workItem != null) {
if (workItem.DueTime - workItem.Scheduler.Now >= RetryShort)
ScheduleShortTermWork(workItem);
else
workItem.Invoke(scheduler);
}
return Disposable.Empty;
}
private static void ScheduleLongTermWork(WorkItem item)
{
lock (StaticGate) {
LongTerm.Enqueue(item);
UpdateLongTermProcessingTimer();
}
}
private static void UpdateLongTermProcessingTimer()
{
if (LongTerm.Count != 0) {
WorkItem workItem = LongTerm.Peek();
if (workItem != _nextLongTermWorkItem) {
TimeSpan t = Scheduler.Normalize(workItem.DueTime - workItem.Scheduler.Now);
long val = t.Ticks / 1000;
TimeSpan timeSpan = LongToShort;
TimeSpan t2 = TimeSpan.FromTicks(Math.Max(val, timeSpan.Ticks));
long ticks = (t - t2).Ticks;
timeSpan = MaxSupportedTimer;
TimeSpan dueTime = TimeSpan.FromTicks(Math.Min(ticks, timeSpan.Ticks));
_nextLongTermWorkItem = workItem;
NextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(delegate {
EvaluateLongTermQueue();
}, null, dueTime);
}
}
}
private static void EvaluateLongTermQueue()
{
lock (StaticGate) {
while (LongTerm.Count > 0) {
WorkItem workItem = LongTerm.Peek();
if (Scheduler.Normalize(workItem.DueTime - workItem.Scheduler.Now) >= ShortTerm)
break;
WorkItem workItem2 = LongTerm.Dequeue();
workItem2.Scheduler.ScheduleShortTermWork(workItem2);
}
_nextLongTermWorkItem = null;
UpdateLongTermProcessingTimer();
}
}
internal virtual void SystemClockChanged(object sender, SystemClockChangedEventArgs args)
{
lock (StaticGate) {
lock (Gate) {
foreach (IDisposable item2 in _shortTermWork) {
item2.Dispose();
}
_shortTermWork.Clear();
while (_shortTerm.Count > 0) {
WorkItem item = _shortTerm.Dequeue();
LongTerm.Enqueue(item);
}
_nextLongTermWorkItem = null;
EvaluateLongTermQueue();
}
}
}
}
}