SelectMany<TSource, TCollection, TResult>
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class SelectMany<TSource, TCollection, TResult>
{
internal sealed class ObservableSelector : Producer<TResult, ObservableSelector._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private sealed class InnerObserver : SafeObserver<TCollection>
{
private readonly _ _parent;
private readonly TSource _value;
public InnerObserver(_ parent, TSource value)
{
_parent = parent;
_value = value;
}
public override void OnNext(TCollection value)
{
TResult value2;
try {
value2 = _parent._resultSelector(_value, value);
} catch (Exception error) {
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
return;
}
lock (_parent._gate) {
_parent.ForwardOnNext(value2);
}
}
public override void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public override void OnCompleted()
{
_parent._group.Remove(this);
if (_parent._isStopped && _parent._group.Count == 0) {
lock (_parent._gate) {
_parent.ForwardOnCompleted();
}
}
}
}
private readonly object _gate = new object();
private readonly CompositeDisposable _group = new CompositeDisposable();
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
private volatile bool _isStopped;
public _(ObservableSelector parent, IObserver<TResult> observer)
: base(observer)
{
_collectionSelector = parent._collectionSelector;
_resultSelector = parent._resultSelector;
}
public override void OnNext(TSource value)
{
IObservable<TCollection> observable = null;
try {
observable = _collectionSelector(value);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
InnerObserver innerObserver = new InnerObserver(this, value);
_group.Add(innerObserver);
innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TCollection>(observable, (IObserver<TCollection>)innerObserver));
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
_isStopped = true;
if (_group.Count == 0) {
lock (_gate) {
ForwardOnCompleted();
}
} else
DisposeUpstream();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
_group.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
_source = source;
_collectionSelector = collectionSelector;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private sealed class InnerObserver : SafeObserver<TCollection>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly int _valueIndex;
private int _index;
public InnerObserver(_ parent, TSource value, int index)
{
_parent = parent;
_value = value;
_valueIndex = index;
}
public override void OnNext(TCollection value)
{
TResult value2;
try {
value2 = _parent._resultSelector(_value, _valueIndex, value, checked(_index++));
} catch (Exception error) {
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
return;
}
lock (_parent._gate) {
_parent.ForwardOnNext(value2);
}
}
public override void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public override void OnCompleted()
{
_parent._group.Remove(this);
if (_parent._isStopped && _parent._group.Count == 0) {
lock (_parent._gate) {
_parent.ForwardOnCompleted();
}
}
}
}
private readonly object _gate = new object();
private readonly CompositeDisposable _group = new CompositeDisposable();
private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector;
private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
private volatile bool _isStopped;
private int _index;
public _(ObservableSelectorIndexed parent, IObserver<TResult> observer)
: base(observer)
{
_collectionSelector = parent._collectionSelector;
_resultSelector = parent._resultSelector;
}
public override void OnNext(TSource value)
{
int num = checked(_index++);
IObservable<TCollection> observable = null;
try {
observable = _collectionSelector(value, num);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
InnerObserver innerObserver = new InnerObserver(this, value, num);
_group.Add(innerObserver);
innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TCollection>(observable, (IObserver<TCollection>)innerObserver));
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
_isStopped = true;
if (_group.Count == 0) {
lock (_gate) {
ForwardOnCompleted();
}
} else
DisposeUpstream();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
_group.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector;
private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
_collectionSelector = collectionSelector;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
public _(EnumerableSelector parent, IObserver<TResult> observer)
: base(observer)
{
_collectionSelector = parent._collectionSelector;
_resultSelector = parent._resultSelector;
}
public override void OnNext(TSource value)
{
IEnumerable<TCollection> enumerable = null;
try {
enumerable = _collectionSelector(value);
} catch (Exception error) {
ForwardOnError(error);
return;
}
IEnumerator<TCollection> enumerator;
try {
enumerator = enumerable.GetEnumerator();
} catch (Exception error2) {
ForwardOnError(error2);
return;
}
using (enumerator) {
bool flag = true;
while (flag) {
TResult value2 = default(TResult);
try {
flag = enumerator.MoveNext();
if (flag)
value2 = _resultSelector(value, enumerator.Current);
} catch (Exception error3) {
ForwardOnError(error3);
return;
}
if (flag)
ForwardOnNext(value2);
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
_source = source;
_collectionSelector = collectionSelector;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector;
private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
private int _index;
public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer)
: base(observer)
{
_collectionSelector = parent._collectionSelector;
_resultSelector = parent._resultSelector;
}
public override void OnNext(TSource value)
{
int arg = checked(_index++);
IEnumerable<TCollection> enumerable = null;
try {
enumerable = _collectionSelector(value, arg);
} catch (Exception error) {
ForwardOnError(error);
return;
}
IEnumerator<TCollection> enumerator;
try {
enumerator = enumerable.GetEnumerator();
} catch (Exception error2) {
ForwardOnError(error2);
return;
}
using (enumerator) {
int num = 0;
bool flag = true;
while (flag) {
TResult value2 = default(TResult);
try {
flag = enumerator.MoveNext();
if (flag)
value2 = _resultSelector(value, arg, enumerator.Current, num++);
} catch (Exception error3) {
ForwardOnError(error3);
return;
}
if (flag)
ForwardOnNext(value2);
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector;
private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
_collectionSelector = collectionSelector;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class TaskSelector : Producer<TResult, TaskSelector._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly object _gate = new object();
private readonly CancellationTokenSource _cancel = new CancellationTokenSource();
private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
private volatile int _count;
public _(TaskSelector parent, IObserver<TResult> observer)
: base(observer)
{
_collectionSelector = parent._collectionSelector;
_resultSelector = parent._resultSelector;
}
public override void Run(IObservable<TSource> source)
{
_count = 1;
base.Run(source);
}
protected override void Dispose(bool disposing)
{
if (disposing)
_cancel.Cancel();
base.Dispose(disposing);
}
public override void OnNext(TSource value)
{
Task<TCollection> task = null;
try {
Interlocked.Increment(ref _count);
task = _collectionSelector(value, _cancel.Token);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
if (task.IsCompleted)
OnCompletedTask(value, task);
else
System.Threading.Tasks.TaskExtensions.ContinueWithState<TCollection, (_, TSource)>(task, (Action<Task<TCollection>, (_, TSource)>)delegate(Task<TCollection> t, (_ this, TSource value) tuple) {
tuple.this.OnCompletedTask(tuple.value, t);
}, (this, value), _cancel.Token);
}
private void OnCompletedTask(TSource value, Task<TCollection> task)
{
switch (task.Status) {
case TaskStatus.RanToCompletion: {
TResult value2;
try {
value2 = _resultSelector(value, task.Result);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
lock (_gate) {
ForwardOnNext(value2);
}
OnCompleted();
break;
}
case TaskStatus.Faulted:
lock (_gate) {
ForwardOnError(task.Exception.InnerException);
}
break;
case TaskStatus.Canceled:
if (!_cancel.IsCancellationRequested) {
lock (_gate) {
ForwardOnError(new TaskCanceledException(task));
}
}
break;
}
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
if (Interlocked.Decrement(ref _count) == 0) {
lock (_gate) {
ForwardOnCompleted();
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
_source = source;
_collectionSelector = collectionSelector;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly object _gate = new object();
private readonly CancellationTokenSource _cancel = new CancellationTokenSource();
private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector;
private readonly Func<TSource, int, TCollection, TResult> _resultSelector;
private volatile int _count;
private int _index;
public _(TaskSelectorIndexed parent, IObserver<TResult> observer)
: base(observer)
{
_collectionSelector = parent._collectionSelector;
_resultSelector = parent._resultSelector;
}
public override void Run(IObservable<TSource> source)
{
_count = 1;
base.Run(source);
}
protected override void Dispose(bool disposing)
{
if (disposing)
_cancel.Cancel();
base.Dispose(disposing);
}
public override void OnNext(TSource value)
{
int num = checked(_index++);
Task<TCollection> task = null;
try {
Interlocked.Increment(ref _count);
task = _collectionSelector(value, num, _cancel.Token);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
if (task.IsCompleted)
OnCompletedTask(value, num, task);
else
System.Threading.Tasks.TaskExtensions.ContinueWithState<TCollection, (_, TSource, int)>(task, (Action<Task<TCollection>, (_, TSource, int)>)delegate(Task<TCollection> t, (_ this, TSource value, int index) tuple) {
tuple.this.OnCompletedTask(tuple.value, tuple.index, t);
}, (this, value, num), _cancel.Token);
}
private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
{
switch (task.Status) {
case TaskStatus.RanToCompletion: {
TResult value2;
try {
value2 = _resultSelector(value, index, task.Result);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
lock (_gate) {
ForwardOnNext(value2);
}
OnCompleted();
break;
}
case TaskStatus.Faulted:
lock (_gate) {
ForwardOnError(task.Exception.InnerException);
}
break;
case TaskStatus.Canceled:
if (!_cancel.IsCancellationRequested) {
lock (_gate) {
ForwardOnError(new TaskCanceledException(task));
}
}
break;
}
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
if (Interlocked.Decrement(ref _count) == 0) {
lock (_gate) {
ForwardOnCompleted();
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector;
private readonly Func<TSource, int, TCollection, TResult> _resultSelector;
public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
{
_source = source;
_collectionSelector = collectionSelector;
_resultSelector = resultSelector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
}
}