<PackageReference Include="System.Reactive" Version="6.0.0-preview.9" />

GetEnumerator<TSource>

sealed class GetEnumerator<TSource> : IEnumerator<TSource>, IEnumerator, IDisposable, IObserver<TSource>
using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal sealed class GetEnumerator<[System.Runtime.CompilerServices.Nullable(2)] TSource> : IEnumerator<TSource>, IEnumerator, IDisposable, IObserver<TSource> { private readonly ConcurrentQueue<TSource> _queue; [System.Runtime.CompilerServices.Nullable(2)] private TSource _current; [System.Runtime.CompilerServices.Nullable(2)] private Exception _error; private bool _done; private bool _disposed; private SingleAssignmentDisposableValue _subscription; private readonly SemaphoreSlim _gate; public TSource Current => _current; object IEnumerator.Current { get { return _current; } } public GetEnumerator() { _queue = new ConcurrentQueue<TSource>(); _gate = new SemaphoreSlim(0); } public IEnumerator<TSource> Run(IObservable<TSource> source) { _subscription.Disposable = source.Subscribe(this); return this; } public void OnNext(TSource value) { _queue.Enqueue(value); _gate.Release(); } public void OnError(Exception error) { _error = error; _subscription.Dispose(); _gate.Release(); } public void OnCompleted() { _done = true; _subscription.Dispose(); _gate.Release(); } public bool MoveNext() { _gate.Wait(); if (_disposed) throw new ObjectDisposedException(""); if (_queue.TryDequeue(out _current)) return true; Exception error = _error; if (error != null) error.Throw(); _gate.Release(); return false; } public void Dispose() { _subscription.Dispose(); _disposed = true; _gate.Release(); } public void Reset() { throw new NotSupportedException(); } } }