CoreDispatcherScheduler
Represents an object that schedules units of work on a CoreDispatcher.
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using Windows.Foundation;
using Windows.System;
using Windows.UI.Core;
namespace System.Reactive.Concurrency
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
[CLSCompliant(false)]
public sealed class CoreDispatcherScheduler : LocalScheduler, ISchedulerPeriodic
{
[System.Runtime.CompilerServices.Nullable(2)]
private DispatcherQueue _dispatcherQueue;
public static CoreDispatcherScheduler Current {
get {
CoreWindow forCurrentThread = CoreWindow.GetForCurrentThread();
if ((object)forCurrentThread == null)
throw new InvalidOperationException(Strings_WindowsThreading.NO_WINDOW_CURRENT);
return new CoreDispatcherScheduler(forCurrentThread.Dispatcher);
}
}
public CoreDispatcher Dispatcher { get; }
public CoreDispatcherPriority Priority { get; }
public CoreDispatcherScheduler(CoreDispatcher dispatcher)
{
if ((object)dispatcher == null)
throw new ArgumentNullException("dispatcher");
Dispatcher = dispatcher;
Priority = CoreDispatcherPriority.Normal;
}
public CoreDispatcherScheduler(CoreDispatcher dispatcher, CoreDispatcherPriority priority)
{
if ((object)dispatcher == null)
throw new ArgumentNullException("dispatcher");
Dispatcher = dispatcher;
Priority = priority;
}
public override IDisposable Schedule<[System.Runtime.CompilerServices.Nullable(2)] TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
Exception ex;
DispatcherQueueTimer timer;
IAsyncAction asyncInfo = Dispatcher.RunAsync(Priority, delegate {
if (!d.IsDisposed)
try {
d.Disposable = action(this, state);
} catch (Exception ex2) {
Exception ex3 = ex = ex2;
timer = CreateDispatcherQueue().CreateTimer();
timer.Interval = TimeSpan.Zero;
timer.Tick += delegate {
timer.Stop();
ExceptionDispatchInfo.Capture(ex).Throw();
};
timer.Start();
}
});
return StableCompositeDisposable.Create(d, asyncInfo.AsDisposable());
}
private DispatcherQueue CreateDispatcherQueue()
{
if (_dispatcherQueue != (DispatcherQueue)null)
return _dispatcherQueue;
if (Dispatcher.HasThreadAccess) {
_dispatcherQueue = DispatcherQueue.GetForCurrentThread();
return _dispatcherQueue;
}
Dispatcher.RunAsync(CoreDispatcherPriority.High, delegate {
_dispatcherQueue = DispatcherQueue.GetForCurrentThread();
}).GetAwaiter().GetResult();
return _dispatcherQueue;
}
public override IDisposable Schedule<[System.Runtime.CompilerServices.Nullable(2)] TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
TimeSpan dueTime2 = Scheduler.Normalize(dueTime);
if (dueTime2.Ticks == 0)
return Schedule(state, action);
return ScheduleSlow(state, dueTime2, action);
}
private IDisposable ScheduleSlow<[System.Runtime.CompilerServices.Nullable(2)] TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
MultipleAssignmentDisposable d = new MultipleAssignmentDisposable();
DispatcherQueueTimer timer = CreateDispatcherQueue().CreateTimer();
timer.Tick += delegate {
DispatcherQueueTimer dispatcherQueueTimer2 = Interlocked.Exchange<DispatcherQueueTimer>(ref timer, (DispatcherQueueTimer)null);
if (dispatcherQueueTimer2 != (DispatcherQueueTimer)null)
try {
d.Disposable = action(this, state);
} finally {
dispatcherQueueTimer2.Stop();
action = ((IScheduler s, TState t) => Disposable.Empty);
}
};
timer.Interval = dueTime;
timer.Start();
d.Disposable = Disposable.Create(delegate {
DispatcherQueueTimer dispatcherQueueTimer = Interlocked.Exchange<DispatcherQueueTimer>(ref timer, (DispatcherQueueTimer)null);
if (dispatcherQueueTimer != (DispatcherQueueTimer)null) {
dispatcherQueueTimer.Stop();
action = ((IScheduler s, TState t) => Disposable.Empty);
}
});
return d;
}
public IDisposable SchedulePeriodic<[System.Runtime.CompilerServices.Nullable(2)] TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
if (action == null)
throw new ArgumentNullException("action");
DispatcherQueueTimer timer = CreateDispatcherQueue().CreateTimer();
timer.Tick += delegate {
state = action(state);
};
timer.Interval = period;
timer.Start();
return Disposable.Create(delegate {
DispatcherQueueTimer dispatcherQueueTimer = Interlocked.Exchange<DispatcherQueueTimer>(ref timer, (DispatcherQueueTimer)null);
if (dispatcherQueueTimer != (DispatcherQueueTimer)null) {
dispatcherQueueTimer.Stop();
action = ((TState _) => _);
}
});
}
}
}