Generate<TState, TResult>
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Generate<TState, TResult>
{
internal sealed class NoTime : Producer<TResult, NoTime._>
{
internal sealed class _ : IdentitySink<TResult>
{
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private TState _state;
private bool _first;
public _(NoTime parent, IObserver<TResult> observer)
: base(observer)
{
_condition = parent._condition;
_iterate = parent._iterate;
_resultSelector = parent._resultSelector;
_state = parent._initialState;
_first = true;
}
public void Run(IScheduler _scheduler)
{
ISchedulerLongRunning schedulerLongRunning = _scheduler.AsLongRunning();
if (schedulerLongRunning != null)
SetUpstream(schedulerLongRunning.ScheduleLongRunning<_>(this, (Action<_, ICancelable>)delegate(_ this, ICancelable c) {
this.Loop(c);
}));
else
SetUpstream(Scheduler.Schedule<_>(_scheduler, this, (Action<_, Action<_>>)delegate(_ this, Action<_> a) {
this.LoopRec(a);
}));
}
private void Loop(ICancelable cancel)
{
while (!cancel.IsDisposed) {
bool flag = false;
TResult value = default(TResult);
try {
if (_first)
_first = false;
else
_state = _iterate(_state);
flag = _condition(_state);
if (flag)
value = _resultSelector(_state);
} catch (Exception error) {
ForwardOnError(error);
return;
}
if (!flag)
break;
ForwardOnNext(value);
}
if (!cancel.IsDisposed)
ForwardOnCompleted();
}
private void LoopRec(Action<_> recurse)
{
bool flag = false;
TResult value = default(TResult);
try {
if (_first)
_first = false;
else
_state = _iterate(_state);
flag = _condition(_state);
if (flag)
value = _resultSelector(_state);
} catch (Exception error) {
ForwardOnError(error);
return;
}
if (flag) {
ForwardOnNext(value);
recurse(this);
} else
ForwardOnCompleted();
}
}
private readonly TState _initialState;
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly IScheduler _scheduler;
public NoTime(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
{
_initialState = initialState;
_condition = condition;
_iterate = iterate;
_resultSelector = resultSelector;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_scheduler);
}
}
internal sealed class Absolute : Producer<TResult, Absolute._>
{
internal sealed class _ : IdentitySink<TResult>
{
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, DateTimeOffset> _timeSelector;
private bool _first;
private bool _hasResult;
private TResult _result;
public _(Absolute parent, IObserver<TResult> observer)
: base(observer)
{
_condition = parent._condition;
_iterate = parent._iterate;
_resultSelector = parent._resultSelector;
_timeSelector = parent._timeSelector;
_first = true;
}
public void Run(IScheduler outerScheduler, TState initialState)
{
SetUpstream(outerScheduler.Schedule<(_, TState)>((this, initialState), (Func<IScheduler, (_, TState), IDisposable>)((IScheduler scheduler, (_ this, TState initialState) tuple) => tuple.this.InvokeRec(scheduler, tuple.initialState))));
}
private IDisposable InvokeRec(IScheduler self, TState state)
{
DateTimeOffset dueTime = default(DateTimeOffset);
if (_hasResult)
ForwardOnNext(_result);
try {
if (_first)
_first = false;
else
state = _iterate(state);
_hasResult = _condition(state);
if (_hasResult) {
_result = _resultSelector(state);
dueTime = _timeSelector(state);
}
} catch (Exception error) {
ForwardOnError(error);
return Disposable.Empty;
}
if (!_hasResult) {
ForwardOnCompleted();
return Disposable.Empty;
}
return self.Schedule<(_, TState)>((this, state), dueTime, (Func<IScheduler, (_, TState), IDisposable>)((IScheduler scheduler, (_ this, TState state) tuple) => tuple.this.InvokeRec(scheduler, tuple.state)));
}
}
private readonly TState _initialState;
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, DateTimeOffset> _timeSelector;
private readonly IScheduler _scheduler;
public Absolute(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
{
_initialState = initialState;
_condition = condition;
_iterate = iterate;
_resultSelector = resultSelector;
_timeSelector = timeSelector;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_scheduler, _initialState);
}
}
internal sealed class Relative : Producer<TResult, Relative._>
{
internal sealed class _ : IdentitySink<TResult>
{
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, TimeSpan> _timeSelector;
private bool _first;
private bool _hasResult;
private TResult _result;
public _(Relative parent, IObserver<TResult> observer)
: base(observer)
{
_condition = parent._condition;
_iterate = parent._iterate;
_resultSelector = parent._resultSelector;
_timeSelector = parent._timeSelector;
_first = true;
}
public void Run(IScheduler outerScheduler, TState initialState)
{
SetUpstream(outerScheduler.Schedule<(_, TState)>((this, initialState), (Func<IScheduler, (_, TState), IDisposable>)((IScheduler scheduler, (_ this, TState initialState) tuple) => tuple.this.InvokeRec(scheduler, tuple.initialState))));
}
private IDisposable InvokeRec(IScheduler self, TState state)
{
TimeSpan dueTime = default(TimeSpan);
if (_hasResult)
ForwardOnNext(_result);
try {
if (_first)
_first = false;
else
state = _iterate(state);
_hasResult = _condition(state);
if (_hasResult) {
_result = _resultSelector(state);
dueTime = _timeSelector(state);
}
} catch (Exception error) {
ForwardOnError(error);
return Disposable.Empty;
}
if (!_hasResult) {
ForwardOnCompleted();
return Disposable.Empty;
}
return self.Schedule<(_, TState)>((this, state), dueTime, (Func<IScheduler, (_, TState), IDisposable>)((IScheduler scheduler, (_ this, TState state) tuple) => tuple.this.InvokeRec(scheduler, tuple.state)));
}
}
private readonly TState _initialState;
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, TimeSpan> _timeSelector;
private readonly IScheduler _scheduler;
public Relative(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
{
_initialState = initialState;
_condition = condition;
_iterate = iterate;
_resultSelector = resultSelector;
_timeSelector = timeSelector;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_scheduler, _initialState);
}
}
}
}