Defer<TValue>
using System.
Reactive.
Disposables;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal sealed class Defer<
TValue> :
Producer<
TValue,
Defer<
TValue>.
_>,
IEvaluatableObservable<
TValue>
{
internal sealed class _ :
Sink<
TValue>,
IObserver<
TValue>
{
private readonly Func<
IObservable<
TValue>>
_observableFactory;
public _(
Func<
IObservable<
TValue>>
observableFactory,
IObserver<
TValue>
observer,
IDisposable cancel)
:
base(
observer,
cancel)
{
_observableFactory =
observableFactory;
}
public IDisposable Run()
{
IObservable<
TValue>
observable =
null;
try {
observable =
_observableFactory();
}
catch (
Exception error) {
_observer.
OnError(
error);
base.
Dispose();
return Disposable.
Empty;
}
return ObservableExtensions.
SubscribeSafe<
TValue>(
observable, (
IObserver<
TValue>)
this);
}
public void OnNext(
TValue value)
{
_observer.
OnNext(
value);
}
public void OnError(
Exception error)
{
_observer.
OnError(
error);
base.
Dispose();
}
public void OnCompleted()
{
_observer.
OnCompleted();
base.
Dispose();
}
}
private readonly Func<
IObservable<
TValue>>
_observableFactory;
public Defer(
Func<
IObservable<
TValue>>
observableFactory)
{
_observableFactory =
observableFactory;
}
protected override _ CreateSink(
IObserver<
TValue>
observer,
IDisposable cancel)
{
return new _(
_observableFactory,
observer,
cancel);
}
protected override IDisposable Run(
_ sink)
{
return sink.
Run();
}
public IObservable<
TValue>
Eval()
{
return _observableFactory();
}
}
}