<PackageReference Include="System.Reactive" Version="4.2.0" />

SelectMany<TSource, TCollection, TResult>

static class SelectMany<TSource, TCollection, TResult>
using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq.ObservableImpl { internal static class SelectMany<TSource, TCollection, TResult> { internal sealed class ObservableSelector : Producer<TResult, ObservableSelector._> { internal sealed class _ : Sink<TSource, TResult> { private sealed class InnerObserver : SafeObserver<TCollection> { private readonly _ _parent; private readonly TSource _value; public InnerObserver(_ parent, TSource value) { _parent = parent; _value = value; } public override void OnNext(TCollection value) { TResult value2; try { value2 = _parent._resultSelector(_value, value); } catch (Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } return; } lock (_parent._gate) { _parent.ForwardOnNext(value2); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } private readonly object _gate = new object(); private readonly CompositeDisposable _group = new CompositeDisposable(); private readonly Func<TSource, IObservable<TCollection>> _collectionSelector; private readonly Func<TSource, TCollection, TResult> _resultSelector; private volatile bool _isStopped; public _(ObservableSelector parent, IObserver<TResult> observer) : base(observer) { _collectionSelector = parent._collectionSelector; _resultSelector = parent._resultSelector; } public override void OnNext(TSource value) { IObservable<TCollection> observable = null; try { observable = _collectionSelector(value); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } InnerObserver innerObserver = new InnerObserver(this, value); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TCollection>(observable, (IObserver<TCollection>)innerObserver)); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { _isStopped = true; if (_group.Count == 0) { lock (_gate) { ForwardOnCompleted(); } } else DisposeUpstream(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TSource, IObservable<TCollection>> _collectionSelector; private readonly Func<TSource, TCollection, TResult> _resultSelector; public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._> { internal sealed class _ : Sink<TSource, TResult> { private sealed class InnerObserver : SafeObserver<TCollection> { private readonly _ _parent; private readonly TSource _value; private readonly int _valueIndex; private int _index; public InnerObserver(_ parent, TSource value, int index) { _parent = parent; _value = value; _valueIndex = index; } public override void OnNext(TCollection value) { TResult value2; try { value2 = _parent._resultSelector(_value, _valueIndex, value, checked(_index++)); } catch (Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } return; } lock (_parent._gate) { _parent.ForwardOnNext(value2); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } private readonly object _gate = new object(); private readonly CompositeDisposable _group = new CompositeDisposable(); private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector; private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector; private volatile bool _isStopped; private int _index; public _(ObservableSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _collectionSelector = parent._collectionSelector; _resultSelector = parent._resultSelector; } public override void OnNext(TSource value) { int num = checked(_index++); IObservable<TCollection> observable = null; try { observable = _collectionSelector(value, num); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } InnerObserver innerObserver = new InnerObserver(this, value, num); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TCollection>(observable, (IObserver<TCollection>)innerObserver)); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { _isStopped = true; if (_group.Count == 0) { lock (_gate) { ForwardOnCompleted(); } } else DisposeUpstream(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector; private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector; public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._> { internal sealed class _ : Sink<TSource, TResult> { private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector; private readonly Func<TSource, TCollection, TResult> _resultSelector; public _(EnumerableSelector parent, IObserver<TResult> observer) : base(observer) { _collectionSelector = parent._collectionSelector; _resultSelector = parent._resultSelector; } public override void OnNext(TSource value) { IEnumerable<TCollection> enumerable = null; try { enumerable = _collectionSelector(value); } catch (Exception error) { ForwardOnError(error); return; } IEnumerator<TCollection> enumerator; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { ForwardOnError(error2); return; } using (enumerator) { bool flag = true; while (flag) { TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = _resultSelector(value, enumerator.Current); } catch (Exception error3) { ForwardOnError(error3); return; } if (flag) ForwardOnNext(value2); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector; private readonly Func<TSource, TCollection, TResult> _resultSelector; public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._> { internal sealed class _ : Sink<TSource, TResult> { private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector; private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector; private int _index; public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _collectionSelector = parent._collectionSelector; _resultSelector = parent._resultSelector; } public override void OnNext(TSource value) { int arg = checked(_index++); IEnumerable<TCollection> enumerable = null; try { enumerable = _collectionSelector(value, arg); } catch (Exception error) { ForwardOnError(error); return; } IEnumerator<TCollection> enumerator; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { ForwardOnError(error2); return; } using (enumerator) { int num = 0; bool flag = true; while (flag) { TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = _resultSelector(value, arg, enumerator.Current, num++); } catch (Exception error3) { ForwardOnError(error3); return; } if (flag) ForwardOnNext(value2); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector; private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector; public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class TaskSelector : Producer<TResult, TaskSelector._> { internal sealed class _ : Sink<TSource, TResult> { private readonly object _gate = new object(); private readonly CancellationTokenSource _cancel = new CancellationTokenSource(); private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector; private readonly Func<TSource, TCollection, TResult> _resultSelector; private volatile int _count; public _(TaskSelector parent, IObserver<TResult> observer) : base(observer) { _collectionSelector = parent._collectionSelector; _resultSelector = parent._resultSelector; } public override void Run(IObservable<TSource> source) { _count = 1; base.Run(source); } protected override void Dispose(bool disposing) { if (disposing) _cancel.Cancel(); base.Dispose(disposing); } public override void OnNext(TSource value) { Task<TCollection> task = null; try { Interlocked.Increment(ref _count); task = _collectionSelector(value, _cancel.Token); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } if (task.IsCompleted) OnCompletedTask(value, task); else System.Threading.Tasks.TaskExtensions.ContinueWithState<TCollection, (_, TSource)>(task, (Action<Task<TCollection>, (_, TSource)>)delegate(Task<TCollection> t, (_ this, TSource value) tuple) { tuple.this.OnCompletedTask(tuple.value, t); }, (this, value), _cancel.Token); } private void OnCompletedTask(TSource value, Task<TCollection> task) { switch (task.Status) { case TaskStatus.RanToCompletion: { TResult value2; try { value2 = _resultSelector(value, task.Result); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } lock (_gate) { ForwardOnNext(value2); } OnCompleted(); break; } case TaskStatus.Faulted: lock (_gate) { ForwardOnError(task.Exception.InnerException); } break; case TaskStatus.Canceled: if (!_cancel.IsCancellationRequested) { lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector; private readonly Func<TSource, TCollection, TResult> _resultSelector; public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._> { internal sealed class _ : Sink<TSource, TResult> { private readonly object _gate = new object(); private readonly CancellationTokenSource _cancel = new CancellationTokenSource(); private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector; private readonly Func<TSource, int, TCollection, TResult> _resultSelector; private volatile int _count; private int _index; public _(TaskSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _collectionSelector = parent._collectionSelector; _resultSelector = parent._resultSelector; } public override void Run(IObservable<TSource> source) { _count = 1; base.Run(source); } protected override void Dispose(bool disposing) { if (disposing) _cancel.Cancel(); base.Dispose(disposing); } public override void OnNext(TSource value) { int num = checked(_index++); Task<TCollection> task = null; try { Interlocked.Increment(ref _count); task = _collectionSelector(value, num, _cancel.Token); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } if (task.IsCompleted) OnCompletedTask(value, num, task); else System.Threading.Tasks.TaskExtensions.ContinueWithState<TCollection, (_, TSource, int)>(task, (Action<Task<TCollection>, (_, TSource, int)>)delegate(Task<TCollection> t, (_ this, TSource value, int index) tuple) { tuple.this.OnCompletedTask(tuple.value, tuple.index, t); }, (this, value, num), _cancel.Token); } private void OnCompletedTask(TSource value, int index, Task<TCollection> task) { switch (task.Status) { case TaskStatus.RanToCompletion: { TResult value2; try { value2 = _resultSelector(value, index, task.Result); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } lock (_gate) { ForwardOnNext(value2); } OnCompleted(); break; } case TaskStatus.Faulted: lock (_gate) { ForwardOnError(task.Exception.InnerException); } break; case TaskStatus.Canceled: if (!_cancel.IsCancellationRequested) { lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector; private readonly Func<TSource, int, TCollection, TResult> _resultSelector; public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } } }