<PackageReference Include="System.Reactive" Version="4.3.1" />

ObservableExtensions

public static class ObservableExtensions
Provides a set of static methods for subscribing delegates to observables.
using System.ComponentModel; using System.Reactive; using System.Reactive.Disposables; using System.Threading; namespace System { public static class ObservableExtensions { public static IDisposable Subscribe<T>(this IObservable<T> source) { if (source == null) throw new ArgumentNullException("source"); return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop)); } public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop)); } public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, onError, Stubs.Nop)); } public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted)); } public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, onError, onCompleted)); } public static void Subscribe<T>(this IObservable<T> source, IObserver<T> observer, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (observer == null) throw new ArgumentNullException("observer"); source.Subscribe_(observer, token); } public static void Subscribe<T>(this IObservable<T> source, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); source.Subscribe_(new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop), token); } public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); source.Subscribe_(new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop), token); } public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); source.Subscribe_(new AnonymousObserver<T>(onNext, onError, Stubs.Nop), token); } public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); source.Subscribe_(new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted), token); } public static void Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); source.Subscribe_(new AnonymousObserver<T>(onNext, onError, onCompleted), token); } private static void Subscribe_<T>(this IObservable<T> source, IObserver<T> observer, CancellationToken token) { if (token.CanBeCanceled) { if (!token.IsCancellationRequested) { ISafeObserver<T> safeObserver = SafeObserver<T>.Wrap(observer); IDisposable state2 = source.Subscribe((IObserver<T>)safeObserver); safeObserver.SetResource((IDisposable)token.Register(delegate(object state) { ((IDisposable)state).Dispose(); }, state2)); } } else source.Subscribe(observer); } [EditorBrowsable(EditorBrowsableState.Advanced)] public static IDisposable SubscribeSafe<T>(this IObservable<T> source, IObserver<T> observer) { if (source == null) throw new ArgumentNullException("source"); if (observer == null) throw new ArgumentNullException("observer"); if (source is ObservableBase<T>) return source.Subscribe(observer); IProducer<T> producer = source as IProducer<T>; if (producer == null) { IDisposable result = Disposable.Empty; try { result = source.Subscribe(observer); return result; } catch (Exception error) { observer.OnError(error); return result; } } return producer.SubscribeRaw(observer, false); } } }