<PackageReference Include="Relativity.Transfer.Client" Version="7.1.29" />

ObservableQuery<TSource>

class ObservableQuery<TSource> : ObservableQuery, IQbservable<TSource>, IQbservable, IObservable<TSource>
using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Linq.Expressions; using System.Reactive.Joins; using System.Reactive.Linq; using System.Reflection; namespace System.Reactive { internal class ObservableQuery<TSource> : ObservableQuery, IQbservable<TSource>, IQbservable, IObservable<TSource> { private class ObservableRewriter : ExpressionVisitor { private class Lazy<T> { private readonly Func<T> _factory; private T _value; private bool _initialized; public T Value { get { lock (_factory) { if (!_initialized) { _value = _factory(); _initialized = true; } } return _value; } } public Lazy(Func<T> factory) { _factory = factory; } } private static Lazy<ILookup<string, MethodInfo>> _observableMethods = new Lazy<ILookup<string, MethodInfo>>(() => GetMethods(typeof(Observable))); protected override Expression VisitConstant(ConstantExpression node) { ObservableQuery observableQuery; if ((observableQuery = (node.Value as ObservableQuery)) != null) { object source = observableQuery.Source; if (source != null) return Expression.Constant(source); return Visit(observableQuery.Expression); } return node; } protected override Expression VisitMethodCall(MethodCallExpression node) { MethodInfo method = node.Method; if (method.DeclaringType.BaseType == typeof(QueryablePattern)) { if (method.Name == "Then") return Expression.Call(Visit(node.Object), arguments: Enumerable.ToArray<Expression>(Enumerable.Select<Expression, Expression>((IEnumerable<Expression>)node.Arguments, (Func<Expression, Expression>)((Expression arg) => Unquote(Visit(arg))))), methodName: "Then", typeArguments: method.GetGenericArguments()); if (method.Name == "And") return Expression.Call(Visit(node.Object), arguments: Enumerable.ToArray<Expression>(Enumerable.Select<Expression, Expression>((IEnumerable<Expression>)node.Arguments, (Func<Expression, Expression>)((Expression arg) => Visit(arg)))), methodName: "And", typeArguments: method.GetGenericArguments()); } else { IEnumerable<Expression> enumerable = Enumerable.AsEnumerable<Expression>((IEnumerable<Expression>)node.Arguments); bool flag = false; ParameterInfo parameterInfo = Enumerable.FirstOrDefault<ParameterInfo>((IEnumerable<ParameterInfo>)method.GetParameters()); if (parameterInfo != null) { Type parameterType = parameterInfo.ParameterType; if (parameterType == typeof(IQbservableProvider)) { flag = true; if (!(Expression.Lambda<Func<IQbservableProvider>>(Visit(node.Arguments[0]), Array.Empty<ParameterExpression>()).Compile()() is ObservableQueryProvider)) return node; enumerable = Enumerable.Skip<Expression>(enumerable, 1); } else if (typeof(IQbservable).IsAssignableFrom(parameterType)) { flag = true; } } if (flag) { IList<Expression> arguments3 = VisitQbservableOperatorArguments(method, enumerable); return FindObservableMethod(method, arguments3); } } return base.VisitMethodCall(node); } protected override Expression VisitLambda<T>(Expression<T> node) { return node; } private IList<Expression> VisitQbservableOperatorArguments(MethodInfo method, IEnumerable<Expression> arguments) { if (method.Name == "When") { Expression expression = Enumerable.Last<Expression>(arguments); if (expression.NodeType == ExpressionType.NewArrayInit) { NewArrayExpression newArrayExpression = (NewArrayExpression)expression; List<Expression> list = new List<Expression>(); list.Add(Expression.NewArrayInit(typeof(Plan<>).MakeGenericType(method.GetGenericArguments()[0]), Enumerable.Select<Expression, Expression>((IEnumerable<Expression>)newArrayExpression.Expressions, (Func<Expression, Expression>)((Expression param) => Visit(param))))); return list; } } return Enumerable.ToList<Expression>(Enumerable.Select<Expression, Expression>(arguments, (Func<Expression, Expression>)((Expression arg) => Visit(arg)))); } private static MethodCallExpression FindObservableMethod(MethodInfo method, IList<Expression> arguments) { Type type = null; ILookup<string, MethodInfo> lookup = null; if (method.DeclaringType == typeof(Qbservable)) { type = typeof(Observable); lookup = _observableMethods.Value; } else { type = method.DeclaringType; if (type.IsDefined(typeof(LocalQueryMethodImplementationTypeAttribute), false)) type = ((LocalQueryMethodImplementationTypeAttribute)type.GetCustomAttributes(typeof(LocalQueryMethodImplementationTypeAttribute), false)[0]).TargetType; lookup = GetMethods(type); } Type[] typeArgs = method.IsGenericMethod ? method.GetGenericArguments() : null; MethodInfo methodInfo = Enumerable.FirstOrDefault<MethodInfo>(lookup[method.Name], (Func<MethodInfo, bool>)((MethodInfo candidateMethod) => ArgsMatch(candidateMethod, arguments, typeArgs))); if (methodInfo == (MethodInfo)null) throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Providers.NO_MATCHING_METHOD_FOUND, method.Name, type.Name)); if (typeArgs != null) methodInfo = methodInfo.MakeGenericMethod(typeArgs); ParameterInfo[] parameters = methodInfo.GetParameters(); int i = 0; for (int num = parameters.Length; i < num; i++) { arguments[i] = Unquote(arguments[i]); } return Expression.Call(null, methodInfo, arguments); } private static ILookup<string, MethodInfo> GetMethods(Type type) { return Enumerable.ToLookup<MethodInfo, string>((IEnumerable<MethodInfo>)type.GetMethods(BindingFlags.Static | BindingFlags.Public), (Func<MethodInfo, string>)((MethodInfo m) => m.Name)); } private static bool ArgsMatch(MethodInfo method, IList<Expression> arguments, Type[] typeArgs) { ParameterInfo[] parameters = method.GetParameters(); if (parameters.Length != arguments.Count) return false; if (!method.IsGenericMethod && typeArgs != null && typeArgs.Length != 0) return false; if (method.IsGenericMethodDefinition) { if (typeArgs == null) return false; if (method.GetGenericArguments().Length != typeArgs.Length) return false; parameters = method.MakeGenericMethod(typeArgs).GetParameters(); } int i = 0; for (int count = arguments.Count; i < count; i++) { Type parameterType = parameters[i].ParameterType; Expression expression = arguments[i]; if (!parameterType.IsAssignableFrom(expression.Type)) { expression = Unquote(expression); if (!parameterType.IsAssignableFrom(expression.Type)) return false; } } return true; } private static Expression Unquote(Expression expression) { while (expression.NodeType == ExpressionType.Quote) { expression = ((UnaryExpression)expression).Operand; } return expression; } } public Type ElementType => typeof(TSource); public IQbservableProvider Provider => Qbservable.Provider; internal ObservableQuery(IObservable<TSource> source) { _source = source; _expression = Expression.Constant(this); } internal ObservableQuery(Expression expression) { _expression = expression; } public IDisposable Subscribe(IObserver<TSource> observer) { if (_source == null) { Expression<Func<IObservable<TSource>>> expression = Expression.Lambda<Func<IObservable<TSource>>>(new ObservableRewriter().Visit(_expression), Array.Empty<ParameterExpression>()); _source = expression.Compile()(); } return ((IObservable<TSource>)_source).Subscribe(observer); } public override string ToString() { ConstantExpression constantExpression; if ((constantExpression = (_expression as ConstantExpression)) != null && constantExpression.Value == this) { if (_source != null) return _source.ToString(); return "null"; } return _expression.ToString(); } } }