EnumerationWithBatches
using Relativity.DataTransfer.Nodes;
using Relativity.Transfer.Enumeration.Interfaces;
using Relativity.Transfer.Enumeration.Operators;
using System;
using System.Collections.Generic;
using System.Threading;
namespace Relativity.Transfer.Enumeration
{
internal class EnumerationWithBatches : IEnumeratorProvider
{
private readonly IEnumeratorProvider _enumeratorProvider;
private readonly IBatchAggregator _batchCreationStrategy;
private readonly ITransferLog _log;
public EnumerationWithBatches(IEnumeratorProvider enumeratorProvider, IBatchAggregator batchCreationStrategy, ITransferLog log)
{
_enumeratorProvider = enumeratorProvider;
_batchCreationStrategy = batchCreationStrategy;
_log = log;
}
public IEnumerator<INode> GetEnumerator(IEnumerable<INode> nodes, CancellationToken token)
{
return _enumeratorProvider.GetEnumerator(nodes, token).OnNext(delegate(INode node) {
_batchCreationStrategy.AddToBatch(node);
}, token).OnFinish(delegate {
_batchCreationStrategy.CreateBatch();
_log.LogInformation("Batches were created and serialized.", Array.Empty<object>());
}, token);
}
}
}