<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

TaskPoolScheduler

using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Concurrency { public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { private static readonly Lazy<TaskPoolScheduler> s_instance = new Lazy<TaskPoolScheduler>(() => new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default))); private readonly TaskFactory taskFactory; public static TaskPoolScheduler Default => s_instance.Value; public TaskPoolScheduler(TaskFactory taskFactory) { if (taskFactory == null) throw new ArgumentNullException("taskFactory"); this.taskFactory = taskFactory; } public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); SerialDisposable d = new SerialDisposable(); CancellationDisposable cancellationDisposable = new CancellationDisposable(); d.Disposable = cancellationDisposable; taskFactory.StartNew(delegate { d.Disposable = action(this, state); }, cancellationDisposable.Token); return d; } public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); TimeSpan dueTime2 = Scheduler.Normalize(dueTime); if (dueTime2.Ticks == 0) return Schedule(state, action); return ScheduleSlow(state, dueTime2, action); } private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { MultipleAssignmentDisposable d = new MultipleAssignmentDisposable(); CancellationDisposable cancellationDisposable = new CancellationDisposable(); d.Disposable = cancellationDisposable; TaskHelpers.Delay(dueTime, cancellationDisposable.Token).ContinueWith(delegate { if (!d.IsDisposed) d.Disposable = action(this, state); }, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, taskFactory.Scheduler); return d; } public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) { BooleanDisposable d = new BooleanDisposable(); taskFactory.StartNew(delegate { action(state, d); }, TaskCreationOptions.LongRunning); return d; } public override IStopwatch StartStopwatch() { return new StopwatchImpl(); } public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); if (action == null) throw new ArgumentNullException("action"); CancellationDisposable cancel = new CancellationDisposable(); AsyncLock gate = new AsyncLock(); Action moveNext = null; moveNext = delegate { TaskHelpers.Delay(period, cancel.Token).ContinueWith(delegate { moveNext(); gate.Wait(delegate { state = action(state); }); }, CancellationToken.None, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously, taskFactory.Scheduler); }; moveNext(); return StableCompositeDisposable.Create(cancel, gate); } } }