<PackageReference Include="System.Reactive" Version="4.1.4" />

QueryLanguageEx

using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq { internal class QueryLanguageEx : IQueryLanguageEx { private sealed class CreateWithEnumerableObservable<TResult> : ObservableBase<TResult> { private readonly Func<IObserver<TResult>, IEnumerable<IObservable<object>>> _iteratorMethod; public CreateWithEnumerableObservable(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod) { _iteratorMethod = iteratorMethod; } protected override IDisposable SubscribeCore(IObserver<TResult> observer) { return Observable.Concat<object>(_iteratorMethod(observer)).Subscribe(new TerminalOnlyObserver<TResult>(observer)); } } private sealed class TerminalOnlyObserver<TResult> : IObserver<object> { private readonly IObserver<TResult> _observer; public TerminalOnlyObserver(IObserver<TResult> observer) { _observer = observer; } public void OnCompleted() { _observer.OnCompleted(); } public void OnError(Exception error) { _observer.OnError(error); } public void OnNext(object value) { } } private sealed class CreateWithOnlyEnumerableObservable<TResult> : ObservableBase<TResult> { private readonly Func<IEnumerable<IObservable<object>>> _iteratorMethod; public CreateWithOnlyEnumerableObservable(Func<IEnumerable<IObservable<object>>> iteratorMethod) { _iteratorMethod = iteratorMethod; } protected override IDisposable SubscribeCore(IObserver<TResult> observer) { return Observable.Concat<object>(_iteratorMethod()).Subscribe(new TerminalOnlyObserver<TResult>(observer)); } } private sealed class ExpandObservable<TSource> : ObservableBase<TSource> { private readonly IObservable<TSource> _source; private readonly Func<TSource, IObservable<TSource>> _selector; private readonly IScheduler _scheduler; public ExpandObservable(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler) { _source = source; _selector = selector; _scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver<TSource> observer) { object outGate = new object(); Queue<IObservable<TSource>> q = new Queue<IObservable<TSource>>(); SerialDisposable i = new SerialDisposable(); CompositeDisposable d = new CompositeDisposable { i }; int activeCount = 0; bool isAcquired = false; lock (q) { q.Enqueue(_source); activeCount++; } <>c__DisplayClass4_0 <>c__DisplayClass4_; <>c__DisplayClass4_.<SubscribeCore>g__ensureActive|0(); return d; } } private sealed class ForkJoinObservable<TSource> : ObservableBase<TSource[]> { private readonly IEnumerable<IObservable<TSource>> _sources; public ForkJoinObservable(IEnumerable<IObservable<TSource>> sources) { _sources = sources; } protected override IDisposable SubscribeCore(IObserver<TSource[]> observer) { IObservable<TSource>[] array = Enumerable.ToArray<IObservable<TSource>>(_sources); int num = array.Length; if (num == 0) { observer.OnCompleted(); return Disposable.Empty; } CompositeDisposable group = new CompositeDisposable(array.Length); object gate = new object(); bool finished = false; bool[] hasResults = new bool[num]; bool[] hasCompleted = new bool[num]; List<TSource> results = new List<TSource>(num); lock (gate) { for (int i = 0; i < num; i++) { int currentIndex = i; IObservable<TSource> source = array[i]; results.Add(default(TSource)); group.Add(ObservableExtensions.Subscribe<TSource>(source, (Action<TSource>)delegate(TSource value) { lock (gate) { if (!finished) { hasResults[currentIndex] = true; results[currentIndex] = value; } } }, (Action<Exception>)delegate(Exception error) { lock (gate) { finished = true; observer.OnError(error); group.Dispose(); } }, (Action)delegate { lock (gate) { if (!finished) { if (!hasResults[currentIndex]) observer.OnCompleted(); else { hasCompleted[currentIndex] = true; bool[] array2 = hasCompleted; for (int j = 0; j < array2.Length; j++) { if (!array2[j]) return; } finished = true; observer.OnNext(results.ToArray()); observer.OnCompleted(); } } } })); } } return group; } } private class ChainObservable<T> : ISubject<IObservable<T>, T>, IObserver<IObservable<T>>, IObservable<T> { private readonly T _head; private readonly AsyncSubject<IObservable<T>> _tail = new AsyncSubject<IObservable<T>>(); public ChainObservable(T head) { _head = head; } public IDisposable Subscribe(IObserver<T> observer) { CompositeDisposable compositeDisposable = new CompositeDisposable(); compositeDisposable.Add(Scheduler.ScheduleAction<(IObserver<T>, CompositeDisposable, ChainObservable<T>)>((IScheduler)CurrentThreadScheduler.Instance, (observer, compositeDisposable, this), (Action<(IObserver<T>, CompositeDisposable, ChainObservable<T>)>)delegate((IObserver<T> observer, CompositeDisposable g, ChainObservable<T> this) state) { state.observer.OnNext(state.this._head); state.g.Add(Observable.Merge<T>((IObservable<IObservable<T>>)state.this._tail).Subscribe(state.observer)); })); return compositeDisposable; } public void OnCompleted() { OnNext(Observable.Empty<T>()); } public void OnError(Exception error) { OnNext(Observable.Throw<T>(error)); } public void OnNext(IObservable<T> value) { _tail.OnNext(value); _tail.OnCompleted(); } } private sealed class CombineObservable<TLeft, TRight, TResult> : ObservableBase<TResult> { private readonly IObservable<TLeft> _leftSource; private readonly IObservable<TRight> _rightSource; private readonly Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> _combinerSelector; public CombineObservable(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector) { _leftSource = leftSource; _rightSource = rightSource; _combinerSelector = combinerSelector; } protected override IDisposable SubscribeCore(IObserver<TResult> observer) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); SingleAssignmentDisposable singleAssignmentDisposable2 = new SingleAssignmentDisposable(); IObserver<Either<Notification<TLeft>, Notification<TRight>>> observer2 = _combinerSelector(observer, singleAssignmentDisposable, singleAssignmentDisposable2); object gate = new object(); singleAssignmentDisposable.Disposable = Observable.Synchronize<Either<Notification<TLeft>, Notification<TRight>>>(Observable.Select<Notification<TLeft>, Either<Notification<TLeft>, Notification<TRight>>>(Observable.Materialize<TLeft>(_leftSource), (Func<Notification<TLeft>, Either<Notification<TLeft>, Notification<TRight>>>)((Notification<TLeft> x) => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x))), gate).Subscribe(observer2); singleAssignmentDisposable2.Disposable = Observable.Synchronize<Either<Notification<TLeft>, Notification<TRight>>>(Observable.Select<Notification<TRight>, Either<Notification<TLeft>, Notification<TRight>>>(Observable.Materialize<TRight>(_rightSource), (Func<Notification<TRight>, Either<Notification<TLeft>, Notification<TRight>>>)((Notification<TRight> x) => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x))), gate).Subscribe(observer2); return StableCompositeDisposable.Create(singleAssignmentDisposable, singleAssignmentDisposable2); } } public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod) { return new CreateWithEnumerableObservable<TResult>(iteratorMethod); } public virtual IObservable<Unit> Create(Func<IEnumerable<IObservable<object>>> iteratorMethod) { return new CreateWithOnlyEnumerableObservable<Unit>(iteratorMethod); } public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler) { return new ExpandObservable<TSource>(source, selector, scheduler); } public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector) { return source.Expand(selector, SchedulerDefaults.Iteration); } public virtual IObservable<TResult> ForkJoin<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector) { return Combine(first, second, delegate(IObserver<TResult> observer, IDisposable leftSubscription, IDisposable rightSubscription) { bool leftStopped = false; bool rightStopped = false; bool hasLeft = false; bool hasRight = false; TFirst lastLeft = (TFirst)default(TFirst); TSecond lastRight = (TSecond)default(TSecond); return new BinaryObserver<TFirst, TSecond>((Action<Notification<TFirst>>)delegate(Notification<TFirst> left) { switch (left.Kind) { case NotificationKind.OnNext: hasLeft = true; lastLeft = (TFirst)left.Value; break; case NotificationKind.OnError: rightSubscription.Dispose(); observer.OnError(left.Exception); break; case NotificationKind.OnCompleted: leftStopped = true; if (rightStopped) { if (!hasLeft) observer.OnCompleted(); else if (!hasRight) { observer.OnCompleted(); } else { TResult value2; try { value2 = resultSelector((TFirst)lastLeft, (TSecond)lastRight); } catch (Exception error2) { observer.OnError(error2); return; } observer.OnNext(value2); observer.OnCompleted(); } } break; } }, (Action<Notification<TSecond>>)delegate(Notification<TSecond> right) { switch (right.Kind) { case NotificationKind.OnNext: hasRight = true; lastRight = (TSecond)right.Value; break; case NotificationKind.OnError: leftSubscription.Dispose(); observer.OnError(right.Exception); break; case NotificationKind.OnCompleted: rightStopped = true; if (leftStopped) { if (!hasLeft) observer.OnCompleted(); else if (!hasRight) { observer.OnCompleted(); } else { TResult value; try { value = resultSelector((TFirst)lastLeft, (TSecond)lastRight); } catch (Exception error) { observer.OnError(error); return; } observer.OnNext(value); observer.OnCompleted(); } } break; } }); }); } public virtual IObservable<TSource[]> ForkJoin<TSource>(params IObservable<TSource>[] sources) { return ((IEnumerable<IObservable<TSource>>)sources).ForkJoin(); } public virtual IObservable<TSource[]> ForkJoin<TSource>(IEnumerable<IObservable<TSource>> sources) { return new ForkJoinObservable<TSource>(sources); } public virtual IObservable<TResult> Let<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> function) { return function(source); } public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector) { return ManySelect(source, selector, DefaultScheduler.Instance); } public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector, IScheduler scheduler) { return Observable.Defer(delegate { ChainObservable<TSource> chain = null; return Observable.Select<IObservable<TSource>, TResult>(Observable.ObserveOn<IObservable<TSource>>(Observable.Do<IObservable<TSource>>(Observable.Select<TSource, IObservable<TSource>>(source, (Func<TSource, IObservable<TSource>>)delegate(TSource x) { ChainObservable<TSource> chainObservable = new ChainObservable<TSource>(x); ((ChainObservable<TSource>)chain)?.OnNext((IObservable<TSource>)chainObservable); chain = (ChainObservable<TSource>)chainObservable; return chainObservable; }), (Action<IObservable<TSource>>)delegate { }, (Action<Exception>)delegate(Exception exception) { ((ChainObservable<TSource>)chain)?.OnError(exception); }, (Action)delegate { ((ChainObservable<TSource>)chain)?.OnCompleted(); }), scheduler), selector); }); } public virtual ListObservable<TSource> ToListObservable<TSource>(IObservable<TSource> source) { return new ListObservable<TSource>(source); } private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector) { return new CombineObservable<TLeft, TRight, TResult>(leftSource, rightSource, combinerSelector); } } }