<PackageReference Include="System.Reactive" Version="4.1.1" />

CurrentThreadScheduler

public sealed class CurrentThreadScheduler : LocalScheduler
Represents an object that schedules units of work on the current thread.
using System.ComponentModel; namespace System.Reactive.Concurrency { public sealed class CurrentThreadScheduler : LocalScheduler { private static class Trampoline { public static void Run(SchedulerQueue<TimeSpan> queue) { while (queue.Count > 0) { ScheduledItem<TimeSpan> scheduledItem = queue.Dequeue(); if (!scheduledItem.IsCanceled) { TimeSpan timeout = scheduledItem.DueTime - Time; if (timeout.Ticks > 0) ConcurrencyAbstractionLayer.Current.Sleep(timeout); if (!scheduledItem.IsCanceled) scheduledItem.Invoke(); } } } } private static readonly Lazy<CurrentThreadScheduler> StaticInstance = new Lazy<CurrentThreadScheduler>(() => new CurrentThreadScheduler()); [ThreadStatic] private static SchedulerQueue<TimeSpan> _threadLocalQueue; [ThreadStatic] private static IStopwatch _clock; [ThreadStatic] private static bool _running; public static CurrentThreadScheduler Instance => StaticInstance.Value; private static TimeSpan Time { get { if (_clock == null) _clock = ConcurrencyAbstractionLayer.Current.StartStopwatch(); return _clock.Elapsed; } } [EditorBrowsable(EditorBrowsableState.Never)] [Obsolete("This instance property is no longer supported. Use CurrentThreadScheduler.IsScheduleRequired instead.")] public bool ScheduleRequired { get { return IsScheduleRequired; } } [EditorBrowsable(EditorBrowsableState.Advanced)] public static bool IsScheduleRequired { get { return !_running; } } private CurrentThreadScheduler() { } private static SchedulerQueue<TimeSpan> GetQueue() { return _threadLocalQueue; } private static void SetQueue(SchedulerQueue<TimeSpan> newQueue) { _threadLocalQueue = newQueue; } public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); SchedulerQueue<TimeSpan> schedulerQueue = null; if (!_running) { _running = true; if (dueTime > TimeSpan.Zero) ConcurrencyAbstractionLayer.Current.Sleep(dueTime); IDisposable result; try { result = action(this, state); } catch { SetQueue(null); _running = false; throw; } schedulerQueue = GetQueue(); if (schedulerQueue != null) try { Trampoline.Run(schedulerQueue); return result; } finally { SetQueue(null); _running = false; } _running = false; return result; } schedulerQueue = GetQueue(); if (schedulerQueue == null) { schedulerQueue = new SchedulerQueue<TimeSpan>(4); SetQueue(schedulerQueue); } TimeSpan dueTime2 = Time + Scheduler.Normalize(dueTime); ScheduledItem<TimeSpan, TState> scheduledItem = new ScheduledItem<TimeSpan, TState>((IScheduler)this, state, action, dueTime2); schedulerQueue.Enqueue((ScheduledItem<TimeSpan>)scheduledItem); return scheduledItem; } } }