TaskPoolScheduler
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Concurrency
{
public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
{
private static readonly Lazy<TaskPoolScheduler> s_instance = new Lazy<TaskPoolScheduler>(() => new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)));
private readonly TaskFactory taskFactory;
public static TaskPoolScheduler Default => s_instance.Value;
public TaskPoolScheduler(TaskFactory taskFactory)
{
if (taskFactory == null)
throw new ArgumentNullException("taskFactory");
this.taskFactory = taskFactory;
}
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SerialDisposable d = new SerialDisposable();
CancellationDisposable cancellationDisposable = new CancellationDisposable();
d.Disposable = cancellationDisposable;
taskFactory.StartNew(delegate {
d.Disposable = action(this, state);
}, cancellationDisposable.Token);
return d;
}
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan dueTime2 = Scheduler.Normalize(dueTime);
if (dueTime2.Ticks == 0)
return Schedule(state, action);
return ScheduleSlow(state, dueTime2, action);
}
private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
MultipleAssignmentDisposable d = new MultipleAssignmentDisposable();
CancellationDisposable cancellationDisposable = new CancellationDisposable();
d.Disposable = cancellationDisposable;
TaskHelpers.Delay(dueTime, cancellationDisposable.Token).ContinueWith(delegate {
if (!d.IsDisposed)
d.Disposable = action(this, state);
}, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, taskFactory.Scheduler);
return d;
}
public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
{
BooleanDisposable d = new BooleanDisposable();
taskFactory.StartNew(delegate {
action(state, d);
}, TaskCreationOptions.LongRunning);
return d;
}
public override IStopwatch StartStopwatch()
{
return new StopwatchImpl();
}
public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
CancellationDisposable cancel = new CancellationDisposable();
AsyncLock gate = new AsyncLock();
Action moveNext = null;
moveNext = delegate {
TaskHelpers.Delay(period, cancel.Token).ContinueWith(delegate {
moveNext();
gate.Wait(delegate {
state = action(state);
});
}, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, taskFactory.Scheduler);
};
moveNext();
return StableCompositeDisposable.Create(cancel, gate);
}
}
}