<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />
    
	
	
		
		
		
	 
	
	
        
                
                JoinObserver<T>
                
                
                
                
using System.
Collections.
Generic;
using System.
Reactive.
Disposables;
using System.
Reactive.
Linq;
using System.
Runtime.
CompilerServices;
namespace System.
Reactive.
Joins
{
    [
System.
Runtime.
CompilerServices.
NullableContext(
1)]
    [
System.
Runtime.
CompilerServices.
Nullable(
new byte[] {
        
0,
        
1,
        
1
    })]
    
internal sealed class JoinObserver<[
System.
Runtime.
CompilerServices.
Nullable(
2)] 
T> : 
ObserverBase<
Notification<
T>>, 
IJoinObserver, 
IDisposable
    {
        [
System.
Runtime.
CompilerServices.
Nullable(
2)]
        
private object _gate;
        
private readonly IObservable<
T> 
_source;
        
private readonly Action<
Exception> 
_onError;
        
private readonly List<
ActivePlan> 
_activePlans;
        
private SingleAssignmentDisposableValue _subscription;
        
private bool _isDisposed;
        
public Queue<
Notification<
T>> 
Queue { get; }
        
public JoinObserver(
IObservable<
T> 
source, 
Action<
Exception> 
onError)
        {
            
_source = 
source;
            
_onError = 
onError;
            
Queue = 
new Queue<
Notification<
T>>();
            
_activePlans = 
new List<
ActivePlan>();
        }
        
public void AddActivePlan(
ActivePlan activePlan)
        {
            
_activePlans.
Add(
activePlan);
        }
        
public void Subscribe(
object gate)
        {
            
_gate = 
gate;
            
_subscription.
Disposable = 
ObservableExtensions.
SubscribeSafe<
Notification<
T>>(
Observable.
Materialize<
T>(
_source), (
IObserver<
Notification<
T>>)
this);
        }
        
public void Dequeue()
        {
            
Queue.
Dequeue();
        }
        
protected override void OnNextCore(
Notification<
T> 
notification)
        {
            
lock (
_gate) {
                
if (!
_isDisposed) {
                    
if (
notification.
Kind == 
NotificationKind.
OnError)
                        
_onError(
notification.
Exception);
                    
else {
                        
Queue.
Enqueue(
notification);
                        
ActivePlan[] 
array = 
_activePlans.
ToArray();
                        
for (
int i = 
0; 
i < 
array.
Length; 
i++) {
                            
array[
i].
Match();
                        }
                    }
                }
            }
        }
        
protected override void OnErrorCore(
Exception exception)
        {
        }
        
protected override void OnCompletedCore()
        {
        }
        
internal void RemoveActivePlan(
ActivePlan activePlan)
        {
            
_activePlans.
Remove(
activePlan);
            
if (
_activePlans.
Count == 
0)
                
Dispose();
        }
        
protected override void Dispose(
bool disposing)
        {
            
base.
Dispose(
disposing);
            
if (!
_isDisposed) {
                
if (
disposing)
                    
_subscription.
Dispose();
                
_isDisposed = 
true;
            }
        }
    }
}