<PackageReference Include="Relativity.Transfer.Client" Version="7.0.8" />

EventSource<T>

sealed class EventSource<T> : IEventSource<T>
using System.Collections.Generic; namespace System.Reactive { internal sealed class EventSource<T> : IEventSource<T> { private readonly IObservable<T> _source; private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions; private readonly Action<Action<T>, T> _invokeHandler; public event Action<T> OnNext { add { object gate = new object(); bool isAdded = false; bool isDone = false; Action remove = delegate { lock (gate) { if (isAdded) Remove(value); else isDone = true; } }; IDisposable disposable = ObservableExtensions.Subscribe<T>(_source, (Action<T>)delegate(T x) { _invokeHandler(value, x); }, (Action<Exception>)delegate(Exception ex) { remove(); ex.Throw(); }, remove); lock (gate) { if (!isDone) { Add(value, disposable); isAdded = true; } } } remove { Remove(value); } } public EventSource(IObservable<T> source, Action<Action<T>, T> invokeHandler) { _source = source; _invokeHandler = invokeHandler; _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>(); } private void Add(Delegate handler, IDisposable disposable) { lock (_subscriptions) { Stack<IDisposable> value = new Stack<IDisposable>(); if (!_subscriptions.TryGetValue(handler, out value)) value = (_subscriptions[handler] = new Stack<IDisposable>()); value.Push(disposable); } } private void Remove(Delegate handler) { IDisposable disposable = null; lock (_subscriptions) { Stack<IDisposable> value = new Stack<IDisposable>(); if (_subscriptions.TryGetValue(handler, out value)) { disposable = value.Pop(); if (value.Count == 0) _subscriptions.Remove(handler); } } disposable?.Dispose(); } } }