<PackageReference Include="Relativity.Transfer.Client" Version="7.1.40" />

EventLoopScheduler

using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable { private static int s_counter; private readonly Func<ThreadStart, Thread> _threadFactory; private IStopwatch _stopwatch; private Thread _thread; private readonly object _gate; private readonly SemaphoreSlim _evt; private readonly SchedulerQueue<TimeSpan> _queue; private readonly Queue<ScheduledItem<TimeSpan>> _readyList; private ScheduledItem<TimeSpan> _nextItem; private readonly SerialDisposable _nextTimer; private bool _disposed; internal bool ExitIfEmpty { get; set; } public EventLoopScheduler() : this((ThreadStart a) => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true }) { } public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory) { if (threadFactory == null) throw new ArgumentNullException("threadFactory"); _threadFactory = threadFactory; _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); _gate = new object(); _evt = new SemaphoreSlim(0); _queue = new SchedulerQueue<TimeSpan>(); _readyList = new Queue<ScheduledItem<TimeSpan>>(); _nextTimer = new SerialDisposable(); ExitIfEmpty = false; } public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); TimeSpan dueTime2 = _stopwatch.Elapsed + dueTime; ScheduledItem<TimeSpan, TState> scheduledItem = new ScheduledItem<TimeSpan, TState>((IScheduler)this, state, action, dueTime2); lock (_gate) { if (_disposed) throw new ObjectDisposedException(""); if (dueTime <= TimeSpan.Zero) { _readyList.Enqueue((ScheduledItem<TimeSpan>)scheduledItem); _evt.Release(); } else { _queue.Enqueue((ScheduledItem<TimeSpan>)scheduledItem); _evt.Release(); } EnsureThread(); } return Disposable.Create(((ScheduledItem<TimeSpan>)scheduledItem).Cancel); } public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); TimeSpan elapsed = _stopwatch.Elapsed; TimeSpan next = elapsed + period; MultipleAssignmentDisposable d = new MultipleAssignmentDisposable(); AsyncLock gate = new AsyncLock(); Func<IScheduler, object, IDisposable> tick = null; tick = delegate(IScheduler self_, object _) { next += period; d.Disposable = self_.Schedule<object>((object)null, next - _stopwatch.Elapsed, tick); gate.Wait(delegate { state = action(state); }); return Disposable.Empty; }; d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick); return StableCompositeDisposable.Create(d, gate); } public override IStopwatch StartStopwatch() { return new StopwatchImpl(); } public void Dispose() { lock (_gate) { if (!_disposed) { _disposed = true; _nextTimer.Dispose(); _evt.Release(); } } } private void EnsureThread() { if (_thread == null) { _thread = _threadFactory(Run); _thread.Start(); } } private void Run() { while (true) { _evt.Wait(); ScheduledItem<TimeSpan>[] array = null; lock (_gate) { while (_evt.CurrentCount > 0) { _evt.Wait(); } if (_disposed) { ((IDisposable)_evt).Dispose(); return; } while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) { ScheduledItem<TimeSpan> item = _queue.Dequeue(); _readyList.Enqueue(item); } if (_queue.Count > 0) { ScheduledItem<TimeSpan> scheduledItem = _queue.Peek(); if (scheduledItem != _nextItem) { _nextItem = scheduledItem; TimeSpan dueTime = scheduledItem.DueTime - _stopwatch.Elapsed; _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, scheduledItem, dueTime); } } if (_readyList.Count > 0) { array = _readyList.ToArray(); _readyList.Clear(); } } if (array != null) { ScheduledItem<TimeSpan>[] array2 = array; foreach (ScheduledItem<TimeSpan> scheduledItem2 in array2) { if (!scheduledItem2.IsCanceled) scheduledItem2.Invoke(); } } if (ExitIfEmpty) { lock (_gate) { if (_readyList.Count == 0 && _queue.Count == 0) { _thread = null; return; } } } } } private void Tick(object state) { lock (_gate) { if (!_disposed) { ScheduledItem<TimeSpan> scheduledItem = (ScheduledItem<TimeSpan>)state; if (_queue.Remove(scheduledItem)) _readyList.Enqueue(scheduledItem); _evt.Release(); } } } } }