<PackageReference Include="System.Reactive" Version="4.0.0-preview.4.build.391" />
ActivePlan<T1>
namespace System.
Reactive.
Joins
{
internal class ActivePlan<
T1> :
ActivePlan
{
private readonly Action<
T1>
onNext;
private readonly Action onCompleted;
private readonly JoinObserver<
T1>
first;
internal ActivePlan(
JoinObserver<
T1>
first,
Action<
T1>
onNext,
Action onCompleted)
{
this.
onNext =
onNext;
this.
onCompleted =
onCompleted;
this.
first =
first;
AddJoinObserver(
first);
}
internal override void Match()
{
if (
first.
Queue.
Count >
0) {
Notification<
T1>
notification =
first.
Queue.
Peek();
if (
notification.
Kind ==
NotificationKind.
OnCompleted)
onCompleted();
else {
Dequeue();
onNext(
notification.
Value);
}
}
}
}
}