ObservableBase<T>
Abstract base class for implementations of the IObservable<T> interface.
using System.Reactive.Concurrency;
using System.Runtime.CompilerServices;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public abstract class ObservableBase<[System.Runtime.CompilerServices.Nullable(2)] T> : IObservable<T>
{
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
AutoDetachObserver<T> autoDetachObserver = new AutoDetachObserver<T>(observer);
if (!CurrentThreadScheduler.IsScheduleRequired)
try {
autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
return autoDetachObserver;
} catch (Exception error) {
if (!autoDetachObserver.Fail(error))
throw;
return autoDetachObserver;
}
Scheduler.ScheduleAction<AutoDetachObserver<T>>((IScheduler)CurrentThreadScheduler.Instance, autoDetachObserver, (Action<AutoDetachObserver<T>>)ScheduledSubscribe);
return autoDetachObserver;
}
private void ScheduledSubscribe(AutoDetachObserver<T> autoDetachObserver)
{
try {
autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
} catch (Exception error) {
if (!autoDetachObserver.Fail(error))
throw;
}
}
protected abstract IDisposable SubscribeCore(IObserver<T> observer);
}
}