<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

LongCount<TSource>

static class LongCount<TSource>
namespace System.Reactive.Linq.ObservableImpl { internal static class LongCount<TSource> { internal sealed class All : Producer<long, All._> { internal sealed class _ : Sink<long>, IObserver<TSource> { private long _count; public _(IObserver<long> observer, IDisposable cancel) : base(observer, cancel) { _count = 0; } public void OnNext(TSource value) { checked { try { _count++; } catch (Exception error) { _observer.OnError(error); base.Dispose(); } } } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnNext(_count); _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; public All(IObservable<TSource> source) { _source = source; } protected override _ CreateSink(IObserver<long> observer, IDisposable cancel) { return new _(observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } internal sealed class Predicate : Producer<long, Predicate._> { internal sealed class _ : Sink<long>, IObserver<TSource> { private readonly Func<TSource, bool> _predicate; private long _count; public _(Func<TSource, bool> predicate, IObserver<long> observer, IDisposable cancel) : base(observer, cancel) { _predicate = predicate; _count = 0; } public void OnNext(TSource value) { checked { try { if (_predicate(value)) _count++; } catch (Exception error) { _observer.OnError(error); base.Dispose(); } } } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnNext(_count); _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TSource, bool> _predicate; public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate) { _source = source; _predicate = predicate; } protected override _ CreateSink(IObserver<long> observer, IDisposable cancel) { return new _(_predicate, observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } } }