<PackageReference Include="System.Reactive" Version="4.3.2" />

TaskPoolScheduler

Represents an object that schedules units of work on the Task Parallel Library (TPL) task pool.
using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Concurrency { public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { private sealed class ScheduledWorkItem<TState> : IDisposable { private readonly TState _state; private readonly TaskPoolScheduler _scheduler; private readonly Func<IScheduler, TState, IDisposable> _action; private IDisposable _cancel; public ScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Func<IScheduler, TState, IDisposable> action) { _state = state; _action = action; _scheduler = scheduler; CancellationDisposable cancellationDisposable = new CancellationDisposable(); Disposable.SetSingle(ref _cancel, cancellationDisposable); scheduler._taskFactory.StartNew(delegate(object thisObject) { ScheduledWorkItem<TState> scheduledWorkItem = (ScheduledWorkItem<TState>)thisObject; Disposable.TrySetSerial(ref scheduledWorkItem._cancel, scheduledWorkItem._action(scheduledWorkItem._scheduler, scheduledWorkItem._state)); }, this, cancellationDisposable.Token); } public void Dispose() { Disposable.TryDispose(ref _cancel); } } private sealed class SlowlyScheduledWorkItem<TState> : IDisposable { private readonly TState _state; private readonly TaskPoolScheduler _scheduler; private readonly Func<IScheduler, TState, IDisposable> _action; private IDisposable _cancel; public SlowlyScheduledWorkItem(TaskPoolScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { _state = state; _action = action; _scheduler = scheduler; CancellationDisposable cancellationDisposable = new CancellationDisposable(); Disposable.SetSingle(ref _cancel, cancellationDisposable); TaskHelpers.Delay(dueTime, cancellationDisposable.Token).ContinueWith(delegate(Task _, object thisObject) { SlowlyScheduledWorkItem<TState> slowlyScheduledWorkItem = (SlowlyScheduledWorkItem<TState>)thisObject; if (!Disposable.GetIsDisposed(ref slowlyScheduledWorkItem._cancel)) Disposable.TrySetMultiple(ref slowlyScheduledWorkItem._cancel, slowlyScheduledWorkItem._action(slowlyScheduledWorkItem._scheduler, slowlyScheduledWorkItem._state)); }, this, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, scheduler._taskFactory.Scheduler); } public void Dispose() { Disposable.TryDispose(ref _cancel); } } private sealed class LongScheduledWorkItem<TState> : ICancelable, IDisposable { private readonly TState _state; private readonly Action<TState, ICancelable> _action; private IDisposable _cancel; public bool IsDisposed => Disposable.GetIsDisposed(ref _cancel); public LongScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Action<TState, ICancelable> action) { _state = state; _action = action; scheduler._taskFactory.StartNew(delegate(object thisObject) { LongScheduledWorkItem<TState> longScheduledWorkItem = (LongScheduledWorkItem<TState>)thisObject; longScheduledWorkItem._action(longScheduledWorkItem._state, longScheduledWorkItem); }, this, TaskCreationOptions.LongRunning); } public void Dispose() { Disposable.TryDispose(ref _cancel); } } private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable { private TState _state; private readonly TimeSpan _period; private readonly TaskFactory _taskFactory; private readonly Func<TState, TState> _action; private readonly AsyncLock _gate = new AsyncLock(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action, TaskFactory taskFactory) { _state = state; _period = period; _action = action; _taskFactory = taskFactory; MoveNext(); } public void Dispose() { _cts.Cancel(); _gate.Dispose(); } private void MoveNext() { TaskHelpers.Delay(_period, _cts.Token).ContinueWith(delegate(Task _, object thisObject) { PeriodicallyScheduledWorkItem<TState> periodicallyScheduledWorkItem = (PeriodicallyScheduledWorkItem<TState>)thisObject; periodicallyScheduledWorkItem.MoveNext(); periodicallyScheduledWorkItem._gate.Wait<PeriodicallyScheduledWorkItem<TState>>(periodicallyScheduledWorkItem, (Action<PeriodicallyScheduledWorkItem<TState>>)delegate(PeriodicallyScheduledWorkItem<TState> closureThis) { closureThis._state = closureThis._action(closureThis._state); }); }, this, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, _taskFactory.Scheduler); } } private static readonly Lazy<TaskPoolScheduler> LazyInstance = new Lazy<TaskPoolScheduler>(() => new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default))); private readonly TaskFactory _taskFactory; public static TaskPoolScheduler Default => LazyInstance.Value; public TaskPoolScheduler(TaskFactory taskFactory) { if (taskFactory == null) throw new ArgumentNullException("taskFactory"); _taskFactory = taskFactory; } public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); return new ScheduledWorkItem<TState>(this, state, action); } public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); TimeSpan dueTime2 = Scheduler.Normalize(dueTime); if (dueTime2.Ticks == 0) return Schedule(state, action); return ScheduleSlow(state, dueTime2, action); } private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { return new SlowlyScheduledWorkItem<TState>(this, state, dueTime, action); } public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) { return new LongScheduledWorkItem<TState>(this, state, action); } public override IStopwatch StartStopwatch() { return new StopwatchImpl(); } public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); return new PeriodicallyScheduledWorkItem<TState>(state, period, action, _taskFactory); } } }