SelectMany<TSource, TResult>
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class SelectMany<TSource, TResult>
{
internal class ObservableSelector : Producer<TResult, ObservableSelector._>
{
internal class _ : Sink<TSource, TResult>
{
private sealed class InnerObserver : SafeObserver<TResult>
{
private readonly _ _parent;
public InnerObserver(_ parent)
{
_parent = parent;
}
public override void OnNext(TResult value)
{
lock (_parent._gate) {
_parent.ForwardOnNext(value);
}
}
public override void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public override void OnCompleted()
{
_parent._group.Remove(this);
if (_parent._isStopped && _parent._group.Count == 0) {
lock (_parent._gate) {
_parent.ForwardOnCompleted();
}
}
}
}
protected readonly object _gate = new object();
private readonly Func<TSource, IObservable<TResult>> _selector;
private readonly CompositeDisposable _group = new CompositeDisposable();
private volatile bool _isStopped;
public _(ObservableSelector parent, IObserver<TResult> observer)
: base(observer)
{
_selector = parent._selector;
}
public override void OnNext(TSource value)
{
IObservable<TResult> observable = null;
try {
observable = _selector(value);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
SubscribeInner(observable);
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
Final();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
_group.Dispose();
}
protected void Final()
{
_isStopped = true;
if (_group.Count == 0) {
lock (_gate) {
ForwardOnCompleted();
}
} else
DisposeUpstream();
}
protected void SubscribeInner(IObservable<TResult> inner)
{
InnerObserver innerObserver = new InnerObserver(this);
_group.Add(innerObserver);
innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)innerObserver));
}
}
protected readonly IObservable<TSource> _source;
protected readonly Func<TSource, IObservable<TResult>> _selector;
public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class ObservableSelectors : ObservableSelector
{
internal new sealed class _ : ObservableSelector._
{
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public _(ObservableSelectors parent, IObserver<TResult> observer)
: base((ObservableSelector)parent, observer)
{
_selectorOnError = parent._selectorOnError;
_selectorOnCompleted = parent._selectorOnCompleted;
}
public override void OnError(Exception error)
{
if (_selectorOnError != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnError(error);
} catch (Exception error2) {
lock (_gate) {
ForwardOnError(error2);
}
return;
}
SubscribeInner(observable);
Final();
} else
base.OnError(error);
}
public override void OnCompleted()
{
if (_selectorOnCompleted != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnCompleted();
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
SubscribeInner(observable);
}
Final();
}
}
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public ObservableSelectors(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
: base(source, selector)
{
_selectorOnError = selectorOnError;
_selectorOnCompleted = selectorOnCompleted;
}
protected override ObservableSelector._ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
}
internal class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._>
{
internal class _ : Sink<TSource, TResult>
{
private sealed class InnerObserver : SafeObserver<TResult>
{
private readonly _ _parent;
public InnerObserver(_ parent)
{
_parent = parent;
}
public override void OnNext(TResult value)
{
lock (_parent._gate) {
_parent.ForwardOnNext(value);
}
}
public override void OnError(Exception error)
{
lock (_parent._gate) {
_parent.ForwardOnError(error);
}
}
public override void OnCompleted()
{
_parent._group.Remove(this);
if (_parent._isStopped && _parent._group.Count == 0) {
lock (_parent._gate) {
_parent.ForwardOnCompleted();
}
}
}
}
private readonly object _gate = new object();
private readonly CompositeDisposable _group = new CompositeDisposable();
protected readonly Func<TSource, int, IObservable<TResult>> _selector;
private int _index;
private volatile bool _isStopped;
public _(ObservableSelectorIndexed parent, IObserver<TResult> observer)
: base(observer)
{
_selector = parent._selector;
}
public override void OnNext(TSource value)
{
IObservable<TResult> observable = null;
try {
observable = _selector(value, checked(_index++));
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
SubscribeInner(observable);
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
Final();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
_group.Dispose();
}
protected void Final()
{
_isStopped = true;
if (_group.Count == 0) {
lock (_gate) {
ForwardOnCompleted();
}
} else
DisposeUpstream();
}
protected void SubscribeInner(IObservable<TResult> inner)
{
InnerObserver innerObserver = new InnerObserver(this);
_group.Add(innerObserver);
innerObserver.SetResource(ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)innerObserver));
}
}
protected readonly IObservable<TSource> _source;
protected readonly Func<TSource, int, IObservable<TResult>> _selector;
public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class ObservableSelectorsIndexed : ObservableSelectorIndexed
{
internal new sealed class _ : ObservableSelectorIndexed._
{
private readonly object _gate = new object();
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public _(ObservableSelectorsIndexed parent, IObserver<TResult> observer)
: base((ObservableSelectorIndexed)parent, observer)
{
_selectorOnError = parent._selectorOnError;
_selectorOnCompleted = parent._selectorOnCompleted;
}
public override void OnError(Exception error)
{
if (_selectorOnError != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnError(error);
} catch (Exception error2) {
lock (_gate) {
ForwardOnError(error2);
}
return;
}
SubscribeInner(observable);
Final();
} else
base.OnError(error);
}
public override void OnCompleted()
{
if (_selectorOnCompleted != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnCompleted();
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
SubscribeInner(observable);
}
Final();
}
}
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
: base(source, selector)
{
_selectorOnError = selectorOnError;
_selectorOnCompleted = selectorOnCompleted;
}
protected override ObservableSelectorIndexed._ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
}
internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly Func<TSource, IEnumerable<TResult>> _selector;
public _(EnumerableSelector parent, IObserver<TResult> observer)
: base(observer)
{
_selector = parent._selector;
}
public override void OnNext(TSource value)
{
IEnumerable<TResult> enumerable = null;
try {
enumerable = _selector(value);
} catch (Exception error) {
ForwardOnError(error);
return;
}
IEnumerator<TResult> enumerator;
try {
enumerator = enumerable.GetEnumerator();
} catch (Exception error2) {
ForwardOnError(error2);
return;
}
using (enumerator) {
bool flag = true;
while (flag) {
TResult value2 = default(TResult);
try {
flag = enumerator.MoveNext();
if (flag)
value2 = enumerator.Current;
} catch (Exception error3) {
ForwardOnError(error3);
return;
}
if (flag)
ForwardOnNext(value2);
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IEnumerable<TResult>> _selector;
public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
private int _index;
public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer)
: base(observer)
{
_selector = parent._selector;
}
public override void OnNext(TSource value)
{
IEnumerable<TResult> enumerable = null;
try {
enumerable = _selector(value, checked(_index++));
} catch (Exception error) {
ForwardOnError(error);
return;
}
IEnumerator<TResult> enumerator;
try {
enumerator = enumerable.GetEnumerator();
} catch (Exception error2) {
ForwardOnError(error2);
return;
}
using (enumerator) {
bool flag = true;
while (flag) {
TResult value2 = default(TResult);
try {
flag = enumerator.MoveNext();
if (flag)
value2 = enumerator.Current;
} catch (Exception error3) {
ForwardOnError(error3);
return;
}
if (flag)
ForwardOnNext(value2);
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class TaskSelector : Producer<TResult, TaskSelector._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly object _gate = new object();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
private volatile int _count;
public _(TaskSelector parent, IObserver<TResult> observer)
: base(observer)
{
_selector = parent._selector;
}
public override void Run(IObservable<TSource> source)
{
_count = 1;
base.Run(source);
}
protected override void Dispose(bool disposing)
{
if (disposing)
_cts.Cancel();
base.Dispose(disposing);
}
public override void OnNext(TSource value)
{
Task<TResult> task = null;
try {
Interlocked.Increment(ref _count);
task = _selector(value, _cts.Token);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
if (task.IsCompleted)
OnCompletedTask(task);
else
task.ContinueWith(delegate(Task<TResult> closureTask, object thisObject) {
((_)thisObject).OnCompletedTask(closureTask);
}, this, _cts.Token);
}
private void OnCompletedTask(Task<TResult> task)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
lock (_gate) {
ForwardOnNext(task.Result);
}
OnCompleted();
break;
case TaskStatus.Faulted:
lock (_gate) {
ForwardOnError(task.Exception.InnerException);
}
break;
case TaskStatus.Canceled:
if (!_cts.IsCancellationRequested) {
lock (_gate) {
ForwardOnError(new TaskCanceledException(task));
}
}
break;
}
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
if (Interlocked.Decrement(ref _count) == 0) {
lock (_gate) {
ForwardOnCompleted();
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._>
{
internal sealed class _ : Sink<TSource, TResult>
{
private readonly object _gate = new object();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
private volatile int _count;
private int _index;
public _(TaskSelectorIndexed parent, IObserver<TResult> observer)
: base(observer)
{
_selector = parent._selector;
}
public override void Run(IObservable<TSource> source)
{
_count = 1;
base.Run(source);
}
protected override void Dispose(bool disposing)
{
if (disposing)
_cts.Cancel();
base.Dispose(disposing);
}
public override void OnNext(TSource value)
{
Task<TResult> task = null;
try {
Interlocked.Increment(ref _count);
task = _selector(value, checked(_index++), _cts.Token);
} catch (Exception error) {
lock (_gate) {
ForwardOnError(error);
}
return;
}
if (task.IsCompleted)
OnCompletedTask(task);
else
task.ContinueWith(delegate(Task<TResult> closureTask, object thisObject) {
((_)thisObject).OnCompletedTask(closureTask);
}, this, _cts.Token);
}
private void OnCompletedTask(Task<TResult> task)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
lock (_gate) {
ForwardOnNext(task.Result);
}
OnCompleted();
break;
case TaskStatus.Faulted:
lock (_gate) {
ForwardOnError(task.Exception.InnerException);
}
break;
case TaskStatus.Canceled:
if (!_cts.IsCancellationRequested) {
lock (_gate) {
ForwardOnError(new TaskCanceledException(task));
}
}
break;
}
}
public override void OnError(Exception error)
{
lock (_gate) {
ForwardOnError(error);
}
}
public override void OnCompleted()
{
if (Interlocked.Decrement(ref _count) == 0) {
lock (_gate) {
ForwardOnCompleted();
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer)
{
return new _(this, observer);
}
protected override void Run(_ sink)
{
sink.Run(_source);
}
}
}
}