Min<TSource>
using System.Collections.Generic;
namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class Min<TSource> : Producer<TSource, Min<TSource>._>
{
internal abstract class _ : Sink<TSource>, IObserver<TSource>
{
protected readonly IComparer<TSource> _comparer;
public _(IComparer<TSource> comparer, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_comparer = comparer;
}
public abstract void OnCompleted();
public abstract void OnError(Exception error);
public abstract void OnNext(TSource value);
}
private sealed class NonNull : _
{
private bool _hasValue;
private TSource _lastValue;
public NonNull(IComparer<TSource> comparer, IObserver<TSource> observer, IDisposable cancel)
: base(comparer, observer, cancel)
{
_hasValue = false;
_lastValue = default(TSource);
}
public override void OnNext(TSource value)
{
if (_hasValue) {
int num = 0;
try {
num = _comparer.Compare(value, _lastValue);
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
if (num < 0)
_lastValue = value;
} else {
_hasValue = true;
_lastValue = value;
}
}
public override void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public override void OnCompleted()
{
if (!_hasValue)
_observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
else {
_observer.OnNext(_lastValue);
_observer.OnCompleted();
}
base.Dispose();
}
}
private sealed class Null : _
{
private TSource _lastValue;
public Null(IComparer<TSource> comparer, IObserver<TSource> observer, IDisposable cancel)
: base(comparer, observer, cancel)
{
_lastValue = default(TSource);
}
public override void OnNext(TSource value)
{
if (value != null) {
if (_lastValue == null)
_lastValue = value;
else {
int num = 0;
try {
num = _comparer.Compare(value, _lastValue);
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
if (num < 0)
_lastValue = value;
}
}
}
public override void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public override void OnCompleted()
{
_observer.OnNext(_lastValue);
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly IComparer<TSource> _comparer;
public Min(IObservable<TSource> source, IComparer<TSource> comparer)
{
_source = source;
_comparer = comparer;
}
protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel)
{
if (default(TSource) != null)
return new NonNull(_comparer, observer, cancel);
return new Null(_comparer, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink);
}
}
}