<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />
ActivePlan<T1, T2>
namespace System.
Reactive.
Joins
{
internal class ActivePlan<
T1,
T2> :
ActivePlan
{
private readonly Action<
T1,
T2>
onNext;
private readonly Action onCompleted;
private readonly JoinObserver<
T1>
first;
private readonly JoinObserver<
T2>
second;
internal ActivePlan(
JoinObserver<
T1>
first,
JoinObserver<
T2>
second,
Action<
T1,
T2>
onNext,
Action onCompleted)
{
this.
onNext =
onNext;
this.
onCompleted =
onCompleted;
this.
first =
first;
this.
second =
second;
AddJoinObserver(
first);
AddJoinObserver(
second);
}
internal override void Match()
{
if (
first.
Queue.
Count >
0 &&
second.
Queue.
Count >
0) {
Notification<
T1>
notification =
first.
Queue.
Peek();
Notification<
T2>
notification2 =
second.
Queue.
Peek();
if (
notification.
Kind ==
NotificationKind.
OnCompleted ||
notification2.
Kind ==
NotificationKind.
OnCompleted)
onCompleted();
else {
Dequeue();
onNext(
notification.
Value,
notification2.
Value);
}
}
}
}
}