ActivePlan<T1, T2, T3, T4>
namespace System.
Reactive.
Joins
{
internal class ActivePlan<
T1,
T2,
T3,
T4> :
ActivePlan
{
private readonly Action<
T1,
T2,
T3,
T4>
_onNext;
private readonly JoinObserver<
T1>
_first;
private readonly JoinObserver<
T2>
_second;
private readonly JoinObserver<
T3>
_third;
private readonly JoinObserver<
T4>
_fourth;
internal ActivePlan(
JoinObserver<
T1>
first,
JoinObserver<
T2>
second,
JoinObserver<
T3>
third,
JoinObserver<
T4>
fourth,
Action<
T1,
T2,
T3,
T4>
onNext,
Action onCompleted)
:
base(
onCompleted)
{
_onNext =
onNext;
_first =
first;
_second =
second;
_third =
third;
_fourth =
fourth;
AddJoinObserver(
first);
AddJoinObserver(
second);
AddJoinObserver(
third);
AddJoinObserver(
fourth);
}
internal override void Match()
{
if (
_first.
Queue.
Count >
0 &&
_second.
Queue.
Count >
0 &&
_third.
Queue.
Count >
0 &&
_fourth.
Queue.
Count >
0) {
Notification<
T1>
notification =
_first.
Queue.
Peek();
Notification<
T2>
notification2 =
_second.
Queue.
Peek();
Notification<
T3>
notification3 =
_third.
Queue.
Peek();
Notification<
T4>
notification4 =
_fourth.
Queue.
Peek();
if (
notification.
Kind ==
NotificationKind.
OnCompleted ||
notification2.
Kind ==
NotificationKind.
OnCompleted ||
notification3.
Kind ==
NotificationKind.
OnCompleted ||
notification4.
Kind ==
NotificationKind.
OnCompleted)
_onCompleted();
else {
Dequeue();
_onNext(
notification.
Value,
notification2.
Value,
notification3.
Value,
notification4.
Value);
}
}
}
}
}