<PackageReference Include="System.Reactive" Version="4.3.2" />

ObservableEx

public static class ObservableEx
Provides a set of static methods for writing in-memory queries over observable sequences.
using System.Collections.Generic; using System.Reactive.Concurrency; namespace System.Reactive.Linq { public static class ObservableEx { private static IQueryLanguageEx s_impl = QueryServices.GetQueryImpl((IQueryLanguageEx)new QueryLanguageEx()); [Experimental] public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod) { if (iteratorMethod == null) throw new ArgumentNullException("iteratorMethod"); return s_impl.Create(iteratorMethod); } [Experimental] public static IObservable<Unit> Create(Func<IEnumerable<IObservable<object>>> iteratorMethod) { if (iteratorMethod == null) throw new ArgumentNullException("iteratorMethod"); return s_impl.Create(iteratorMethod); } [Experimental] public static IObservable<TSource> Expand<TSource>(this IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return s_impl.Expand(source, selector, scheduler); } [Experimental] public static IObservable<TSource> Expand<TSource>(this IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return s_impl.Expand(source, selector); } [Experimental] public static IObservable<TResult> ForkJoin<TSource1, TSource2, TResult>(this IObservable<TSource1> first, IObservable<TSource2> second, Func<TSource1, TSource2, TResult> resultSelector) { if (first == null) throw new ArgumentNullException("first"); if (second == null) throw new ArgumentNullException("second"); if (resultSelector == null) throw new ArgumentNullException("resultSelector"); return s_impl.ForkJoin(first, second, resultSelector); } [Experimental] public static IObservable<TSource[]> ForkJoin<TSource>(params IObservable<TSource>[] sources) { if (sources == null) throw new ArgumentNullException("sources"); return s_impl.ForkJoin(sources); } [Experimental] public static IObservable<TSource[]> ForkJoin<TSource>(this IEnumerable<IObservable<TSource>> sources) { if (sources == null) throw new ArgumentNullException("sources"); return s_impl.ForkJoin(sources); } [Experimental] public static IObservable<TResult> Let<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return s_impl.Let(source, selector); } [Experimental] public static IObservable<TResult> ManySelect<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return s_impl.ManySelect(source, selector, scheduler); } [Experimental] public static IObservable<TResult> ManySelect<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return s_impl.ManySelect(source, selector); } [Experimental] public static ListObservable<TSource> ToListObservable<TSource>(this IObservable<TSource> source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.ToListObservable(source); } } }