<PackageReference Include="System.Reactive" Version="6.0.2" />

GroupByUntil<TSource, TKey, TElement, TDuration>

sealed class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>, _<TSource, TKey, TElement, TDuration>>
using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Runtime.CompilerServices; namespace System.Reactive.Linq.ObservableImpl { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1, 1, 1, 1, 1, 1 })] internal sealed class GroupByUntil<[System.Runtime.CompilerServices.Nullable(2)] TSource, [System.Runtime.CompilerServices.Nullable(2)] TKey, [System.Runtime.CompilerServices.Nullable(2)] TElement, [System.Runtime.CompilerServices.Nullable(2)] TDuration> : Producer<IGroupedObservable<TKey, TElement>, GroupByUntil<TSource, TKey, TElement, TDuration>._> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1, 1, 1, 1 })] internal sealed class _ : Sink<TSource, IGroupedObservable<TKey, TElement>> { [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] private sealed class DurationObserver : SafeObserver<TDuration> { [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 0, 0 })] private readonly _ _parent; private readonly TKey _key; private readonly ISubject<TElement> _writer; public DurationObserver([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 0, 0 })] _ parent, TKey key, ISubject<TElement> writer) { _parent = parent; _key = key; _writer = writer; } public override void OnNext(TDuration value) { OnCompleted(); } public override void OnError(Exception error) { _parent.Error(error); Dispose(); } public override void OnCompleted() { if (_key == null) { ISubject<TElement> null = default(ISubject<TElement>); lock (_parent._nullGate) { null = _parent._null; _parent._null = null; } null?.OnCompleted(); } else if (_parent._map.Remove(_key)) { _writer.OnCompleted(); } _parent._groupDisposable.Remove(this); } } private readonly object _gate = new object(); private readonly object _nullGate = new object(); private readonly CompositeDisposable _groupDisposable = new CompositeDisposable(); private readonly RefCountDisposable _refCountDisposable; private readonly Map<TKey, ISubject<TElement>> _map; private readonly Func<TSource, TKey> _keySelector; private readonly Func<TSource, TElement> _elementSelector; private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] private ISubject<TElement> _null; public _(GroupByUntil<TSource, TKey, TElement, TDuration> parent, IObserver<IGroupedObservable<TKey, TElement>> observer) : base(observer) { _refCountDisposable = new RefCountDisposable(_groupDisposable); _map = new Map<TKey, ISubject<TElement>>(parent._capacity, parent._comparer); _keySelector = parent._keySelector; _elementSelector = parent._elementSelector; _durationSelector = parent._durationSelector; } public override void Run(IObservable<TSource> source) { _groupDisposable.Add(ObservableExtensions.SubscribeSafe<TSource>(source, (IObserver<TSource>)this)); SetUpstream(_refCountDisposable); } private ISubject<TElement> NewSubject() { Subject<TElement> subject = new Subject<TElement>(); return Subject.Create<TElement>((IObserver<TElement>)new AsyncLockObserver<TElement>(subject, new AsyncLock()), (IObservable<TElement>)subject); } public override void OnNext(TSource value) { TKey val; try { val = _keySelector(value); } catch (Exception exception) { Error(exception); return; } bool added = false; ISubject<TElement> subject = default(ISubject<TElement>); try { if (val == null) { lock (_nullGate) { if (_null == null) { _null = NewSubject(); added = true; } subject = _null; } } else subject = _map.GetOrAdd(val, NewSubject, out added); } catch (Exception exception2) { Error(exception2); return; } if (added) { GroupedObservable<TKey, TElement> value2 = new GroupedObservable<TKey, TElement>(val, subject, _refCountDisposable); GroupedObservable<TKey, TElement> arg = new GroupedObservable<TKey, TElement>(val, subject); IObservable<TDuration> source; try { source = _durationSelector(arg); } catch (Exception exception3) { Error(exception3); return; } lock (_gate) { ForwardOnNext(value2); } DurationObserver durationObserver = new DurationObserver(this, val, subject); _groupDisposable.Add(durationObserver); durationObserver.SetResource(ObservableExtensions.SubscribeSafe<TDuration>(source, (IObserver<TDuration>)durationObserver)); } TElement value3; try { value3 = _elementSelector(value); } catch (Exception exception4) { Error(exception4); return; } subject.OnNext(value3); } public override void OnError(Exception error) { Error(error); } public override void OnCompleted() { ISubject<TElement> null = default(ISubject<TElement>); lock (_nullGate) { null = _null; } null?.OnCompleted(); foreach (ISubject<TElement> value in _map.Values) { value.OnCompleted(); } lock (_gate) { ForwardOnCompleted(); } } private void Error(Exception exception) { ISubject<TElement> null = default(ISubject<TElement>); lock (_nullGate) { null = _null; } null?.OnError(exception); foreach (ISubject<TElement> value in _map.Values) { value.OnError(exception); } lock (_gate) { ForwardOnError(exception); } } } private readonly IObservable<TSource> _source; private readonly Func<TSource, TKey> _keySelector; private readonly Func<TSource, TElement> _elementSelector; private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector; private readonly int? _capacity; private readonly IEqualityComparer<TKey> _comparer; public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer) { _source = source; _keySelector = keySelector; _elementSelector = elementSelector; _durationSelector = durationSelector; _capacity = capacity; _comparer = comparer; } [return: System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 0, 0 })] protected override _ CreateSink(IObserver<IGroupedObservable<TKey, TElement>> observer) { return new _(this, observer); } protected override void Run([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0, 0, 0, 0 })] _ sink) { sink.Run(_source); } } }