<PackageReference Include="Relativity.Transfer.Client" Version="7.2.26" />
JoinObserver<T>
using System.
Collections.
Generic;
using System.
Reactive.
Disposables;
using System.
Reactive.
Linq;
namespace System.
Reactive.
Joins
{
internal sealed class JoinObserver<
T> :
ObserverBase<
Notification<
T>>,
IJoinObserver,
IDisposable
{
private object gate;
private readonly IObservable<
T>
source;
private readonly Action<
Exception>
onError;
private List<
ActivePlan>
activePlans;
private readonly SingleAssignmentDisposable subscription;
private bool isDisposed;
public Queue<
Notification<
T>>
Queue { get; set; }
public JoinObserver(
IObservable<
T>
source,
Action<
Exception>
onError)
{
this.
source =
source;
this.
onError =
onError;
Queue =
new Queue<
Notification<
T>>();
subscription =
new SingleAssignmentDisposable();
activePlans =
new List<
ActivePlan>();
}
public void AddActivePlan(
ActivePlan activePlan)
{
activePlans.
Add(
activePlan);
}
public void Subscribe(
object gate)
{
this.
gate =
gate;
subscription.
Disposable =
ObservableExtensions.
SubscribeSafe<
Notification<
T>>(
Observable.
Materialize<
T>(
source), (
IObserver<
Notification<
T>>)
this);
}
public void Dequeue()
{
Queue.
Dequeue();
}
protected override void OnNextCore(
Notification<
T>
notification)
{
lock (
gate) {
if (!
isDisposed) {
if (
notification.
Kind ==
NotificationKind.
OnError)
onError(
notification.
Exception);
else {
Queue.
Enqueue(
notification);
ActivePlan[]
array =
activePlans.
ToArray();
for (
int i =
0;
i <
array.
Length;
i++) {
array[
i].
Match();
}
}
}
}
}
protected override void OnErrorCore(
Exception exception)
{
}
protected override void OnCompletedCore()
{
}
internal void RemoveActivePlan(
ActivePlan activePlan)
{
activePlans.
Remove(
activePlan);
if (
activePlans.
Count ==
0)
Dispose();
}
protected override void Dispose(
bool disposing)
{
base.
Dispose(
disposing);
if (!
isDisposed) {
if (
disposing)
subscription.
Dispose();
isDisposed =
true;
}
}
}
}