<PackageReference Include="System.Reactive" Version="6.0.0-preview.1" />

FastImmediateObserver<T>

Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Subjects { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal sealed class FastImmediateObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : IScheduledObserver<T>, IObserver<T>, IDisposable { private readonly object _gate = new object(); private volatile IObserver<T> _observer; private Queue<T> _queue = new Queue<T>(); [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] private Queue<T> _queue2; [System.Runtime.CompilerServices.Nullable(2)] private Exception _error; private bool _done; private bool _busy; private bool _hasFaulted; public FastImmediateObserver(IObserver<T> observer) { _observer = observer; } public void Dispose() { Done(); } public void EnsureActive() { EnsureActive(1); } public void EnsureActive(int count) { bool flag = false; lock (_gate) { if (!_hasFaulted && !_busy) { flag = true; _busy = true; } } if (flag) { while (true) { Queue<T> queue = null; Exception ex = null; bool flag2 = false; lock (_gate) { if (_queue.Count > 0) { if (_queue2 == null) _queue2 = new Queue<T>(); queue = _queue; _queue = _queue2; _queue2 = null; } if (_error != null) ex = _error; else if (_done) { flag2 = true; } else if (queue == null) { _busy = false; return; } } try { if (queue != null) { while (queue.Count > 0) { _observer.OnNext(queue.Dequeue()); } lock (_gate) { _queue2 = queue; } } if (ex != null) { Done().OnError(ex); return; } if (flag2) { Done().OnCompleted(); return; } } catch { lock (_gate) { _hasFaulted = true; _queue.Clear(); } throw; } } } } public void OnCompleted() { lock (_gate) { if (!_hasFaulted) _done = true; } } public void OnError(Exception error) { lock (_gate) { if (!_hasFaulted) _error = error; } } public void OnNext(T value) { lock (_gate) { if (!_hasFaulted) _queue.Enqueue(value); } } private IObserver<T> Done() { return Interlocked.Exchange<IObserver<T>>(ref _observer, NopObserver<T>.Instance); } } }