<PackageReference Include="System.Reactive" Version="6.0.0-preview.16" />

CurrentThreadScheduler

public sealed class CurrentThreadScheduler : LocalScheduler
Represents an object that schedules units of work on the current thread.
using System.ComponentModel; using System.Runtime.CompilerServices; namespace System.Reactive.Concurrency { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] public sealed class CurrentThreadScheduler : LocalScheduler { [System.Runtime.CompilerServices.NullableContext(0)] private static class Trampoline { [System.Runtime.CompilerServices.NullableContext(1)] public static void Run(SchedulerQueue<TimeSpan> queue) { while (queue.Count > 0) { ScheduledItem<TimeSpan> scheduledItem = queue.Dequeue(); if (!scheduledItem.IsCanceled) { TimeSpan timeout = scheduledItem.DueTime - Time; if (timeout.Ticks > 0) ConcurrencyAbstractionLayer.Current.Sleep(timeout); if (!scheduledItem.IsCanceled) scheduledItem.Invoke(); } } } } private static readonly Lazy<CurrentThreadScheduler> StaticInstance = new Lazy<CurrentThreadScheduler>(() => new CurrentThreadScheduler()); [System.Runtime.CompilerServices.Nullable(2)] [ThreadStatic] private static SchedulerQueue<TimeSpan> _threadLocalQueue; [System.Runtime.CompilerServices.Nullable(2)] [ThreadStatic] private static IStopwatch _clock; [ThreadStatic] private static bool _running; public static CurrentThreadScheduler Instance => StaticInstance.Value; private static TimeSpan Time { get { if (_clock == null) _clock = ConcurrencyAbstractionLayer.Current.StartStopwatch(); return _clock.Elapsed; } } [EditorBrowsable(EditorBrowsableState.Never)] [Obsolete("This instance property is no longer supported. Use CurrentThreadScheduler.IsScheduleRequired instead.")] public bool ScheduleRequired { get { return IsScheduleRequired; } } [EditorBrowsable(EditorBrowsableState.Advanced)] public static bool IsScheduleRequired { get { return !_running; } } private CurrentThreadScheduler() { } [System.Runtime.CompilerServices.NullableContext(2)] private static SchedulerQueue<TimeSpan> GetQueue() { return _threadLocalQueue; } [System.Runtime.CompilerServices.NullableContext(2)] private static void SetQueue(SchedulerQueue<TimeSpan> newQueue) { _threadLocalQueue = newQueue; } public override IDisposable Schedule<[System.Runtime.CompilerServices.Nullable(2)] TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { if (action == null) throw new ArgumentNullException("action"); SchedulerQueue<TimeSpan> queue; if (!_running) { _running = true; if (dueTime > TimeSpan.Zero) ConcurrencyAbstractionLayer.Current.Sleep(dueTime); IDisposable result; try { result = action(this, state); } catch { SetQueue(null); _running = false; throw; } queue = GetQueue(); if (queue != null) try { Trampoline.Run(queue); return result; } finally { SetQueue(null); _running = false; } _running = false; return result; } queue = GetQueue(); if (queue == null) { queue = new SchedulerQueue<TimeSpan>(4); SetQueue(queue); } TimeSpan dueTime2 = Time + Scheduler.Normalize(dueTime); ScheduledItem<TimeSpan, TState> scheduledItem = new ScheduledItem<TimeSpan, TState>((IScheduler)this, state, action, dueTime2); queue.Enqueue((ScheduledItem<TimeSpan>)scheduledItem); return scheduledItem; } } }