<PackageReference Include="System.Reactive" Version="4.1.6" />

GroupByUntil<TSource, TKey, TElement, TDuration>

sealed class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>, _<TSource, TKey, TElement, TDuration>>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq.ObservableImpl { internal sealed class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>, GroupByUntil<TSource, TKey, TElement, TDuration>._> { internal sealed class _ : Sink<TSource, IGroupedObservable<TKey, TElement>> { private sealed class DurationObserver : SafeObserver<TDuration> { private readonly _ _parent; private readonly TKey _key; private readonly ISubject<TElement> _writer; public DurationObserver(_ parent, TKey key, ISubject<TElement> writer) { _parent = parent; _key = key; _writer = writer; } public override void OnNext(TDuration value) { OnCompleted(); } public override void OnError(Exception error) { _parent.Error(error); Dispose(); } public override void OnCompleted() { if (_key == null) { ISubject<TElement> subject = null; lock (_parent._nullGate) { subject = _parent._null; _parent._null = null; } subject.OnCompleted(); } else if (_parent._map.Remove(_key)) { _writer.OnCompleted(); } _parent._groupDisposable.Remove(this); } } private readonly object _gate = new object(); private readonly object _nullGate = new object(); private readonly CompositeDisposable _groupDisposable = new CompositeDisposable(); private readonly RefCountDisposable _refCountDisposable; private readonly Map<TKey, ISubject<TElement>> _map; private readonly Func<TSource, TKey> _keySelector; private readonly Func<TSource, TElement> _elementSelector; private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector; private ISubject<TElement> _null; public _(GroupByUntil<TSource, TKey, TElement, TDuration> parent, IObserver<IGroupedObservable<TKey, TElement>> observer) : base(observer) { _refCountDisposable = new RefCountDisposable(_groupDisposable); _map = new Map<TKey, ISubject<TElement>>(parent._capacity, parent._comparer); _keySelector = parent._keySelector; _elementSelector = parent._elementSelector; _durationSelector = parent._durationSelector; } public override void Run(IObservable<TSource> source) { _groupDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this)); SetUpstream(_refCountDisposable); } private ISubject<TElement> NewSubject() { Subject<TElement> subject = new Subject<TElement>(); return Subject.Create<TElement>((IObserver<TElement>)new AsyncLockObserver<TElement>(subject, new AsyncLock()), (IObservable<TElement>)subject); } public override void OnNext(TSource value) { TKey val = default(TKey); try { val = _keySelector(value); } catch (Exception exception) { Error(exception); return; } bool added = false; ISubject<TElement> subject = null; try { if (val == null) { lock (_nullGate) { if (_null == null) { _null = NewSubject(); added = true; } subject = _null; } } else subject = _map.GetOrAdd(val, NewSubject, out added); } catch (Exception exception2) { Error(exception2); return; } if (added) { GroupedObservable<TKey, TElement> value2 = new GroupedObservable<TKey, TElement>(val, subject, _refCountDisposable); IObservable<TDuration> observable = null; GroupedObservable<TKey, TElement> arg = new GroupedObservable<TKey, TElement>(val, subject); try { observable = _durationSelector(arg); } catch (Exception exception3) { Error(exception3); return; } lock (_gate) { ForwardOnNext(value2); } DurationObserver durationObserver = new DurationObserver(this, val, subject); _groupDisposable.Add(durationObserver); durationObserver.SetResource(ObservableExtensions.SubscribeSafe<TDuration>(observable, (IObserver<TDuration>)durationObserver)); } TElement val2 = default(TElement); try { val2 = _elementSelector(value); } catch (Exception exception4) { Error(exception4); return; } subject.OnNext(val2); } public override void OnError(Exception error) { Error(error); } public override void OnCompleted() { ISubject<TElement> subject = null; lock (_nullGate) { subject = _null; } subject?.OnCompleted(); foreach (ISubject<TElement> value in _map.Values) { value.OnCompleted(); } lock (_gate) { ForwardOnCompleted(); } } private void Error(Exception exception) { ISubject<TElement> subject = null; lock (_nullGate) { subject = _null; } subject?.OnError(exception); foreach (ISubject<TElement> value in _map.Values) { value.OnError(exception); } lock (_gate) { ForwardOnError(exception); } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, TKey> _keySelector; private readonly Func<TSource, TElement> _elementSelector; private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector; private readonly int? _capacity; private readonly IEqualityComparer<TKey> _comparer; public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer) { _source = source; _keySelector = keySelector; _elementSelector = elementSelector; _durationSelector = durationSelector; _capacity = capacity; _comparer = comparer; } protected override _ CreateSink(IObserver<IGroupedObservable<TKey, TElement>> observer) { return new _(this, observer); } protected override void Run(_ sink) { sink.Run(_source); } } }