EventProducer<TDelegate, TArgs>
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
namespace System.Reactive.Linq.ObservableImpl
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal abstract class EventProducer<[System.Runtime.CompilerServices.Nullable(2)] TDelegate, [System.Runtime.CompilerServices.Nullable(2)] TArgs> : BasicProducer<TArgs>
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class Session
{
private readonly EventProducer<TDelegate, TArgs> _parent;
private readonly Subject<TArgs> _subject;
private readonly SingleAssignmentDisposable _removeHandler = new SingleAssignmentDisposable();
private int _count;
public Session(EventProducer<TDelegate, TArgs> parent)
{
_parent = parent;
_subject = new Subject<TArgs>();
}
public IDisposable Connect(IObserver<TArgs> observer)
{
IDisposable disposable = _subject.Subscribe(observer);
if (++_count == 1)
try {
Initialize();
} catch (Exception error) {
_count--;
disposable.Dispose();
observer.OnError(error);
return Disposable.Empty;
}
return Disposable.Create<(Session, EventProducer<TDelegate, TArgs>, IDisposable)>((this, _parent, disposable), (Action<(Session, EventProducer<TDelegate, TArgs>, IDisposable)>)delegate((Session, EventProducer<TDelegate, TArgs> _parent, IDisposable connection) tuple) {
Session item = tuple.Item1;
EventProducer<TDelegate, TArgs> item2 = tuple._parent;
tuple.connection.Dispose();
lock (item2._gate) {
if (--item._count == 0) {
Scheduler.ScheduleAction<SingleAssignmentDisposable>(item2._scheduler, item._removeHandler, (Action<SingleAssignmentDisposable>)delegate(SingleAssignmentDisposable handler) {
handler.Dispose();
});
item2._session = null;
}
}
});
}
private void Initialize()
{
EventProducer<TDelegate, TArgs> parent = _parent;
Subject<TArgs> subject = _subject;
TDelegate handler = parent.GetHandler(((SubjectBase<TArgs>)subject).OnNext);
Scheduler.ScheduleAction<TDelegate>(_parent._scheduler, handler, (Action<TDelegate>)AddHandler);
}
private void AddHandler(TDelegate onNext)
{
IDisposable disposable;
try {
disposable = _parent.AddHandler(onNext);
} catch (Exception error) {
_subject.OnError(error);
return;
}
_removeHandler.Disposable = disposable;
}
}
private readonly IScheduler _scheduler;
private readonly object _gate;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0,
0
})]
private Session _session;
protected EventProducer(IScheduler scheduler)
{
_scheduler = scheduler;
_gate = new object();
}
protected abstract TDelegate GetHandler(Action<TArgs> onNext);
protected abstract IDisposable AddHandler(TDelegate handler);
protected override IDisposable Run(IObserver<TArgs> observer)
{
lock (_gate) {
if (_session == null)
_session = new Session(this);
return _session.Connect(observer);
}
}
}
}