<PackageReference Include="System.Reactive" Version="6.0.2" />

BasicProducer<TSource>

abstract class BasicProducer<TSource> : IProducer<TSource>, IObservable<TSource>
Base class for implementation of query operators, providing performance benefits over the use of Observable.Create.
using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal abstract class BasicProducer<[System.Runtime.CompilerServices.Nullable(2)] TSource> : IProducer<TSource>, IObservable<TSource> { public IDisposable Subscribe(IObserver<TSource> observer) { if (observer == null) throw new ArgumentNullException("observer"); return SubscribeRaw(observer, true); } public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard) { ISafeObserver<TSource> safeObserver = null; if (enableSafeguard) observer = (safeObserver = SafeObserver<TSource>.Wrap(observer)); IDisposable disposable; if (CurrentThreadScheduler.IsScheduleRequired) { SingleAssignmentDisposable singleAssignmentDisposable = new SingleAssignmentDisposable(); Scheduler.ScheduleAction<(BasicProducer<TSource>, SingleAssignmentDisposable, IObserver<TSource>)>((IScheduler)CurrentThreadScheduler.Instance, (this, singleAssignmentDisposable, observer), (Func<(BasicProducer<TSource>, SingleAssignmentDisposable, IObserver<TSource>), IDisposable>)(((BasicProducer<TSource> this, SingleAssignmentDisposable runAssignable, IObserver<TSource> observer) tuple) => tuple.runAssignable.Disposable = tuple.this.Run(tuple.observer))); disposable = singleAssignmentDisposable; } else disposable = Run(observer); safeObserver?.SetResource(disposable); return disposable; } protected abstract IDisposable Run(IObserver<TSource> observer); } }