<PackageReference Include="Relativity.Transfer.Client" Version="7.2.26" />

JoinObserver<T>

using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Linq; namespace System.Reactive.Joins { internal sealed class JoinObserver<T> : ObserverBase<Notification<T>>, IJoinObserver, IDisposable { private object gate; private readonly IObservable<T> source; private readonly Action<Exception> onError; private List<ActivePlan> activePlans; private readonly SingleAssignmentDisposable subscription; private bool isDisposed; public Queue<Notification<T>> Queue { get; set; } public JoinObserver(IObservable<T> source, Action<Exception> onError) { this.source = source; this.onError = onError; Queue = new Queue<Notification<T>>(); subscription = new SingleAssignmentDisposable(); activePlans = new List<ActivePlan>(); } public void AddActivePlan(ActivePlan activePlan) { activePlans.Add(activePlan); } public void Subscribe(object gate) { this.gate = gate; subscription.Disposable = ObservableExtensions.SubscribeSafe<Notification<T>>(Observable.Materialize<T>(source), (IObserver<Notification<T>>)this); } public void Dequeue() { Queue.Dequeue(); } protected override void OnNextCore(Notification<T> notification) { lock (gate) { if (!isDisposed) { if (notification.Kind == NotificationKind.OnError) onError(notification.Exception); else { Queue.Enqueue(notification); ActivePlan[] array = activePlans.ToArray(); for (int i = 0; i < array.Length; i++) { array[i].Match(); } } } } } protected override void OnErrorCore(Exception exception) { } protected override void OnCompletedCore() { } internal void RemoveActivePlan(ActivePlan activePlan) { activePlans.Remove(activePlan); if (activePlans.Count == 0) Dispose(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (!isDisposed) { if (disposing) subscription.Dispose(); isDisposed = true; } } } }