Delay<TSource>
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class Delay<TSource>
{
internal abstract class Base<TParent> : Producer<TSource, Base<TParent>._> where TParent : Base<TParent>
{
internal abstract class _ : IdentitySink<TSource>
{
protected IStopwatch _watch;
protected IScheduler _scheduler;
protected _(TParent parent, IObserver<TSource> observer)
: base(observer)
{
_scheduler = parent._scheduler;
}
public void Run(TParent parent)
{
_watch = _scheduler.StartStopwatch();
RunCore(parent);
base.Run(parent._source);
}
protected abstract void RunCore(TParent parent);
}
internal abstract class S : _
{
protected readonly object _gate = new object();
protected IDisposable _cancelable;
protected TimeSpan _delay;
protected bool _ready;
protected bool _active;
protected bool _running;
protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
private bool _hasCompleted;
private TimeSpan _completeAt;
private bool _hasFailed;
private Exception _exception;
protected S(TParent parent, IObserver<TSource> observer)
: base(parent, observer)
{
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
Disposable.TryDispose(ref _cancelable);
}
public override void OnNext(TSource value)
{
bool flag = false;
lock (_gate) {
TimeSpan interval = _watch.Elapsed.Add(_delay);
_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, interval));
flag = (_ready && !_active);
_active = true;
}
if (flag)
Disposable.TrySetSerial(ref _cancelable, Scheduler.Schedule<S>(_scheduler, this, _delay, (Action<S, Action<S, TimeSpan>>)delegate(S this, Action<S, TimeSpan> a) {
this.DrainQueue(a);
}));
}
public override void OnError(Exception error)
{
DisposeUpstream();
bool flag = false;
lock (_gate) {
_queue.Clear();
_exception = error;
_hasFailed = true;
flag = !_running;
}
if (flag)
ForwardOnError(error);
}
public override void OnCompleted()
{
DisposeUpstream();
bool flag = false;
lock (_gate) {
TimeSpan timeSpan = _completeAt = _watch.Elapsed.Add(_delay);
_hasCompleted = true;
flag = (_ready && !_active);
_active = true;
}
if (flag)
Disposable.TrySetSerial(ref _cancelable, Scheduler.Schedule<S>(_scheduler, this, _delay, (Action<S, Action<S, TimeSpan>>)delegate(S this, Action<S, TimeSpan> a) {
this.DrainQueue(a);
}));
}
protected void DrainQueue(Action<S, TimeSpan> recurse)
{
lock (_gate) {
if (_hasFailed)
return;
_running = true;
}
bool flag = false;
bool flag2;
Exception error;
bool flag4;
bool flag5;
TimeSpan arg;
while (true) {
flag2 = false;
error = null;
bool flag3 = false;
TSource value = default(TSource);
flag4 = false;
flag5 = false;
arg = default(TimeSpan);
lock (_gate) {
if (_hasFailed) {
error = _exception;
flag2 = true;
_running = false;
} else {
TimeSpan elapsed = _watch.Elapsed;
if (_queue.Count > 0) {
System.Reactive.TimeInterval<TSource> timeInterval = _queue.Peek();
TimeSpan interval = timeInterval.Interval;
if (interval.CompareTo(elapsed) <= 0 && !flag) {
timeInterval = _queue.Dequeue();
value = timeInterval.Value;
flag3 = true;
} else {
flag5 = true;
arg = Scheduler.Normalize(interval.Subtract(elapsed));
_running = false;
}
} else if (_hasCompleted) {
if (_completeAt.CompareTo(elapsed) <= 0 && !flag)
flag4 = true;
else {
flag5 = true;
arg = Scheduler.Normalize(_completeAt.Subtract(elapsed));
_running = false;
}
} else {
_running = false;
_active = false;
}
}
}
if (!flag3)
break;
ForwardOnNext(value);
flag = true;
}
if (flag4)
ForwardOnCompleted();
else if (flag2) {
ForwardOnError(error);
} else if (flag5) {
recurse(this, arg);
}
}
}
protected abstract class L : _
{
protected readonly object _gate = new object();
protected IDisposable _cancelable;
private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);
protected TimeSpan _delay;
protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
private CancellationTokenSource _stop;
private bool _hasCompleted;
private TimeSpan _completeAt;
private bool _hasFailed;
private Exception _exception;
protected L(TParent parent, IObserver<TSource> observer)
: base(parent, observer)
{
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
Disposable.TryDispose(ref _cancelable);
}
protected void ScheduleDrain()
{
_stop = new CancellationTokenSource();
Disposable.TrySetSerial(ref _cancelable, new CancellationDisposable(_stop));
_scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
}
public override void OnNext(TSource value)
{
lock (_gate) {
TimeSpan interval = _watch.Elapsed.Add(_delay);
_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, interval));
_evt.Release();
}
}
public override void OnError(Exception error)
{
DisposeUpstream();
lock (_gate) {
_queue.Clear();
_exception = error;
_hasFailed = true;
_evt.Release();
}
}
public override void OnCompleted()
{
DisposeUpstream();
lock (_gate) {
TimeSpan timeSpan = _completeAt = _watch.Elapsed.Add(_delay);
_hasCompleted = true;
_evt.Release();
}
}
private void DrainQueue(ICancelable cancel)
{
bool flag;
Exception error;
bool flag3;
while (true) {
try {
_evt.Wait(_stop.Token);
} catch (OperationCanceledException) {
return;
}
flag = false;
error = null;
bool flag2 = false;
TSource value = default(TSource);
flag3 = false;
bool flag4 = false;
TimeSpan dueTime = default(TimeSpan);
lock (_gate) {
if (_hasFailed) {
error = _exception;
flag = true;
} else {
TimeSpan elapsed = _watch.Elapsed;
if (_queue.Count > 0) {
System.Reactive.TimeInterval<TSource> timeInterval = _queue.Dequeue();
flag2 = true;
value = timeInterval.Value;
TimeSpan interval = timeInterval.Interval;
if (interval.CompareTo(elapsed) > 0) {
flag4 = true;
dueTime = Scheduler.Normalize(interval.Subtract(elapsed));
}
} else if (_hasCompleted) {
flag3 = true;
if (_completeAt.CompareTo(elapsed) > 0) {
flag4 = true;
dueTime = Scheduler.Normalize(_completeAt.Subtract(elapsed));
}
}
}
}
if (flag4) {
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Scheduler.ScheduleAction<ManualResetEventSlim>(_scheduler, manualResetEventSlim, dueTime, (Action<ManualResetEventSlim>)delegate(ManualResetEventSlim slimTimer) {
slimTimer.Set();
});
try {
manualResetEventSlim.Wait(_stop.Token);
} catch (OperationCanceledException) {
return;
}
}
if (!flag2)
break;
ForwardOnNext(value);
}
if (flag3)
ForwardOnCompleted();
else if (flag) {
ForwardOnError(error);
}
}
}
protected readonly IObservable<TSource> _source;
protected readonly IScheduler _scheduler;
protected Base(IObservable<TSource> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}
}
internal sealed class Absolute : Base<Absolute>
{
private new sealed class S : Base<Absolute>.S
{
public S(Absolute parent, IObserver<TSource> observer)
: base(parent, observer)
{
}
protected override void RunCore(Absolute parent)
{
_ready = false;
Disposable.TrySetSingle(ref _cancelable, Scheduler.ScheduleAction<S>(parent._scheduler, this, parent._dueTime, (Action<S>)delegate(S this) {
this.Start();
}));
}
private void Start()
{
TimeSpan dueTime = default(TimeSpan);
bool flag = false;
lock (_gate) {
_delay = _watch.Elapsed;
Queue<System.Reactive.TimeInterval<TSource>> queue = _queue;
_queue = new Queue<System.Reactive.TimeInterval<TSource>>();
if (queue.Count > 0) {
dueTime = queue.Peek().Interval;
while (queue.Count > 0) {
System.Reactive.TimeInterval<TSource> timeInterval = queue.Dequeue();
_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(timeInterval.Value, timeInterval.Interval.Add(_delay)));
}
flag = true;
_active = true;
}
_ready = true;
}
if (flag)
Disposable.TrySetSerial(ref _cancelable, Scheduler.Schedule<Base<Absolute>.S>(_scheduler, (Base<Absolute>.S)this, dueTime, (Action<Base<Absolute>.S, Action<Base<Absolute>.S, TimeSpan>>)delegate(Base<Absolute>.S this, Action<Base<Absolute>.S, TimeSpan> a) {
DrainQueue(a);
}));
}
}
private new sealed class L : Base<Absolute>.L
{
public L(Absolute parent, IObserver<TSource> observer)
: base(parent, observer)
{
}
protected override void RunCore(Absolute parent)
{
Disposable.TrySetSingle(ref _cancelable, Scheduler.ScheduleAction<L>(parent._scheduler, this, parent._dueTime, (Action<L>)delegate(L this) {
this.Start();
}));
}
private void Start()
{
lock (_gate) {
_delay = _watch.Elapsed;
Queue<System.Reactive.TimeInterval<TSource>> queue = _queue;
_queue = new Queue<System.Reactive.TimeInterval<TSource>>();
while (queue.Count > 0) {
System.Reactive.TimeInterval<TSource> timeInterval = queue.Dequeue();
_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(timeInterval.Value, timeInterval.Interval.Add(_delay)));
}
}
ScheduleDrain();
}
}
private readonly DateTimeOffset _dueTime;
public Absolute(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
: base(source, scheduler)
{
_dueTime = dueTime;
}
protected override _ CreateSink(IObserver<TSource> observer)
{
if (_scheduler.AsLongRunning() == null)
return new S(this, observer);
return new L(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
internal sealed class Relative : Base<Relative>
{
private new sealed class S : Base<Relative>.S
{
public S(Relative parent, IObserver<TSource> observer)
: base(parent, observer)
{
}
protected override void RunCore(Relative parent)
{
_ready = true;
_delay = Scheduler.Normalize(parent._dueTime);
}
}
private new sealed class L : Base<Relative>.L
{
public L(Relative parent, IObserver<TSource> observer)
: base(parent, observer)
{
}
protected override void RunCore(Relative parent)
{
_delay = Scheduler.Normalize(parent._dueTime);
ScheduleDrain();
}
}
private readonly TimeSpan _dueTime;
public Relative(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
: base(source, scheduler)
{
_dueTime = dueTime;
}
protected override _ CreateSink(IObserver<TSource> observer)
{
if (_scheduler.AsLongRunning() == null)
return new S(this, observer);
return new L(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(this);
}
}
}
}