GetEnumerator<TSource>
using System.
Collections;
using System.
Collections.
Concurrent;
using System.
Collections.
Generic;
using System.
Reactive.
Disposables;
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal sealed class GetEnumerator<
TSource> :
IEnumerator<
TSource>,
IEnumerator,
IDisposable,
IObserver<
TSource>
{
private readonly ConcurrentQueue<
TSource>
_queue;
private TSource _current;
private Exception _error;
private bool _done;
private bool _disposed;
private IDisposable _subscription;
private readonly SemaphoreSlim _gate;
public TSource Current =>
_current;
object IEnumerator.
Current {
get {
return _current;
}
}
public GetEnumerator()
{
_queue =
new ConcurrentQueue<
TSource>();
_gate =
new SemaphoreSlim(
0);
}
public IEnumerator<
TSource>
Run(
IObservable<
TSource>
source)
{
Disposable.
TrySetSingle(
ref _subscription,
source.
Subscribe(
this));
return this;
}
public void OnNext(
TSource value)
{
_queue.
Enqueue(
value);
_gate.
Release();
}
public void OnError(
Exception error)
{
_error =
error;
Disposable.
TryDispose(
ref _subscription);
_gate.
Release();
}
public void OnCompleted()
{
_done =
true;
Disposable.
TryDispose(
ref _subscription);
_gate.
Release();
}
public bool MoveNext()
{
_gate.
Wait();
if (
_disposed)
throw new ObjectDisposedException(
"");
if (
_queue.
TryDequeue(
out _current))
return true;
_error.
ThrowIfNotNull();
_gate.
Release();
return false;
}
public void Dispose()
{
Disposable.
TryDispose(
ref _subscription);
_disposed =
true;
_gate.
Release();
}
public void Reset()
{
throw new NotSupportedException();
}
}
}