EventLoopScheduler
Represents an object that schedules units of work on a designated thread.
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Concurrency
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class PeriodicallyScheduledWorkItem<[System.Runtime.CompilerServices.Nullable(2)] TState> : IDisposable
{
private readonly TimeSpan _period;
private readonly Func<TState, TState> _action;
private readonly EventLoopScheduler _scheduler;
private readonly AsyncLock _gate = new AsyncLock();
private TState _state;
private TimeSpan _next;
private MultipleAssignmentDisposableValue _task;
public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
_state = state;
_period = period;
_action = action;
_scheduler = scheduler;
_next = scheduler._stopwatch.Elapsed + period;
_task.TrySetFirst(((LocalScheduler)scheduler).Schedule<PeriodicallyScheduledWorkItem<TState>>(this, _next - scheduler._stopwatch.Elapsed, (Func<IScheduler, PeriodicallyScheduledWorkItem<TState>, IDisposable>)((IScheduler _, PeriodicallyScheduledWorkItem<TState> s) => s.Tick(_))));
}
private IDisposable Tick(IScheduler self)
{
_next += _period;
_task.Disposable = self.Schedule<PeriodicallyScheduledWorkItem<TState>>(this, _next - _scheduler._stopwatch.Elapsed, (Func<IScheduler, PeriodicallyScheduledWorkItem<TState>, IDisposable>)((IScheduler _, PeriodicallyScheduledWorkItem<TState> s) => s.Tick(_)));
_gate.Wait<PeriodicallyScheduledWorkItem<TState>>(this, (Action<PeriodicallyScheduledWorkItem<TState>>)delegate(PeriodicallyScheduledWorkItem<TState> closureWorkItem) {
closureWorkItem._state = closureWorkItem._action(closureWorkItem._state);
});
return Disposable.Empty;
}
public void Dispose()
{
_task.Dispose();
_gate.Dispose();
}
}
private static int _counter;
private readonly Func<ThreadStart, Thread> _threadFactory;
private readonly IStopwatch _stopwatch;
[System.Runtime.CompilerServices.Nullable(2)]
private Thread _thread;
private readonly object _gate;
private readonly SemaphoreSlim _evt;
private readonly SchedulerQueue<TimeSpan> _queue;
private readonly Queue<ScheduledItem<TimeSpan>> _readyList;
[System.Runtime.CompilerServices.Nullable(2)]
private ScheduledItem<TimeSpan> _nextItem;
private SerialDisposableValue _nextTimer;
private bool _disposed;
internal bool ExitIfEmpty { get; set; }
public EventLoopScheduler()
: this((ThreadStart a) => new Thread(a) {
Name = "Event Loop " + Interlocked.Increment(ref _counter).ToString(),
IsBackground = true
})
{
}
public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
{
if (threadFactory == null)
throw new ArgumentNullException("threadFactory");
_threadFactory = threadFactory;
_stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
_gate = new object();
_evt = new SemaphoreSlim(0);
_queue = new SchedulerQueue<TimeSpan>();
_readyList = new Queue<ScheduledItem<TimeSpan>>();
ExitIfEmpty = false;
}
public override IDisposable Schedule<[System.Runtime.CompilerServices.Nullable(2)] TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan dueTime2 = _stopwatch.Elapsed + dueTime;
ScheduledItem<TimeSpan, TState> scheduledItem = new ScheduledItem<TimeSpan, TState>((IScheduler)this, state, action, dueTime2);
lock (_gate) {
if (_disposed)
throw new ObjectDisposedException("EventLoopScheduler");
if (dueTime <= TimeSpan.Zero) {
_readyList.Enqueue((ScheduledItem<TimeSpan>)scheduledItem);
_evt.Release();
} else {
_queue.Enqueue((ScheduledItem<TimeSpan>)scheduledItem);
_evt.Release();
}
EnsureThread();
return scheduledItem;
}
}
public IDisposable SchedulePeriodic<[System.Runtime.CompilerServices.Nullable(2)] TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action);
}
public override IStopwatch StartStopwatch()
{
return new StopwatchImpl();
}
public void Dispose()
{
lock (_gate) {
if (!_disposed) {
_disposed = true;
_nextTimer.Dispose();
_evt.Release();
}
}
}
private void EnsureThread()
{
if (_thread == null) {
_thread = _threadFactory(Run);
_thread.Start();
}
}
private void Run()
{
while (true) {
_evt.Wait();
ScheduledItem<TimeSpan>[] array = null;
lock (_gate) {
while (_evt.CurrentCount > 0) {
_evt.Wait();
}
if (_disposed) {
_evt.Dispose();
return;
}
while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) {
ScheduledItem<TimeSpan> item = _queue.Dequeue();
_readyList.Enqueue(item);
}
if (_queue.Count > 0) {
ScheduledItem<TimeSpan> scheduledItem = _queue.Peek();
if (scheduledItem != _nextItem) {
_nextItem = scheduledItem;
TimeSpan dueTime = scheduledItem.DueTime - _stopwatch.Elapsed;
_nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, scheduledItem, dueTime);
}
}
if (_readyList.Count > 0) {
array = _readyList.ToArray();
_readyList.Clear();
}
}
if (array != null) {
ScheduledItem<TimeSpan>[] array2 = array;
foreach (ScheduledItem<TimeSpan> scheduledItem2 in array2) {
if (!scheduledItem2.IsCanceled)
try {
scheduledItem2.Invoke();
} catch (ObjectDisposedException ex) when (ex.ObjectName == "EventLoopScheduler") {
}
}
}
if (ExitIfEmpty) {
lock (_gate) {
if (_readyList.Count == 0 && _queue.Count == 0) {
_thread = null;
return;
}
}
}
}
}
[System.Runtime.CompilerServices.NullableContext(2)]
private void Tick(object state)
{
lock (_gate) {
if (!_disposed) {
ScheduledItem<TimeSpan> scheduledItem = (ScheduledItem<TimeSpan>)state;
if (scheduledItem == _nextItem)
_nextItem = null;
if (_queue.Remove(scheduledItem))
_readyList.Enqueue(scheduledItem);
_evt.Release();
}
}
}
}
}