<PackageReference Include="System.Reactive" Version="4.0.0" />

Synchronization

public static class Synchronization
Provides basic synchronization and scheduling services for observable sequences.
using System.ComponentModel; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { [EditorBrowsable(EditorBrowsableState.Advanced)] public static class Synchronization { public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>)delegate(IObserver<TSource> observer) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); SerialDisposable d = new SerialDisposable(); d.Disposable = singleAssignmentDisposable; singleAssignmentDisposable.Disposable = scheduler.Schedule((Action)delegate { d.Disposable = new ScheduledDisposable(scheduler, ObservableExtensions.SubscribeSafe<TSource>(source, observer)); }); return d; }); } public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException("source"); if (context == null) throw new ArgumentNullException("context"); return new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>)delegate(IObserver<TSource> observer) { SingleAssignmentDisposable subscription = new SingleAssignmentDisposable(); context.PostWithStartComplete(delegate { if (!subscription.IsDisposed) subscription.Disposable = new ContextDisposable(context, ObservableExtensions.SubscribeSafe<TSource>(source, observer)); }); return subscription; }); } public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new ObserveOn<TSource>.Scheduler(source, scheduler); } public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context) { if (source == null) throw new ArgumentNullException("source"); if (context == null) throw new ArgumentNullException("context"); return new ObserveOn<TSource>.Context(source, context); } public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return new Synchronize<TSource>(source); } public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate) { if (source == null) throw new ArgumentNullException("source"); if (gate == null) throw new ArgumentNullException("gate"); return new Synchronize<TSource>(source, gate); } } }