<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />
Range
using System.
Reactive.
Concurrency;
using System.
Reactive.
Disposables;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal sealed class Range :
Producer<
int,
Range.
_>
{
internal sealed class _ :
Sink<
int>
{
private readonly int _start;
private readonly int _count;
public _(
Range parent,
IObserver<
int>
observer,
IDisposable cancel)
:
base(
observer,
cancel)
{
_start =
parent.
_start;
_count =
parent.
_count;
}
public IDisposable Run(
IScheduler scheduler)
{
ISchedulerLongRunning schedulerLongRunning =
scheduler.
AsLongRunning();
if (
schedulerLongRunning !=
null)
return schedulerLongRunning.
ScheduleLongRunning(
0,
Loop);
return scheduler.
Schedule(
0,
LoopRec);
}
private void Loop(
int i,
ICancelable cancel)
{
while (!
cancel.
IsDisposed &&
i <
_count) {
_observer.
OnNext(
_start +
i);
i++;
}
if (!
cancel.
IsDisposed)
_observer.
OnCompleted();
base.
Dispose();
}
private void LoopRec(
int i,
Action<
int>
recurse)
{
if (
i <
_count) {
_observer.
OnNext(
_start +
i);
recurse(
i +
1);
}
else {
_observer.
OnCompleted();
base.
Dispose();
}
}
}
private readonly int _start;
private readonly int _count;
private readonly IScheduler _scheduler;
public Range(
int start,
int count,
IScheduler scheduler)
{
_start =
start;
_count =
count;
_scheduler =
scheduler;
}
protected override _ CreateSink(
IObserver<
int>
observer,
IDisposable cancel)
{
return new _(
this,
observer,
cancel);
}
protected override IDisposable Run(
_ sink)
{
return sink.
Run(
_scheduler);
}
}
}