ObserveOnObserver<T>
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class ObserveOnObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : ScheduledObserver<T>
{
private SingleAssignmentDisposableValue _run;
public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer)
: base(scheduler, observer)
{
}
public void Run(IObservable<T> source)
{
_run.Disposable = ObservableExtensions.SubscribeSafe<T>(source, (IObserver<T>)this);
}
protected override void OnNextCore(T value)
{
base.OnNextCore(value);
EnsureActive();
}
protected override void OnErrorCore(Exception exception)
{
base.OnErrorCore(exception);
EnsureActive();
}
protected override void OnCompletedCore()
{
base.OnCompletedCore();
EnsureActive();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
_run.Dispose();
}
}
}