BasicProducer<TSource>
Base class for implementation of query operators, providing performance benefits over the use of Observable.Create.
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal abstract class BasicProducer<[System.Runtime.CompilerServices.Nullable(2)] TSource> : IProducer<TSource>, IObservable<TSource>
{
public IDisposable Subscribe(IObserver<TSource> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return SubscribeRaw(observer, true);
}
public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard)
{
ISafeObserver<TSource> safeObserver = null;
if (enableSafeguard)
observer = (safeObserver = SafeObserver<TSource>.Wrap(observer));
IDisposable disposable;
if (CurrentThreadScheduler.IsScheduleRequired) {
SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable();
Scheduler.ScheduleAction<(BasicProducer<TSource>, SingleAssignmentDisposable, IObserver<TSource>)>((IScheduler)CurrentThreadScheduler.Instance, (this, singleAssignmentDisposable, observer), (Func<(BasicProducer<TSource>, SingleAssignmentDisposable, IObserver<TSource>), IDisposable>)(((BasicProducer<TSource> this, SingleAssignmentDisposable runAssignable, IObserver<TSource> observer) tuple) => tuple.runAssignable.Disposable = tuple.this.Run(tuple.observer)));
disposable = singleAssignmentDisposable;
} else
disposable = Run(observer);
safeObserver?.SetResource(disposable);
return disposable;
}
protected abstract IDisposable Run(IObserver<TSource> observer);
}
}