<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />

Range

sealed class Range : Producer<int, _>
using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Range : Producer<int, Range._> { internal sealed class _ : Sink<int> { private readonly int _start; private readonly int _count; public _(Range parent, IObserver<int> observer, IDisposable cancel) : base(observer, cancel) { _start = parent._start; _count = parent._count; } public IDisposable Run(IScheduler scheduler) { ISchedulerLongRunning schedulerLongRunning = scheduler.AsLongRunning(); if (schedulerLongRunning != null) return schedulerLongRunning.ScheduleLongRunning(0, Loop); return scheduler.Schedule(0, LoopRec); } private void Loop(int i, ICancelable cancel) { while (!cancel.IsDisposed && i < _count) { _observer.OnNext(_start + i); i++; } if (!cancel.IsDisposed) _observer.OnCompleted(); base.Dispose(); } private void LoopRec(int i, Action<int> recurse) { if (i < _count) { _observer.OnNext(_start + i); recurse(i + 1); } else { _observer.OnCompleted(); base.Dispose(); } } } private readonly int _start; private readonly int _count; private readonly IScheduler _scheduler; public Range(int start, int count, IScheduler scheduler) { _start = start; _count = count; _scheduler = scheduler; } protected override _ CreateSink(IObserver<int> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(_scheduler); } } }