EventPatternSourceBase<TSender, TEventArgs>
Base class for classes that expose an observable sequence as a well-known event pattern (sender, event arguments).
Contains functionality to maintain a map of event handler delegates to observable sequence subscriptions. Subclasses
should only add an event with custom add and remove methods calling into the base class's operations.
using System.Collections.Generic;
using System.Runtime.CompilerServices;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
public abstract class EventPatternSourceBase<[System.Runtime.CompilerServices.Nullable(2)] TSender, [System.Runtime.CompilerServices.Nullable(2)] TEventArgs>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1,
1,
1
})]
private sealed class Observer : ObserverBase<EventPattern<TSender, TEventArgs>>, ISafeObserver<EventPattern<TSender, TEventArgs>>, IObserver<EventPattern<TSender, TEventArgs>>, IDisposable
{
private bool _isDone;
private bool _isAdded;
private readonly Delegate _handler;
private readonly object _gate = new object();
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2,
1
})]
private readonly Action<TSender, TEventArgs> _invoke;
private readonly EventPatternSourceBase<TSender, TEventArgs> _sourceBase;
public Observer(EventPatternSourceBase<TSender, TEventArgs> sourceBase, Delegate handler, [System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2,
1
})] Action<TSender, TEventArgs> invoke)
{
_handler = handler;
_invoke = invoke;
_sourceBase = sourceBase;
}
protected override void OnNextCore(EventPattern<TSender, TEventArgs> value)
{
_sourceBase._invokeHandler(_invoke, value);
}
protected override void OnErrorCore(Exception error)
{
Remove();
error.Throw();
}
protected override void OnCompletedCore()
{
Remove();
}
private void Remove()
{
lock (_gate) {
if (_isAdded)
_sourceBase.Remove(_handler);
else
_isDone = true;
}
}
public void SetResource(IDisposable resource)
{
lock (_gate) {
if (!_isDone) {
_sourceBase.Add(_handler, resource);
_isAdded = true;
}
}
}
}
private readonly IObservable<EventPattern<TSender, TEventArgs>> _source;
private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
2,
1,
1,
1,
1
})]
private readonly Action<Action<TSender, TEventArgs>, EventPattern<TSender, TEventArgs>> _invokeHandler;
protected EventPatternSourceBase(IObservable<EventPattern<TSender, TEventArgs>> source, [System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
2,
1,
1,
1,
1
})] Action<Action<TSender, TEventArgs>, EventPattern<TSender, TEventArgs>> invokeHandler)
{
if (source == null)
throw new ArgumentNullException("source");
_source = source;
if (invokeHandler == null)
throw new ArgumentNullException("invokeHandler");
_invokeHandler = invokeHandler;
}
protected void Add(Delegate handler, [System.Runtime.CompilerServices.Nullable(new byte[] {
1,
2,
1
})] Action<TSender, TEventArgs> invoke)
{
if ((object)handler == null)
throw new ArgumentNullException("handler");
if (invoke == null)
throw new ArgumentNullException("invoke");
Observer observer = new Observer(this, handler, invoke);
observer.SetResource(_source.Subscribe(observer));
}
private void Add(Delegate handler, IDisposable disposable)
{
lock (_subscriptions) {
if (!_subscriptions.TryGetValue(handler, out Stack<IDisposable> value))
value = (_subscriptions[handler] = new Stack<IDisposable>());
value.Push(disposable);
}
}
protected void Remove(Delegate handler)
{
if ((object)handler == null)
throw new ArgumentNullException("handler");
IDisposable disposable = null;
lock (_subscriptions) {
if (_subscriptions.TryGetValue(handler, out Stack<IDisposable> value)) {
disposable = value.Pop();
if (value.Count == 0)
_subscriptions.Remove(handler);
}
}
disposable?.Dispose();
}
}
}