ImmediateScheduler
using System.Reactive.Disposables;
namespace System.Reactive.Concurrency
{
public sealed class ImmediateScheduler : LocalScheduler
{
private sealed class AsyncLockScheduler : LocalScheduler
{
private AsyncLock asyncLock;
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable i = new SingleAssignmentDisposable();
if (asyncLock == null)
asyncLock = new AsyncLock();
asyncLock.Wait(delegate {
if (!i.IsDisposed)
i.Disposable = action(this, state);
});
return i;
}
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
if (dueTime.Ticks <= 0)
return Schedule(state, action);
return ScheduleSlow(state, dueTime, action);
}
private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
IStopwatch timer = ConcurrencyAbstractionLayer.Current.StartStopwatch();
SingleAssignmentDisposable i = new SingleAssignmentDisposable();
if (asyncLock == null)
asyncLock = new AsyncLock();
asyncLock.Wait(delegate {
if (!i.IsDisposed) {
TimeSpan timeout = dueTime - timer.Elapsed;
if (timeout.Ticks > 0)
ConcurrencyAbstractionLayer.Current.Sleep(timeout);
if (!i.IsDisposed)
i.Disposable = action(this, state);
}
});
return i;
}
}
private static readonly Lazy<ImmediateScheduler> s_instance = new Lazy<ImmediateScheduler>(() => new ImmediateScheduler());
public static ImmediateScheduler Instance => s_instance.Value;
private ImmediateScheduler()
{
}
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
return action(new AsyncLockScheduler(), state);
}
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan timeout = Scheduler.Normalize(dueTime);
if (timeout.Ticks > 0)
ConcurrencyAbstractionLayer.Current.Sleep(timeout);
return action(new AsyncLockScheduler(), state);
}
}
}