<PackageReference Include="System.Reactive" Version="4.0.0" />

SelectMany<TSource, TResult>

static class SelectMany<TSource, TResult>
using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq.ObservableImpl { internal static class SelectMany<TSource, TResult> { internal class ObservableSelector : Producer<TResult, ObservableSelector._> { internal class _ : Sink<TResult>, IObserver<TSource> { private sealed class InnerObserver : IObserver<TResult> { private readonly _ _parent; private readonly IDisposable _self; public InnerObserver(_ parent, IDisposable self) { _parent = parent; _self = self; } public void OnNext(TResult value) { lock (_parent._gate) { _parent._observer.OnNext(value); } } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } protected readonly object _gate = new object(); private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable(); private readonly CompositeDisposable _group = new CompositeDisposable(); private readonly Func<TSource, IObservable<TResult>> _selector; private bool _isStopped; public _(ObservableSelector parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _selector = parent._selector; _group.Add(_sourceSubscription); } public IDisposable Run(IObservable<TSource> source) { _isStopped = false; _sourceSubscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this); return _group; } public void OnNext(TSource value) { IObservable<TResult> observable = null; try { observable = _selector(value); } catch (Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } return; } SubscribeInner(observable); } public virtual void OnError(Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } } public virtual void OnCompleted() { Final(); } protected void Final() { _isStopped = true; if (_group.Count == 1) { lock (_gate) { _observer.OnCompleted(); base.Dispose(); } } else _sourceSubscription.Dispose(); } protected void SubscribeInner(IObservable<TResult> inner) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _group.Add(singleAssignmentDisposable); singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)new InnerObserver(this, singleAssignmentDisposable)); } } protected readonly IObservable<TSource> _source; protected readonly Func<TSource, IObservable<TResult>> _selector; public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(_source); } } internal sealed class ObservableSelectors : ObservableSelector { internal new sealed class _ : ObservableSelector._ { private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public _(ObservableSelectors parent, IObserver<TResult> observer, IDisposable cancel) : base((ObservableSelector)parent, observer, cancel) { _selectorOnError = parent._selectorOnError; _selectorOnCompleted = parent._selectorOnCompleted; } public override void OnError(Exception error) { if (_selectorOnError != null) { IObservable<TResult> observable = null; try { observable = _selectorOnError(error); } catch (Exception error2) { lock (_gate) { _observer.OnError(error2); base.Dispose(); } return; } SubscribeInner(observable); Final(); } else base.OnError(error); } public override void OnCompleted() { if (_selectorOnCompleted != null) { IObservable<TResult> observable = null; try { observable = _selectorOnCompleted(); } catch (Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } return; } SubscribeInner(observable); } Final(); } } private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public ObservableSelectors(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) : base(source, selector) { _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } protected override ObservableSelector._ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } } internal class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._> { internal class _ : Sink<TResult>, IObserver<TSource> { private sealed class InnerObserver : IObserver<TResult> { private readonly _ _parent; private readonly IDisposable _self; public InnerObserver(_ parent, IDisposable self) { _parent = parent; _self = self; } public void OnNext(TResult value) { lock (_parent._gate) { _parent._observer.OnNext(value); } } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } private readonly object _gate = new object(); private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable(); private readonly CompositeDisposable _group = new CompositeDisposable(); protected readonly Func<TSource, int, IObservable<TResult>> _selector; private bool _isStopped; private int _index; public _(ObservableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _selector = parent._selector; _group.Add(_sourceSubscription); } public IDisposable Run(IObservable<TSource> source) { _isStopped = false; _sourceSubscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this); return _group; } public void OnNext(TSource value) { IObservable<TResult> observable = null; try { observable = _selector(value, checked(_index++)); } catch (Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } return; } SubscribeInner(observable); } public virtual void OnError(Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } } public virtual void OnCompleted() { Final(); } protected void Final() { _isStopped = true; if (_group.Count == 1) { lock (_gate) { _observer.OnCompleted(); base.Dispose(); } } else _sourceSubscription.Dispose(); } protected void SubscribeInner(IObservable<TResult> inner) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); _group.Add(singleAssignmentDisposable); singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)new InnerObserver(this, singleAssignmentDisposable)); } } protected readonly IObservable<TSource> _source; protected readonly Func<TSource, int, IObservable<TResult>> _selector; public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(_source); } } internal sealed class ObservableSelectorsIndexed : ObservableSelectorIndexed { internal new sealed class _ : ObservableSelectorIndexed._ { private readonly object _gate = new object(); private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable(); private readonly CompositeDisposable _group = new CompositeDisposable(); private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public _(ObservableSelectorsIndexed parent, IObserver<TResult> observer, IDisposable cancel) : base((ObservableSelectorIndexed)parent, observer, cancel) { _selectorOnError = parent._selectorOnError; _selectorOnCompleted = parent._selectorOnCompleted; _group.Add(_sourceSubscription); } public override void OnError(Exception error) { if (_selectorOnError != null) { IObservable<TResult> observable = null; try { observable = _selectorOnError(error); } catch (Exception error2) { lock (_gate) { _observer.OnError(error2); base.Dispose(); } return; } SubscribeInner(observable); Final(); } else base.OnError(error); } public override void OnCompleted() { if (_selectorOnCompleted != null) { IObservable<TResult> observable = null; try { observable = _selectorOnCompleted(); } catch (Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } return; } SubscribeInner(observable); } Final(); } } private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) : base(source, selector) { _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } protected override ObservableSelectorIndexed._ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } } internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._> { internal sealed class _ : Sink<TResult>, IObserver<TSource> { private readonly Func<TSource, IEnumerable<TResult>> _selector; public _(EnumerableSelector parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _selector = parent._selector; } public void OnNext(TSource value) { IEnumerable<TResult> enumerable = null; try { enumerable = _selector(value); } catch (Exception error) { _observer.OnError(error); base.Dispose(); return; } IEnumerator<TResult> enumerator = null; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { _observer.OnError(error2); base.Dispose(); return; } try { bool flag = true; while (flag) { flag = false; TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = enumerator.Current; } catch (Exception error3) { _observer.OnError(error3); base.Dispose(); return; } if (flag) _observer.OnNext(value2); } } finally { enumerator?.Dispose(); } } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TSource, IEnumerable<TResult>> _selector; public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._> { internal sealed class _ : Sink<TResult>, IObserver<TSource> { private readonly Func<TSource, int, IEnumerable<TResult>> _selector; private int _index; public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _selector = parent._selector; } public void OnNext(TSource value) { IEnumerable<TResult> enumerable = null; try { enumerable = _selector(value, checked(_index++)); } catch (Exception error) { _observer.OnError(error); base.Dispose(); return; } IEnumerator<TResult> enumerator = null; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { _observer.OnError(error2); base.Dispose(); return; } try { bool flag = true; while (flag) { flag = false; TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = enumerator.Current; } catch (Exception error3) { _observer.OnError(error3); base.Dispose(); return; } if (flag) _observer.OnNext(value2); } } finally { enumerator?.Dispose(); } } public void OnError(Exception error) { _observer.OnError(error); base.Dispose(); } public void OnCompleted() { _observer.OnCompleted(); base.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, IEnumerable<TResult>> _selector; public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink); } } internal sealed class TaskSelector : Producer<TResult, TaskSelector._> { internal sealed class _ : Sink<TResult>, IObserver<TSource> { private readonly object _gate = new object(); private readonly CancellationDisposable _cancel = new CancellationDisposable(); private readonly Func<TSource, CancellationToken, Task<TResult>> _selector; private volatile int _count; public _(TaskSelector parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _selector = parent._selector; } public IDisposable Run(IObservable<TSource> source) { _count = 1; return StableCompositeDisposable.Create(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this), _cancel); } public void OnNext(TSource value) { Task<TResult> task = null; try { Interlocked.Increment(ref _count); task = _selector(value, _cancel.Token); } catch (Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } return; } if (task.IsCompleted) OnCompletedTask(task); else task.ContinueWith(OnCompletedTask); } private void OnCompletedTask(Task<TResult> task) { switch (task.Status) { case TaskStatus.RanToCompletion: lock (_gate) { _observer.OnNext(task.Result); } OnCompleted(); break; case TaskStatus.Faulted: lock (_gate) { _observer.OnError(task.Exception.InnerException); base.Dispose(); } break; case TaskStatus.Canceled: if (!_cancel.IsDisposed) { lock (_gate) { _observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } break; } } public void OnError(Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { _observer.OnCompleted(); base.Dispose(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, CancellationToken, Task<TResult>> _selector; public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(_source); } } internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._> { internal sealed class _ : Sink<TResult>, IObserver<TSource> { private readonly object _gate = new object(); private readonly CancellationDisposable _cancel = new CancellationDisposable(); private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector; private volatile int _count; private int _index; public _(TaskSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _selector = parent._selector; } public IDisposable Run(IObservable<TSource> source) { _count = 1; return StableCompositeDisposable.Create(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this), _cancel); } public void OnNext(TSource value) { Task<TResult> task = null; try { Interlocked.Increment(ref _count); task = _selector(value, checked(_index++), _cancel.Token); } catch (Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } return; } if (task.IsCompleted) OnCompletedTask(task); else task.ContinueWith(OnCompletedTask); } private void OnCompletedTask(Task<TResult> task) { switch (task.Status) { case TaskStatus.RanToCompletion: lock (_gate) { _observer.OnNext(task.Result); } OnCompleted(); break; case TaskStatus.Faulted: lock (_gate) { _observer.OnError(task.Exception.InnerException); base.Dispose(); } break; case TaskStatus.Canceled: if (!_cancel.IsDisposed) { lock (_gate) { _observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } break; } } public void OnError(Exception error) { lock (_gate) { _observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { _observer.OnCompleted(); base.Dispose(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector; public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) { return new _(this, observer, cancel); } protected override IDisposable Run(_ sink) { return sink.Run(_source); } } } }