ThreadPoolScheduler
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
{
private sealed class FastPeriodicTimer<TState> : IDisposable
{
private TState _state;
private Func<TState, TState> _action;
private volatile bool _disposed;
public FastPeriodicTimer(TState state, Func<TState, TState> action)
{
_state = state;
_action = action;
ThreadPool.QueueUserWorkItem(Tick, null);
}
private void Tick(object state)
{
if (!_disposed) {
_state = _action(_state);
ThreadPool.QueueUserWorkItem(Tick, null);
}
}
public void Dispose()
{
_disposed = true;
_action = Stubs<TState>.I;
}
}
private sealed class Timer<TState> : IDisposable
{
private readonly MultipleAssignmentDisposable _disposable;
private readonly IScheduler _parent;
private readonly TState _state;
private Func<IScheduler, TState, IDisposable> _action;
private volatile Timer _timer;
public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
_parent = parent;
_state = state;
_action = action;
_disposable = new MultipleAssignmentDisposable();
_disposable.Disposable = Disposable.Create(Stop);
try {
} finally {
_timer = new Timer(Tick, null, dueTime, TimeSpan.FromMilliseconds(-1));
}
}
private void Tick(object state)
{
try {
_disposable.Disposable = _action(_parent, _state);
} finally {
SpinWait.SpinUntil(IsTimerAssigned);
Stop();
}
}
private bool IsTimerAssigned()
{
return _timer != null;
}
public void Dispose()
{
_disposable.Dispose();
}
private void Stop()
{
Timer timer = _timer;
if (timer != TimerStubs.Never) {
_action = Nop;
_timer = TimerStubs.Never;
timer.Dispose();
}
}
private IDisposable Nop(IScheduler scheduler, TState state)
{
return Disposable.Empty;
}
}
private sealed class PeriodicTimer<TState> : IDisposable
{
private TState _state;
private Func<TState, TState> _action;
private readonly AsyncLock _gate;
private volatile Timer _timer;
public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action)
{
_state = state;
_action = action;
_gate = new AsyncLock();
_timer = new Timer(Tick, null, period, period);
}
private void Tick(object state)
{
_gate.Wait(delegate {
_state = _action(_state);
});
}
public void Dispose()
{
Timer timer = _timer;
if (timer != null) {
_action = Stubs<TState>.I;
_timer = null;
timer.Dispose();
_gate.Dispose();
}
}
}
private static readonly Lazy<ThreadPoolScheduler> s_instance = new Lazy<ThreadPoolScheduler>(() => new ThreadPoolScheduler());
private static readonly Lazy<NewThreadScheduler> s_newBackgroundThread = new Lazy<NewThreadScheduler>(() => new NewThreadScheduler((ThreadStart action) => new Thread(action) {
IsBackground = true
}));
public static ThreadPoolScheduler Instance => s_instance.Value;
private ThreadPoolScheduler()
{
}
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
ThreadPool.QueueUserWorkItem(delegate {
if (!d.IsDisposed)
d.Disposable = action(this, state);
}, null);
return d;
}
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan dueTime2 = Scheduler.Normalize(dueTime);
if (dueTime2.Ticks == 0)
return Schedule(state, action);
return new Timer<TState>((IScheduler)this, state, dueTime2, action);
}
public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
{
if (action == null)
throw new ArgumentNullException("action");
return s_newBackgroundThread.Value.ScheduleLongRunning(state, action);
}
public override IStopwatch StartStopwatch()
{
return new StopwatchImpl();
}
public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
if (period == TimeSpan.Zero)
return new FastPeriodicTimer<TState>(state, action);
return new PeriodicTimer<TState>(state, period, action);
}
}
}