<PackageReference Include="Relativity.Server.Transfer.SDK" Version="7.7.0" />

ParallelEnumerator

using Relativity.DataTransfer.Nodes; using Relativity.Transfer.Enumeration.Interfaces; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Runtime.ExceptionServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Relativity.Transfer.Enumeration { internal class ParallelEnumerator : IEnumeratorProvider { private readonly INodeEnumerableFactory _nodeEnumerableFactory; private readonly ITransferLog _logger; private Exception _exception; public ParallelEnumerator(INodeEnumerableFactory nodeEnumerableFactory, ITransferLog logger) { _nodeEnumerableFactory = nodeEnumerableFactory; _logger = logger; } public IEnumerator<INode> GetEnumerator(IEnumerable<INode> nodes, CancellationToken token) { IEnumerable<INode> nodes2 = AssertAndLog(nodes); BlockingCollection<INode> producedNodes = CreateProducers(nodes2, token); return GetEnumeratorForConsumer(producedNodes); } private BlockingCollection<INode> CreateProducers(IEnumerable<INode> nodes, CancellationToken token) { ParallelOptions parallelOptions = ParallelHelper.CreateDefaultOptions(token); _logger.LogInformation("Enumeration is going to be run on max degree of parallelism of {MaxDegreeOfParallelism}.", parallelOptions.MaxDegreeOfParallelism); BlockingCollection<INode> producedNodes = new BlockingCollection<INode>(); ConcurrentQueue<INode> nodesToEnumerate = new ConcurrentQueue<INode>(); Enqueue(nodes, nodesToEnumerate); Task.Run(delegate { _logger.LogInformation("Producer task is running - enumeration is started.", Array.Empty<object>()); try { while (nodesToEnumerate.Any()) { Parallel.ForEach((IEnumerable<INode>)nodesToEnumerate, parallelOptions, (Action<INode, ParallelLoopState, long>)delegate { EnumerateNode(producedNodes, nodesToEnumerate, token); }); } } catch (OperationCanceledException exception) { _logger.LogError(exception, "Enumeration was cancelled by the client via cancellation token.", Array.Empty<object>()); } catch (Exception exception2) { _exception = exception2; _logger.LogError(exception2, "Exception occured during enumeration.", Array.Empty<object>()); throw; } finally { producedNodes.CompleteAdding(); _logger.LogInformation("Producer task finished - enumeration is completed. Nodes left to enumerate = {NodesToEnumerate}.", nodesToEnumerate.Count); } }); return producedNodes; } private IEnumerator<INode> GetEnumeratorForConsumer(BlockingCollection<INode> producedNodes) { foreach (INode item in producedNodes.GetConsumingEnumerable()) { ThrowCaughtExceptionToConsumer(); yield return item; } ThrowCaughtExceptionToConsumer(); } private void ThrowCaughtExceptionToConsumer() { if (_exception != null) { AggregateException ex = _exception as AggregateException; if (ex != null) ExceptionDispatchInfo.Capture(UnwrapAggregateException(ex)).Throw(); else ExceptionDispatchInfo.Capture(_exception).Throw(); } } private static Exception UnwrapAggregateException(AggregateException aggregateException) { ReadOnlyCollection<Exception> innerExceptions = aggregateException.Flatten().InnerExceptions; switch (innerExceptions.Count) { case 0: return aggregateException; case 1: return innerExceptions[0]; default: { StringBuilder stringBuilder = new StringBuilder(); foreach (string item in (from x in innerExceptions select x.Message).Distinct()) { stringBuilder.AppendLine(item); } return new EnumerationException(stringBuilder.ToString(), aggregateException); } } } private void EnumerateNode(BlockingCollection<INode> producedNodes, ConcurrentQueue<INode> nodesToEnumerate, CancellationToken token) { if (!nodesToEnumerate.TryDequeue(out INode result)) _logger.LogWarning("Failed to acquire node to enumerate. Nodes left to enumerate = {NodesToEnumerate}", nodesToEnumerate.Count); else { foreach (INode item in _nodeEnumerableFactory.Create(result, token)) { if (item is IFile) PushResult(producedNodes, token, item); else if (((IExtendedDirectory)item).IsEmpty) { PushResult(producedNodes, token, item); } else { PushToEnumerate(nodesToEnumerate, item); } } } } private static void PushToEnumerate(ConcurrentQueue<INode> nodesToEnumerate, INode enumeratedNode) { nodesToEnumerate.Enqueue(enumeratedNode as IDirectory); } private static void PushResult(BlockingCollection<INode> producedNodes, CancellationToken token, INode enumeratedNode) { producedNodes.Add(enumeratedNode, token); } private static void Enqueue(IEnumerable<INode> nodes, ConcurrentQueue<INode> nodesToEnumerate) { foreach (INode node in nodes) { nodesToEnumerate.Enqueue(node); } } private IEnumerable<INode> AssertAndLog(IEnumerable<INode> nodes) { Assertions.RequireNonNull(nodes, "nodes"); string text = string.Join(", ", from x in nodes select x.AbsolutePath); _logger.LogInformation("Enumeration of {NodesCount} elements requested: {", nodes.Count(), text, LogRedaction.OnPositions(1)); return nodes; } } }