<PackageReference Include="System.Reactive" Version="6.1.0" />

Observer

public static class Observer
Provides a set of static methods for creating observers.
using System.Reactive.Concurrency; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] public static class Observer { [System.Runtime.CompilerServices.Nullable(0)] private class AnonymousProgress<[System.Runtime.CompilerServices.Nullable(2)] T> : IProgress<T> { private readonly Action<T> _progress; public AnonymousProgress(Action<T> progress) { _progress = progress; } public void Report(T value) { _progress(value); } } public static IObserver<T> ToObserver<[System.Runtime.CompilerServices.Nullable(2)] T>(this Action<Notification<T>> handler) { if (handler == null) throw new ArgumentNullException("handler"); return new AnonymousObserver<T>((Action<T>)delegate(T x) { handler(Notification.CreateOnNext<T>(x)); }, (Action<Exception>)delegate(Exception exception) { handler(Notification.CreateOnError<T>(exception)); }, (Action)delegate { handler(Notification.CreateOnCompleted<T>()); }); } public static Action<Notification<T>> ToNotifier<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); return delegate(Notification<T> n) { n.Accept(observer); }; } public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext) { if (onNext == null) throw new ArgumentNullException("onNext"); return new AnonymousObserver<T>(onNext); } public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext, Action<Exception> onError) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); return new AnonymousObserver<T>(onNext, onError); } public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext, Action onCompleted) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return new AnonymousObserver<T>(onNext, onCompleted); } public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext, Action<Exception> onError, Action onCompleted) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return new AnonymousObserver<T>(onNext, onError, onCompleted); } public static IObserver<T> AsObserver<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); return new AnonymousObserver<T>((Action<T>)observer.OnNext, (Action<Exception>)observer.OnError, (Action)observer.OnCompleted); } public static IObserver<T> Checked<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); return new CheckedObserver<T>(observer); } public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); return new SynchronizedObserver<T>(observer, new object()); } public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer, bool preventReentrancy) { if (observer == null) throw new ArgumentNullException("observer"); if (preventReentrancy) return new AsyncLockObserver<T>(observer, new AsyncLock()); return new SynchronizedObserver<T>(observer, new object()); } public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer, object gate) { if (observer == null) throw new ArgumentNullException("observer"); if (gate == null) throw new ArgumentNullException("gate"); return new SynchronizedObserver<T>(observer, gate); } public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer, AsyncLock asyncLock) { if (observer == null) throw new ArgumentNullException("observer"); if (asyncLock == null) throw new ArgumentNullException("asyncLock"); return new AsyncLockObserver<T>(observer, asyncLock); } public static IObserver<T> NotifyOn<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer, IScheduler scheduler) { if (observer == null) throw new ArgumentNullException("observer"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new ObserveOnObserver<T>(scheduler, observer); } public static IObserver<T> NotifyOn<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer, SynchronizationContext context) { if (observer == null) throw new ArgumentNullException("observer"); if (context == null) throw new ArgumentNullException("context"); return new ObserveOnObserver<T>((IScheduler)new SynchronizationContextScheduler(context), observer); } public static IProgress<T> ToProgress<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); return new AnonymousProgress<T>((Action<T>)observer.OnNext); } public static IProgress<T> ToProgress<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer, IScheduler scheduler) { if (observer == null) throw new ArgumentNullException("observer"); if (scheduler == null) throw new ArgumentNullException("scheduler"); ObserveOnObserver<T> observeOnObserver = new ObserveOnObserver<T>(scheduler, observer); return new AnonymousProgress<T>((Action<T>)((ObserverBase<T>)observeOnObserver).OnNext); } public static IObserver<T> ToObserver<[System.Runtime.CompilerServices.Nullable(2)] T>(this IProgress<T> progress) { if (progress == null) throw new ArgumentNullException("progress"); return new AnonymousObserver<T>((Action<T>)progress.Report); } } }