<PackageReference Include="Relativity.Transfer.Client" Version="7.1.40" />

ThreadPoolScheduler

using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { private sealed class FastPeriodicTimer<TState> : IDisposable { private TState _state; private Func<TState, TState> _action; private volatile bool _disposed; public FastPeriodicTimer(TState state, Func<TState, TState> action) { _state = state; _action = action; ThreadPool.QueueUserWorkItem(Tick, null); } private void Tick(object state) { if (!_disposed) { _state = _action(_state); ThreadPool.QueueUserWorkItem(Tick, null); } } public void Dispose() { _disposed = true; _action = Stubs<TState>.I; } } private sealed class Timer<TState> : IDisposable { private readonly MultipleAssignmentDisposable _disposable; private readonly IScheduler _parent; private readonly TState _state; private Func<IScheduler, TState, IDisposable> _action; private volatile Timer _timer; public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { _parent = parent; _state = state; _action = action; _disposable = new MultipleAssignmentDisposable(); _disposable.Disposable = Disposable.Create(Stop); try { } finally { _timer = new Timer(Tick, null, dueTime, TimeSpan.FromMilliseconds(-1)); } } private void Tick(object state) { try { _disposable.Disposable = _action(_parent, _state); } finally { SpinWait.SpinUntil(IsTimerAssigned); Stop(); } } private bool IsTimerAssigned() { return _timer != null; } public void Dispose() { _disposable.Dispose(); } private void Stop() { Timer timer = _timer; if (timer != TimerStubs.Never) { _action = Nop; _timer = TimerStubs.Never; timer.Dispose(); } } private IDisposable Nop(IScheduler scheduler, TState state) { return Disposable.Empty; } } private sealed class PeriodicTimer<TState> : IDisposable { private TState _state; private Func<TState, TState> _action; private readonly AsyncLock _gate; private volatile Timer _timer; public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action) { _state = state; _action = action; _gate = new AsyncLock(); _timer = new Timer(Tick, null, period, period); } private void Tick(object state) { _gate.Wait(delegate { _state = _action(_state); }); } public void Dispose() { Timer timer = _timer; if (timer != null) { _action = Stubs<TState>.I; _timer = null; timer.Dispose(); _gate.Dispose(); } } } private static readonly Lazy<ThreadPoolScheduler> s_instance = new Lazy<ThreadPoolScheduler>(() => new ThreadPoolScheduler()); private static readonly Lazy<NewThreadScheduler> s_newBackgroundThread = new Lazy<NewThreadScheduler>(() => new NewThreadScheduler((ThreadStart action) => new Thread(action) { IsBackground = true })); public static ThreadPoolScheduler Instance => s_instance.Value; private ThreadPoolScheduler() { } public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); SingleAssignmentDisposable d = new SingleAssignmentDisposable(); ThreadPool.QueueUserWorkItem(delegate { if (!d.IsDisposed) d.Disposable = action(this, state); }, null); return d; } public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); TimeSpan dueTime2 = Scheduler.Normalize(dueTime); if (dueTime2.Ticks == 0) return Schedule(state, action); return new Timer<TState>((IScheduler)this, state, dueTime2, action); } public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) { if (action == null) throw new ArgumentNullException("action"); return s_newBackgroundThread.Value.ScheduleLongRunning(state, action); } public override IStopwatch StartStopwatch() { return new StopwatchImpl(); } public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); if (period == TimeSpan.Zero) return new FastPeriodicTimer<TState>(state, action); return new PeriodicTimer<TState>(state, period, action); } } }