<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

SelectMany<TSource, TResult>

static class SelectMany<TSource, TResult>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal static class SelectMany<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 0 })] internal class ObservableSelector : Producer<TResult, ObservableSelector._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal class _ : Sink<TSource, TResult> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class InnerObserver : SafeObserver<TResult> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] private readonly _ _parent; public InnerObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ parent) { _parent = parent; } public override void OnNext(TResult value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } protected readonly object _gate = new object(); private readonly Func<TSource, IObservable<TResult>> _selector; private readonly CompositeDisposable _group = new CompositeDisposable(); private volatile bool _isStopped; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] ObservableSelector parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IObservable<TResult> inner; try { inner = _selector(value); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(inner); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { Final(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } protected void Final() { _isStopped = true; if (_group.Count == 0) { lock (_gate) { ForwardOnCompleted(); } } else DisposeUpstream(); } protected void SubscribeInner(IObservable<TResult> inner) { InnerObserver innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)innerObserver)); } } protected readonly IObservable<TSource> _source; protected readonly Func<TSource, IObservable<TResult>> _selector; public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) { _source = source; _selector = selector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(0)] internal sealed class ObservableSelectors : ObservableSelector { [System.Runtime.CompilerServices.Nullable(0)] internal new sealed class _ : ObservableSelector._ { private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] ObservableSelectors parent, IObserver<TResult> observer) : base((ObservableSelector)parent, observer) { _selectorOnError = parent._selectorOnError; _selectorOnCompleted = parent._selectorOnCompleted; } public override void OnError(Exception error) { if (_selectorOnError != null) { IObservable<TResult> inner; try { inner = _selectorOnError(error); } catch (Exception error2) { lock (_gate) { ForwardOnError(error2); } return; } SubscribeInner(inner); Final(); } else base.OnError(error); } public override void OnCompleted() { if (_selectorOnCompleted != null) { IObservable<TResult> inner; try { inner = _selectorOnCompleted(); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(inner); } Final(); } } private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public ObservableSelectors(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) : base(source, selector) { _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override ObservableSelector._ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 0 })] internal class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal class _ : Sink<TSource, TResult> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class InnerObserver : SafeObserver<TResult> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] private readonly _ _parent; public InnerObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ parent) { _parent = parent; } public override void OnNext(TResult value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } private readonly object _gate = new object(); private readonly CompositeDisposable _group = new CompositeDisposable(); protected readonly Func<TSource, int, IObservable<TResult>> _selector; private int _index; private volatile bool _isStopped; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] ObservableSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IObservable<TResult> inner; try { inner = _selector(value, checked(_index++)); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(inner); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { Final(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) _group.Dispose(); } protected void Final() { _isStopped = true; if (_group.Count == 0) { lock (_gate) { ForwardOnCompleted(); } } else DisposeUpstream(); } protected void SubscribeInner(IObservable<TResult> inner) { InnerObserver innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)innerObserver)); } } protected readonly IObservable<TSource> _source; protected readonly Func<TSource, int, IObservable<TResult>> _selector; public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) { _source = source; _selector = selector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(0)] internal sealed class ObservableSelectorsIndexed : ObservableSelectorIndexed { [System.Runtime.CompilerServices.Nullable(0)] internal new sealed class _ : ObservableSelectorIndexed._ { private readonly object _gate = new object(); private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] ObservableSelectorsIndexed parent, IObserver<TResult> observer) : base((ObservableSelectorIndexed)parent, observer) { _selectorOnError = parent._selectorOnError; _selectorOnCompleted = parent._selectorOnCompleted; } public override void OnError(Exception error) { if (_selectorOnError != null) { IObservable<TResult> inner; try { inner = _selectorOnError(error); } catch (Exception error2) { lock (_gate) { ForwardOnError(error2); } return; } SubscribeInner(inner); Final(); } else base.OnError(error); } public override void OnCompleted() { if (_selectorOnCompleted != null) { IObservable<TResult> inner; try { inner = _selectorOnCompleted(); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } SubscribeInner(inner); } Final(); } } private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) : base(source, selector) { _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override ObservableSelectorIndexed._ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 0 })] internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal sealed class _ : Sink<TSource, TResult> { private readonly Func<TSource, IEnumerable<TResult>> _selector; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] EnumerableSelector parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IEnumerable<TResult> enumerable; try { enumerable = _selector(value); } catch (Exception error) { ForwardOnError(error); return; } IEnumerator<TResult> enumerator; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { ForwardOnError(error2); return; } using (enumerator) { bool flag = true; while (flag) { TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = enumerator.Current; } catch (Exception error3) { ForwardOnError(error3); return; } if (flag) ForwardOnNext(value2); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, IEnumerable<TResult>> _selector; public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) { _source = source; _selector = selector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 0 })] internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal sealed class _ : Sink<TSource, TResult> { private readonly Func<TSource, int, IEnumerable<TResult>> _selector; private int _index; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] EnumerableSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void OnNext(TSource value) { IEnumerable<TResult> enumerable; try { enumerable = _selector(value, checked(_index++)); } catch (Exception error) { ForwardOnError(error); return; } IEnumerator<TResult> enumerator; try { enumerator = enumerable.GetEnumerator(); } catch (Exception error2) { ForwardOnError(error2); return; } using (enumerator) { bool flag = true; while (flag) { TResult value2 = default(TResult); try { flag = enumerator.MoveNext(); if (flag) value2 = enumerator.Current; } catch (Exception error3) { ForwardOnError(error3); return; } if (flag) ForwardOnNext(value2); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, IEnumerable<TResult>> _selector; public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector) { _source = source; _selector = selector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 0 })] internal sealed class TaskSelector : Producer<TResult, TaskSelector._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal sealed class _ : Sink<TSource, TResult> { private readonly object _gate = new object(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly Func<TSource, CancellationToken, Task<TResult>> _selector; private volatile int _count; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] TaskSelector parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void Run(IObservable<TSource> source) { _count = 1; base.Run(source); } protected override void Dispose(bool disposing) { if (disposing) _cts.Cancel(); base.Dispose(disposing); } public override void OnNext(TSource value) { Task<TResult> task; try { Interlocked.Increment(ref _count); task = _selector(value, _cts.Token); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } if (task.IsCompleted) OnCompletedTask(task); else task.ContinueWith(delegate(Task<TResult> closureTask, object thisObject) { ((_)thisObject).OnCompletedTask(closureTask); }, this, _cts.Token); } private void OnCompletedTask(Task<TResult> task) { switch (task.Status) { case TaskStatus.RanToCompletion: lock (_gate) { ForwardOnNext(task.Result); } OnCompleted(); break; case TaskStatus.Faulted: lock (_gate) { ForwardOnError(task.GetSingleException()); } break; case TaskStatus.Canceled: if (!_cts.IsCancellationRequested) { lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, CancellationToken, Task<TResult>> _selector; public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector) { _source = source; _selector = selector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_source); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 0, 0 })] internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1 })] internal sealed class _ : Sink<TSource, TResult> { private readonly object _gate = new object(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector; private volatile int _count; private int _index; public _([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] TaskSelectorIndexed parent, IObserver<TResult> observer) : base(observer) { _selector = parent._selector; } public override void Run(IObservable<TSource> source) { _count = 1; base.Run(source); } protected override void Dispose(bool disposing) { if (disposing) _cts.Cancel(); base.Dispose(disposing); } public override void OnNext(TSource value) { Task<TResult> task; try { Interlocked.Increment(ref _count); task = _selector(value, checked(_index++), _cts.Token); } catch (Exception error) { lock (_gate) { ForwardOnError(error); } return; } if (task.IsCompleted) OnCompletedTask(task); else task.ContinueWith(delegate(Task<TResult> closureTask, object thisObject) { ((_)thisObject).OnCompletedTask(closureTask); }, this, _cts.Token); } private void OnCompletedTask(Task<TResult> task) { switch (task.Status) { case TaskStatus.RanToCompletion: lock (_gate) { ForwardOnNext(task.Result); } OnCompleted(); break; case TaskStatus.Faulted: lock (_gate) { ForwardOnError(task.GetSingleException()); } break; case TaskStatus.Canceled: if (!_cts.IsCancellationRequested) { lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector; public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector) { _source = source; _selector = selector; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] protected override _ CreateSink(IObserver<TResult> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0 })] _ sink) { sink.Run(_source); } } } }