SelectMany<TSource, TResult>
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class SelectMany<TSource, TResult>
{
internal class ObservableSelector : Producer<TResult, ObservableSelector._>
{
internal class _ : Sink<TResult>, IObserver<TSource>
{
private sealed class InnerObserver : IObserver<TResult>
{
private readonly _ _parent;
private readonly IDisposable _self;
public InnerObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
}
public void OnNext(TResult value)
{
lock (_parent._gate) {
_parent._observer.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
_parent._group.Remove(_self);
if (_parent._isStopped && _parent._group.Count == 1) {
lock (_parent._gate) {
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
}
}
protected readonly object _gate = new object();
private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
private readonly CompositeDisposable _group = new CompositeDisposable();
private readonly Func<TSource, IObservable<TResult>> _selector;
private bool _isStopped;
public _(ObservableSelector parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = parent._selector;
_group.Add(_sourceSubscription);
}
public IDisposable Run(IObservable<TSource> source)
{
_isStopped = false;
_sourceSubscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this);
return _group;
}
public void OnNext(TSource value)
{
IObservable<TResult> observable = null;
try {
observable = _selector(value);
} catch (Exception error) {
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
return;
}
SubscribeInner(observable);
}
public virtual void OnError(Exception error)
{
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
}
public virtual void OnCompleted()
{
Final();
}
protected void Final()
{
_isStopped = true;
if (_group.Count == 1) {
lock (_gate) {
_observer.OnCompleted();
base.Dispose();
}
} else
_sourceSubscription.Dispose();
}
protected void SubscribeInner(IObservable<TResult> inner)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_group.Add(singleAssignmentDisposable);
singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)new InnerObserver(this, singleAssignmentDisposable));
}
}
protected readonly IObservable<TSource> _source;
protected readonly Func<TSource, IObservable<TResult>> _selector;
public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_source);
}
}
internal sealed class ObservableSelectors : ObservableSelector
{
internal new sealed class _ : ObservableSelector._
{
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public _(ObservableSelectors parent, IObserver<TResult> observer, IDisposable cancel)
: base((ObservableSelector)parent, observer, cancel)
{
_selectorOnError = parent._selectorOnError;
_selectorOnCompleted = parent._selectorOnCompleted;
}
public override void OnError(Exception error)
{
if (_selectorOnError != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnError(error);
} catch (Exception error2) {
lock (_gate) {
_observer.OnError(error2);
base.Dispose();
}
return;
}
SubscribeInner(observable);
Final();
} else
base.OnError(error);
}
public override void OnCompleted()
{
if (_selectorOnCompleted != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnCompleted();
} catch (Exception error) {
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
return;
}
SubscribeInner(observable);
}
Final();
}
}
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public ObservableSelectors(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
: base(source, selector)
{
_selectorOnError = selectorOnError;
_selectorOnCompleted = selectorOnCompleted;
}
protected override ObservableSelector._ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
}
internal class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._>
{
internal class _ : Sink<TResult>, IObserver<TSource>
{
private sealed class InnerObserver : IObserver<TResult>
{
private readonly _ _parent;
private readonly IDisposable _self;
public InnerObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
}
public void OnNext(TResult value)
{
lock (_parent._gate) {
_parent._observer.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
_parent._group.Remove(_self);
if (_parent._isStopped && _parent._group.Count == 1) {
lock (_parent._gate) {
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
}
}
private readonly object _gate = new object();
private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
private readonly CompositeDisposable _group = new CompositeDisposable();
protected readonly Func<TSource, int, IObservable<TResult>> _selector;
private bool _isStopped;
private int _index;
public _(ObservableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = parent._selector;
_group.Add(_sourceSubscription);
}
public IDisposable Run(IObservable<TSource> source)
{
_isStopped = false;
_sourceSubscription.Disposable = ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this);
return _group;
}
public void OnNext(TSource value)
{
IObservable<TResult> observable = null;
try {
observable = _selector(value, checked(_index++));
} catch (Exception error) {
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
return;
}
SubscribeInner(observable);
}
public virtual void OnError(Exception error)
{
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
}
public virtual void OnCompleted()
{
Final();
}
protected void Final()
{
_isStopped = true;
if (_group.Count == 1) {
lock (_gate) {
_observer.OnCompleted();
base.Dispose();
}
} else
_sourceSubscription.Dispose();
}
protected void SubscribeInner(IObservable<TResult> inner)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
_group.Add(singleAssignmentDisposable);
singleAssignmentDisposable.Disposable = ObservableExtensions.SubscribeSafe<TResult>(inner, (IObserver<TResult>)new InnerObserver(this, singleAssignmentDisposable));
}
}
protected readonly IObservable<TSource> _source;
protected readonly Func<TSource, int, IObservable<TResult>> _selector;
public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_source);
}
}
internal sealed class ObservableSelectorsIndexed : ObservableSelectorIndexed
{
internal new sealed class _ : ObservableSelectorIndexed._
{
private readonly object _gate = new object();
private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
private readonly CompositeDisposable _group = new CompositeDisposable();
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public _(ObservableSelectorsIndexed parent, IObserver<TResult> observer, IDisposable cancel)
: base((ObservableSelectorIndexed)parent, observer, cancel)
{
_selectorOnError = parent._selectorOnError;
_selectorOnCompleted = parent._selectorOnCompleted;
_group.Add(_sourceSubscription);
}
public override void OnError(Exception error)
{
if (_selectorOnError != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnError(error);
} catch (Exception error2) {
lock (_gate) {
_observer.OnError(error2);
base.Dispose();
}
return;
}
SubscribeInner(observable);
Final();
} else
base.OnError(error);
}
public override void OnCompleted()
{
if (_selectorOnCompleted != null) {
IObservable<TResult> observable = null;
try {
observable = _selectorOnCompleted();
} catch (Exception error) {
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
return;
}
SubscribeInner(observable);
}
Final();
}
}
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
: base(source, selector)
{
_selectorOnError = selectorOnError;
_selectorOnCompleted = selectorOnCompleted;
}
protected override ObservableSelectorIndexed._ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
}
internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._>
{
internal sealed class _ : Sink<TResult>, IObserver<TSource>
{
private readonly Func<TSource, IEnumerable<TResult>> _selector;
public _(EnumerableSelector parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = parent._selector;
}
public void OnNext(TSource value)
{
IEnumerable<TResult> enumerable = null;
try {
enumerable = _selector(value);
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
IEnumerator<TResult> enumerator = null;
try {
enumerator = enumerable.GetEnumerator();
} catch (Exception error2) {
_observer.OnError(error2);
base.Dispose();
return;
}
try {
bool flag = true;
while (flag) {
flag = false;
TResult value2 = default(TResult);
try {
flag = enumerator.MoveNext();
if (flag)
value2 = enumerator.Current;
} catch (Exception error3) {
_observer.OnError(error3);
base.Dispose();
return;
}
if (flag)
_observer.OnNext(value2);
}
} finally {
enumerator?.Dispose();
}
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IEnumerable<TResult>> _selector;
public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink);
}
}
internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._>
{
internal sealed class _ : Sink<TResult>, IObserver<TSource>
{
private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
private int _index;
public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = parent._selector;
}
public void OnNext(TSource value)
{
IEnumerable<TResult> enumerable = null;
try {
enumerable = _selector(value, checked(_index++));
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
IEnumerator<TResult> enumerator = null;
try {
enumerator = enumerable.GetEnumerator();
} catch (Exception error2) {
_observer.OnError(error2);
base.Dispose();
return;
}
try {
bool flag = true;
while (flag) {
flag = false;
TResult value2 = default(TResult);
try {
flag = enumerator.MoveNext();
if (flag)
value2 = enumerator.Current;
} catch (Exception error3) {
_observer.OnError(error3);
base.Dispose();
return;
}
if (flag)
_observer.OnNext(value2);
}
} finally {
enumerator?.Dispose();
}
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)sink);
}
}
internal sealed class TaskSelector : Producer<TResult, TaskSelector._>
{
internal sealed class _ : Sink<TResult>, IObserver<TSource>
{
private readonly object _gate = new object();
private readonly CancellationDisposable _cancel = new CancellationDisposable();
private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
private volatile int _count;
public _(TaskSelector parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = parent._selector;
}
public IDisposable Run(IObservable<TSource> source)
{
_count = 1;
return StableCompositeDisposable.Create(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this), _cancel);
}
public void OnNext(TSource value)
{
Task<TResult> task = null;
try {
Interlocked.Increment(ref _count);
task = _selector(value, _cancel.Token);
} catch (Exception error) {
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
return;
}
if (task.IsCompleted)
OnCompletedTask(task);
else
task.ContinueWith(OnCompletedTask);
}
private void OnCompletedTask(Task<TResult> task)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
lock (_gate) {
_observer.OnNext(task.Result);
}
OnCompleted();
break;
case TaskStatus.Faulted:
lock (_gate) {
_observer.OnError(task.Exception.InnerException);
base.Dispose();
}
break;
case TaskStatus.Canceled:
if (!_cancel.IsDisposed) {
lock (_gate) {
_observer.OnError(new TaskCanceledException(task));
base.Dispose();
}
}
break;
}
}
public void OnError(Exception error)
{
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
}
public void OnCompleted()
{
if (Interlocked.Decrement(ref _count) == 0) {
lock (_gate) {
_observer.OnCompleted();
base.Dispose();
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_source);
}
}
internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._>
{
internal sealed class _ : Sink<TResult>, IObserver<TSource>
{
private readonly object _gate = new object();
private readonly CancellationDisposable _cancel = new CancellationDisposable();
private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
private volatile int _count;
private int _index;
public _(TaskSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_selector = parent._selector;
}
public IDisposable Run(IObservable<TSource> source)
{
_count = 1;
return StableCompositeDisposable.Create(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this), _cancel);
}
public void OnNext(TSource value)
{
Task<TResult> task = null;
try {
Interlocked.Increment(ref _count);
task = _selector(value, checked(_index++), _cancel.Token);
} catch (Exception error) {
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
return;
}
if (task.IsCompleted)
OnCompletedTask(task);
else
task.ContinueWith(OnCompletedTask);
}
private void OnCompletedTask(Task<TResult> task)
{
switch (task.Status) {
case TaskStatus.RanToCompletion:
lock (_gate) {
_observer.OnNext(task.Result);
}
OnCompleted();
break;
case TaskStatus.Faulted:
lock (_gate) {
_observer.OnError(task.Exception.InnerException);
base.Dispose();
}
break;
case TaskStatus.Canceled:
if (!_cancel.IsDisposed) {
lock (_gate) {
_observer.OnError(new TaskCanceledException(task));
base.Dispose();
}
}
break;
}
}
public void OnError(Exception error)
{
lock (_gate) {
_observer.OnError(error);
base.Dispose();
}
}
public void OnCompleted()
{
if (Interlocked.Decrement(ref _count) == 0) {
lock (_gate) {
_observer.OnCompleted();
base.Dispose();
}
}
}
}
private readonly IObservable<TSource> _source;
private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
{
_source = source;
_selector = selector;
}
protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel)
{
return new _(this, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(_source);
}
}
}
}