Plan<T1, T2, TResult>
sealed class Plan<
T1,
T2,
TResult> :
Plan<
TResult>
using System.
Collections.
Generic;
using System.
Runtime.
CompilerServices;
namespace System.
Reactive.
Joins
{
[
System.
Runtime.
CompilerServices.
NullableContext(
1)]
[
System.
Runtime.
CompilerServices.
Nullable(
new byte[] {
0,
1
})]
internal sealed class Plan<[
System.
Runtime.
CompilerServices.
Nullable(
2)]
T1, [
System.
Runtime.
CompilerServices.
Nullable(
2)]
T2, [
System.
Runtime.
CompilerServices.
Nullable(
2)]
TResult> :
Plan<
TResult>
{
internal Pattern<
T1,
T2>
Expression { get; }
internal Func<
T1,
T2,
TResult>
Selector { get; }
internal Plan(
Pattern<
T1,
T2>
expression,
Func<
T1,
T2,
TResult>
selector)
{
Expression =
expression;
Selector =
selector;
}
internal override ActivePlan Activate(
Dictionary<
object,
IJoinObserver>
externalSubscriptions,
IObserver<
TResult>
observer,
Action<
ActivePlan>
deactivate)
{
IObserver<
TResult>
observer2 =
observer;
Action<
Exception>
onError =
observer2.
OnError;
JoinObserver<
T1>
firstJoinObserver =
Plan<
TResult>.
CreateObserver(
externalSubscriptions,
Expression.
First,
onError);
JoinObserver<
T2>
secondJoinObserver =
Plan<
TResult>.
CreateObserver(
externalSubscriptions,
Expression.
Second,
onError);
ActivePlan<
T1,
T2>
activePlan =
null;
activePlan =
new ActivePlan<
T1,
T2>(
firstJoinObserver,
secondJoinObserver,
delegate(
T1 first,
T2 second) {
TResult value;
try {
value =
Selector(
first,
second);
}
catch (
Exception error) {
observer.
OnError(
error);
return;
}
observer.
OnNext(
value);
},
delegate {
firstJoinObserver.
RemoveActivePlan(
activePlan);
secondJoinObserver.
RemoveActivePlan(
activePlan);
deactivate(
activePlan);
});
firstJoinObserver.
AddActivePlan(
activePlan);
secondJoinObserver.
AddActivePlan(
activePlan);
return activePlan;
}
}
}