<PackageReference Include="System.Reactive" Version="6.1.0-preview.9" />
    
	
	
		
		
		
	 
	
	
        
                
                EventProducer<TDelegate, TArgs>
                
                
                
                
using System.
Reactive.
Concurrency;
using System.
Reactive.
Disposables;
using System.
Reactive.
Subjects;
using System.
Runtime.
CompilerServices;
namespace System.
Reactive.
Linq.
ObservableImpl
{
    [
System.
Runtime.
CompilerServices.
NullableContext(
1)]
    [
System.
Runtime.
CompilerServices.
Nullable(
new byte[] {
        
0,
        
1
    })]
    
internal abstract class EventProducer<[
System.
Runtime.
CompilerServices.
Nullable(
2)] 
TDelegate, [
System.
Runtime.
CompilerServices.
Nullable(
2)] 
TArgs> : 
BasicProducer<
TArgs>
    {
        [
System.
Runtime.
CompilerServices.
Nullable(
0)]
        
private sealed class Session
        {
            
private readonly EventProducer<
TDelegate, 
TArgs> 
_parent;
            
private readonly Subject<
TArgs> 
_subject;
            
private readonly SingleAssignmentDisposable _removeHandler = 
new SingleAssignmentDisposable();
            
private int _count;
            
public Session(
EventProducer<
TDelegate, 
TArgs> 
parent)
            {
                
_parent = 
parent;
                
_subject = 
new Subject<
TArgs>();
            }
            
public IDisposable Connect(
IObserver<
TArgs> 
observer)
            {
                
IDisposable disposable = 
_subject.
Subscribe(
observer);
                
if (++
_count == 
1)
                    
try {
                        
Initialize();
                    } 
catch (
Exception error) {
                        
_count--;
                        
disposable.
Dispose();
                        
observer.
OnError(
error);
                        
return Disposable.
Empty;
                    }
                
return Disposable.
Create<(
Session, 
EventProducer<
TDelegate, 
TArgs>, 
IDisposable)>((
this, 
_parent, 
disposable), (
Action<(
Session, 
EventProducer<
TDelegate, 
TArgs>, 
IDisposable)>)
delegate((
Session, 
EventProducer<
TDelegate, 
TArgs> 
_parent, 
IDisposable connection) 
tuple) {
                    
Session item = 
tuple.
Item1;
                    
EventProducer<
TDelegate, 
TArgs> 
item2 = 
tuple.
_parent;
                    
tuple.
connection.
Dispose();
                    
lock (
item2.
_gate) {
                        
if (--
item.
_count == 
0) {
                            
Scheduler.
ScheduleAction<
SingleAssignmentDisposable>(
item2.
_scheduler, 
item.
_removeHandler, (
Action<
SingleAssignmentDisposable>)
delegate(
SingleAssignmentDisposable handler) {
                                
handler.
Dispose();
                            });
                            
item2.
_session = 
null;
                        }
                    }
                });
            }
            
private void Initialize()
            {
                
EventProducer<
TDelegate, 
TArgs> 
parent = 
_parent;
                
Subject<
TArgs> 
subject = 
_subject;
                
TDelegate handler = 
parent.
GetHandler(((
SubjectBase<
TArgs>)
subject).
OnNext);
                
Scheduler.
ScheduleAction<
TDelegate>(
_parent.
_scheduler, 
handler, (
Action<
TDelegate>)
AddHandler);
            }
            
private void AddHandler(
TDelegate onNext)
            {
                
IDisposable disposable;
                
try {
                    
disposable = 
_parent.
AddHandler(
onNext);
                } 
catch (
Exception error) {
                    
_subject.
OnError(
error);
                    
return;
                }
                
_removeHandler.
Disposable = 
disposable;
            }
        }
        
private readonly IScheduler _scheduler;
        
private readonly object _gate;
        [
System.
Runtime.
CompilerServices.
Nullable(
new byte[] {
            
2,
            
0,
            
0
        })]
        
private Session _session;
        
protected EventProducer(
IScheduler scheduler)
        {
            
_scheduler = 
scheduler;
            
_gate = 
new object();
        }
        
protected abstract TDelegate GetHandler(
Action<
TArgs> 
onNext);
        
protected abstract IDisposable AddHandler(
TDelegate handler);
        
protected override IDisposable Run(
IObserver<
TArgs> 
observer)
        {
            
lock (
_gate) {
                
if (
_session == 
null)
                    
_session = 
new Session(
this);
                
return _session.
Connect(
observer);
            }
        }
    }
}