<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

Synchronization

public static class Synchronization
Provides basic synchronization and scheduling services for observable sequences.
using System.ComponentModel; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Concurrency { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] [EditorBrowsable(EditorBrowsableState.Advanced)] public static class Synchronization { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class SubscribeOnObservable<[System.Runtime.CompilerServices.Nullable(2)] TSource> : ObservableBase<TSource> { [System.Runtime.CompilerServices.NullableContext(0)] private sealed class Subscription : IDisposable { private SerialDisposableValue _cancel; [System.Runtime.CompilerServices.NullableContext(1)] public Subscription(IObservable<TSource> source, IScheduler scheduler, IObserver<TSource> observer) { _cancel.TrySetFirst(scheduler.Schedule<(Subscription, IObservable<TSource>, IObserver<TSource>)>((this, source, observer), (Func<IScheduler, (Subscription, IObservable<TSource>, IObserver<TSource>), IDisposable>)delegate(IScheduler closureScheduler, (Subscription this, IObservable<TSource> source, IObserver<TSource> observer) state) { state.this._cancel.Disposable = new ScheduledDisposable(closureScheduler, ObservableExtensions.SubscribeSafe<TSource>(state.source, state.observer)); return Disposable.Empty; })); } public void Dispose() { _cancel.Dispose(); } } private readonly IObservable<TSource> _source; private readonly IScheduler _scheduler; public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler) { _source = source; _scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver<TSource> observer) { return new Subscription(_source, _scheduler, observer); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class SubscribeOnCtxObservable<[System.Runtime.CompilerServices.Nullable(2)] TSource> : ObservableBase<TSource> { [System.Runtime.CompilerServices.Nullable(0)] private sealed class Subscription : IDisposable { private readonly IObservable<TSource> _source; private readonly IObserver<TSource> _observer; private readonly SynchronizationContext _context; private SingleAssignmentDisposableValue _cancel; public Subscription(IObservable<TSource> source, SynchronizationContext context, IObserver<TSource> observer) { _source = source; _context = context; _observer = observer; SynchronizationContextExtensions.PostWithStartComplete<Subscription>(context, (Action<Subscription>)delegate(Subscription this) { if (!this._cancel.IsDisposed) this._cancel.Disposable = new ContextDisposable(this._context, ObservableExtensions.SubscribeSafe<TSource>(this._source, this._observer)); }, this); } public void Dispose() { _cancel.Dispose(); } } private readonly IObservable<TSource> _source; private readonly SynchronizationContext _context; public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context) { _source = source; _context = context; } protected override IDisposable SubscribeCore(IObserver<TSource> observer) { return new Subscription(_source, _context, observer); } } public static IObservable<TSource> SubscribeOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SubscribeOnObservable<TSource>(source, scheduler); } public static IObservable<TSource> SubscribeOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException("source"); if (context == null) throw new ArgumentNullException("context"); return new SubscribeOnCtxObservable<TSource>(source, context); } public static IObservable<TSource> ObserveOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); ISchedulerLongRunning schedulerLongRunning = scheduler.AsLongRunning(); if (schedulerLongRunning != null) return new ObserveOn<TSource>.SchedulerLongRunning(source, schedulerLongRunning); return new ObserveOn<TSource>.Scheduler(source, scheduler); } public static IObservable<TSource> ObserveOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException("source"); if (context == null) throw new ArgumentNullException("context"); return new ObserveOn<TSource>.Context(source, context); } public static IObservable<TSource> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return new Synchronize<TSource>(source); } public static IObservable<TSource> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, object gate) { if (source == null) throw new ArgumentNullException("source"); if (gate == null) throw new ArgumentNullException("gate"); return new Synchronize<TSource>(source, gate); } } }