<PackageReference Include="System.Reactive" Version="6.0.0-preview.16" />

EventProducer<TDelegate, TArgs>

abstract class EventProducer<TDelegate, TArgs> : BasicProducer<TArgs>
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal abstract class EventProducer<[System.Runtime.CompilerServices.Nullable(2)] TDelegate, [System.Runtime.CompilerServices.Nullable(2)] TArgs> : BasicProducer<TArgs> { [System.Runtime.CompilerServices.Nullable(0)] private sealed class Session { private readonly EventProducer<TDelegate, TArgs> _parent; private readonly Subject<TArgs> _subject; private readonly SingleAssignmentDisposable _removeHandler = new SingleAssignmentDisposable(); private int _count; public Session(EventProducer<TDelegate, TArgs> parent) { _parent = parent; _subject = new Subject<TArgs>(); } public IDisposable Connect(IObserver<TArgs> observer) { IDisposable disposable = _subject.Subscribe(observer); if (++_count == 1) try { Initialize(); } catch (Exception error) { _count--; disposable.Dispose(); observer.OnError(error); return Disposable.Empty; } return Disposable.Create<(Session, EventProducer<TDelegate, TArgs>, IDisposable)>((this, _parent, disposable), (Action<(Session, EventProducer<TDelegate, TArgs>, IDisposable)>)delegate((Session, EventProducer<TDelegate, TArgs> _parent, IDisposable connection) tuple) { Session item = tuple.Item1; EventProducer<TDelegate, TArgs> item2 = tuple._parent; tuple.connection.Dispose(); lock (item2._gate) { if (--item._count == 0) { Scheduler.ScheduleAction<SingleAssignmentDisposable>(item2._scheduler, item._removeHandler, (Action<SingleAssignmentDisposable>)delegate(SingleAssignmentDisposable handler) { handler.Dispose(); }); item2._session = null; } } }); } private void Initialize() { EventProducer<TDelegate, TArgs> parent = _parent; Subject<TArgs> subject = _subject; TDelegate handler = parent.GetHandler(((SubjectBase<TArgs>)subject).OnNext); Scheduler.ScheduleAction<TDelegate>(_parent._scheduler, handler, (Action<TDelegate>)AddHandler); } private void AddHandler(TDelegate onNext) { IDisposable disposable; try { disposable = _parent.AddHandler(onNext); } catch (Exception error) { _subject.OnError(error); return; } _removeHandler.Disposable = disposable; } } private readonly IScheduler _scheduler; private readonly object _gate; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 0 })] private Session _session; protected EventProducer(IScheduler scheduler) { _scheduler = scheduler; _gate = new object(); } protected abstract TDelegate GetHandler(Action<TArgs> onNext); protected abstract IDisposable AddHandler(TDelegate handler); protected override IDisposable Run(IObserver<TArgs> observer) { lock (_gate) { if (_session == null) _session = new Session(this); return _session.Connect(observer); } } } }