<PackageReference Include="System.Reactive" Version="4.3.2" />

SelectMany<TSource, TResult>

static class SelectMany<TSource, TResult>
using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq.ObservableImpl { internal static class SelectMany<TSource, TResult> { internal class ObservableSelector : Producer<TResult, ObservableSelector._> { internal class _ : Sink<TSource, TResult> { private sealed class InnerObserver : SafeObserver<TResult> { private readonly _ _parent; public InnerObserver(_ parent) { _parent = parent; } public override void OnNext(TResult value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } protected readonly object _gate = new object(); private readonly Func<TSource, IObservable<TResult>> _selector; private readonly CompositeDisposable _group = new CompositeDisposable(); private volatile bool _isStopped; public _(ObservableSelector parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IObservable<TResult> observable = null; try { observable = _selector(value); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(observable); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { Final(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } protected void Final() { _isStopped = true; if (_group.Count == 0) { lock (_gate) { ForwardOnCompleted(); } } else DisposeUpstream(); } protected void SubscribeInner(IObservable<TResult> inner) { InnerObserver innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)innerObserver)); } } protected readonly IObservable<TSource> _source; protected readonly Func<TSource, IObservable<TResult>> _selector; public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class ObservableSelectors : ObservableSelector { internal new sealed class _ : ObservableSelector._ { private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public _(ObservableSelectors parent, IObserver<TResult> observer) : base((ObservableSelector)parent, observer) { _selectorOnError = parent._selectorOnError; _selectorOnCompleted = parent._selectorOnCompleted; } public override void OnError(Exception error) { if (_selectorOnError != null) { IObservable<TResult> observable = null; try { observable = _selectorOnError(error); } catch (Exception error2) { lock (_gate) { ForwardOnError(error2); } return; } SubscribeInner(observable); Final(); } else base.OnError(error); } public override void OnCompleted() { if (_selectorOnCompleted != null) { IObservable<TResult> observable = null; try { observable = _selectorOnCompleted(); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(observable); } Final(); } } private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public ObservableSelectors(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) : base(source, selector) { _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } protected override ObservableSelector._ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } } internal class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._> { internal class _ : Sink<TSource, TResult> { private sealed class InnerObserver : SafeObserver<TResult> { private readonly _ _parent; public InnerObserver(_ parent) { _parent = parent; } public override void OnNext(TResult value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } private readonly object _gate = new object(); private readonly CompositeDisposable _group = new CompositeDisposable(); protected readonly Func<TSource, int, IObservable<TResult>> _selector; private int _index; private volatile bool _isStopped; public _(ObservableSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IObservable<TResult> observable = null; try { observable = _selector(value, checked(_index++)); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(observable); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { Final(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } protected void Final() { _isStopped = true; if (_group.Count == 0) { lock (_gate) { ForwardOnCompleted(); } } else DisposeUpstream(); } protected void SubscribeInner(IObservable<TResult> inner) { InnerObserver innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)innerObserver)); } } protected readonly IObservable<TSource> _source; protected readonly Func<TSource, int, IObservable<TResult>> _selector; public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class ObservableSelectorsIndexed : ObservableSelectorIndexed { internal new sealed class _ : ObservableSelectorIndexed._ { private readonly object _gate = new object(); private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public _(ObservableSelectorsIndexed parent, IObserver<TResult> observer) : base((ObservableSelectorIndexed)parent, observer) { _selectorOnError = parent._selectorOnError; _selectorOnCompleted = parent._selectorOnCompleted; } public override void OnError(Exception error) { if (_selectorOnError != null) { IObservable<TResult> observable = null; try { observable = _selectorOnError(error); } catch (Exception error2) { lock (_gate) { ForwardOnError(error2); } return; } SubscribeInner(observable); Final(); } else base.OnError(error); } public override void OnCompleted() { if (_selectorOnCompleted != null) { IObservable<TResult> observable = null; try { observable = _selectorOnCompleted(); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(observable); } Final(); } } private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) : base(source, selector) { _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } protected override ObservableSelectorIndexed._ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } } internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._> { internal sealed class _ : Sink<TSource, TResult> { private readonly Func<TSource, IEnumerable<TResult>> _selector; public _(EnumerableSelector parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IEnumerable<TResult> enumerable = null; try { enumerable = _selector(value); } catch (Exception error) { ForwardOnError(error); return; } IEnumerator<TResult> enumerator; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { ForwardOnError(error2); return; } using (enumerator) { bool flag = true; while (flag) { TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = enumerator.Current; } catch (Exception error3) { ForwardOnError(error3); return; } if (flag) ForwardOnNext(value2); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, IEnumerable<TResult>> _selector; public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._> { internal sealed class _ : Sink<TSource, TResult> { private readonly Func<TSource, int, IEnumerable<TResult>> _selector; private int _index; public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IEnumerable<TResult> enumerable = null; try { enumerable = _selector(value, checked(_index++)); } catch (Exception error) { ForwardOnError(error); return; } IEnumerator<TResult> enumerator; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { ForwardOnError(error2); return; } using (enumerator) { bool flag = true; while (flag) { TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = enumerator.Current; } catch (Exception error3) { ForwardOnError(error3); return; } if (flag) ForwardOnNext(value2); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, IEnumerable<TResult>> _selector; public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class TaskSelector : Producer<TResult, TaskSelector._> { internal sealed class _ : Sink<TSource, TResult> { private readonly object _gate = new object(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly Func<TSource, CancellationToken, Task<TResult>> _selector; private volatile int _count; public _(TaskSelector parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void Run(IObservable<TSource> source) { _count = 1; base.Run(source); } protected override void Dispose(bool disposing) { if (disposing) _cts.Cancel(); base.Dispose(disposing); } public override void OnNext(TSource value) { Task<TResult> task = null; try { Interlocked.Increment(ref _count); task = _selector(value, _cts.Token); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } if (task.IsCompleted) OnCompletedTask(task); else task.ContinueWith(delegate(Task<TResult> closureTask, object thisObject) { ((_)thisObject).OnCompletedTask(closureTask); }, this, _cts.Token); } private void OnCompletedTask(Task<TResult> task) { switch (task.Status) { case TaskStatus.RanToCompletion: lock (_gate) { ForwardOnNext(task.Result); } OnCompleted(); break; case TaskStatus.Faulted: lock (_gate) { ForwardOnError(task.Exception.InnerException); } break; case TaskStatus.Canceled: if (!_cts.IsCancellationRequested) { lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, CancellationToken, Task<TResult>> _selector; public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._> { internal sealed class _ : Sink<TSource, TResult> { private readonly object _gate = new object(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector; private volatile int _count; private int _index; public _(TaskSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void Run(IObservable<TSource> source) { _count = 1; base.Run(source); } protected override void Dispose(bool disposing) { if (disposing) _cts.Cancel(); base.Dispose(disposing); } public override void OnNext(TSource value) { Task<TResult> task = null; try { Interlocked.Increment(ref _count); task = _selector(value, checked(_index++), _cts.Token); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } if (task.IsCompleted) OnCompletedTask(task); else task.ContinueWith(delegate(Task<TResult> closureTask, object thisObject) { ((_)thisObject).OnCompletedTask(closureTask); }, this, _cts.Token); } private void OnCompletedTask(Task<TResult> task) { switch (task.Status) { case TaskStatus.RanToCompletion: lock (_gate) { ForwardOnNext(task.Result); } OnCompleted(); break; case TaskStatus.Faulted: lock (_gate) { ForwardOnError(task.Exception.InnerException); } break; case TaskStatus.Canceled: if (!_cts.IsCancellationRequested) { lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector; public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector) { _source = source; _selector = selector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } } }