ScheduledObserver<T>
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal class ScheduledObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : ObserverBase<T>, IScheduledObserver<T>, IObserver<T>, IDisposable
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class SemaphoreSlimRelease : IDisposable
{
[System.Runtime.CompilerServices.Nullable(2)]
private volatile SemaphoreSlim _dispatcherEvent;
[System.Runtime.CompilerServices.NullableContext(1)]
public SemaphoreSlimRelease(SemaphoreSlim dispatcherEvent)
{
_dispatcherEvent = dispatcherEvent;
}
public void Dispose()
{
Interlocked.Exchange<SemaphoreSlim>(ref _dispatcherEvent, (SemaphoreSlim)null)?.Release();
}
}
private int _state;
private const int Stopped = 0;
private const int Running = 1;
private const int Pending = 2;
private const int Faulted = 9;
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private bool _failed;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _error;
private bool _completed;
private readonly IObserver<T> _observer;
private readonly IScheduler _scheduler;
[System.Runtime.CompilerServices.Nullable(2)]
private readonly ISchedulerLongRunning _longRunning;
private SerialDisposableValue _disposable;
private readonly object _dispatcherInitGate = new object();
[System.Runtime.CompilerServices.Nullable(2)]
private readonly SemaphoreSlim _dispatcherEvent;
[System.Runtime.CompilerServices.Nullable(2)]
private readonly IDisposable _dispatcherEventRelease;
[System.Runtime.CompilerServices.Nullable(2)]
private IDisposable _dispatcherJob;
public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
{
_scheduler = scheduler;
_observer = observer;
_longRunning = _scheduler.AsLongRunning();
if (_longRunning != null) {
_dispatcherEvent = new SemaphoreSlim(0);
_dispatcherEventRelease = new SemaphoreSlimRelease(_dispatcherEvent);
}
}
private void EnsureDispatcher()
{
if (_dispatcherJob == null) {
lock (_dispatcherInitGate) {
if (_dispatcherJob == null) {
_dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
_disposable.Disposable = StableCompositeDisposable.Create(_dispatcherJob, _dispatcherEventRelease);
}
}
}
}
private void Dispatch(ICancelable cancel)
{
do {
_dispatcherEvent.Wait();
if (cancel.IsDisposed)
return;
T result;
while (_queue.TryDequeue(out result)) {
try {
_observer.OnNext(result);
} catch {
T result2;
while (_queue.TryDequeue(out result2)) {
}
throw;
}
_dispatcherEvent.Wait();
if (cancel.IsDisposed)
return;
}
if (_failed) {
_observer.OnError(_error);
Dispose();
return;
}
} while (!_completed);
_observer.OnCompleted();
Dispose();
}
public void EnsureActive()
{
EnsureActive(1);
}
public void EnsureActive(int n)
{
if (_longRunning != null) {
if (n > 0)
_dispatcherEvent.Release(n);
EnsureDispatcher();
} else
EnsureActiveSlow();
}
private void EnsureActiveSlow()
{
bool flag = false;
while (true) {
switch (Interlocked.CompareExchange(ref _state, 1, 0)) {
case 9:
return;
case 0:
flag = true;
goto case 2;
case 1:
if (Interlocked.CompareExchange(ref _state, 2, 1) != 1)
break;
goto case 2;
case 2:
if (flag)
_disposable.Disposable = Scheduler.Schedule<object>(_scheduler, (object)null, (Action<object, Action<object>>)Run);
return;
}
}
}
[System.Runtime.CompilerServices.NullableContext(2)]
private void Run(object state, [System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2
})] Action<object> recurse)
{
T result;
while (!_queue.TryDequeue(out result)) {
if (_failed) {
if (_queue.IsEmpty) {
Interlocked.Exchange(ref _state, 0);
_observer.OnError(_error);
Dispose();
return;
}
} else if (_completed) {
if (_queue.IsEmpty) {
Interlocked.Exchange(ref _state, 0);
_observer.OnCompleted();
Dispose();
return;
}
} else {
int num = Interlocked.CompareExchange(ref _state, 0, 1);
if (num == 1 || num == 9)
return;
_state = 1;
}
}
Interlocked.Exchange(ref _state, 1);
try {
_observer.OnNext(result);
} catch {
Interlocked.Exchange(ref _state, 9);
T result2;
while (_queue.TryDequeue(out result2)) {
}
throw;
}
recurse(state);
}
protected override void OnNextCore(T value)
{
_queue.Enqueue(value);
}
protected override void OnErrorCore(Exception exception)
{
_error = exception;
_failed = true;
}
protected override void OnCompletedCore()
{
_completed = true;
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
_disposable.Dispose();
}
}
}