EventLoopScheduler
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
{
private static int s_counter;
private readonly Func<ThreadStart, Thread> _threadFactory;
private IStopwatch _stopwatch;
private Thread _thread;
private readonly object _gate;
private readonly SemaphoreSlim _evt;
private readonly SchedulerQueue<TimeSpan> _queue;
private readonly Queue<ScheduledItem<TimeSpan>> _readyList;
private ScheduledItem<TimeSpan> _nextItem;
private readonly SerialDisposable _nextTimer;
private bool _disposed;
internal bool ExitIfEmpty { get; set; }
public EventLoopScheduler()
: this((ThreadStart a) => new Thread(a) {
Name = "Event Loop " + Interlocked.Increment(ref s_counter),
IsBackground = true
})
{
}
public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
{
if (threadFactory == null)
throw new ArgumentNullException("threadFactory");
_threadFactory = threadFactory;
_stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
_gate = new object();
_evt = new SemaphoreSlim(0);
_queue = new SchedulerQueue<TimeSpan>();
_readyList = new Queue<ScheduledItem<TimeSpan>>();
_nextTimer = new SerialDisposable();
ExitIfEmpty = false;
}
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan dueTime2 = _stopwatch.Elapsed + dueTime;
ScheduledItem<TimeSpan, TState> scheduledItem = new ScheduledItem<TimeSpan, TState>((IScheduler)this, state, action, dueTime2);
lock (_gate) {
if (_disposed)
throw new ObjectDisposedException("");
if (dueTime <= TimeSpan.Zero) {
_readyList.Enqueue((ScheduledItem<TimeSpan>)scheduledItem);
_evt.Release();
} else {
_queue.Enqueue((ScheduledItem<TimeSpan>)scheduledItem);
_evt.Release();
}
EnsureThread();
}
return Disposable.Create(((ScheduledItem<TimeSpan>)scheduledItem).Cancel);
}
public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
TimeSpan elapsed = _stopwatch.Elapsed;
TimeSpan next = elapsed + period;
MultipleAssignmentDisposable d = new MultipleAssignmentDisposable();
AsyncLock gate = new AsyncLock();
Func<IScheduler, object, IDisposable> tick = null;
tick = delegate(IScheduler self_, object _) {
next += period;
d.Disposable = self_.Schedule<object>((object)null, next - _stopwatch.Elapsed, tick);
gate.Wait(delegate {
state = action(state);
});
return Disposable.Empty;
};
d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
return StableCompositeDisposable.Create(d, gate);
}
public override IStopwatch StartStopwatch()
{
return new StopwatchImpl();
}
public void Dispose()
{
lock (_gate) {
if (!_disposed) {
_disposed = true;
_nextTimer.Dispose();
_evt.Release();
}
}
}
private void EnsureThread()
{
if (_thread == null) {
_thread = _threadFactory(Run);
_thread.Start();
}
}
private void Run()
{
while (true) {
_evt.Wait();
ScheduledItem<TimeSpan>[] array = null;
lock (_gate) {
while (_evt.CurrentCount > 0) {
_evt.Wait();
}
if (_disposed) {
((IDisposable)_evt).Dispose();
return;
}
while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) {
ScheduledItem<TimeSpan> item = _queue.Dequeue();
_readyList.Enqueue(item);
}
if (_queue.Count > 0) {
ScheduledItem<TimeSpan> scheduledItem = _queue.Peek();
if (scheduledItem != _nextItem) {
_nextItem = scheduledItem;
TimeSpan dueTime = scheduledItem.DueTime - _stopwatch.Elapsed;
_nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, scheduledItem, dueTime);
}
}
if (_readyList.Count > 0) {
array = _readyList.ToArray();
_readyList.Clear();
}
}
if (array != null) {
ScheduledItem<TimeSpan>[] array2 = array;
foreach (ScheduledItem<TimeSpan> scheduledItem2 in array2) {
if (!scheduledItem2.IsCanceled)
scheduledItem2.Invoke();
}
}
if (ExitIfEmpty) {
lock (_gate) {
if (_readyList.Count == 0 && _queue.Count == 0) {
_thread = null;
return;
}
}
}
}
}
private void Tick(object state)
{
lock (_gate) {
if (!_disposed) {
ScheduledItem<TimeSpan> scheduledItem = (ScheduledItem<TimeSpan>)state;
if (_queue.Remove(scheduledItem))
_readyList.Enqueue(scheduledItem);
_evt.Release();
}
}
}
}
}