Generate<TState, TResult>
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Generate<TState, TResult>
{
internal sealed class NoTime : Producer<TResult, NoTime._>
{
internal sealed class _ : Sink<TResult>
{
private readonly NoTime _parent;
private TState _state;
private bool _first;
public _(NoTime parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
}
public IDisposable Run()
{
_state = _parent._initialState;
_first = true;
ISchedulerLongRunning schedulerLongRunning = _parent._scheduler.AsLongRunning();
if (schedulerLongRunning != null)
return schedulerLongRunning.ScheduleLongRunning(Loop);
return _parent._scheduler.Schedule(LoopRec);
}
private void Loop(ICancelable cancel)
{
while (!cancel.IsDisposed) {
bool flag = false;
TResult value = default(TResult);
try {
if (_first)
_first = false;
else
_state = _parent._iterate(_state);
flag = _parent._condition(_state);
if (flag)
value = _parent._resultSelector(_state);
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
if (!flag)
break;
_observer.OnNext(value);
}
if (!cancel.IsDisposed)
_observer.OnCompleted();
base.Dispose();
}
private void LoopRec(Action recurse)
{
bool flag = false;
TResult value = default(TResult);
try {
if (_first)
_first = false;
else
_state = _parent._iterate(_state);
flag = _parent._condition(_state);
if (flag)
value = _parent._resultSelector(_state);
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
if (flag) {
_observer.OnNext(value);
recurse();
} else {
_observer.OnCompleted();
base.Dispose();
}
}
}
private readonly TState _initialState;
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly IScheduler _scheduler;
public NoTime(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
{
_initialState = initialState;
_condition = condition;
_iterate = iterate;
_resultSelector = resultSelector;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run();
}
}
internal sealed class Absolute : Producer<TResult, Absolute._>
{
internal sealed class _ : Sink<TResult>
{
private readonly Absolute _parent;
private bool _first;
private bool _hasResult;
private TResult _result;
public _(Absolute parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
}
public IDisposable Run()
{
_first = true;
_hasResult = false;
_result = default(TResult);
return _parent._scheduler.Schedule<TState>(_parent._initialState, (Func<IScheduler, TState, IDisposable>)InvokeRec);
}
private IDisposable InvokeRec(IScheduler self, TState state)
{
DateTimeOffset dueTime = default(DateTimeOffset);
if (_hasResult)
_observer.OnNext(_result);
try {
if (_first)
_first = false;
else
state = _parent._iterate(state);
_hasResult = _parent._condition(state);
if (_hasResult) {
_result = _parent._resultSelector(state);
dueTime = _parent._timeSelector(state);
}
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return Disposable.Empty;
}
if (!_hasResult) {
_observer.OnCompleted();
base.Dispose();
return Disposable.Empty;
}
return self.Schedule<TState>(state, dueTime, (Func<IScheduler, TState, IDisposable>)InvokeRec);
}
}
private readonly TState _initialState;
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, DateTimeOffset> _timeSelector;
private readonly IScheduler _scheduler;
public Absolute(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
{
_initialState = initialState;
_condition = condition;
_iterate = iterate;
_resultSelector = resultSelector;
_timeSelector = timeSelector;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run();
}
}
internal sealed class Relative : Producer<TResult, Relative._>
{
internal sealed class _ : Sink<TResult>
{
private readonly Relative _parent;
private bool _first;
private bool _hasResult;
private TResult _result;
public _(Relative parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
}
public IDisposable Run()
{
_first = true;
_hasResult = false;
_result = default(TResult);
return _parent._scheduler.Schedule<TState>(_parent._initialState, (Func<IScheduler, TState, IDisposable>)InvokeRec);
}
private IDisposable InvokeRec(IScheduler self, TState state)
{
TimeSpan dueTime = default(TimeSpan);
if (_hasResult)
_observer.OnNext(_result);
try {
if (_first)
_first = false;
else
state = _parent._iterate(state);
_hasResult = _parent._condition(state);
if (_hasResult) {
_result = _parent._resultSelector(state);
dueTime = _parent._timeSelector(state);
}
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return Disposable.Empty;
}
if (!_hasResult) {
_observer.OnCompleted();
base.Dispose();
return Disposable.Empty;
}
return self.Schedule<TState>(state, dueTime, (Func<IScheduler, TState, IDisposable>)InvokeRec);
}
}
private readonly TState _initialState;
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, TimeSpan> _timeSelector;
private readonly IScheduler _scheduler;
public Relative(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
{
_initialState = initialState;
_condition = condition;
_iterate = iterate;
_resultSelector = resultSelector;
_timeSelector = timeSelector;
_scheduler = scheduler;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run();
}
}
}
}