ObservableExtensions
Provides a set of static methods for subscribing delegates to observables.
            
                using System.ComponentModel;
using System.Reactive;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(0)]
    public static class ObservableExtensions
    {
        public static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop));
        }
        public static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop));
        }
        public static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            if (onError == null)
                throw new ArgumentNullException("onError");
            return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, onError, Stubs.Nop));
        }
        public static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            if (onCompleted == null)
                throw new ArgumentNullException("onCompleted");
            return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted));
        }
        public static IDisposable Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            if (onError == null)
                throw new ArgumentNullException("onError");
            if (onCompleted == null)
                throw new ArgumentNullException("onCompleted");
            return source.Subscribe((IObserver<T>)new AnonymousObserver<T>(onNext, onError, onCompleted));
        }
        public static void Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, IObserver<T> observer, CancellationToken token)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (observer == null)
                throw new ArgumentNullException("observer");
            source.Subscribe_(observer, token);
        }
        public static void Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, CancellationToken token)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            source.Subscribe_(new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop), token);
        }
        public static void Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext, CancellationToken token)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            source.Subscribe_(new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop), token);
        }
        public static void Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, CancellationToken token)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            if (onError == null)
                throw new ArgumentNullException("onError");
            source.Subscribe_(new AnonymousObserver<T>(onNext, onError, Stubs.Nop), token);
        }
        public static void Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext, Action onCompleted, CancellationToken token)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            if (onCompleted == null)
                throw new ArgumentNullException("onCompleted");
            source.Subscribe_(new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted), token);
        }
        public static void Subscribe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted, CancellationToken token)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");
            if (onError == null)
                throw new ArgumentNullException("onError");
            if (onCompleted == null)
                throw new ArgumentNullException("onCompleted");
            source.Subscribe_(new AnonymousObserver<T>(onNext, onError, onCompleted), token);
        }
        private static void Subscribe_<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, IObserver<T> observer, CancellationToken token)
        {
            if (token.CanBeCanceled) {
                if (!token.IsCancellationRequested) {
                    ISafeObserver<T> safeObserver = SafeObserver<T>.Wrap(observer);
                    IDisposable state2 = source.Subscribe((IObserver<T>)safeObserver);
                    safeObserver.SetResource((IDisposable)token.Register(delegate(object state) {
                        ((IDisposable)state).Dispose();
                    }, state2));
                }
            } else
                source.Subscribe(observer);
        }
        [EditorBrowsable(EditorBrowsableState.Advanced)]
        public static IDisposable SubscribeSafe<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObservable<T> source, IObserver<T> observer)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (observer == null)
                throw new ArgumentNullException("observer");
            if (source is ObservableBase<T>)
                return source.Subscribe(observer);
            IProducer<T> producer = source as IProducer<T>;
            if (producer == null) {
                IDisposable result = Disposable.Empty;
                try {
                    result = source.Subscribe(observer);
                    return result;
                } catch (Exception error) {
                    observer.OnError(error);
                    return result;
                }
            }
            return producer.SubscribeRaw(observer, false);
        }
    }
}