<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

ObserveOnObserverNew<T>

sealed class ObserveOnObserverNew<T> : IdentitySink<T>
An ObserveOn operator implementation that uses lock-free techniques to signal events to the downstream.
using System.Collections.Concurrent; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class ObserveOnObserverNew<[System.Runtime.CompilerServices.Nullable(2)] T> : IdentitySink<T> { private readonly IScheduler _scheduler; private readonly ConcurrentQueue<T> _queue; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _task; private int _wip; private bool _done; [System.Runtime.CompilerServices.Nullable(2)] private Exception _error; private bool _disposed; private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc = (IScheduler scheduler, ObserveOnObserverNew<T> self) => self.DrainShortRunning(scheduler); public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream) { _scheduler = scheduler; _queue = new ConcurrentQueue<T>(); } protected override void Dispose(bool disposing) { Volatile.Write(ref _disposed, true); base.Dispose(disposing); if (disposing) { Disposable.Dispose(ref _task); Clear(_queue); } } private static void Clear(ConcurrentQueue<T> q) { T result; while (q.TryDequeue(out result)) { } } public override void OnCompleted() { Volatile.Write(ref _done, true); Schedule(); } public override void OnError(Exception error) { _error = error; Volatile.Write(ref _done, true); Schedule(); } public override void OnNext(T value) { _queue.Enqueue(value); Schedule(); } private void Schedule() { if (Interlocked.Increment(ref _wip) == 1) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); if (Disposable.TrySetMultiple(ref _task, singleAssignmentDisposable)) singleAssignmentDisposable.Disposable = _scheduler.Schedule<ObserveOnObserverNew<T>>(this, DrainShortRunningFunc); if (Volatile.Read(ref _disposed)) Clear(_queue); } } private IDisposable DrainShortRunning(IScheduler recursiveScheduler) { DrainStep(_queue); if (Interlocked.Decrement(ref _wip) != 0) { IDisposable value = recursiveScheduler.Schedule<ObserveOnObserverNew<T>>(this, DrainShortRunningFunc); Disposable.TrySetMultiple(ref _task, value); } return Disposable.Empty; } private void DrainStep(ConcurrentQueue<T> q) { if (Volatile.Read(ref _disposed)) Clear(q); else { bool flag = Volatile.Read(ref _done); if (flag) { Exception error = _error; if (error != null) { Volatile.Write(ref _disposed, true); ForwardOnError(error); return; } } if (q.TryDequeue(out T result)) ForwardOnNext(result); else if (flag) { Volatile.Write(ref _disposed, true); ForwardOnCompleted(); } } } } }