<PackageReference Include="System.Reactive" Version="6.0.2" />

ConnectableObservable<TSource, TResult>

sealed class ConnectableObservable<TSource, TResult> : IConnectableObservable<TResult>, IObservable<TResult>
Represents an observable wrapper that can be connected and disconnected from its underlying observable sequence.
using System.Reactive.Linq; using System.Runtime.CompilerServices; namespace System.Reactive.Subjects { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal sealed class ConnectableObservable<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult> : IConnectableObservable<TResult>, IObservable<TResult> { [System.Runtime.CompilerServices.NullableContext(0)] private sealed class Connection : IDisposable { [System.Runtime.CompilerServices.Nullable(1)] private readonly ConnectableObservable<TSource, TResult> _parent; [System.Runtime.CompilerServices.Nullable(2)] private IDisposable _subscription; [System.Runtime.CompilerServices.NullableContext(1)] public Connection(ConnectableObservable<TSource, TResult> parent, IDisposable subscription) { _parent = parent; _subscription = subscription; } public void Dispose() { lock (_parent._gate) { if (_subscription != null) { _subscription.Dispose(); _subscription = null; _parent._connection = null; } } } } private readonly ISubject<TSource, TResult> _subject; private readonly IObservable<TSource> _source; private readonly object _gate; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 0, 0 })] private Connection _connection; public ConnectableObservable(IObservable<TSource> source, ISubject<TSource, TResult> subject) { _subject = subject; _source = Observable.AsObservable<TSource>(source); _gate = new object(); } public IDisposable Connect() { lock (_gate) { if (_connection == null) { IDisposable subscription = ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)_subject); _connection = new Connection(this, subscription); } return _connection; } } public IDisposable Subscribe(IObserver<TResult> observer) { if (observer == null) throw new ArgumentNullException("observer"); return ObservableExtensions.SubscribeSafe<TResult>((IObservable<TResult>)_subject, observer); } } }