ObserveOnObserverNew<T>
An ObserveOn operator implementation that uses lock-free
techniques to signal events to the downstream.
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class ObserveOnObserverNew<[System.Runtime.CompilerServices.Nullable(2)] T> : IdentitySink<T>
{
private readonly IScheduler _scheduler;
private readonly ConcurrentQueue<T> _queue;
[System.Runtime.CompilerServices.Nullable(2)]
private IDisposable _task;
private int _wip;
private bool _done;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _error;
private bool _disposed;
private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc = (IScheduler scheduler, ObserveOnObserverNew<T> self) => self.DrainShortRunning(scheduler);
public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream)
: base(downstream)
{
_scheduler = scheduler;
_queue = new ConcurrentQueue<T>();
}
protected override void Dispose(bool disposing)
{
Volatile.Write(ref _disposed, true);
base.Dispose(disposing);
if (disposing) {
Disposable.Dispose(ref _task);
Clear(_queue);
}
}
private static void Clear(ConcurrentQueue<T> q)
{
T result;
while (q.TryDequeue(out result)) {
}
}
public override void OnCompleted()
{
Volatile.Write(ref _done, true);
Schedule();
}
public override void OnError(Exception error)
{
_error = error;
Volatile.Write(ref _done, true);
Schedule();
}
public override void OnNext(T value)
{
_queue.Enqueue(value);
Schedule();
}
private void Schedule()
{
if (Interlocked.Increment(ref _wip) == 1) {
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
if (Disposable.TrySetMultiple(ref _task, singleAssignmentDisposable))
singleAssignmentDisposable.Disposable = _scheduler.Schedule<ObserveOnObserverNew<T>>(this, DrainShortRunningFunc);
if (Volatile.Read(ref _disposed))
Clear(_queue);
}
}
private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
{
DrainStep(_queue);
if (Interlocked.Decrement(ref _wip) != 0) {
IDisposable value = recursiveScheduler.Schedule<ObserveOnObserverNew<T>>(this, DrainShortRunningFunc);
Disposable.TrySetMultiple(ref _task, value);
}
return Disposable.Empty;
}
private void DrainStep(ConcurrentQueue<T> q)
{
if (Volatile.Read(ref _disposed))
Clear(q);
else {
bool flag = Volatile.Read(ref _done);
if (flag) {
Exception error = _error;
if (error != null) {
Volatile.Write(ref _disposed, true);
ForwardOnError(error);
return;
}
}
if (q.TryDequeue(out T result))
ForwardOnNext(result);
else if (flag) {
Volatile.Write(ref _disposed, true);
ForwardOnCompleted();
}
}
}
}
}