Select<TSource, TResult>
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Select<TSource, TResult>
{
internal sealed class Selector : Producer<TResult, Selector._>
{
internal sealed class _ : Sink<TResult>, IObserver<TSource>
{
private readonly Func<TSource, TResult> _selector;
public _(Func<TSource, TResult> selector, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = selector;
}
public void OnNext(TSource value)
{
TResult val = default(TResult);
try {
val = _selector(value);
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
_observer.OnNext(val);
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, TResult> _selector;
public Selector(IObservable<TSource> source, Func<TSource, TResult> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(_selector, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink);
}
}
internal sealed class SelectorIndexed : Producer<TResult, SelectorIndexed._>
{
internal sealed class _ : Sink<TResult>, IObserver<TSource>
{
private readonly Func<TSource, int, TResult> _selector;
private int _index;
public _(Func<TSource, int, TResult> selector, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = selector;
_index = 0;
}
public void OnNext(TSource value)
{
TResult val = default(TResult);
try {
val = _selector(value, checked(_index++));
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
_observer.OnNext(val);
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, TResult> _selector;
public SelectorIndexed(IObservable<TSource> source, Func<TSource, int, TResult> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(_selector, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink);
}
}
}
}