<PackageReference Include="System.Reactive" Version="6.0.0-preview.13" />

Skip<TSource>

static class Skip<TSource>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class Skip<[System.Runtime.CompilerServices.Nullable(2)] TSource> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Count : Producer<TSource, Count._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private int _remaining; public _(int count, IObserver<TSource> observer) : base(observer) { _remaining = count; } public override void OnNext(TSource value) { if (_remaining <= 0) ForwardOnNext(value); else _remaining--; } } private readonly IObservable<TSource> _source; private readonly int _count; public Count(IObservable<TSource> source, int count) { _source = source; _count = count; } public IObservable<TSource> Combine(int count) { return new Count(_source, _count + count); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(_count, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0 })] internal sealed class Time : Producer<TSource, Time._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class _ : IdentitySink<TSource> { private volatile bool _open; private SingleAssignmentDisposableValue _sourceDisposable; public _(IObserver<TSource> observer) : base(observer) { } public void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] Time parent) { SetUpstream(Scheduler.ScheduleAction<_>(parent._scheduler, this, parent._duration, (Action<_>)delegate(_ state) { state.Tick(); })); _sourceDisposable.Disposable = ObservableExtensions.SubscribeSafe<TSource>(parent._source, (IObserver<TSource>)this); } protected override void Dispose(bool disposing) { if (disposing) _sourceDisposable.Dispose(); base.Dispose(disposing); } private void Tick() { _open = true; } public override void OnNext(TSource value) { if (_open) ForwardOnNext(value); } } private readonly IObservable<TSource> _source; private readonly TimeSpan _duration; internal readonly IScheduler _scheduler; public Time(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) { _source = source; _duration = duration; _scheduler = scheduler; } public IObservable<TSource> Combine(TimeSpan duration) { if (duration <= _duration) return this; return new Time(_source, duration, _scheduler); } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] _ sink) { sink.Run(this); } } } }