<PackageReference Include="System.Reactive" Version="4.2.0" />

ConcatMany<T>

sealed class ConcatMany<T> : IObservable<T>
using System.Collections.Concurrent; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class ConcatMany<T> : IObservable<T> { internal sealed class ConcatManyOuterObserver : IObserver<IObservable<T>>, IDisposable { internal sealed class InnerObserver : IObserver<T>, IDisposable { private readonly ConcatManyOuterObserver _parent; internal IDisposable Upstream; internal InnerObserver(ConcatManyOuterObserver parent) { _parent = parent; } internal bool SetDisposable(SingleAssignmentDisposable sad) { return Disposable.TrySetSingle(ref Upstream, sad) == TrySetSingleResult.Success; } internal bool Finish() { IDisposable disposable = Volatile.Read<IDisposable>(ref Upstream); if (disposable != BooleanDisposable.True && Interlocked.CompareExchange<IDisposable>(ref Upstream, (IDisposable)null, disposable) == disposable) { disposable.Dispose(); return true; } return false; } public void Dispose() { Disposable.TryDispose(ref Upstream); } public void OnCompleted() { _parent.InnerComplete(); } public void OnError(Exception error) { _parent.InnerError(error); } public void OnNext(T value) { _parent.InnerNext(value); } } private readonly IObserver<T> _downstream; private readonly ConcurrentQueue<IObservable<T>> _queue; private readonly InnerObserver _innerObserver; private IDisposable _upstream; private int _trampoline; private Exception _error; private bool _done; private int _active; internal ConcatManyOuterObserver(IObserver<T> downstream) { _downstream = downstream; _queue = new ConcurrentQueue<IObservable<T>>(); _innerObserver = new InnerObserver(this); } internal void OnSubscribe(IDisposable d) { Disposable.SetSingle(ref _upstream, d); } public void Dispose() { _innerObserver.Dispose(); DisposeMain(); } private void DisposeMain() { Disposable.TryDispose(ref _upstream); } private bool IsDisposed() { return Disposable.GetIsDisposed(ref _upstream); } public void OnCompleted() { Volatile.Write(ref _done, true); Drain(); } public void OnError(Exception error) { if (Interlocked.CompareExchange<Exception>(ref _error, error, (Exception)null) == null) { Volatile.Write(ref _done, true); Drain(); } } public void OnNext(IObservable<T> value) { _queue.Enqueue(value); Drain(); } private void InnerNext(T item) { _downstream.OnNext(item); } private void InnerError(Exception error) { if (_innerObserver.Finish() && Interlocked.CompareExchange<Exception>(ref _error, error, (Exception)null) == null) { Volatile.Write(ref _done, true); Volatile.Write(ref _active, 0); Drain(); } } private void InnerComplete() { if (_innerObserver.Finish()) { Volatile.Write(ref _active, 0); Drain(); } } private void Drain() { if (Interlocked.Increment(ref _trampoline) == 1) { do { if (IsDisposed()) { IObservable<T> result; while (_queue.TryDequeue(out result)) { } } else if (Volatile.Read(ref _active) == 0) { bool flag = Volatile.Read(ref _done); if (flag) { Exception ex = Volatile.Read<Exception>(ref _error); if (ex != null) { _downstream.OnError(ex); DisposeMain(); continue; } } if (_queue.TryDequeue(out IObservable<T> result2)) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); if (_innerObserver.SetDisposable(singleAssignmentDisposable)) { Interlocked.Exchange(ref _active, 1); singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<T>(result2, (IObserver<T>)_innerObserver); } } else if (flag) { _downstream.OnCompleted(); DisposeMain(); } } } while (Interlocked.Decrement(ref _trampoline) != 0); } } } private readonly IObservable<IObservable<T>> _sources; internal ConcatMany(IObservable<IObservable<T>> sources) { _sources = sources; } public IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); ConcatManyOuterObserver concatManyOuterObserver = new ConcatManyOuterObserver(observer); IDisposable d = ObservableExtensions.SubscribeSafe<IObservable<T>>(_sources, (IObserver<IObservable<T>>)concatManyOuterObserver); concatManyOuterObserver.OnSubscribe(d); return concatManyOuterObserver; } } }