ObservableQuery<TSource>
class ObservableQuery<TSource> : ObservableQuery, IQbservable<TSource>, IQbservable, IObservable<TSource>
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Joins;
using System.Reactive.Linq;
using System.Reflection;
namespace System.Reactive
{
internal class ObservableQuery<TSource> : ObservableQuery, IQbservable<TSource>, IQbservable, IObservable<TSource>
{
private class ObservableRewriter : ExpressionVisitor
{
private class Lazy<T>
{
private readonly Func<T> _factory;
private T _value;
private bool _initialized;
public T Value {
get {
lock (_factory) {
if (!_initialized) {
_value = _factory();
_initialized = true;
}
}
return _value;
}
}
public Lazy(Func<T> factory)
{
_factory = factory;
}
}
private static Lazy<ILookup<string, MethodInfo>> _observableMethods = new Lazy<ILookup<string, MethodInfo>>(() => GetMethods(typeof(Observable)));
protected override Expression VisitConstant(ConstantExpression node)
{
ObservableQuery observableQuery;
if ((observableQuery = (node.Value as ObservableQuery)) != null) {
object source = observableQuery.Source;
if (source != null)
return Expression.Constant(source);
return Visit(observableQuery.Expression);
}
return node;
}
protected override Expression VisitMethodCall(MethodCallExpression node)
{
MethodInfo method = node.Method;
if (method.DeclaringType.BaseType == typeof(QueryablePattern)) {
if (method.Name == "Then")
return Expression.Call(Visit(node.Object), arguments: Enumerable.ToArray<Expression>(Enumerable.Select<Expression, Expression>((IEnumerable<Expression>)node.Arguments, (Func<Expression, Expression>)((Expression arg) => Unquote(Visit(arg))))), methodName: "Then", typeArguments: method.GetGenericArguments());
if (method.Name == "And")
return Expression.Call(Visit(node.Object), arguments: Enumerable.ToArray<Expression>(Enumerable.Select<Expression, Expression>((IEnumerable<Expression>)node.Arguments, (Func<Expression, Expression>)((Expression arg) => Visit(arg)))), methodName: "And", typeArguments: method.GetGenericArguments());
} else {
IEnumerable<Expression> enumerable = Enumerable.AsEnumerable<Expression>((IEnumerable<Expression>)node.Arguments);
bool flag = false;
ParameterInfo parameterInfo = Enumerable.FirstOrDefault<ParameterInfo>((IEnumerable<ParameterInfo>)method.GetParameters());
if (parameterInfo != null) {
Type parameterType = parameterInfo.ParameterType;
if (parameterType == typeof(IQbservableProvider)) {
flag = true;
if (!(Expression.Lambda<Func<IQbservableProvider>>(Visit(node.Arguments[0]), Array.Empty<ParameterExpression>()).Compile()() is ObservableQueryProvider))
return node;
enumerable = Enumerable.Skip<Expression>(enumerable, 1);
} else if (typeof(IQbservable).IsAssignableFrom(parameterType)) {
flag = true;
}
}
if (flag) {
IList<Expression> arguments3 = VisitQbservableOperatorArguments(method, enumerable);
return FindObservableMethod(method, arguments3);
}
}
return base.VisitMethodCall(node);
}
protected override Expression VisitLambda<T>(Expression<T> node)
{
return node;
}
private IList<Expression> VisitQbservableOperatorArguments(MethodInfo method, IEnumerable<Expression> arguments)
{
if (method.Name == "When") {
Expression expression = Enumerable.Last<Expression>(arguments);
if (expression.NodeType == ExpressionType.NewArrayInit) {
NewArrayExpression newArrayExpression = (NewArrayExpression)expression;
List<Expression> list = new List<Expression>();
list.Add(Expression.NewArrayInit(typeof(Plan<>).MakeGenericType(method.GetGenericArguments()[0]), Enumerable.Select<Expression, Expression>((IEnumerable<Expression>)newArrayExpression.Expressions, (Func<Expression, Expression>)((Expression param) => Visit(param)))));
return list;
}
}
return Enumerable.ToList<Expression>(Enumerable.Select<Expression, Expression>(arguments, (Func<Expression, Expression>)((Expression arg) => Visit(arg))));
}
private static MethodCallExpression FindObservableMethod(MethodInfo method, IList<Expression> arguments)
{
Type type = null;
ILookup<string, MethodInfo> lookup = null;
if (method.DeclaringType == typeof(Qbservable)) {
type = typeof(Observable);
lookup = _observableMethods.Value;
} else {
type = method.DeclaringType;
if (type.IsDefined(typeof(LocalQueryMethodImplementationTypeAttribute), false))
type = ((LocalQueryMethodImplementationTypeAttribute)type.GetCustomAttributes(typeof(LocalQueryMethodImplementationTypeAttribute), false)[0]).TargetType;
lookup = GetMethods(type);
}
Type[] typeArgs = method.IsGenericMethod ? method.GetGenericArguments() : null;
MethodInfo methodInfo = Enumerable.FirstOrDefault<MethodInfo>(lookup[method.Name], (Func<MethodInfo, bool>)((MethodInfo candidateMethod) => ArgsMatch(candidateMethod, arguments, typeArgs)));
if (methodInfo == (MethodInfo)null)
throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Providers.NO_MATCHING_METHOD_FOUND, method.Name, type.Name));
if (typeArgs != null)
methodInfo = methodInfo.MakeGenericMethod(typeArgs);
ParameterInfo[] parameters = methodInfo.GetParameters();
int i = 0;
for (int num = parameters.Length; i < num; i++) {
arguments[i] = Unquote(arguments[i]);
}
return Expression.Call(null, methodInfo, arguments);
}
private static ILookup<string, MethodInfo> GetMethods(Type type)
{
return Enumerable.ToLookup<MethodInfo, string>((IEnumerable<MethodInfo>)type.GetMethods(BindingFlags.Static | BindingFlags.Public), (Func<MethodInfo, string>)((MethodInfo m) => m.Name));
}
private static bool ArgsMatch(MethodInfo method, IList<Expression> arguments, Type[] typeArgs)
{
ParameterInfo[] parameters = method.GetParameters();
if (parameters.Length != arguments.Count)
return false;
if (!method.IsGenericMethod && typeArgs != null && typeArgs.Length != 0)
return false;
if (method.IsGenericMethodDefinition) {
if (typeArgs == null)
return false;
if (method.GetGenericArguments().Length != typeArgs.Length)
return false;
parameters = method.MakeGenericMethod(typeArgs).GetParameters();
}
int i = 0;
for (int count = arguments.Count; i < count; i++) {
Type parameterType = parameters[i].ParameterType;
Expression expression = arguments[i];
if (!parameterType.IsAssignableFrom(expression.Type)) {
expression = Unquote(expression);
if (!parameterType.IsAssignableFrom(expression.Type))
return false;
}
}
return true;
}
private static Expression Unquote(Expression expression)
{
while (expression.NodeType == ExpressionType.Quote) {
expression = ((UnaryExpression)expression).Operand;
}
return expression;
}
}
public Type ElementType => typeof(TSource);
public IQbservableProvider Provider => Qbservable.Provider;
internal ObservableQuery(IObservable<TSource> source)
{
_source = source;
_expression = Expression.Constant(this);
}
internal ObservableQuery(Expression expression)
{
_expression = expression;
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
if (_source == null) {
Expression<Func<IObservable<TSource>>> expression = Expression.Lambda<Func<IObservable<TSource>>>(new ObservableRewriter().Visit(_expression), Array.Empty<ParameterExpression>());
_source = expression.Compile()();
}
return ((IObservable<TSource>)_source).Subscribe(observer);
}
public override string ToString()
{
ConstantExpression constantExpression;
if ((constantExpression = (_expression as ConstantExpression)) != null && constantExpression.Value == this) {
if (_source != null)
return _source.ToString();
return "null";
}
return _expression.ToString();
}
}
}