<PackageReference Include="System.Reactive" Version="6.0.0" />

SkipLast<TSource>

static class SkipLast<TSource>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class SkipLast<[System.Runtime.CompilerServices.Nullable(2)] TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Count : Producer<TSource, Count._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly int _count; private readonly Queue<TSource> _queue; public _(int count, IObserver<TSource> observer) : base(observer) { _count = count; _queue = new Queue<TSource>(); } public override void OnNext(TSource value) { _queue.Enqueue(value); if (_queue.Count > _count) ForwardOnNext(_queue.Dequeue()); } } private readonly IObservable<TSource> _source; private readonly int _count; public Count(IObservable<TSource> source, int count) { _source = source; _count = count; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(_count, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Time : Producer<TSource, Time._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private readonly TimeSpan _duration; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 1 })] private readonly Queue<System.Reactive.TimeInterval<TSource>> _queue; [System.Runtime.CompilerServices.Nullable(2)] private IStopwatch _watch; public _(TimeSpan duration, IObserver<TSource> observer) : base(observer) { _duration = duration; _queue = new Queue<System.Reactive.TimeInterval<TSource>>(); } public void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Time parent) { _watch = parent._scheduler.StartStopwatch(); SetUpstream(ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this)); } public override void OnNext(TSource value) { TimeSpan elapsed = _watch.Elapsed; _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, elapsed)); while (_queue.Count > 0) { TimeSpan t = elapsed; System.Reactive.TimeInterval<TSource> timeInterval = _queue.Peek(); if (!(t - timeInterval.Interval >= _duration)) break; timeInterval = _queue.Dequeue(); ForwardOnNext(timeInterval.Value); } } public override void OnCompleted() { TimeSpan elapsed = _watch.Elapsed; while (_queue.Count > 0) { TimeSpan t = elapsed; System.Reactive.TimeInterval<TSource> timeInterval = _queue.Peek(); if (!(t - timeInterval.Interval >= _duration)) break; timeInterval = _queue.Dequeue(); ForwardOnNext(timeInterval.Value); } ForwardOnCompleted(); } } private readonly IObservable<TSource> _source; private readonly TimeSpan _duration; private readonly IScheduler _scheduler; public Time(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) { _source = source; _duration = duration; _scheduler = scheduler; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(_duration, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(this); } } } }