Scheduler
Provides a set of static properties to access commonly used schedulers.
using System.Globalization;
using System.Reactive.Disposables;
using System.Reactive.PlatformServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Concurrency
{
public static class Scheduler
{
private sealed class AsyncInvocation<TState> : IDisposable
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private IDisposable _run;
public IDisposable Run(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
if (_cts.IsCancellationRequested)
return Disposable.Empty;
action(new CancelableScheduler(self, _cts.Token), s, _cts.Token).ContinueWith(delegate(Task<IDisposable> t, object thisObject) {
AsyncInvocation<TState> obj = (AsyncInvocation<TState>)thisObject;
t.Exception?.Handle((Exception e) => e is OperationCanceledException);
Disposable.SetSingle(ref obj._run, t.Result);
}, this, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously);
return this;
}
public void Dispose()
{
_cts.Cancel();
Disposable.TryDispose(ref _run);
}
}
private sealed class CancelableScheduler : IScheduler
{
private readonly IScheduler _scheduler;
private readonly CancellationToken _cancellationToken;
public CancellationToken Token => _cancellationToken;
public DateTimeOffset Now => _scheduler.Now;
public CancelableScheduler(IScheduler scheduler, CancellationToken cancellationToken)
{
_scheduler = scheduler;
_cancellationToken = cancellationToken;
}
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
return _scheduler.Schedule(state, action);
}
public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
return _scheduler.Schedule(state, dueTime, action);
}
public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
{
return _scheduler.Schedule(state, dueTime, action);
}
}
private abstract class InvokeRecBaseState : IDisposable
{
protected readonly IScheduler Scheduler;
protected readonly CompositeDisposable Group;
protected InvokeRecBaseState(IScheduler scheduler)
{
Scheduler = scheduler;
Group = new CompositeDisposable();
}
public void Dispose()
{
Group.Dispose();
}
}
private sealed class InvokeRec1State<TState> : InvokeRecBaseState
{
private readonly Action<TState, Action<TState>> _action;
private readonly Action<TState> _recurseCallback;
public InvokeRec1State(IScheduler scheduler, Action<TState, Action<TState>> action)
: base(scheduler)
{
_action = action;
_recurseCallback = delegate(TState state) {
InvokeNext(state);
};
}
private void InvokeNext(TState state)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
Group.Add(singleAssignmentDisposable);
singleAssignmentDisposable.Disposable = System.Reactive.Concurrency.Scheduler.ScheduleAction<(TState, SingleAssignmentDisposable, InvokeRec1State<TState>)>(Scheduler, (state, singleAssignmentDisposable, this), (Action<(TState, SingleAssignmentDisposable, InvokeRec1State<TState>)>)delegate((TState state, SingleAssignmentDisposable sad, InvokeRec1State<TState> this) nextState) {
nextState.this.Group.Remove(nextState.sad);
nextState.this.InvokeFirst(nextState.state);
});
}
internal void InvokeFirst(TState state)
{
_action(state, _recurseCallback);
}
}
private sealed class InvokeRec2State<TState> : InvokeRecBaseState
{
private readonly Action<TState, Action<TState, TimeSpan>> _action;
private readonly Action<TState, TimeSpan> _recurseCallback;
public InvokeRec2State(IScheduler scheduler, Action<TState, Action<TState, TimeSpan>> action)
: base(scheduler)
{
_action = action;
_recurseCallback = delegate(TState state, TimeSpan time) {
InvokeNext(state, time);
};
}
private void InvokeNext(TState state, TimeSpan time)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
Group.Add(singleAssignmentDisposable);
singleAssignmentDisposable.Disposable = System.Reactive.Concurrency.Scheduler.ScheduleAction<(TState, SingleAssignmentDisposable, InvokeRec2State<TState>)>(Scheduler, (state, singleAssignmentDisposable, this), time, (Action<(TState, SingleAssignmentDisposable, InvokeRec2State<TState>)>)delegate((TState state, SingleAssignmentDisposable sad, InvokeRec2State<TState> this) nextState) {
nextState.this.Group.Remove(nextState.sad);
nextState.this.InvokeFirst(nextState.state);
});
}
internal void InvokeFirst(TState state)
{
_action(state, _recurseCallback);
}
}
private sealed class InvokeRec3State<TState> : InvokeRecBaseState
{
private readonly Action<TState, Action<TState, DateTimeOffset>> _action;
private readonly Action<TState, DateTimeOffset> _recurseCallback;
public InvokeRec3State(IScheduler scheduler, Action<TState, Action<TState, DateTimeOffset>> action)
: base(scheduler)
{
_action = action;
_recurseCallback = delegate(TState state, DateTimeOffset dtOffset) {
InvokeNext(state, dtOffset);
};
}
private void InvokeNext(TState state, DateTimeOffset dtOffset)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
Group.Add(singleAssignmentDisposable);
singleAssignmentDisposable.Disposable = System.Reactive.Concurrency.Scheduler.ScheduleAction<(TState, SingleAssignmentDisposable, InvokeRec3State<TState>)>(Scheduler, (state, singleAssignmentDisposable, this), dtOffset, (Action<(TState, SingleAssignmentDisposable, InvokeRec3State<TState>)>)delegate((TState state, SingleAssignmentDisposable sad, InvokeRec3State<TState> this) nextState) {
nextState.this.Group.Remove(nextState.sad);
nextState.this.InvokeFirst(nextState.state);
});
}
internal void InvokeFirst(TState state)
{
_action(state, _recurseCallback);
}
}
private sealed class SchedulePeriodicStopwatch<TState> : IDisposable
{
private readonly IScheduler _scheduler;
private readonly TimeSpan _period;
private readonly Func<TState, TState> _action;
private readonly IStopwatchProvider _stopwatchProvider;
private TState _state;
private readonly object _gate = new object();
private readonly AutoResetEvent _resumeEvent = new AutoResetEvent(false);
private volatile int _runState;
private IStopwatch _stopwatch;
private TimeSpan _nextDue;
private TimeSpan _suspendedAt;
private TimeSpan _inactiveTime;
private const int Stopped = 0;
private const int Running = 1;
private const int Suspended = 2;
private const int Disposed = 3;
private IDisposable _task;
public SchedulePeriodicStopwatch(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action, IStopwatchProvider stopwatchProvider)
{
_scheduler = scheduler;
_period = period;
_action = action;
_stopwatchProvider = stopwatchProvider;
_state = state;
_runState = 0;
}
public IDisposable Start()
{
RegisterHostLifecycleEventHandlers();
_stopwatch = _stopwatchProvider.StartStopwatch();
_nextDue = _period;
_runState = 1;
Disposable.TrySetSingle(ref _task, Scheduler.Schedule<SchedulePeriodicStopwatch<TState>>(_scheduler, this, _nextDue, (Action<SchedulePeriodicStopwatch<TState>, Action<SchedulePeriodicStopwatch<TState>, TimeSpan>>)delegate(SchedulePeriodicStopwatch<TState> this, Action<SchedulePeriodicStopwatch<TState>, TimeSpan> a) {
this.Tick(a);
}));
return this;
}
void IDisposable.Dispose()
{
Disposable.TryDispose(ref _task);
Cancel();
}
private void Tick(Action<SchedulePeriodicStopwatch<TState>, TimeSpan> recurse)
{
_nextDue += _period;
_state = _action(_state);
TimeSpan arg = default(TimeSpan);
while (true) {
lock (_gate) {
if (_runState != 1) {
if (_runState == 3)
return;
goto IL_0094;
}
arg = Normalize(_nextDue - (_stopwatch.Elapsed - _inactiveTime));
}
break;
IL_0094:
_resumeEvent.WaitOne();
}
recurse(this, arg);
}
private void Cancel()
{
UnregisterHostLifecycleEventHandlers();
lock (_gate) {
_runState = 3;
if (!Environment.HasShutdownStarted)
_resumeEvent.Set();
}
}
private void Suspending(object sender, HostSuspendingEventArgs args)
{
lock (_gate) {
if (_runState == 1) {
_suspendedAt = _stopwatch.Elapsed;
_runState = 2;
if (!Environment.HasShutdownStarted)
_resumeEvent.Reset();
}
}
}
private void Resuming(object sender, HostResumingEventArgs args)
{
lock (_gate) {
if (_runState == 2) {
_inactiveTime += _stopwatch.Elapsed - _suspendedAt;
_runState = 1;
if (!Environment.HasShutdownStarted)
_resumeEvent.Set();
}
}
}
private void RegisterHostLifecycleEventHandlers()
{
HostLifecycleService.Suspending += Suspending;
HostLifecycleService.Resuming += Resuming;
HostLifecycleService.AddRef();
}
private void UnregisterHostLifecycleEventHandlers()
{
HostLifecycleService.Suspending -= Suspending;
HostLifecycleService.Resuming -= Resuming;
HostLifecycleService.Release();
}
}
private sealed class SchedulePeriodicRecursive<TState>
{
private readonly IScheduler _scheduler;
private readonly TimeSpan _period;
private readonly Func<TState, TState> _action;
private TState _state;
private int _pendingTickCount;
private IDisposable _cancel;
private const int TICK = 0;
private const int DispatchStart = 1;
private const int DispatchEnd = 2;
public SchedulePeriodicRecursive(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
_scheduler = scheduler;
_period = period;
_action = action;
_state = state;
}
public IDisposable Start()
{
_pendingTickCount = 0;
SingleAssignmentDisposable singleAssignmentDisposable = (SingleAssignmentDisposable)(_cancel = new SingleAssignmentDisposable());
singleAssignmentDisposable.Disposable = Scheduler.Schedule<int>(_scheduler, 0, _period, (Action<int, Action<int, TimeSpan>>)Tick);
return singleAssignmentDisposable;
}
private void Tick(int command, Action<int, TimeSpan> recurse)
{
switch (command) {
case 0:
recurse(0, _period);
if (Interlocked.Increment(ref _pendingTickCount) != 1)
break;
goto case 1;
case 1:
try {
_state = _action(_state);
} catch (Exception exception) {
_cancel.Dispose();
exception.Throw();
}
recurse(2, TimeSpan.Zero);
break;
case 2:
if (Interlocked.Decrement(ref _pendingTickCount) > 0)
recurse(1, TimeSpan.Zero);
break;
}
}
}
private sealed class EmulatedStopwatch : IStopwatch
{
private readonly IScheduler _scheduler;
private readonly DateTimeOffset _start;
public TimeSpan Elapsed => Normalize(_scheduler.Now - _start);
public EmulatedStopwatch(IScheduler scheduler)
{
_scheduler = scheduler;
_start = _scheduler.Now;
}
}
private static readonly Lazy<IScheduler> _threadPool = new Lazy<IScheduler>(() => Initialize("ThreadPool"));
private static readonly Lazy<IScheduler> _newThread = new Lazy<IScheduler>(() => Initialize("NewThread"));
private static readonly Lazy<IScheduler> _taskPool = new Lazy<IScheduler>(() => Initialize("TaskPool"));
internal static Type[] Optimizations = new Type[3] {
typeof(ISchedulerLongRunning),
typeof(IStopwatchProvider),
typeof(ISchedulerPeriodic)
};
public static DateTimeOffset Now => SystemClock.UtcNow;
public static ImmediateScheduler Immediate => ImmediateScheduler.Instance;
public static CurrentThreadScheduler CurrentThread => CurrentThreadScheduler.Instance;
public static DefaultScheduler Default => DefaultScheduler.Instance;
[Obsolete("This property is no longer supported due to refactoring of the API surface and elimination of platform-specific dependencies. Consider using Scheduler.Default to obtain the platform's most appropriate pool-based scheduler. In order to access a specific pool-based scheduler, please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use the appropriate scheduler in the System.Reactive.Concurrency namespace.")]
public static IScheduler ThreadPool {
get {
return _threadPool.Value;
}
}
[Obsolete("This property is no longer supported due to refactoring of the API surface and elimination of platform-specific dependencies. Please use NewThreadScheduler.Default to obtain an instance of this scheduler type.")]
public static IScheduler NewThread {
get {
return _newThread.Value;
}
}
[Obsolete("This property is no longer supported due to refactoring of the API surface and elimination of platform-specific dependencies. Please use TaskPoolScheduler.Default to obtain an instance of this scheduler type.")]
public static IScheduler TaskPool {
get {
return _taskPool.Value;
}
}
public static SchedulerOperation Yield(this IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation((Action a) => scheduler.Schedule(a), scheduler.GetCancellationToken());
}
public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation((Action a) => scheduler.Schedule(a), cancellationToken);
}
public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation((Action a) => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
}
public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation((Action a) => scheduler.Schedule(dueTime, a), cancellationToken);
}
public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation((Action a) => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
}
public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SchedulerOperation((Action a) => scheduler.Schedule(dueTime, a), cancellationToken);
}
public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, action);
}
public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, action);
}
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, action, (IScheduler self, Func<IScheduler, CancellationToken, Task> closureAction, CancellationToken ct) => closureAction(self, ct));
}
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, action, (IScheduler self, Func<IScheduler, CancellationToken, Task<IDisposable>> closureAction, CancellationToken ct) => closureAction(self, ct));
}
public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, action, dueTime, (IScheduler self, Func<IScheduler, CancellationToken, Task> closureAction, CancellationToken ct) => closureAction(self, ct));
}
public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, action, dueTime, (IScheduler self, Func<IScheduler, CancellationToken, Task<IDisposable>> closureAction, CancellationToken ct) => closureAction(self, ct));
}
public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, state, dueTime, action);
}
public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, action, dueTime, (IScheduler self, Func<IScheduler, CancellationToken, Task> closureAction, CancellationToken ct) => closureAction(self, ct));
}
public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return ScheduleAsync_(scheduler, action, dueTime, (IScheduler self, Func<IScheduler, CancellationToken, Task<IDisposable>> closureAction, CancellationToken ct) => closureAction(self, ct));
}
private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule((state, action), (IScheduler self, (TState state, Func<IScheduler, TState, CancellationToken, Task> action) t) => Scheduler.InvokeAsync<TState>(self, t.state, t.action));
}
private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule((state, action), (IScheduler self, (TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) t) => Scheduler.InvokeAsync<TState>(self, t.state, t.action));
}
private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule((state, action), dueTime, (IScheduler self, (TState state, Func<IScheduler, TState, CancellationToken, Task> action) t) => Scheduler.InvokeAsync<TState>(self, t.state, t.action));
}
private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule((state, action), dueTime, (IScheduler self, (TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) t) => Scheduler.InvokeAsync<TState>(self, t.state, t.action));
}
private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
{
return scheduler.Schedule((state, action), dueTime, (IScheduler self, (TState state, Func<IScheduler, TState, CancellationToken, Task> action) t) => Scheduler.InvokeAsync<TState>(self, t.state, t.action));
}
private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return scheduler.Schedule((state, action), dueTime, (IScheduler self, (TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) t) => Scheduler.InvokeAsync<TState>(self, t.state, t.action));
}
private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
{
return new AsyncInvocation<TState>().Run(self, s, action);
}
private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)
{
return InvokeAsync(self, (action, s), (IScheduler self_, (Func<IScheduler, TState, CancellationToken, Task> action, TState state) t, CancellationToken ct) => t.action(self_, t.state, ct).ContinueWith<IDisposable>((Func<Task, IDisposable>)((Task _) => Disposable.Empty)));
}
private static CancellationToken GetCancellationToken(this IScheduler scheduler)
{
CancelableScheduler cancelableScheduler;
if ((cancelableScheduler = (scheduler as CancelableScheduler)) == null)
return CancellationToken.None;
return cancelableScheduler.Token;
}
public static TimeSpan Normalize(TimeSpan timeSpan)
{
if (timeSpan.Ticks >= 0)
return timeSpan;
return TimeSpan.Zero;
}
private static IScheduler Initialize(string name)
{
IScheduler service = PlatformEnlightenmentProvider.Current.GetService<IScheduler>(new object[1] {
name
});
if (service == null)
throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Strings_Core.CANT_OBTAIN_SCHEDULER, name));
return service;
}
public static IDisposable Schedule(this IScheduler scheduler, Action<Action> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule(action, delegate(Action<Action> _action, Action<Action<Action>> self) {
_action(delegate {
self(_action);
});
});
}
public static IDisposable Schedule<TState>(this IScheduler scheduler, TState state, Action<TState, Action<TState>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((state, action), (IScheduler s, (TState state, Action<TState, Action<TState>> action) p) => Scheduler.InvokeRec1<TState>(s, p));
}
private static IDisposable InvokeRec1<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState>> action) tuple)
{
InvokeRec1State<TState> invokeRec1State = new InvokeRec1State<TState>(scheduler, tuple.action);
invokeRec1State.InvokeFirst(tuple.state);
return invokeRec1State;
}
public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, Action<Action<TimeSpan>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule(action, dueTime, delegate(Action<Action<TimeSpan>> _action, Action<Action<Action<TimeSpan>>, TimeSpan> self) {
_action(delegate(TimeSpan dt) {
self(_action, dt);
});
});
}
public static IDisposable Schedule<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Action<TState, Action<TState, TimeSpan>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((state, action), dueTime, (IScheduler s, (TState state, Action<TState, Action<TState, TimeSpan>> action) p) => Scheduler.InvokeRec2<TState>(s, p));
}
private static IDisposable InvokeRec2<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, TimeSpan>> action) tuple)
{
InvokeRec2State<TState> invokeRec2State = new InvokeRec2State<TState>(scheduler, tuple.action);
invokeRec2State.InvokeFirst(tuple.state);
return invokeRec2State;
}
public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset dueTime, Action<Action<DateTimeOffset>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule(action, dueTime, delegate(Action<Action<DateTimeOffset>> _action, Action<Action<Action<DateTimeOffset>>, DateTimeOffset> self) {
_action(delegate(DateTimeOffset dt) {
self(_action, dt);
});
});
}
public static IDisposable Schedule<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Action<TState, Action<TState, DateTimeOffset>> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((state, action), dueTime, (IScheduler s, (TState state, Action<TState, Action<TState, DateTimeOffset>> action) p) => Scheduler.InvokeRec3<TState>(s, p));
}
private static IDisposable InvokeRec3<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, DateTimeOffset>> action) tuple)
{
InvokeRec3State<TState> invokeRec3State = new InvokeRec3State<TState>(scheduler, tuple.action);
invokeRec3State.InvokeFirst(tuple.state);
return invokeRec3State;
}
public static ISchedulerLongRunning AsLongRunning(this IScheduler scheduler)
{
return As<ISchedulerLongRunning>(scheduler);
}
public static IStopwatchProvider AsStopwatchProvider(this IScheduler scheduler)
{
return As<IStopwatchProvider>(scheduler);
}
public static ISchedulerPeriodic AsPeriodic(this IScheduler scheduler)
{
return As<ISchedulerPeriodic>(scheduler);
}
private static T As<T>(IScheduler scheduler) where T : class
{
IServiceProvider serviceProvider;
if ((serviceProvider = (scheduler as IServiceProvider)) != null)
return (T)serviceProvider.GetService(typeof(T));
return null;
}
public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
return SchedulePeriodic_(scheduler, state, period, action);
}
public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler, TState state, TimeSpan period, Action<TState> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
return SchedulePeriodic_(scheduler, (state, action), period, delegate((TState state, Action<TState> action) t) {
t.action(t.state);
return t;
});
}
public static IDisposable SchedulePeriodic(this IScheduler scheduler, TimeSpan period, Action action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
return SchedulePeriodic_(scheduler, action, period, delegate(Action a) {
a();
return a;
});
}
public static IStopwatch StartStopwatch(this IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
IStopwatchProvider stopwatchProvider = scheduler.AsStopwatchProvider();
if (stopwatchProvider != null)
return stopwatchProvider.StartStopwatch();
return new EmulatedStopwatch(scheduler);
}
private static IDisposable SchedulePeriodic_<TState>(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
ISchedulerPeriodic schedulerPeriodic = scheduler.AsPeriodic();
if (period < TimeSpan.FromMilliseconds(1))
schedulerPeriodic = null;
if (schedulerPeriodic != null)
return schedulerPeriodic.SchedulePeriodic(state, period, action);
IStopwatchProvider stopwatchProvider = scheduler.AsStopwatchProvider();
if (stopwatchProvider != null)
return new SchedulePeriodicStopwatch<TState>(scheduler, state, period, action, stopwatchProvider).Start();
return new SchedulePeriodicRecursive<TState>(scheduler, state, period, action).Start();
}
public static IDisposable Schedule(this IScheduler scheduler, Action action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule(action, (IScheduler s, Action a) => Invoke(s, a));
}
internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, Action<TState> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((action, state), delegate(IScheduler _, (Action<TState> action, TState state) tuple) {
tuple.action(tuple.state);
return Disposable.Empty;
});
}
internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, Func<TState, IDisposable> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((action, state), (IScheduler _, (Func<TState, IDisposable> action, TState state) tuple) => tuple.action(tuple.state));
}
public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, Action action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule(action, dueTime, (IScheduler s, Action a) => Invoke(s, a));
}
internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Action<TState> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((state, action), dueTime, (IScheduler s, (TState state, Action<TState> action) tuple) => Scheduler.Invoke<TState>(s, tuple));
}
internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<TState, IDisposable> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((state, action), dueTime, (IScheduler s, (TState state, Func<TState, IDisposable> action) tuple) => Scheduler.Invoke<TState>(s, tuple));
}
public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset dueTime, Action action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule(action, dueTime, (IScheduler s, Action a) => Invoke(s, a));
}
internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Action<TState> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((state, action), dueTime, (IScheduler s, (TState state, Action<TState> action) tuple) => Scheduler.Invoke<TState>(s, tuple));
}
internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<TState, IDisposable> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.Schedule((state, action), dueTime, (IScheduler s, (TState state, Func<TState, IDisposable> action) tuple) => Scheduler.Invoke<TState>(s, tuple));
}
public static IDisposable ScheduleLongRunning(this ISchedulerLongRunning scheduler, Action<ICancelable> action)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (action == null)
throw new ArgumentNullException("action");
return scheduler.ScheduleLongRunning(action, delegate(Action<ICancelable> a, ICancelable c) {
a(c);
});
}
private static IDisposable Invoke(IScheduler scheduler, Action action)
{
action();
return Disposable.Empty;
}
private static IDisposable Invoke<TState>(IScheduler scheduler, (TState state, Action<TState> action) tuple)
{
tuple.action(tuple.state);
return Disposable.Empty;
}
private static IDisposable Invoke<TState>(IScheduler scheduler, (TState state, Func<TState, IDisposable> action) tuple)
{
return tuple.action(tuple.state);
}
public static IScheduler DisableOptimizations(this IScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new DisableOptimizationsScheduler(scheduler);
}
public static IScheduler DisableOptimizations(this IScheduler scheduler, params Type[] optimizationInterfaces)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (optimizationInterfaces == null)
throw new ArgumentNullException("optimizationInterfaces");
return new DisableOptimizationsScheduler(scheduler, optimizationInterfaces);
}
public static IScheduler Catch<TException>(this IScheduler scheduler, Func<TException, bool> handler) where TException : Exception
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
if (handler == null)
throw new ArgumentNullException("handler");
return new CatchScheduler<TException>(scheduler, handler);
}
}
}