<PackageReference Include="System.Reactive" Version="7.0.0-preview.1" />
AutoConnect<T>
using System.
Reactive.
Subjects;
using System.
Runtime.
CompilerServices;
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
[
System.
Runtime.
CompilerServices.
NullableContext(
1)]
[
System.
Runtime.
CompilerServices.
Nullable(
0)]
internal sealed class AutoConnect<[
System.
Runtime.
CompilerServices.
Nullable(
2)]
T> :
IObservable<
T>
{
private readonly IConnectableObservable<
T>
_source;
private readonly int _minObservers;
[
System.
Runtime.
CompilerServices.
Nullable(
new byte[] {
2,
1
})]
private readonly Action<
IDisposable>
_onConnect;
private int _count;
internal AutoConnect(
IConnectableObservable<
T>
source,
int minObservers, [
System.
Runtime.
CompilerServices.
Nullable(
new byte[] {
2,
1
})]
Action<
IDisposable>
onConnect)
{
_source =
source;
_minObservers =
minObservers;
_onConnect =
onConnect;
}
public IDisposable Subscribe(
IObserver<
T>
observer)
{
IDisposable result =
_source.
Subscribe(
observer);
if (
Volatile.
Read(
ref _count) <
_minObservers &&
Interlocked.
Increment(
ref _count) ==
_minObservers) {
IDisposable obj =
_source.
Connect();
Action<
IDisposable>
onConnect =
_onConnect;
if (
onConnect ==
null)
return result;
onConnect(
obj);
}
return result;
}
}
}