Observer
Provides a set of static methods for creating observers.
using System.Reactive.Concurrency;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public static class Observer
{
[System.Runtime.CompilerServices.Nullable(0)]
private class AnonymousProgress<[System.Runtime.CompilerServices.Nullable(2)] T> : IProgress<T>
{
private readonly Action<T> _progress;
public AnonymousProgress(Action<T> progress)
{
_progress = progress;
}
public void Report(T value)
{
_progress(value);
}
}
public static IObserver<T> ToObserver<[System.Runtime.CompilerServices.Nullable(2)] T>(this Action<Notification<T>> handler)
{
if (handler == null)
throw new ArgumentNullException("handler");
return new AnonymousObserver<T>((Action<T>)delegate(T x) {
handler(Notification.CreateOnNext<T>(x));
}, (Action<Exception>)delegate(Exception exception) {
handler(Notification.CreateOnError<T>(exception));
}, (Action)delegate {
handler(Notification.CreateOnCompleted<T>());
});
}
public static Action<Notification<T>> ToNotifier<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return delegate(Notification<T> n) {
n.Accept(observer);
};
}
public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
return new AnonymousObserver<T>(onNext);
}
public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext, Action<Exception> onError)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
return new AnonymousObserver<T>(onNext, onError);
}
public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
return new AnonymousObserver<T>(onNext, onCompleted);
}
public static IObserver<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
return new AnonymousObserver<T>(onNext, onError, onCompleted);
}
public static IObserver<T> AsObserver<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new AnonymousObserver<T>((Action<T>)observer.OnNext, (Action<Exception>)observer.OnError, (Action)observer.OnCompleted);
}
public static IObserver<T> Checked<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new CheckedObserver<T>(observer);
}
public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new SynchronizedObserver<T>(observer, new object());
}
public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer, bool preventReentrancy)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (preventReentrancy)
return new AsyncLockObserver<T>(observer, new AsyncLock());
return new SynchronizedObserver<T>(observer, new object());
}
public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer, object gate)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (gate == null)
throw new ArgumentNullException("gate");
return new SynchronizedObserver<T>(observer, gate);
}
public static IObserver<T> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer, AsyncLock asyncLock)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (asyncLock == null)
throw new ArgumentNullException("asyncLock");
return new AsyncLockObserver<T>(observer, asyncLock);
}
public static IObserver<T> NotifyOn<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer, IScheduler scheduler)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new ObserveOnObserver<T>(scheduler, observer);
}
public static IObserver<T> NotifyOn<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer, SynchronizationContext context)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (context == null)
throw new ArgumentNullException("context");
return new ObserveOnObserver<T>((IScheduler)new SynchronizationContextScheduler(context), observer);
}
public static IProgress<T> ToProgress<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new AnonymousProgress<T>((Action<T>)observer.OnNext);
}
public static IProgress<T> ToProgress<[System.Runtime.CompilerServices.Nullable(2)] T>(this IObserver<T> observer, IScheduler scheduler)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
ObserveOnObserver<T> observeOnObserver = new ObserveOnObserver<T>(scheduler, observer);
return new AnonymousProgress<T>((Action<T>)((ObserverBase<T>)observeOnObserver).OnNext);
}
public static IObserver<T> ToObserver<[System.Runtime.CompilerServices.Nullable(2)] T>(this IProgress<T> progress)
{
if (progress == null)
throw new ArgumentNullException("progress");
return new AnonymousObserver<T>((Action<T>)progress.Report);
}
}
}