ObservableBase<T>
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive
{
public abstract class ObservableBase<T> : IObservable<T>
{
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
AutoDetachObserver<T> autoDetachObserver = new AutoDetachObserver<T>(observer);
if (!CurrentThreadScheduler.IsScheduleRequired)
try {
autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
return autoDetachObserver;
} catch (Exception error) {
if (!autoDetachObserver.Fail(error))
throw;
return autoDetachObserver;
}
((LocalScheduler)CurrentThreadScheduler.Instance).Schedule<AutoDetachObserver<T>>(autoDetachObserver, (Func<IScheduler, AutoDetachObserver<T>, IDisposable>)ScheduledSubscribe);
return autoDetachObserver;
}
private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver<T> autoDetachObserver)
{
try {
autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
} catch (Exception error) {
if (!autoDetachObserver.Fail(error))
throw;
}
return Disposable.Empty;
}
protected abstract IDisposable SubscribeCore(IObserver<T> observer);
}
}