ParallelEnumerator
using Relativity.DataTransfer.Nodes;
using Relativity.Transfer.Enumeration.Interfaces;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Relativity.Transfer.Enumeration
{
internal class ParallelEnumerator : IEnumeratorProvider
{
private readonly INodeEnumerableFactory _nodeEnumerableFactory;
private readonly ITransferLog _logger;
private Exception _exception;
public ParallelEnumerator(INodeEnumerableFactory nodeEnumerableFactory, ITransferLog logger)
{
_nodeEnumerableFactory = nodeEnumerableFactory;
_logger = logger;
}
public IEnumerator<INode> GetEnumerator(IEnumerable<INode> nodes, CancellationToken token)
{
IEnumerable<INode> nodes2 = AssertAndLog(nodes);
BlockingCollection<INode> producedNodes = CreateProducers(nodes2, token);
return GetEnumeratorForConsumer(producedNodes);
}
private BlockingCollection<INode> CreateProducers(IEnumerable<INode> nodes, CancellationToken token)
{
ParallelOptions parallelOptions = ParallelHelper.CreateDefaultOptions(token);
_logger.LogInformation("Enumeration is going to be run on max degree of parallelism of {MaxDegreeOfParallelism}.", parallelOptions.MaxDegreeOfParallelism);
BlockingCollection<INode> producedNodes = new BlockingCollection<INode>();
ConcurrentQueue<INode> nodesToEnumerate = new ConcurrentQueue<INode>();
Enqueue(nodes, nodesToEnumerate);
Task.Run(delegate {
_logger.LogInformation("Producer task is running - enumeration is started.", Array.Empty<object>());
try {
while (nodesToEnumerate.Any()) {
Parallel.ForEach((IEnumerable<INode>)nodesToEnumerate, parallelOptions, (Action<INode, ParallelLoopState, long>)delegate {
EnumerateNode(producedNodes, nodesToEnumerate, token);
});
}
} catch (OperationCanceledException exception) {
_logger.LogError(exception, "Enumeration was cancelled by the client via cancellation token.", Array.Empty<object>());
} catch (Exception exception2) {
_exception = exception2;
_logger.LogError(exception2, "Exception occured during enumeration.", Array.Empty<object>());
throw;
} finally {
producedNodes.CompleteAdding();
_logger.LogInformation("Producer task finished - enumeration is completed. Nodes left to enumerate = {NodesToEnumerate}.", nodesToEnumerate.Count);
}
});
return producedNodes;
}
private IEnumerator<INode> GetEnumeratorForConsumer(BlockingCollection<INode> producedNodes)
{
foreach (INode item in producedNodes.GetConsumingEnumerable()) {
ThrowCaughtExceptionToConsumer();
yield return item;
}
ThrowCaughtExceptionToConsumer();
}
private void ThrowCaughtExceptionToConsumer()
{
if (_exception != null) {
AggregateException ex = _exception as AggregateException;
if (ex != null)
ExceptionDispatchInfo.Capture(UnwrapAggregateException(ex)).Throw();
else
ExceptionDispatchInfo.Capture(_exception).Throw();
}
}
private static Exception UnwrapAggregateException(AggregateException aggregateException)
{
ReadOnlyCollection<Exception> innerExceptions = aggregateException.Flatten().InnerExceptions;
switch (innerExceptions.Count) {
case 0:
return aggregateException;
case 1:
return innerExceptions[0];
default: {
StringBuilder stringBuilder = new StringBuilder();
foreach (string item in (from x in innerExceptions
select x.Message).Distinct()) {
stringBuilder.AppendLine(item);
}
return new EnumerationException(stringBuilder.ToString(), aggregateException);
}
}
}
private void EnumerateNode(BlockingCollection<INode> producedNodes, ConcurrentQueue<INode> nodesToEnumerate, CancellationToken token)
{
if (!nodesToEnumerate.TryDequeue(out INode result))
_logger.LogWarning("Failed to acquire node to enumerate. Nodes left to enumerate = {NodesToEnumerate}", nodesToEnumerate.Count);
else {
foreach (INode item in _nodeEnumerableFactory.Create(result, token)) {
if (item is IFile)
PushResult(producedNodes, token, item);
else if (((IExtendedDirectory)item).IsEmpty) {
PushResult(producedNodes, token, item);
} else {
PushToEnumerate(nodesToEnumerate, item);
}
}
}
}
private static void PushToEnumerate(ConcurrentQueue<INode> nodesToEnumerate, INode enumeratedNode)
{
nodesToEnumerate.Enqueue(enumeratedNode as IDirectory);
}
private static void PushResult(BlockingCollection<INode> producedNodes, CancellationToken token, INode enumeratedNode)
{
producedNodes.Add(enumeratedNode, token);
}
private static void Enqueue(IEnumerable<INode> nodes, ConcurrentQueue<INode> nodesToEnumerate)
{
foreach (INode node in nodes) {
nodesToEnumerate.Enqueue(node);
}
}
private IEnumerable<INode> AssertAndLog(IEnumerable<INode> nodes)
{
Assertions.RequireNonNull(nodes, "nodes");
string text = string.Join(", ", from x in nodes
select x.AbsolutePath);
_logger.LogInformation("Enumeration of {NodesCount} elements requested: {", nodes.Count(), text, LogRedaction.OnPositions(1));
return nodes;
}
}
}