<PackageReference Include="System.Reactive" Version="4.1.2" />

QbservableEx

public static class QbservableEx
Provides a set of static methods for writing queries over observable sequences, allowing translation to a target query language.
using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Reactive.Concurrency; using System.Reflection; namespace System.Reactive.Linq { [LocalQueryMethodImplementationType(typeof(ObservableEx))] public static class QbservableEx { internal static Expression GetSourceExpression<TSource>(IObservable<TSource> source) { IQbservable<TSource> qbservable; if ((qbservable = (source as IQbservable<TSource>)) != null) return qbservable.Expression; return Expression.Constant(source, typeof(IObservable<TSource>)); } internal static Expression GetSourceExpression<TSource>(IEnumerable<TSource> source) { IQueryable<TSource> queryable; if ((queryable = (source as IQueryable<TSource>)) != null) return queryable.Expression; return Expression.Constant(source, typeof(IEnumerable<TSource>)); } internal static Expression GetSourceExpression<TSource>(IObservable<TSource>[] sources) { return Expression.NewArrayInit(typeof(IObservable<TSource>), from source in sources select QbservableEx.GetSourceExpression<TSource>(source)); } internal static Expression GetSourceExpression<TSource>(IEnumerable<TSource>[] sources) { return Expression.NewArrayInit(typeof(IEnumerable<TSource>), from source in sources select QbservableEx.GetSourceExpression<TSource>(source)); } internal static MethodInfo InfoOf<R>(Expression<Func<R>> f) { return ((MethodCallExpression)f.Body).Method; } [Experimental] public static IQbservable<Unit> Create(this IQbservableProvider provider, Expression<Func<IEnumerable<IObservable<object>>>> iteratorMethod) { if (provider == null) throw new ArgumentNullException("provider"); if (iteratorMethod == null) throw new ArgumentNullException("iteratorMethod"); return provider.CreateQuery<Unit>(Expression.Call(null, (MethodInfo)MethodBase.GetCurrentMethod(), Expression.Constant(provider, typeof(IQbservableProvider)), iteratorMethod)); } [Experimental] public static IQbservable<TResult> Create<TResult>(this IQbservableProvider provider, Expression<Func<IObserver<TResult>, IEnumerable<IObservable<object>>>> iteratorMethod) { if (provider == null) throw new ArgumentNullException("provider"); if (iteratorMethod == null) throw new ArgumentNullException("iteratorMethod"); return provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TResult)), Expression.Constant(provider, typeof(IQbservableProvider)), iteratorMethod)); } [Experimental] public static IQbservable<TSource> Expand<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TSource>>> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, selector)); } [Experimental] public static IQbservable<TSource> Expand<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TSource>>> selector, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, selector, Expression.Constant(scheduler, typeof(IScheduler)))); } [Experimental] public static IQbservable<TSource[]> ForkJoin<TSource>(this IQbservableProvider provider, params IObservable<TSource>[] sources) { if (provider == null) throw new ArgumentNullException("provider"); if (sources == null) throw new ArgumentNullException("sources"); return provider.CreateQuery<TSource[]>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), Expression.Constant(provider, typeof(IQbservableProvider)), GetSourceExpression(sources))); } [Experimental] public static IQbservable<TSource[]> ForkJoin<TSource>(this IQbservableProvider provider, IEnumerable<IObservable<TSource>> sources) { if (provider == null) throw new ArgumentNullException("provider"); if (sources == null) throw new ArgumentNullException("sources"); return provider.CreateQuery<TSource[]>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), Expression.Constant(provider, typeof(IQbservableProvider)), GetSourceExpression(sources))); } [Experimental] public static IQbservable<TResult> ForkJoin<TSource1, TSource2, TResult>(this IQbservable<TSource1> first, IObservable<TSource2> second, Expression<Func<TSource1, TSource2, TResult>> resultSelector) { if (first == null) throw new ArgumentNullException("first"); if (second == null) throw new ArgumentNullException("second"); if (resultSelector == null) throw new ArgumentNullException("resultSelector"); return first.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource1), typeof(TSource2), typeof(TResult)), first.Expression, GetSourceExpression(second), resultSelector)); } [Experimental] public static IQbservable<TResult> Let<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<IObservable<TSource>, IObservable<TResult>>> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), source.Expression, selector)); } [Experimental] public static IQbservable<TResult> ManySelect<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<IObservable<TSource>, TResult>> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), source.Expression, selector)); } [Experimental] public static IQbservable<TResult> ManySelect<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<IObservable<TSource>, TResult>> selector, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return source.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), source.Expression, selector, Expression.Constant(scheduler, typeof(IScheduler)))); } } }