QueryLanguageEx
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
namespace System.Reactive.Linq
{
internal class QueryLanguageEx : IQueryLanguageEx
{
private sealed class CreateWithEnumerableObservable<TResult> : ObservableBase<TResult>
{
private readonly Func<IObserver<TResult>, IEnumerable<IObservable<object>>> _iteratorMethod;
public CreateWithEnumerableObservable(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
{
_iteratorMethod = iteratorMethod;
}
protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
return Observable.Concat<object>(_iteratorMethod(observer)).Subscribe(new TerminalOnlyObserver<TResult>(observer));
}
}
private sealed class TerminalOnlyObserver<TResult> : IObserver<object>
{
private readonly IObserver<TResult> _observer;
public TerminalOnlyObserver(IObserver<TResult> observer)
{
_observer = observer;
}
public void OnCompleted()
{
_observer.OnCompleted();
}
public void OnError(Exception error)
{
_observer.OnError(error);
}
public void OnNext(object value)
{
}
}
private sealed class CreateWithOnlyEnumerableObservable<TResult> : ObservableBase<TResult>
{
private readonly Func<IEnumerable<IObservable<object>>> _iteratorMethod;
public CreateWithOnlyEnumerableObservable(Func<IEnumerable<IObservable<object>>> iteratorMethod)
{
_iteratorMethod = iteratorMethod;
}
protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
return Observable.Concat<object>(_iteratorMethod()).Subscribe(new TerminalOnlyObserver<TResult>(observer));
}
}
private sealed class ExpandObservable<TSource> : ObservableBase<TSource>
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TSource>> _selector;
private readonly IScheduler _scheduler;
public ExpandObservable(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler)
{
_source = source;
_selector = selector;
_scheduler = scheduler;
}
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
object outGate = new object();
Queue<IObservable<TSource>> q = new Queue<IObservable<TSource>>();
SerialDisposable i = new SerialDisposable();
CompositeDisposable d = new CompositeDisposable {
i
};
int activeCount = 0;
bool isAcquired = false;
lock (q) {
q.Enqueue(_source);
activeCount++;
}
<>c__DisplayClass4_0 <>c__DisplayClass4_;
<>c__DisplayClass4_.<SubscribeCore>g__ensureActive|0();
return d;
}
}
private sealed class ForkJoinObservable<TSource> : ObservableBase<TSource[]>
{
private readonly IEnumerable<IObservable<TSource>> _sources;
public ForkJoinObservable(IEnumerable<IObservable<TSource>> sources)
{
_sources = sources;
}
protected override IDisposable SubscribeCore(IObserver<TSource[]> observer)
{
IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(_sources);
int num = array.Length;
if (num == 0) {
observer.OnCompleted();
return Disposable.Empty;
}
CompositeDisposable group = new CompositeDisposable(array.Length);
object gate = new object();
bool finished = false;
bool[] hasResults = new bool[num];
bool[] hasCompleted = new bool[num];
List<TSource> results = new List<TSource>(num);
lock (gate) {
for (int i = 0; i < num; i++) {
int currentIndex = i;
IObservable<TSource> source = array[i];
results.Add(default(TSource));
group.Add(ObservableExtensions.Subscribe<TSource>(source, (Action<TSource>)delegate(TSource value) {
lock (gate) {
if (!finished) {
hasResults[currentIndex] = true;
results[currentIndex] = value;
}
}
}, (Action<Exception>)delegate(Exception error) {
lock (gate) {
finished = true;
observer.OnError(error);
group.Dispose();
}
}, (Action)delegate {
lock (gate) {
if (!finished) {
if (!hasResults[currentIndex])
observer.OnCompleted();
else {
hasCompleted[currentIndex] = true;
bool[] array2 = hasCompleted;
for (int j = 0; j < array2.Length; j++) {
if (!array2[j])
return;
}
finished = true;
observer.OnNext(results.ToArray());
observer.OnCompleted();
}
}
}
}));
}
}
return group;
}
}
private class ChainObservable<T> : ISubject<IObservable<T>, T>, IObserver<IObservable<T>>, IObservable<T>
{
private readonly T _head;
private readonly AsyncSubject<IObservable<T>> _tail = new AsyncSubject<IObservable<T>>();
public ChainObservable(T head)
{
_head = head;
}
public IDisposable Subscribe(IObserver<T> observer)
{
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.Add(Scheduler.ScheduleAction<(IObserver<T>, CompositeDisposable, ChainObservable<T>)>((IScheduler)CurrentThreadScheduler.Instance, (observer, compositeDisposable, this), (Action<(IObserver<T>, CompositeDisposable, ChainObservable<T>)>)delegate((IObserver<T> observer, CompositeDisposable g, ChainObservable<T> this) state) {
state.observer.OnNext(state.this._head);
state.g.Add(Observable.Merge<T>((IObservable<IObservable<T>>)state.this._tail).Subscribe(state.observer));
}));
return compositeDisposable;
}
public void OnCompleted()
{
OnNext(Observable.Empty<T>());
}
public void OnError(Exception error)
{
OnNext(Observable.Throw<T>(error));
}
public void OnNext(IObservable<T> value)
{
_tail.OnNext(value);
_tail.OnCompleted();
}
}
private sealed class CombineObservable<TLeft, TRight, TResult> : ObservableBase<TResult>
{
private readonly IObservable<TLeft> _leftSource;
private readonly IObservable<TRight> _rightSource;
private readonly Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> _combinerSelector;
public CombineObservable(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
{
_leftSource = leftSource;
_rightSource = rightSource;
_combinerSelector = combinerSelector;
}
protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
SingleAssignmentDisposable singleAssignmentDisposable2 = new SingleAssignmentDisposable();
IObserver<Either<Notification<TLeft>, Notification<TRight>>> observer2 = _combinerSelector(observer, singleAssignmentDisposable, singleAssignmentDisposable2);
object gate = new object();
singleAssignmentDisposable.Disposable = Observable.Synchronize<Either<Notification<TLeft>, Notification<TRight>>>(Observable.Select<Notification<TLeft>, Either<Notification<TLeft>, Notification<TRight>>>(Observable.Materialize<TLeft>(_leftSource), (Func<Notification<TLeft>, Either<Notification<TLeft>, Notification<TRight>>>)((Notification<TLeft> x) => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x))), gate).Subscribe(observer2);
singleAssignmentDisposable2.Disposable = Observable.Synchronize<Either<Notification<TLeft>, Notification<TRight>>>(Observable.Select<Notification<TRight>, Either<Notification<TLeft>, Notification<TRight>>>(Observable.Materialize<TRight>(_rightSource), (Func<Notification<TRight>, Either<Notification<TLeft>, Notification<TRight>>>)((Notification<TRight> x) => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x))), gate).Subscribe(observer2);
return StableCompositeDisposable.Create(singleAssignmentDisposable, singleAssignmentDisposable2);
}
}
public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
{
return new CreateWithEnumerableObservable<TResult>(iteratorMethod);
}
public virtual IObservable<Unit> Create(Func<IEnumerable<IObservable<object>>> iteratorMethod)
{
return new CreateWithOnlyEnumerableObservable<Unit>(iteratorMethod);
}
public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler)
{
return new ExpandObservable<TSource>(source, selector, scheduler);
}
public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector)
{
return source.Expand(selector, SchedulerDefaults.Iteration);
}
public virtual IObservable<TResult> ForkJoin<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
return Combine(first, second, delegate(IObserver<TResult> observer, IDisposable leftSubscription, IDisposable rightSubscription) {
bool leftStopped = false;
bool rightStopped = false;
bool hasLeft = false;
bool hasRight = false;
TFirst lastLeft = (TFirst)default(TFirst);
TSecond lastRight = (TSecond)default(TSecond);
return new BinaryObserver<TFirst, TSecond>((Action<Notification<TFirst>>)delegate(Notification<TFirst> left) {
switch (left.Kind) {
case NotificationKind.OnNext:
hasLeft = true;
lastLeft = (TFirst)left.Value;
break;
case NotificationKind.OnError:
rightSubscription.Dispose();
observer.OnError(left.Exception);
break;
case NotificationKind.OnCompleted:
leftStopped = true;
if (rightStopped) {
if (!hasLeft)
observer.OnCompleted();
else if (!hasRight) {
observer.OnCompleted();
} else {
TResult value2;
try {
value2 = resultSelector((TFirst)lastLeft, (TSecond)lastRight);
} catch (Exception error2) {
observer.OnError(error2);
return;
}
observer.OnNext(value2);
observer.OnCompleted();
}
}
break;
}
}, (Action<Notification<TSecond>>)delegate(Notification<TSecond> right) {
switch (right.Kind) {
case NotificationKind.OnNext:
hasRight = true;
lastRight = (TSecond)right.Value;
break;
case NotificationKind.OnError:
leftSubscription.Dispose();
observer.OnError(right.Exception);
break;
case NotificationKind.OnCompleted:
rightStopped = true;
if (leftStopped) {
if (!hasLeft)
observer.OnCompleted();
else if (!hasRight) {
observer.OnCompleted();
} else {
TResult value;
try {
value = resultSelector((TFirst)lastLeft, (TSecond)lastRight);
} catch (Exception error) {
observer.OnError(error);
return;
}
observer.OnNext(value);
observer.OnCompleted();
}
}
break;
}
});
});
}
public virtual IObservable<TSource[]> ForkJoin<TSource>(params IObservable<TSource>[] sources)
{
return ((IEnumerable<IObservable<TSource>>)sources).ForkJoin();
}
public virtual IObservable<TSource[]> ForkJoin<TSource>(IEnumerable<IObservable<TSource>> sources)
{
return new ForkJoinObservable<TSource>(sources);
}
public virtual IObservable<TResult> Let<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> function)
{
return function(source);
}
public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector)
{
return ManySelect(source, selector, DefaultScheduler.Instance);
}
public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector, IScheduler scheduler)
{
return Observable.Defer(delegate {
ChainObservable<TSource> chain = null;
return Observable.Select<IObservable<TSource>, TResult>(Observable.ObserveOn<IObservable<TSource>>(Observable.Do<IObservable<TSource>>(Observable.Select<TSource, IObservable<TSource>>(source, (Func<TSource, IObservable<TSource>>)delegate(TSource x) {
ChainObservable<TSource> chainObservable = new ChainObservable<TSource>(x);
((ChainObservable<TSource>)chain)?.OnNext((IObservable<TSource>)chainObservable);
chain = (ChainObservable<TSource>)chainObservable;
return chainObservable;
}), (Action<IObservable<TSource>>)delegate {
}, (Action<Exception>)delegate(Exception exception) {
((ChainObservable<TSource>)chain)?.OnError(exception);
}, (Action)delegate {
((ChainObservable<TSource>)chain)?.OnCompleted();
}), scheduler), selector);
});
}
public virtual ListObservable<TSource> ToListObservable<TSource>(IObservable<TSource> source)
{
return new ListObservable<TSource>(source);
}
private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
{
return new CombineObservable<TLeft, TRight, TResult>(leftSource, rightSource, combinerSelector);
}
}
}