TaskPoolScheduler
Represents an object that schedules units of work on the Task Parallel Library (TPL) task pool.
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Concurrency
{
public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
{
private sealed class ScheduledWorkItem<TState> : IDisposable
{
private readonly TState _state;
private readonly TaskPoolScheduler _scheduler;
private readonly Func<IScheduler, TState, IDisposable> _action;
private IDisposable _cancel;
public ScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Func<IScheduler, TState, IDisposable> action)
{
_state = state;
_action = action;
_scheduler = scheduler;
CancellationDisposable cancellationDisposable = new CancellationDisposable();
Disposable.SetSingle(ref _cancel, cancellationDisposable);
scheduler._taskFactory.StartNew(delegate(object thisObject) {
ScheduledWorkItem<TState> scheduledWorkItem = (ScheduledWorkItem<TState>)thisObject;
Disposable.TrySetSerial(ref scheduledWorkItem._cancel, scheduledWorkItem._action(scheduledWorkItem._scheduler, scheduledWorkItem._state));
}, this, cancellationDisposable.Token);
}
public void Dispose()
{
Disposable.TryDispose(ref _cancel);
}
}
private sealed class SlowlyScheduledWorkItem<TState> : IDisposable
{
private readonly TState _state;
private readonly TaskPoolScheduler _scheduler;
private readonly Func<IScheduler, TState, IDisposable> _action;
private IDisposable _cancel;
public SlowlyScheduledWorkItem(TaskPoolScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
_state = state;
_action = action;
_scheduler = scheduler;
CancellationDisposable cancellationDisposable = new CancellationDisposable();
Disposable.SetSingle(ref _cancel, cancellationDisposable);
TaskHelpers.Delay(dueTime, cancellationDisposable.Token).ContinueWith(delegate(Task _, object thisObject) {
SlowlyScheduledWorkItem<TState> slowlyScheduledWorkItem = (SlowlyScheduledWorkItem<TState>)thisObject;
if (!Disposable.GetIsDisposed(ref slowlyScheduledWorkItem._cancel))
Disposable.TrySetMultiple(ref slowlyScheduledWorkItem._cancel, slowlyScheduledWorkItem._action(slowlyScheduledWorkItem._scheduler, slowlyScheduledWorkItem._state));
}, this, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, scheduler._taskFactory.Scheduler);
}
public void Dispose()
{
Disposable.TryDispose(ref _cancel);
}
}
private sealed class LongScheduledWorkItem<TState> : ICancelable, IDisposable
{
private readonly TState _state;
private readonly Action<TState, ICancelable> _action;
private IDisposable _cancel;
public bool IsDisposed => Disposable.GetIsDisposed(ref _cancel);
public LongScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Action<TState, ICancelable> action)
{
_state = state;
_action = action;
scheduler._taskFactory.StartNew(delegate(object thisObject) {
LongScheduledWorkItem<TState> longScheduledWorkItem = (LongScheduledWorkItem<TState>)thisObject;
longScheduledWorkItem._action(longScheduledWorkItem._state, longScheduledWorkItem);
}, this, TaskCreationOptions.LongRunning);
}
public void Dispose()
{
Disposable.TryDispose(ref _cancel);
}
}
private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private TState _state;
private readonly TimeSpan _period;
private readonly TaskFactory _taskFactory;
private readonly Func<TState, TState> _action;
private readonly AsyncLock _gate = new AsyncLock();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action, TaskFactory taskFactory)
{
_state = state;
_period = period;
_action = action;
_taskFactory = taskFactory;
MoveNext();
}
public void Dispose()
{
_cts.Cancel();
_gate.Dispose();
}
private void MoveNext()
{
TaskHelpers.Delay(_period, _cts.Token).ContinueWith(delegate(Task _, object thisObject) {
PeriodicallyScheduledWorkItem<TState> periodicallyScheduledWorkItem = (PeriodicallyScheduledWorkItem<TState>)thisObject;
periodicallyScheduledWorkItem.MoveNext();
periodicallyScheduledWorkItem._gate.Wait<PeriodicallyScheduledWorkItem<TState>>(periodicallyScheduledWorkItem, (Action<PeriodicallyScheduledWorkItem<TState>>)delegate(PeriodicallyScheduledWorkItem<TState> closureThis) {
closureThis._state = closureThis._action(closureThis._state);
});
}, this, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, _taskFactory.Scheduler);
}
}
private static readonly Lazy<TaskPoolScheduler> LazyInstance = new Lazy<TaskPoolScheduler>(() => new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)));
private readonly TaskFactory _taskFactory;
public static TaskPoolScheduler Default => LazyInstance.Value;
public TaskPoolScheduler(TaskFactory taskFactory)
{
if (taskFactory == null)
throw new ArgumentNullException("taskFactory");
_taskFactory = taskFactory;
}
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
return new ScheduledWorkItem<TState>(this, state, action);
}
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan dueTime2 = Scheduler.Normalize(dueTime);
if (dueTime2.Ticks == 0)
return Schedule(state, action);
return ScheduleSlow(state, dueTime2, action);
}
private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
return new SlowlyScheduledWorkItem<TState>(this, state, dueTime, action);
}
public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
{
return new LongScheduledWorkItem<TState>(this, state, action);
}
public override IStopwatch StartStopwatch()
{
return new StopwatchImpl();
}
public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
return new PeriodicallyScheduledWorkItem<TState>(state, period, action, _taskFactory);
}
}
}