AsyncLockObserver<T>
using System.Reactive.Concurrency;
using System.Runtime.CompilerServices;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class AsyncLockObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : ObserverBase<T>
{
private readonly AsyncLock _gate;
private readonly IObserver<T> _observer;
public AsyncLockObserver(IObserver<T> observer, AsyncLock gate)
{
_gate = gate;
_observer = observer;
}
protected override void OnNextCore(T value)
{
_gate.Wait<(IObserver<T>, T)>((_observer, value), (Action<(IObserver<T>, T)>)delegate((IObserver<T> _observer, T value) tuple) {
tuple._observer.OnNext(tuple.value);
});
}
protected override void OnErrorCore(Exception exception)
{
_gate.Wait<(IObserver<T>, Exception)>((_observer, exception), (Action<(IObserver<T>, Exception)>)delegate((IObserver<T> _observer, Exception exception) tuple) {
tuple._observer.OnError(tuple.exception);
});
}
protected override void OnCompletedCore()
{
_gate.Wait<IObserver<T>>(_observer, (Action<IObserver<T>>)delegate(IObserver<T> closureObserver) {
closureObserver.OnCompleted();
});
}
}
}