<PackageReference Include="System.Reactive" Version="6.0.0" />

RangeLongRunning

sealed class RangeLongRunning : Producer<int, RangeSink>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { } namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class RangeLongRunning : Producer<int, RangeLongRunning.RangeSink> { [System.Runtime.CompilerServices.Nullable(0)] internal sealed class RangeSink : IdentitySink<int> { private readonly int _end; private readonly int _index; public RangeSink(int start, int count, IObserver<int> observer) : base(observer) { _index = start; _end = start + count; } public void Run(ISchedulerLongRunning scheduler) { SetUpstream(scheduler.ScheduleLongRunning(this, delegate(RangeSink this, ICancelable cancel) { this.Loop(cancel); })); } private void Loop(ICancelable cancel) { int num = _index; int end = _end; while (!cancel.IsDisposed && num != end) { ForwardOnNext(num++); } if (!cancel.IsDisposed) ForwardOnCompleted(); } } private readonly int _start; private readonly int _count; private readonly ISchedulerLongRunning _scheduler; public RangeLongRunning(int start, int count, ISchedulerLongRunning scheduler) { _start = start; _count = count; _scheduler = scheduler; } protected override RangeSink CreateSink(IObserver<int> observer) { return new RangeSink(_start, _count, observer); } protected override void Run(RangeSink sink) { sink.Run(_scheduler); } } }