<PackageReference Include="Relativity.Transfer.Client" Version="7.2.7" />
Zip<T1, T2, T3, T4, TResult>
sealed class Zip<
T1,
T2,
T3,
T4,
TResult> :
Producer<
TResult,
_<
T1,
T2,
T3,
T4,
TResult>>
using System.
Reactive.
Disposables;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal sealed class Zip<
T1,
T2,
T3,
T4,
TResult> :
Producer<
TResult,
Zip<
T1,
T2,
T3,
T4,
TResult>.
_>
{
internal sealed class _ :
ZipSink<
TResult>
{
private readonly Func<
T1,
T2,
T3,
T4,
TResult>
_resultSelector;
private ZipObserver<
T1>
_observer1;
private ZipObserver<
T2>
_observer2;
private ZipObserver<
T3>
_observer3;
private ZipObserver<
T4>
_observer4;
public _(
Func<
T1,
T2,
T3,
T4,
TResult>
resultSelector,
IObserver<
TResult>
observer,
IDisposable cancel)
:
base(
4,
observer,
cancel)
{
_resultSelector =
resultSelector;
}
public IDisposable Run(
IObservable<
T1>
source1,
IObservable<
T2>
source2,
IObservable<
T3>
source3,
IObservable<
T4>
source4)
{
IDisposable[]
obj =
new IDisposable[
5];
SingleAssignmentDisposable singleAssignmentDisposable = (
SingleAssignmentDisposable)(
obj[
0] =
new SingleAssignmentDisposable());
_observer1 =
new ZipObserver<
T1>(
_gate,
this,
0,
singleAssignmentDisposable);
base.
Queues[
0] =
_observer1.
Values;
SingleAssignmentDisposable singleAssignmentDisposable2 = (
SingleAssignmentDisposable)(
obj[
1] =
new SingleAssignmentDisposable());
_observer2 =
new ZipObserver<
T2>(
_gate,
this,
1,
singleAssignmentDisposable2);
base.
Queues[
1] =
_observer2.
Values;
SingleAssignmentDisposable singleAssignmentDisposable3 = (
SingleAssignmentDisposable)(
obj[
2] =
new SingleAssignmentDisposable());
_observer3 =
new ZipObserver<
T3>(
_gate,
this,
2,
singleAssignmentDisposable3);
base.
Queues[
2] =
_observer3.
Values;
SingleAssignmentDisposable singleAssignmentDisposable4 = (
SingleAssignmentDisposable)(
obj[
3] =
new SingleAssignmentDisposable());
_observer4 =
new ZipObserver<
T4>(
_gate,
this,
3,
singleAssignmentDisposable4);
base.
Queues[
3] =
_observer4.
Values;
singleAssignmentDisposable.
Disposable =
ObservableExtensions.
SubscribeSafe<
T1>(
source1, (
IObserver<
T1>)
_observer1);
singleAssignmentDisposable2.
Disposable =
ObservableExtensions.
SubscribeSafe<
T2>(
source2, (
IObserver<
T2>)
_observer2);
singleAssignmentDisposable3.
Disposable =
ObservableExtensions.
SubscribeSafe<
T3>(
source3, (
IObserver<
T3>)
_observer3);
singleAssignmentDisposable4.
Disposable =
ObservableExtensions.
SubscribeSafe<
T4>(
source4, (
IObserver<
T4>)
_observer4);
obj[
4] =
Disposable.
Create(
delegate {
_observer1.
Values.
Clear();
_observer2.
Values.
Clear();
_observer3.
Values.
Clear();
_observer4.
Values.
Clear();
});
return StableCompositeDisposable.
Create(
obj);
}
protected override TResult GetResult()
{
return _resultSelector(
_observer1.
Values.
Dequeue(),
_observer2.
Values.
Dequeue(),
_observer3.
Values.
Dequeue(),
_observer4.
Values.
Dequeue());
}
}
private readonly IObservable<
T1>
_source1;
private readonly IObservable<
T2>
_source2;
private readonly IObservable<
T3>
_source3;
private readonly IObservable<
T4>
_source4;
private readonly Func<
T1,
T2,
T3,
T4,
TResult>
_resultSelector;
public Zip(
IObservable<
T1>
source1,
IObservable<
T2>
source2,
IObservable<
T3>
source3,
IObservable<
T4>
source4,
Func<
T1,
T2,
T3,
T4,
TResult>
resultSelector)
{
_source1 =
source1;
_source2 =
source2;
_source3 =
source3;
_source4 =
source4;
_resultSelector =
resultSelector;
}
protected override _ CreateSink(
IObserver<
TResult>
observer,
IDisposable cancel)
{
return new _(
_resultSelector,
observer,
cancel);
}
protected override IDisposable Run(
_ sink)
{
return sink.
Run(
_source1,
_source2,
_source3,
_source4);
}
}
}