<PackageReference Include="System.Reactive" Version="4.1.1" />

GetEnumerator<TSource>

sealed class GetEnumerator<TSource> : IEnumerator<TSource>, IEnumerator, IDisposable, IObserver<TSource>
using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class GetEnumerator<TSource> : IEnumerator<TSource>, IEnumerator, IDisposable, IObserver<TSource> { private readonly ConcurrentQueue<TSource> _queue; private TSource _current; private Exception _error; private bool _done; private bool _disposed; private IDisposable _subscription; private readonly SemaphoreSlim _gate; public TSource Current => _current; object IEnumerator.Current { get { return _current; } } public GetEnumerator() { _queue = new ConcurrentQueue<TSource>(); _gate = new SemaphoreSlim(0); } public IEnumerator<TSource> Run(IObservable<TSource> source) { Disposable.TrySetSingle(ref _subscription, source.Subscribe(this)); return this; } public void OnNext(TSource value) { _queue.Enqueue(value); _gate.Release(); } public void OnError(Exception error) { _error = error; Disposable.TryDispose(ref _subscription); _gate.Release(); } public void OnCompleted() { _done = true; Disposable.TryDispose(ref _subscription); _gate.Release(); } public bool MoveNext() { _gate.Wait(); if (_disposed) throw new ObjectDisposedException(""); if (_queue.TryDequeue(out _current)) return true; _error.ThrowIfNotNull(); _gate.Release(); return false; } public void Dispose() { Disposable.TryDispose(ref _subscription); _disposed = true; _gate.Release(); } public void Reset() { throw new NotSupportedException(); } } }