<PackageReference Include="System.Reactive" Version="7.0.0-preview.1" />
Collect<TSource, TResult>
using System.
Diagnostics.
CodeAnalysis;
using System.
Runtime.
CompilerServices;
namespace System.
Reactive.
Linq.
ObservableImpl
{
[
NullableContext(
1)]
[
Nullable(
new byte[] {
0,
1,
1
})]
internal sealed class Collect<[
Nullable(
2)]
TSource, [
Nullable(
2)]
TResult> :
System.
Reactive.
Linq.
ObservableImpl.
PushToPullAdapter<
TSource,
TResult>
{
[
Nullable(
new byte[] {
0,
1,
1
})]
private sealed class _ :
System.
Reactive.
Linq.
ObservableImpl.
PushToPullSink<
TSource,
TResult>
{
private readonly object _gate;
private readonly Func<
TResult,
TSource,
TResult>
_merge;
private readonly Func<
TResult,
TResult>
_getNewCollector;
private TResult _collector;
[
Nullable(
2)]
private Exception _error;
private bool _hasCompleted;
private bool _done;
public _(
Func<
TResult,
TSource,
TResult>
merge,
Func<
TResult,
TResult>
getNewCollector,
TResult collector)
{
_gate =
new object();
_merge =
merge;
_getNewCollector =
getNewCollector;
_collector =
collector;
}
public override void OnNext(
TSource value)
{
lock (
_gate) {
try {
_collector =
_merge(
_collector,
value);
}
catch (
Exception error) {
Exception ex =
_error =
error;
Dispose();
}
}
}
public override void OnError(
Exception error)
{
Dispose();
lock (
_gate) {
_error =
error;
}
}
public override void OnCompleted()
{
Dispose();
lock (
_gate) {
_hasCompleted =
true;
}
}
public override bool TryMoveNext([
MaybeNullWhen(
false)]
out TResult current)
{
lock (
_gate) {
Exception error =
_error;
if (
error !=
null) {
current =
default(
TResult);
_collector =
default(
TResult);
System.
Reactive.
ExceptionHelpers.
Throw(
error);
}
else if (
_hasCompleted) {
if (
_done) {
current =
default(
TResult);
_collector =
default(
TResult);
return false;
}
current =
_collector;
_done =
true;
}
else {
current =
_collector;
try {
_collector =
_getNewCollector(
current);
}
catch {
Dispose();
throw;
}
}
return true;
}
}
}
private readonly Func<
TResult>
_getInitialCollector;
private readonly Func<
TResult,
TSource,
TResult>
_merge;
private readonly Func<
TResult,
TResult>
_getNewCollector;
public Collect(
IObservable<
TSource>
source,
Func<
TResult>
getInitialCollector,
Func<
TResult,
TSource,
TResult>
merge,
Func<
TResult,
TResult>
getNewCollector)
:
base(
source)
{
_getInitialCollector =
getInitialCollector;
_merge =
merge;
_getNewCollector =
getNewCollector;
}
protected override System.
Reactive.
Linq.
ObservableImpl.
PushToPullSink<
TSource,
TResult>
Run()
{
return new _(
_merge,
_getNewCollector,
_getInitialCollector());
}
}
}