<PackageReference Include="System.Reactive" Version="4.1.6" />

Synchronization

public static class Synchronization
Provides basic synchronization and scheduling services for observable sequences.
using System.ComponentModel; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { [EditorBrowsable(EditorBrowsableState.Advanced)] public static class Synchronization { private sealed class SubscribeOnObservable<TSource> : ObservableBase<TSource> { private sealed class Subscription : IDisposable { private IDisposable _cancel; public Subscription(IObservable<TSource> source, IScheduler scheduler, IObserver<TSource> observer) { Disposable.TrySetSingle(ref _cancel, scheduler.Schedule<(Subscription, IObservable<TSource>, IObserver<TSource>)>((this, source, observer), (Func<IScheduler, (Subscription, IObservable<TSource>, IObserver<TSource>), IDisposable>)delegate(IScheduler closureScheduler, (Subscription this, IObservable<TSource> source, IObserver<TSource> observer) state) { Disposable.TrySetSerial(ref state.this._cancel, new ScheduledDisposable(closureScheduler, ObservableExtensions.SubscribeSafe<TSource>(state.source, state.observer))); return Disposable.Empty; })); } public void Dispose() { Disposable.TryDispose(ref _cancel); } } private readonly IObservable<TSource> _source; private readonly IScheduler _scheduler; public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler) { _source = source; _scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver<TSource> observer) { return new Subscription(_source, _scheduler, observer); } } private sealed class SubscribeOnCtxObservable<TSource> : ObservableBase<TSource> { private sealed class Subscription : IDisposable { private readonly IObservable<TSource> _source; private readonly IObserver<TSource> _observer; private readonly SynchronizationContext _context; private IDisposable _cancel; public Subscription(IObservable<TSource> source, SynchronizationContext context, IObserver<TSource> observer) { _source = source; _context = context; _observer = observer; SynchronizationContextExtensions.PostWithStartComplete<Subscription>(context, (Action<Subscription>)delegate(Subscription this) { if (!Disposable.GetIsDisposed(ref this._cancel)) Disposable.SetSingle(ref this._cancel, new ContextDisposable(this._context, ObservableExtensions.SubscribeSafe<TSource>(this._source, this._observer))); }, this); } public void Dispose() { Disposable.TryDispose(ref _cancel); } } private readonly IObservable<TSource> _source; private readonly SynchronizationContext _context; public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context) { _source = source; _context = context; } protected override IDisposable SubscribeCore(IObserver<TSource> observer) { return new Subscription(_source, _context, observer); } } public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new SubscribeOnObservable<TSource>(source, scheduler); } public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException("source"); if (context == null) throw new ArgumentNullException("context"); return new SubscribeOnCtxObservable<TSource>(source, context); } public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); ISchedulerLongRunning schedulerLongRunning = scheduler.AsLongRunning(); if (schedulerLongRunning != null) return new ObserveOn<TSource>.SchedulerLongRunning(source, schedulerLongRunning); return new ObserveOn<TSource>.Scheduler(source, scheduler); } public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException("source"); if (context == null) throw new ArgumentNullException("context"); return new ObserveOn<TSource>.Context(source, context); } public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return new Synchronize<TSource>(source); } public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate) { if (source == null) throw new ArgumentNullException("source"); if (gate == null) throw new ArgumentNullException("gate"); return new Synchronize<TSource>(source, gate); } } }