<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

LocalScheduler

using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.PlatformServices; using System.Threading; namespace System.Reactive.Concurrency { public abstract class LocalScheduler : IScheduler, IStopwatchProvider, IServiceProvider { private abstract class WorkItem : IComparable<WorkItem>, IDisposable { public readonly LocalScheduler Scheduler; public readonly DateTimeOffset DueTime; private readonly SingleAssignmentDisposable _disposable; private int _hasRun; public WorkItem(LocalScheduler scheduler, DateTimeOffset dueTime) { Scheduler = scheduler; DueTime = dueTime; _disposable = new SingleAssignmentDisposable(); _hasRun = 0; } public void Invoke(IScheduler scheduler) { if (Interlocked.Exchange(ref _hasRun, 1) == 0) try { if (!_disposable.IsDisposed) _disposable.Disposable = InvokeCore(scheduler); } finally { SystemClock.Release(); } } protected abstract IDisposable InvokeCore(IScheduler scheduler); public int CompareTo(WorkItem other) { return Comparer<DateTimeOffset>.Default.Compare(DueTime, other.DueTime); } public void Dispose() { _disposable.Dispose(); } } private sealed class WorkItem<TState> : WorkItem { private readonly TState _state; private readonly Func<IScheduler, TState, IDisposable> _action; public WorkItem(LocalScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) : base(scheduler, dueTime) { _state = state; _action = action; } protected override IDisposable InvokeCore(IScheduler scheduler) { return _action(scheduler, _state); } } private static readonly object _gate = new object(); private static readonly object s_gate = new object(); private static readonly PriorityQueue<WorkItem> s_longTerm = new PriorityQueue<WorkItem>(); private static readonly SerialDisposable s_nextLongTermTimer = new SerialDisposable(); private static WorkItem s_nextLongTermWorkItem = null; private readonly PriorityQueue<WorkItem> _shortTerm = new PriorityQueue<WorkItem>(); private readonly HashSet<IDisposable> _shortTermWork = new HashSet<IDisposable>(); private static readonly TimeSpan SHORTTERM = TimeSpan.FromSeconds(10); private const int MAXERRORRATIO = 1000; private static readonly TimeSpan LONGTOSHORT = TimeSpan.FromSeconds(5); private static readonly TimeSpan RETRYSHORT = TimeSpan.FromMilliseconds(50); private static readonly TimeSpan MAXSUPPORTEDTIMER = TimeSpan.FromMilliseconds(4294967294); public virtual DateTimeOffset Now => Scheduler.Now; public virtual IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); return Schedule(state, TimeSpan.Zero, action); } public abstract IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action); public virtual IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); return Enqueue(state, dueTime, action); } public virtual IStopwatch StartStopwatch() { return ConcurrencyAbstractionLayer.Current.StartStopwatch(); } object IServiceProvider.GetService(Type serviceType) { return GetService(serviceType); } protected virtual object GetService(Type serviceType) { if (serviceType == typeof(IStopwatchProvider)) return this; if (serviceType == typeof(ISchedulerLongRunning)) return this as ISchedulerLongRunning; if (serviceType == typeof(ISchedulerPeriodic)) return this as ISchedulerPeriodic; return null; } protected LocalScheduler() { SystemClock.Register(this); } private IDisposable Enqueue<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) { TimeSpan t = Scheduler.Normalize(dueTime - Now); if (t == TimeSpan.Zero) return Schedule(state, TimeSpan.Zero, action); SystemClock.AddRef(); WorkItem<TState> workItem = new WorkItem<TState>(this, state, dueTime, action); if (t <= SHORTTERM) ScheduleShortTermWork(workItem); else ScheduleLongTermWork(workItem); return workItem; } private void ScheduleShortTermWork(WorkItem item) { lock (_gate) { _shortTerm.Enqueue(item); SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _shortTermWork.Add(singleAssignmentDisposable); TimeSpan dueTime = Scheduler.Normalize(item.DueTime - item.Scheduler.Now); singleAssignmentDisposable.Disposable = item.Scheduler.Schedule(singleAssignmentDisposable, dueTime, ExecuteNextShortTermWorkItem); } } private IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel) { WorkItem workItem = null; lock (_gate) { if (_shortTermWork.Remove(cancel) && _shortTerm.Count > 0) workItem = _shortTerm.Dequeue(); } if (workItem != null) { if (workItem.DueTime - workItem.Scheduler.Now >= RETRYSHORT) ScheduleShortTermWork(workItem); else workItem.Invoke(scheduler); } return Disposable.Empty; } private static void ScheduleLongTermWork(WorkItem item) { lock (s_gate) { s_longTerm.Enqueue(item); UpdateLongTermProcessingTimer(); } } private static void UpdateLongTermProcessingTimer() { if (s_longTerm.Count != 0) { WorkItem workItem = s_longTerm.Peek(); if (workItem != s_nextLongTermWorkItem) { TimeSpan t = Scheduler.Normalize(workItem.DueTime - workItem.Scheduler.Now); long val = t.Ticks / 1000; TimeSpan timeSpan = LONGTOSHORT; TimeSpan t2 = TimeSpan.FromTicks(Math.Max(val, timeSpan.Ticks)); long ticks = (t - t2).Ticks; timeSpan = MAXSUPPORTEDTIMER; TimeSpan dueTime = TimeSpan.FromTicks(Math.Min(ticks, timeSpan.Ticks)); s_nextLongTermWorkItem = workItem; s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueTime); } } } private static void EvaluateLongTermQueue(object state) { lock (s_gate) { WorkItem workItem = null; while (s_longTerm.Count > 0) { workItem = s_longTerm.Peek(); if (Scheduler.Normalize(workItem.DueTime - workItem.Scheduler.Now) >= SHORTTERM) break; WorkItem workItem2 = s_longTerm.Dequeue(); workItem2.Scheduler.ScheduleShortTermWork(workItem2); } s_nextLongTermWorkItem = null; UpdateLongTermProcessingTimer(); } } internal void SystemClockChanged(object sender, SystemClockChangedEventArgs args) { lock (s_gate) { lock (_gate) { foreach (IDisposable item2 in _shortTermWork) { item2.Dispose(); } _shortTermWork.Clear(); while (_shortTerm.Count > 0) { WorkItem item = _shortTerm.Dequeue(); s_longTerm.Enqueue(item); } s_nextLongTermWorkItem = null; EvaluateLongTermQueue(null); } } } } }