<PackageReference Include="System.Reactive" Version="4.2.0-preview.566" />

Merge<TSource>

static class Merge<TSource>
using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq.ObservableImpl { internal static class Merge<TSource> { internal sealed class ObservablesMaxConcurrency : Producer<TSource, ObservablesMaxConcurrency._> { internal sealed class _ : Sink<IObservable<TSource>, TSource> { private sealed class InnerObserver : SafeObserver<TSource> { private readonly _ _parent; public InnerObserver(_ parent) { _parent = parent; } public override void OnNext(TSource value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); lock (_parent._gate) { if (_parent._q.Count > 0) { IObservable<TSource> innerSource = _parent._q.Dequeue(); _parent.Subscribe(innerSource); } else { _parent._activeCount--; if (_parent._isStopped && _parent._activeCount == 0) _parent.ForwardOnCompleted(); } } } } private readonly int _maxConcurrent; private readonly object _gate = new object(); private readonly Queue<IObservable<TSource>> _q = new Queue<IObservable<TSource>>(); private volatile bool _isStopped; private readonly CompositeDisposable _group = new CompositeDisposable(); private int _activeCount; public _(int maxConcurrent, IObserver<TSource> observer) : base(observer) { _maxConcurrent = maxConcurrent; } public override void OnNext(IObservable<TSource> value) { lock (_gate) { if (_activeCount < _maxConcurrent) { _activeCount++; Subscribe(value); } else _q.Enqueue(value); } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { _isStopped = true; if (_activeCount == 0) ForwardOnCompleted(); else DisposeUpstream(); } } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } private void Subscribe(IObservable<TSource> innerSource) { InnerObserver innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TSource>(innerSource, (IObserver<TSource>)innerObserver)); } } private readonly IObservable<IObservable<TSource>> _sources; private readonly int _maxConcurrent; public ObservablesMaxConcurrency(IObservable<IObservable<TSource>> sources, int maxConcurrent) { _sources = sources; _maxConcurrent = maxConcurrent; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(_maxConcurrent, observer); } protected override void Run(_ sink) { sink.Run(_sources); } } internal sealed class Observables : Producer<TSource, Observables._> { internal sealed class _ : Sink<IObservable<TSource>, TSource> { private sealed class InnerObserver : SafeObserver<TSource> { private readonly _ _parent; public InnerObserver(_ parent) { _parent = parent; } public override void OnNext(TSource value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } private readonly object _gate = new object(); private volatile bool _isStopped; private readonly CompositeDisposable _group = new CompositeDisposable(); public _(IObserver<TSource> observer) : base(observer) { } public override void OnNext(IObservable<TSource> value) { InnerObserver innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TSource>(value, (IObserver<TSource>)innerObserver)); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { _isStopped = true; if (_group.Count == 0) { lock (_gate) { ForwardOnCompleted(); } } else DisposeUpstream(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } } private readonly IObservable<IObservable<TSource>> _sources; public Observables(IObservable<IObservable<TSource>> sources) { _sources = sources; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(_sources); } } internal sealed class Tasks : Producer<TSource, Tasks._> { internal sealed class _ : Sink<Task<TSource>, TSource> { private readonly object _gate = new object(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private volatile int _count = 1; public _(IObserver<TSource> observer) : base(observer) { } public override void OnNext(Task<TSource> value) { Interlocked.Increment(ref _count); if (value.IsCompleted) OnCompletedTask(value); else value.ContinueWith(delegate(Task<TSource> t, object thisObject) { ((_)thisObject).OnCompletedTask(t); }, this, _cts.Token); } private void OnCompletedTask(Task<TSource> task) { switch (task.Status) { case TaskStatus.RanToCompletion: lock (_gate) { ForwardOnNext(task.Result); } OnCompleted(); break; case TaskStatus.Faulted: lock (_gate) { ForwardOnError(task.Exception.InnerException); } break; case TaskStatus.Canceled: lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } protected override void Dispose(bool disposing) { if (disposing) _cts.Cancel(); base.Dispose(disposing); } } private readonly IObservable<Task<TSource>> _sources; public Tasks(IObservable<Task<TSource>> sources) { _sources = sources; } protected override _ CreateSink(IObserver<TSource> observer) { return new _(observer); } protected override void Run(_ sink) { sink.Run(_sources); } } } }