DefaultScheduler
using System.Reactive.Disposables;
namespace System.Reactive.Concurrency
{
public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic
{
private sealed class LongRunning : ISchedulerLongRunning
{
public static ISchedulerLongRunning Instance = new LongRunning();
public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
{
if (action == null)
throw new ArgumentNullException("action");
BooleanDisposable booleanDisposable = new BooleanDisposable();
s_cal.StartThread(delegate(object arg) {
ICancelable arg2 = (ICancelable)arg;
action(state, arg2);
}, booleanDisposable);
return booleanDisposable;
}
}
private static readonly Lazy<DefaultScheduler> s_instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
private static IConcurrencyAbstractionLayer s_cal = ConcurrencyAbstractionLayer.Current;
public static DefaultScheduler Instance => s_instance.Value;
private DefaultScheduler()
{
}
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
IDisposable disposable = s_cal.QueueUserWorkItem(delegate {
if (!d.IsDisposed)
d.Disposable = action(this, state);
}, null);
return StableCompositeDisposable.Create(d, disposable);
}
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan dueTime2 = Scheduler.Normalize(dueTime);
if (dueTime2.Ticks == 0)
return Schedule(state, action);
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
IDisposable disposable = s_cal.StartTimer(delegate {
if (!d.IsDisposed)
d.Disposable = action(this, state);
}, null, dueTime2);
return StableCompositeDisposable.Create(d, disposable);
}
public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
AsyncLock gate = new AsyncLock();
IDisposable cancel = s_cal.StartPeriodicTimer(delegate {
gate.Wait(delegate {
state = action(state);
});
}, period);
return Disposable.Create(delegate {
cancel.Dispose();
gate.Dispose();
action = Stubs<TState>.I;
});
}
protected override object GetService(Type serviceType)
{
if (serviceType == typeof(ISchedulerLongRunning) && s_cal.SupportsLongRunning)
return LongRunning.Instance;
return base.GetService(serviceType);
}
}
}