QbservableEx
Provides a set of static methods for writing queries over observable sequences, allowing translation to a target query language.
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Concurrency;
using System.Reflection;
namespace System.Reactive.Linq
{
[LocalQueryMethodImplementationType(typeof(ObservableEx))]
public static class QbservableEx
{
internal static Expression GetSourceExpression<TSource>(IObservable<TSource> source)
{
IQbservable<TSource> qbservable;
if ((qbservable = (source as IQbservable<TSource>)) != null)
return qbservable.Expression;
return Expression.Constant(source, typeof(IObservable<TSource>));
}
internal static Expression GetSourceExpression<TSource>(IEnumerable<TSource> source)
{
IQueryable<TSource> queryable;
if ((queryable = (source as IQueryable<TSource>)) != null)
return queryable.Expression;
return Expression.Constant(source, typeof(IEnumerable<TSource>));
}
internal static Expression GetSourceExpression<TSource>(IObservable<TSource>[] sources)
{
return Expression.NewArrayInit(typeof(IObservable<TSource>), from source in sources
select QbservableEx.GetSourceExpression<TSource>(source));
}
internal static Expression GetSourceExpression<TSource>(IEnumerable<TSource>[] sources)
{
return Expression.NewArrayInit(typeof(IEnumerable<TSource>), from source in sources
select QbservableEx.GetSourceExpression<TSource>(source));
}
internal static MethodInfo InfoOf<R>(Expression<Func<R>> f)
{
return ((MethodCallExpression)f.Body).Method;
}
[Experimental]
public static IQbservable<Unit> Create(this IQbservableProvider provider, Expression<Func<IEnumerable<IObservable<object>>>> iteratorMethod)
{
if (provider == null)
throw new ArgumentNullException("provider");
if (iteratorMethod == null)
throw new ArgumentNullException("iteratorMethod");
return provider.CreateQuery<Unit>(Expression.Call(null, (MethodInfo)MethodBase.GetCurrentMethod(), Expression.Constant(provider, typeof(IQbservableProvider)), iteratorMethod));
}
[Experimental]
public static IQbservable<TResult> Create<TResult>(this IQbservableProvider provider, Expression<Func<IObserver<TResult>, IEnumerable<IObservable<object>>>> iteratorMethod)
{
if (provider == null)
throw new ArgumentNullException("provider");
if (iteratorMethod == null)
throw new ArgumentNullException("iteratorMethod");
return provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TResult)), Expression.Constant(provider, typeof(IQbservableProvider)), iteratorMethod));
}
[Experimental]
public static IQbservable<TSource> Expand<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TSource>>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, selector));
}
[Experimental]
public static IQbservable<TSource> Expand<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TSource>>> selector, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return source.Provider.CreateQuery<TSource>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, selector, Expression.Constant(scheduler, typeof(IScheduler))));
}
[Experimental]
public static IQbservable<TSource[]> ForkJoin<TSource>(this IQbservableProvider provider, params IObservable<TSource>[] sources)
{
if (provider == null)
throw new ArgumentNullException("provider");
if (sources == null)
throw new ArgumentNullException("sources");
return provider.CreateQuery<TSource[]>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), Expression.Constant(provider, typeof(IQbservableProvider)), GetSourceExpression(sources)));
}
[Experimental]
public static IQbservable<TSource[]> ForkJoin<TSource>(this IQbservableProvider provider, IEnumerable<IObservable<TSource>> sources)
{
if (provider == null)
throw new ArgumentNullException("provider");
if (sources == null)
throw new ArgumentNullException("sources");
return provider.CreateQuery<TSource[]>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), Expression.Constant(provider, typeof(IQbservableProvider)), GetSourceExpression(sources)));
}
[Experimental]
public static IQbservable<TResult> ForkJoin<TSource1, TSource2, TResult>(this IQbservable<TSource1> first, IObservable<TSource2> second, Expression<Func<TSource1, TSource2, TResult>> resultSelector)
{
if (first == null)
throw new ArgumentNullException("first");
if (second == null)
throw new ArgumentNullException("second");
if (resultSelector == null)
throw new ArgumentNullException("resultSelector");
return first.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource1), typeof(TSource2), typeof(TResult)), first.Expression, GetSourceExpression(second), resultSelector));
}
[Experimental]
public static IQbservable<TResult> Let<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<IObservable<TSource>, IObservable<TResult>>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
return source.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), source.Expression, selector));
}
[Experimental]
public static IQbservable<TResult> ManySelect<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<IObservable<TSource>, TResult>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
return source.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), source.Expression, selector));
}
[Experimental]
public static IQbservable<TResult> ManySelect<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<IObservable<TSource>, TResult>> selector, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return source.Provider.CreateQuery<TResult>(Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), source.Expression, selector, Expression.Constant(scheduler, typeof(IScheduler))));
}
}
}