<PackageReference Include="System.Reactive" Version="4.0.0" />

LastOrDefaultAsync<TSource>

static class LastOrDefaultAsync<TSource>
namespace System.Reactive.Linq.ObservableImpl { internal static class LastOrDefaultAsync<TSource> { internal sealed class Sequence : Producer<TSource, Sequence._> { internal sealed class _ : Sink<TSource>, IObserver<TSource> { private TSource _value; public _(IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _value = default(TSource); } public void OnNext(TSource value) { _value = value; } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnNext(_value); _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; public Sequence(IObservable<TSource> source) { _source = source; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } internal sealed class Predicate : Producer<TSource, Predicate._> { internal sealed class _ : Sink<TSource>, IObserver<TSource> { private readonly Func<TSource, bool> _predicate; private TSource _value; public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _predicate = predicate; _value = default(TSource); } public void OnNext(TSource value) { bool flag = false; try { flag = _predicate(value); } catch (Exception error) { _observer.OnError(error); base.Dispose(); return; } if (flag) _value = value; } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnNext(_value); _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TSource, bool> _predicate; public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate) { _source = source; _predicate = predicate; } protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) { return new _(_predicate, observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } } }