FastImmediateObserver<T>
Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal sealed class FastImmediateObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : IScheduledObserver<T>, IObserver<T>, IDisposable
{
private readonly object _gate = new object();
private volatile IObserver<T> _observer;
private Queue<T> _queue = new Queue<T>();
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
private Queue<T> _queue2;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _error;
private bool _done;
private bool _busy;
private bool _hasFaulted;
public FastImmediateObserver(IObserver<T> observer)
{
_observer = observer;
}
public void Dispose()
{
Done();
}
public void EnsureActive()
{
EnsureActive(1);
}
public void EnsureActive(int count)
{
bool flag = false;
lock (_gate) {
if (!_hasFaulted && !_busy) {
flag = true;
_busy = true;
}
}
if (flag) {
while (true) {
Queue<T> queue = null;
Exception ex = null;
bool flag2 = false;
lock (_gate) {
if (_queue.Count > 0) {
if (_queue2 == null)
_queue2 = new Queue<T>();
queue = _queue;
_queue = _queue2;
_queue2 = null;
}
if (_error != null)
ex = _error;
else if (_done) {
flag2 = true;
} else if (queue == null) {
_busy = false;
return;
}
}
try {
if (queue != null) {
while (queue.Count > 0) {
_observer.OnNext(queue.Dequeue());
}
lock (_gate) {
_queue2 = queue;
}
}
if (ex != null) {
Done().OnError(ex);
return;
}
if (flag2) {
Done().OnCompleted();
return;
}
} catch {
lock (_gate) {
_hasFaulted = true;
_queue.Clear();
}
throw;
}
}
}
}
public void OnCompleted()
{
lock (_gate) {
if (!_hasFaulted)
_done = true;
}
}
public void OnError(Exception error)
{
lock (_gate) {
if (!_hasFaulted)
_error = error;
}
}
public void OnNext(T value)
{
lock (_gate) {
if (!_hasFaulted)
_queue.Enqueue(value);
}
}
private IObserver<T> Done()
{
return Interlocked.Exchange<IObserver<T>>(ref _observer, NopObserver<T>.Instance);
}
}
}