<PackageReference Include="System.Reactive" Version="4.1.6" />

RedoSerializedObserver<X>

sealed class RedoSerializedObserver<X> : IObserver<X>
using System.Collections.Concurrent; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class RedoSerializedObserver<X> : IObserver<X> { private readonly IObserver<X> _downstream; private int _wip; private Exception _terminalException; private static readonly Exception SignaledIndicator = new Exception(); private readonly ConcurrentQueue<X> _queue; internal RedoSerializedObserver(IObserver<X> downstream) { _downstream = downstream; _queue = new ConcurrentQueue<X>(); } public void OnCompleted() { if (Interlocked.CompareExchange<Exception>(ref _terminalException, ExceptionHelper.Terminated, (Exception)null) == null) Drain(); } public void OnError(Exception error) { if (Interlocked.CompareExchange<Exception>(ref _terminalException, error, (Exception)null) == null) Drain(); } public void OnNext(X value) { _queue.Enqueue(value); Drain(); } private void Clear() { X result; while (_queue.TryDequeue(out result)) { } } private void Drain() { if (Interlocked.Increment(ref _wip) == 1) { int num = 1; do { Exception ex = Volatile.Read<Exception>(ref _terminalException); if (ex == null) { X result; while (_queue.TryDequeue(out result)) { _downstream.OnNext(result); } } else { if (ex != SignaledIndicator) { Interlocked.Exchange<Exception>(ref _terminalException, SignaledIndicator); if (ex != ExceptionHelper.Terminated) _downstream.OnError(ex); else _downstream.OnCompleted(); } Clear(); } num = Interlocked.Add(ref _wip, -num); } while (num != 0); } } } }