Subject
Provides a set of static methods for creating subjects.
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Runtime.CompilerServices;
namespace System.Reactive.Subjects
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public static class Subject
{
[System.Runtime.CompilerServices.Nullable(0)]
private class AnonymousSubject<[System.Runtime.CompilerServices.Nullable(2)] T, [System.Runtime.CompilerServices.Nullable(2)] U> : ISubject<T, U>, IObserver<T>, IObservable<U>
{
private readonly IObserver<T> _observer;
private readonly IObservable<U> _observable;
public AnonymousSubject(IObserver<T> observer, IObservable<U> observable)
{
_observer = observer;
_observable = observable;
}
public void OnCompleted()
{
_observer.OnCompleted();
}
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
_observer.OnError(error);
}
public void OnNext(T value)
{
_observer.OnNext(value);
}
public IDisposable Subscribe(IObserver<U> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return _observable.Subscribe(observer);
}
}
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1
})]
private sealed class AnonymousSubject<[System.Runtime.CompilerServices.Nullable(2)] T> : AnonymousSubject<T, T>, ISubject<T>, ISubject<T, T>, IObserver<T>, IObservable<T>
{
public AnonymousSubject(IObserver<T> observer, IObservable<T> observable)
: base(observer, observable)
{
}
}
public static ISubject<TSource, TResult> Create<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult>(IObserver<TSource> observer, IObservable<TResult> observable)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (observable == null)
throw new ArgumentNullException("observable");
return new AnonymousSubject<TSource, TResult>(observer, observable);
}
public static ISubject<T> Create<[System.Runtime.CompilerServices.Nullable(2)] T>(IObserver<T> observer, IObservable<T> observable)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (observable == null)
throw new ArgumentNullException("observable");
return new AnonymousSubject<T>(observer, observable);
}
public static ISubject<TSource, TResult> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult>(ISubject<TSource, TResult> subject)
{
if (subject == null)
throw new ArgumentNullException("subject");
return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), (IObservable<TResult>)subject);
}
public static ISubject<TSource> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource>(ISubject<TSource> subject)
{
if (subject == null)
throw new ArgumentNullException("subject");
return new AnonymousSubject<TSource>(Observer.Synchronize(subject), (IObservable<TSource>)subject);
}
public static ISubject<TSource, TResult> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult>(ISubject<TSource, TResult> subject, IScheduler scheduler)
{
if (subject == null)
throw new ArgumentNullException("subject");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new AnonymousSubject<TSource, TResult>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
}
public static ISubject<TSource> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource>(ISubject<TSource> subject, IScheduler scheduler)
{
if (subject == null)
throw new ArgumentNullException("subject");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new AnonymousSubject<TSource>(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
}
}
}