<PackageReference Include="System.Reactive" Version="4.1.5" />

EventLoopScheduler

Represents an object that schedules units of work on a designated thread.
using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable { private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable { private readonly TimeSpan _period; private readonly Func<TState, TState> _action; private readonly EventLoopScheduler _scheduler; private readonly AsyncLock _gate = new AsyncLock(); private TState _state; private TimeSpan _next; private IDisposable _task; public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action) { _state = state; _period = period; _action = action; _scheduler = scheduler; _next = scheduler._stopwatch.Elapsed + period; Disposable.TrySetSingle(ref _task, ((LocalScheduler)scheduler).Schedule<PeriodicallyScheduledWorkItem<TState>>(this, _next - scheduler._stopwatch.Elapsed, (Func<IScheduler, PeriodicallyScheduledWorkItem<TState>, IDisposable>)((IScheduler _, PeriodicallyScheduledWorkItem<TState> s) => s.Tick(_)))); } private IDisposable Tick(IScheduler self) { _next += _period; Disposable.TrySetMultiple(ref _task, self.Schedule<PeriodicallyScheduledWorkItem<TState>>(this, _next - _scheduler._stopwatch.Elapsed, (Func<IScheduler, PeriodicallyScheduledWorkItem<TState>, IDisposable>)((IScheduler _, PeriodicallyScheduledWorkItem<TState> s) => s.Tick(_)))); _gate.Wait<PeriodicallyScheduledWorkItem<TState>>(this, (Action<PeriodicallyScheduledWorkItem<TState>>)delegate(PeriodicallyScheduledWorkItem<TState> closureWorkItem) { closureWorkItem._state = closureWorkItem._action(closureWorkItem._state); }); return Disposable.Empty; } public void Dispose() { Disposable.TryDispose(ref _task); _gate.Dispose(); } } private static int _counter; private readonly Func<ThreadStart, Thread> _threadFactory; private readonly IStopwatch _stopwatch; private Thread _thread; private readonly object _gate; private readonly SemaphoreSlim _evt; private readonly SchedulerQueue<TimeSpan> _queue; private readonly Queue<ScheduledItem<TimeSpan>> _readyList; private ScheduledItem<TimeSpan> _nextItem; private IDisposable _nextTimer; private bool _disposed; internal bool ExitIfEmpty { get; set; } public EventLoopScheduler() : this((ThreadStart a) => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref _counter), IsBackground = true }) { } public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory) { if (threadFactory == null) throw new ArgumentNullException("threadFactory"); _threadFactory = threadFactory; _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); _gate = new object(); _evt = new SemaphoreSlim(0); _queue = new SchedulerQueue<TimeSpan>(); _readyList = new Queue<ScheduledItem<TimeSpan>>(); ExitIfEmpty = false; } public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); TimeSpan dueTime2 = _stopwatch.Elapsed + dueTime; ScheduledItem<TimeSpan, TState> scheduledItem = new ScheduledItem<TimeSpan, TState>((IScheduler)this, state, action, dueTime2); lock (_gate) { if (_disposed) throw new ObjectDisposedException(""); if (dueTime <= TimeSpan.Zero) { _readyList.Enqueue((ScheduledItem<TimeSpan>)scheduledItem); _evt.Release(); } else { _queue.Enqueue((ScheduledItem<TimeSpan>)scheduledItem); _evt.Release(); } EnsureThread(); return scheduledItem; } } public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action); } public override IStopwatch StartStopwatch() { return new StopwatchImpl(); } public void Dispose() { lock (_gate) { if (!_disposed) { _disposed = true; Disposable.TryDispose(ref _nextTimer); _evt.Release(); } } } private void EnsureThread() { if (_thread == null) { _thread = _threadFactory(Run); _thread.Start(); } } private void Run() { while (true) { _evt.Wait(); ScheduledItem<TimeSpan>[] array = null; lock (_gate) { while (_evt.CurrentCount > 0) { _evt.Wait(); } if (_disposed) { _evt.Dispose(); return; } while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) { ScheduledItem<TimeSpan> item = _queue.Dequeue(); _readyList.Enqueue(item); } if (_queue.Count > 0) { ScheduledItem<TimeSpan> scheduledItem = _queue.Peek(); if (scheduledItem != _nextItem) { _nextItem = scheduledItem; TimeSpan dueTime = scheduledItem.DueTime - _stopwatch.Elapsed; Disposable.TrySetSerial(ref _nextTimer, ConcurrencyAbstractionLayer.Current.StartTimer(Tick, scheduledItem, dueTime)); } } if (_readyList.Count > 0) { array = _readyList.ToArray(); _readyList.Clear(); } } if (array != null) { ScheduledItem<TimeSpan>[] array2 = array; foreach (ScheduledItem<TimeSpan> scheduledItem2 in array2) { if (!scheduledItem2.IsCanceled) scheduledItem2.Invoke(); } } if (ExitIfEmpty) { lock (_gate) { if (_readyList.Count == 0 && _queue.Count == 0) { _thread = null; return; } } } } } private void Tick(object state) { lock (_gate) { if (!_disposed) { ScheduledItem<TimeSpan> scheduledItem = (ScheduledItem<TimeSpan>)state; if (scheduledItem == _nextItem) _nextItem = null; if (_queue.Remove(scheduledItem)) _readyList.Enqueue(scheduledItem); _evt.Release(); } } } } }