<PackageReference Include="System.Reactive" Version="4.0.0" />

SingleAsync<TSource>

static class SingleAsync<TSource>
namespace System.Reactive.Linq.ObservableImpl { internal static class SingleAsync<TSource> { internal sealed class Sequence : Producer<TSource, Sequence._> { internal sealed class _ : Sink<TSource>, IObserver<TSource> { private TSource _value; private bool _seenValue; public _(IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _value = default(TSource); _seenValue = false; } public void OnNext(TSource value) { if (_seenValue) { _observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT)); base.Dispose(); } else { _value = value; _seenValue = true; } } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (!_seenValue) _observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); else { _observer.OnNext(_value); _observer.OnCompleted(); } base.Dispose(); } } private readonly IObservable<TSource> _source; public Sequence(IObservable<TSource> source) { _source = source; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } internal sealed class Predicate : Producer<TSource, Predicate._> { internal sealed class _ : Sink<TSource>, IObserver<TSource> { private readonly Func<TSource, bool> _predicate; private TSource _value; private bool _seenValue; public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _predicate = predicate; _value = default(TSource); _seenValue = false; } public void OnNext(TSource value) { bool flag = false; try { flag = _predicate(value); } catch (Exception error) { _observer.OnError(error); base.Dispose(); return; } if (flag) { if (_seenValue) { _observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_MATCHING_ELEMENT)); base.Dispose(); } else { _value = value; _seenValue = true; } } } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (!_seenValue) _observer.OnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS)); else { _observer.OnNext(_value); _observer.OnCompleted(); } base.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TSource, bool> _predicate; public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate) { _source = source; _predicate = predicate; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(_predicate, observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } } }