<PackageReference Include="Relativity.Transfer.Client" Version="7.1.40" />

NewThreadScheduler

using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { private sealed class Periodic<TState> : IDisposable { private readonly IStopwatch _stopwatch; private readonly TimeSpan _period; private readonly Func<TState, TState> _action; private readonly object _cancel = new object(); private volatile bool _done; private TState _state; private TimeSpan _next; public Periodic(TState state, TimeSpan period, Func<TState, TState> action) { _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); _period = period; _action = action; _state = state; _next = period; } public void Run() { while (!_done) { TimeSpan timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed); lock (_cancel) { if (Monitor.Wait(_cancel, timeout)) return; } _state = _action(_state); _next += _period; } } public void Dispose() { _done = true; lock (_cancel) { Monitor.Pulse(_cancel); } } } internal static readonly Lazy<NewThreadScheduler> s_instance = new Lazy<NewThreadScheduler>(() => new NewThreadScheduler()); private readonly Func<ThreadStart, Thread> _threadFactory; public static NewThreadScheduler Default => s_instance.Value; public NewThreadScheduler() : this((ThreadStart action) => new Thread(action)) { } public NewThreadScheduler(Func<ThreadStart, Thread> threadFactory) { if (threadFactory == null) throw new ArgumentNullException("threadFactory"); _threadFactory = threadFactory; } public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); return new EventLoopScheduler(_threadFactory) { ExitIfEmpty = true }.Schedule(state, dueTime, action); } public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) { if (action == null) throw new ArgumentNullException("action"); BooleanDisposable d = new BooleanDisposable(); _threadFactory(delegate { action(state, d); }).Start(); return d; } public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); Periodic<TState> periodic = new Periodic<TState>(state, period, action); _threadFactory(periodic.Run).Start(); return periodic; } public override IStopwatch StartStopwatch() { return new StopwatchImpl(); } } }