<PackageReference Include="System.Reactive" Version="6.1.0-preview.9" />

EventPatternSourceBase<TSender, TEventArgs>

public abstract class EventPatternSourceBase<TSender, TEventArgs>
Base class for classes that expose an observable sequence as a well-known event pattern (sender, event arguments). Contains functionality to maintain a map of event handler delegates to observable sequence subscriptions. Subclasses should only add an event with custom add and remove methods calling into the base class's operations.
using System.Collections.Generic; using System.Runtime.CompilerServices; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] public abstract class EventPatternSourceBase<[System.Runtime.CompilerServices.Nullable(2)] TSender, [System.Runtime.CompilerServices.Nullable(2)] TEventArgs> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1 })] private sealed class Observer : ObserverBase<EventPattern<TSender, TEventArgs>>, ISafeObserver<EventPattern<TSender, TEventArgs>>, IObserver<EventPattern<TSender, TEventArgs>>, IDisposable { private bool _isDone; private bool _isAdded; private readonly Delegate _handler; private readonly object _gate = new object(); [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 2, 1 })] private readonly Action<TSender, TEventArgs> _invoke; private readonly EventPatternSourceBase<TSender, TEventArgs> _sourceBase; public Observer(EventPatternSourceBase<TSender, TEventArgs> sourceBase, Delegate handler, [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 2, 1 })] Action<TSender, TEventArgs> invoke) { _handler = handler; _invoke = invoke; _sourceBase = sourceBase; } protected override void OnNextCore(EventPattern<TSender, TEventArgs> value) { _sourceBase._invokeHandler(_invoke, value); } protected override void OnErrorCore(Exception error) { Remove(); error.Throw(); } protected override void OnCompletedCore() { Remove(); } private void Remove() { lock (_gate) { if (_isAdded) _sourceBase.Remove(_handler); else _isDone = true; } } public void SetResource(IDisposable resource) { lock (_gate) { if (!_isDone) { _sourceBase.Add(_handler, resource); _isAdded = true; } } } } private readonly IObservable<EventPattern<TSender, TEventArgs>> _source; private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>(); [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 2, 1, 1, 1, 1 })] private readonly Action<Action<TSender, TEventArgs>, EventPattern<TSender, TEventArgs>> _invokeHandler; protected EventPatternSourceBase(IObservable<EventPattern<TSender, TEventArgs>> source, [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 2, 1, 1, 1, 1 })] Action<Action<TSender, TEventArgs>, EventPattern<TSender, TEventArgs>> invokeHandler) { if (source == null) throw new ArgumentNullException("source"); _source = source; if (invokeHandler == null) throw new ArgumentNullException("invokeHandler"); _invokeHandler = invokeHandler; } protected void Add(Delegate handler, [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 2, 1 })] Action<TSender, TEventArgs> invoke) { if ((object)handler == null) throw new ArgumentNullException("handler"); if (invoke == null) throw new ArgumentNullException("invoke"); Observer observer = new Observer(this, handler, invoke); observer.SetResource(_source.Subscribe(observer)); } private void Add(Delegate handler, IDisposable disposable) { lock (_subscriptions) { if (!_subscriptions.TryGetValue(handler, out Stack<IDisposable> value)) value = (_subscriptions[handler] = new Stack<IDisposable>()); value.Push(disposable); } } protected void Remove(Delegate handler) { if ((object)handler == null) throw new ArgumentNullException("handler"); IDisposable disposable = null; lock (_subscriptions) { if (_subscriptions.TryGetValue(handler, out Stack<IDisposable> value)) { disposable = value.Pop(); if (value.Count == 0) _subscriptions.Remove(handler); } } disposable?.Dispose(); } } }