SequenceEqual<TSource>
using System.Collections.Generic;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
internal static class SequenceEqual<TSource>
{
internal sealed class Observable : Producer<bool, Observable._>
{
internal sealed class _ : Sink<bool>
{
private sealed class FirstObserver : IObserver<TSource>
{
private readonly _ _parent;
public FirstObserver(_ parent)
{
_parent = parent;
}
public void OnNext(TSource value)
{
lock (_parent._gate) {
if (_parent._qr.Count > 0) {
bool flag = false;
TSource y = _parent._qr.Dequeue();
try {
flag = _parent._comparer.Equals(value, y);
} catch (Exception error) {
_parent._observer.OnError(error);
_parent.Dispose();
return;
}
if (!flag) {
_parent._observer.OnNext(false);
_parent._observer.OnCompleted();
_parent.Dispose();
}
} else if (_parent._doner) {
_parent._observer.OnNext(false);
_parent._observer.OnCompleted();
_parent.Dispose();
} else {
_parent._ql.Enqueue(value);
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
lock (_parent._gate) {
_parent._donel = true;
if (_parent._ql.Count == 0) {
if (_parent._qr.Count > 0) {
_parent._observer.OnNext(false);
_parent._observer.OnCompleted();
_parent.Dispose();
} else if (_parent._doner) {
_parent._observer.OnNext(true);
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
}
}
}
private sealed class SecondObserver : IObserver<TSource>
{
private readonly _ _parent;
public SecondObserver(_ parent)
{
_parent = parent;
}
public void OnNext(TSource value)
{
lock (_parent._gate) {
if (_parent._ql.Count > 0) {
bool flag = false;
TSource x = _parent._ql.Dequeue();
try {
flag = _parent._comparer.Equals(x, value);
} catch (Exception error) {
_parent._observer.OnError(error);
_parent.Dispose();
return;
}
if (!flag) {
_parent._observer.OnNext(false);
_parent._observer.OnCompleted();
_parent.Dispose();
}
} else if (_parent._donel) {
_parent._observer.OnNext(false);
_parent._observer.OnCompleted();
_parent.Dispose();
} else {
_parent._qr.Enqueue(value);
}
}
}
public void OnError(Exception error)
{
lock (_parent._gate) {
_parent._observer.OnError(error);
_parent.Dispose();
}
}
public void OnCompleted()
{
lock (_parent._gate) {
_parent._doner = true;
if (_parent._qr.Count == 0) {
if (_parent._ql.Count > 0) {
_parent._observer.OnNext(false);
_parent._observer.OnCompleted();
_parent.Dispose();
} else if (_parent._donel) {
_parent._observer.OnNext(true);
_parent._observer.OnCompleted();
_parent.Dispose();
}
}
}
}
}
private readonly IEqualityComparer<TSource> _comparer;
private object _gate;
private bool _donel;
private bool _doner;
private Queue<TSource> _ql;
private Queue<TSource> _qr;
public _(IEqualityComparer<TSource> comparer, IObserver<bool> observer, IDisposable cancel)
: base(observer, cancel)
{
_comparer = comparer;
}
public IDisposable Run(Observable parent)
{
_gate = new object();
_donel = false;
_doner = false;
_ql = new Queue<TSource>();
_qr = new Queue<TSource>();
return StableCompositeDisposable.Create(ObservableExtensions.SubscribeSafe<TSource>(parent._first, (IObserver<TSource>)new FirstObserver(this)), ObservableExtensions.SubscribeSafe<TSource>(parent._second, (IObserver<TSource>)new SecondObserver(this)));
}
}
private readonly IObservable<TSource> _first;
private readonly IObservable<TSource> _second;
private readonly IEqualityComparer<TSource> _comparer;
public Observable(IObservable<TSource> first, IObservable<TSource> second, IEqualityComparer<TSource> comparer)
{
_first = first;
_second = second;
_comparer = comparer;
}
protected override _ CreateSink(IObserver<bool> observer, IDisposable cancel)
{
return new _(_comparer, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(this);
}
}
internal sealed class Enumerable : Producer<bool, Enumerable._>
{
internal sealed class _ : Sink<bool>, IObserver<TSource>
{
private readonly IEqualityComparer<TSource> _comparer;
private IEnumerator<TSource> _enumerator;
public _(IEqualityComparer<TSource> comparer, IObserver<bool> observer, IDisposable cancel)
: base(observer, cancel)
{
_comparer = comparer;
}
public IDisposable Run(Enumerable parent)
{
try {
_enumerator = parent._second.GetEnumerator();
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return Disposable.Empty;
}
return StableCompositeDisposable.Create(ObservableExtensions.SubscribeSafe<TSource>(parent._first, (IObserver<TSource>)this), _enumerator);
}
public void OnNext(TSource value)
{
bool flag = false;
try {
if (_enumerator.MoveNext()) {
TSource current = _enumerator.Current;
flag = _comparer.Equals(value, current);
}
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
if (!flag) {
_observer.OnNext(false);
_observer.OnCompleted();
base.Dispose();
}
}
public void OnError(Exception error)
{
_observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
bool flag = false;
try {
flag = _enumerator.MoveNext();
} catch (Exception error) {
_observer.OnError(error);
base.Dispose();
return;
}
_observer.OnNext(!flag);
_observer.OnCompleted();
base.Dispose();
}
}
private readonly IObservable<TSource> _first;
private readonly IEnumerable<TSource> _second;
private readonly IEqualityComparer<TSource> _comparer;
public Enumerable(IObservable<TSource> first, IEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
{
_first = first;
_second = second;
_comparer = comparer;
}
protected override _ CreateSink(IObserver<bool> observer, IDisposable cancel)
{
return new _(_comparer, observer, cancel);
}
protected override IDisposable Run(_ sink)
{
return sink.Run(this);
}
}
}
}