HistoricalScheduler
using System.Collections.Generic;
using System.Reactive.Disposables;
namespace System.Reactive.Concurrency
{
public class HistoricalScheduler : HistoricalSchedulerBase
{
private readonly SchedulerQueue<DateTimeOffset> queue = new SchedulerQueue<DateTimeOffset>();
public HistoricalScheduler()
{
}
public HistoricalScheduler(DateTimeOffset initialClock)
: base(initialClock)
{
}
public HistoricalScheduler(DateTimeOffset initialClock, IComparer<DateTimeOffset> comparer)
: base(initialClock, comparer)
{
}
protected override IScheduledItem<DateTimeOffset> GetNext()
{
while (queue.Count > 0) {
ScheduledItem<DateTimeOffset> scheduledItem = queue.Peek();
if (!scheduledItem.IsCanceled)
return scheduledItem;
queue.Dequeue();
}
return null;
}
public override IDisposable ScheduleAbsolute<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
ScheduledItem<DateTimeOffset, TState> si = null;
Func<IScheduler, TState, IDisposable> action2 = delegate(IScheduler scheduler, TState state1) {
queue.Remove((ScheduledItem<DateTimeOffset>)si);
return action(scheduler, state1);
};
si = (ScheduledItem<DateTimeOffset, TState>)new ScheduledItem<DateTimeOffset, TState>((IScheduler)this, state, action2, dueTime, base.Comparer);
queue.Enqueue((ScheduledItem<DateTimeOffset>)si);
return Disposable.Create(((ScheduledItem<DateTimeOffset>)si).Cancel);
}
}
}