ConnectableObservable<TSource, TResult>
sealed class ConnectableObservable<TSource, TResult> : IConnectableObservable<TResult>, IObservable<TResult>
Represents an observable wrapper that can be connected and disconnected from its underlying observable sequence.
using System.Reactive.Linq;
using System.Runtime.CompilerServices;
namespace System.Reactive.Subjects
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal sealed class ConnectableObservable<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TResult> : IConnectableObservable<TResult>, IObservable<TResult>
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class Connection : IDisposable
{
[System.Runtime.CompilerServices.Nullable(1)]
private readonly ConnectableObservable<TSource, TResult> _parent;
[System.Runtime.CompilerServices.Nullable(2)]
private IDisposable _subscription;
[System.Runtime.CompilerServices.NullableContext(1)]
public Connection(ConnectableObservable<TSource, TResult> parent, IDisposable subscription)
{
_parent = parent;
_subscription = subscription;
}
public void Dispose()
{
lock (_parent._gate) {
if (_subscription != null) {
_subscription.Dispose();
_subscription = null;
_parent._connection = null;
}
}
}
}
private readonly ISubject<TSource, TResult> _subject;
private readonly IObservable<TSource> _source;
private readonly object _gate;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0,
0
})]
private Connection _connection;
public ConnectableObservable(IObservable<TSource> source, ISubject<TSource, TResult> subject)
{
_subject = subject;
_source = Observable.AsObservable<TSource>(source);
_gate = new object();
}
public IDisposable Connect()
{
lock (_gate) {
if (_connection == null) {
IDisposable subscription = ObservableExtensions.SubscribeSafe<TSource>(_source, (IObserver<TSource>)_subject);
_connection = new Connection(this, subscription);
}
return _connection;
}
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return ObservableExtensions.SubscribeSafe<TResult>((IObservable<TResult>)_subject, observer);
}
}
}