<PackageReference Include="Relativity.Server.Transfer.SDK" Version="7.7.0" />

BatchAggregator

using Relativity.DataTransfer.Nodes; using Relativity.Transfer.Enumeration.Interfaces; using System; using System.IO; using System.Linq; using System.Runtime.InteropServices; using System.Security; namespace Relativity.Transfer.Enumeration.Batches { internal class BatchAggregator : IBatchAggregator { private readonly IEnumerationHandler<SerializedBatch> _batchCreatedHandler; private readonly ITransferLog _log; private readonly BatchSerializer _batchSerializer; private readonly IBatchLimits _limits; private int _serializedBatchesNumber; private BatchNodesBuffer _buffer; private long _totalFilesNumber; public BatchAggregator(BatchingParameters parameters, IEnumerationHandler<SerializedBatch> batchCreatedHandler, ITransferLog log) : this(parameters, batchCreatedHandler, log, new JsonFileSerializer(), GlobalSettings.Instance.MaxFilesPerBatch, GlobalSettings.Instance.MaxBytesPerBatch) { } public BatchAggregator(BatchingParameters parameters, IEnumerationHandler<SerializedBatch> batchCreatedHandler, ITransferLog log, IFileSerializer fileSerializer) : this(parameters, batchCreatedHandler, log, fileSerializer, GlobalSettings.Instance.MaxFilesPerBatch, GlobalSettings.Instance.MaxBytesPerBatch) { } public BatchAggregator(BatchingParameters parameters, IEnumerationHandler<SerializedBatch> batchCreatedHandler, ITransferLog log, IFileSerializer fileSerializer, int maxPathsNumber, long maxBytesSize) { _batchSerializer = new BatchSerializer(parameters, fileSerializer); _batchCreatedHandler = batchCreatedHandler; _log = log; _limits = new BatchLimits(maxPathsNumber, maxBytesSize); _buffer = new BatchNodesBuffer(); } public void AddToBatch(INode node) { if (!_limits.FitsInBatch(node, _buffer)) CreateBatch(); _buffer.Add(node); } public void CreateBatch() { if (_buffer.Count() != 0) { int num = _serializedBatchesNumber + 1; try { _log.LogInformation("Serializing a batch #{BatchNumber}: {FilesNumber} files, {DirectoriesNumber}, {ByteSize} bytes", num, _buffer.FilesInCurrentBatch, _buffer.DirectoriesInCurrentBatch, _buffer.CurrentBatchBytesSize); if (_buffer.CurrentBatchBytesSize < 0) LogNegativeSizeBatch(num); SerializedBatch serializedBatch = _batchSerializer.SerializeBatch(_buffer, num, _totalFilesNumber); _log.LogInformation("Batch serialized: {@BatchInfo}", serializedBatch); _batchCreatedHandler.Report(serializedBatch); _totalFilesNumber += _buffer.Count(); _buffer = new BatchNodesBuffer(); _serializedBatchesNumber++; } catch (Exception ex) { if (ex is IOException || ex is UnauthorizedAccessException) _log.LogError(ex, "Error during creation of batch #{BatchNumber}: Exception: {ExceptionType} Message: {ExceptionMessage}", num, ex.GetType(), ex.Message, LogRedaction.OnPositions(2)); else _log.LogError(ex, "Error during creation of batch #{BatchNumber}: {ExceptionMessage}", num, ex.Message); throw; } } } private void LogNegativeSizeBatch(int nextBatchNumber) { _log.LogWarning("Serializing a batch with an invalid files size! #{BatchNumber}: {FilesNumber} files, {DirectoriesNumber}, {ByteSize} bytes", nextBatchNumber, _buffer.FilesInCurrentBatch, _buffer.DirectoriesInCurrentBatch, _buffer.CurrentBatchBytesSize); INode firstNode = _buffer.GetNodes().First(); string rootDrive = string.Empty; string accessRights = string.Empty; long? size = null; string extension = string.Empty; RetrieveFilePathInfo(firstNode, ref rootDrive, ref extension, ref accessRights, ref size); _log.LogWarning("The first file from a batch with a negative byte size: Root [{RootDrive}], Extension [{FileExtension}], AccessControl [{FileAccessControl}], Size [{FileSize}B]", rootDrive, extension, accessRights, size); } private void RetrieveFilePathInfo(INode firstNode, ref string rootDrive, ref string extension, ref string accessRights, ref long? size) { if (firstNode.IsFile()) { string absolutePath = firstNode.AbsolutePath; rootDrive = Path.GetPathRoot(absolutePath); extension = Path.GetExtension(absolutePath); try { accessRights = File.GetAccessControl(absolutePath).AccessRightType.ToString(); size = new FileInfo(absolutePath).Length; } catch (Exception ex) when (ex is UnauthorizedAccessException || ex is NotSupportedException || ex is IOException || ex is SecurityException || ex is SEHException || ex is SystemException) { _log.LogError(ex, "Error when trying to retrieve the access rights or length of the file: {ErrorMessage}", ex.Message); } } } } }