<PackageReference Include="System.Reactive" Version="4.2.2" />

JoinObserver<T>

using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Linq; namespace System.Reactive.Joins { internal sealed class JoinObserver<T> : ObserverBase<Notification<T>>, IJoinObserver, IDisposable { private object _gate; private readonly IObservable<T> _source; private readonly Action<Exception> _onError; private readonly List<ActivePlan> _activePlans; private IDisposable _subscription; private bool _isDisposed; public Queue<Notification<T>> Queue { get; } public JoinObserver(IObservable<T> source, Action<Exception> onError) { _source = source; _onError = onError; Queue = new Queue<Notification<T>>(); _activePlans = new List<ActivePlan>(); } public void AddActivePlan(ActivePlan activePlan) { _activePlans.Add(activePlan); } public void Subscribe(object gate) { _gate = gate; Disposable.SetSingle(ref _subscription, ObservableExtensions.SubscribeSafe<Notification<T>>(Observable.Materialize<T>(_source), (IObserver<Notification<T>>)this)); } public void Dequeue() { Queue.Dequeue(); } protected override void OnNextCore(Notification<T> notification) { lock (_gate) { if (!_isDisposed) { if (notification.Kind == NotificationKind.OnError) _onError(notification.Exception); else { Queue.Enqueue(notification); ActivePlan[] array = _activePlans.ToArray(); for (int i = 0; i < array.Length; i++) { array[i].Match(); } } } } } protected override void OnErrorCore(Exception exception) { } protected override void OnCompletedCore() { } internal void RemoveActivePlan(ActivePlan activePlan) { _activePlans.Remove(activePlan); if (_activePlans.Count == 0) Dispose(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (!_isDisposed) { if (disposing) Disposable.TryDispose(ref _subscription); _isDisposed = true; } } } }