BatchAggregator
using Relativity.DataTransfer.Nodes;
using Relativity.Transfer.Enumeration.Interfaces;
using System;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Security;
namespace Relativity.Transfer.Enumeration.Batches
{
internal class BatchAggregator : IBatchAggregator
{
private readonly IEnumerationHandler<SerializedBatch> _batchCreatedHandler;
private readonly ITransferLog _log;
private readonly BatchSerializer _batchSerializer;
private readonly IBatchLimits _limits;
private int _serializedBatchesNumber;
private BatchNodesBuffer _buffer;
private long _totalFilesNumber;
public BatchAggregator(BatchingParameters parameters, IEnumerationHandler<SerializedBatch> batchCreatedHandler, ITransferLog log)
: this(parameters, batchCreatedHandler, log, new JsonFileSerializer(), GlobalSettings.Instance.MaxFilesPerBatch, GlobalSettings.Instance.MaxBytesPerBatch)
{
}
public BatchAggregator(BatchingParameters parameters, IEnumerationHandler<SerializedBatch> batchCreatedHandler, ITransferLog log, IFileSerializer fileSerializer)
: this(parameters, batchCreatedHandler, log, fileSerializer, GlobalSettings.Instance.MaxFilesPerBatch, GlobalSettings.Instance.MaxBytesPerBatch)
{
}
public BatchAggregator(BatchingParameters parameters, IEnumerationHandler<SerializedBatch> batchCreatedHandler, ITransferLog log, IFileSerializer fileSerializer, int maxPathsNumber, long maxBytesSize)
{
_batchSerializer = new BatchSerializer(parameters, fileSerializer);
_batchCreatedHandler = batchCreatedHandler;
_log = log;
_limits = new BatchLimits(maxPathsNumber, maxBytesSize);
_buffer = new BatchNodesBuffer();
}
public void AddToBatch(INode node)
{
if (!_limits.FitsInBatch(node, _buffer))
CreateBatch();
_buffer.Add(node);
}
public void CreateBatch()
{
if (_buffer.Count() != 0) {
int num = _serializedBatchesNumber + 1;
try {
_log.LogInformation("Serializing a batch #{BatchNumber}: {FilesNumber} files, {DirectoriesNumber}, {ByteSize} bytes", num, _buffer.FilesInCurrentBatch, _buffer.DirectoriesInCurrentBatch, _buffer.CurrentBatchBytesSize);
if (_buffer.CurrentBatchBytesSize < 0)
LogNegativeSizeBatch(num);
SerializedBatch serializedBatch = _batchSerializer.SerializeBatch(_buffer, num, _totalFilesNumber);
_log.LogInformation("Batch serialized: {@BatchInfo}", serializedBatch);
_batchCreatedHandler.Report(serializedBatch);
_totalFilesNumber += _buffer.Count();
_buffer = new BatchNodesBuffer();
_serializedBatchesNumber++;
} catch (Exception ex) {
if (ex is IOException || ex is UnauthorizedAccessException)
_log.LogError(ex, "Error during creation of batch #{BatchNumber}: Exception: {ExceptionType} Message: {ExceptionMessage}", num, ex.GetType(), ex.Message, LogRedaction.OnPositions(2));
else
_log.LogError(ex, "Error during creation of batch #{BatchNumber}: {ExceptionMessage}", num, ex.Message);
throw;
}
}
}
private void LogNegativeSizeBatch(int nextBatchNumber)
{
_log.LogWarning("Serializing a batch with an invalid files size! #{BatchNumber}: {FilesNumber} files, {DirectoriesNumber}, {ByteSize} bytes", nextBatchNumber, _buffer.FilesInCurrentBatch, _buffer.DirectoriesInCurrentBatch, _buffer.CurrentBatchBytesSize);
INode firstNode = _buffer.GetNodes().First();
string rootDrive = string.Empty;
string accessRights = string.Empty;
long? size = null;
string extension = string.Empty;
RetrieveFilePathInfo(firstNode, ref rootDrive, ref extension, ref accessRights, ref size);
_log.LogWarning("The first file from a batch with a negative byte size: Root [{RootDrive}], Extension [{FileExtension}], AccessControl [{FileAccessControl}], Size [{FileSize}B]", rootDrive, extension, accessRights, size);
}
private void RetrieveFilePathInfo(INode firstNode, ref string rootDrive, ref string extension, ref string accessRights, ref long? size)
{
if (firstNode.IsFile()) {
string absolutePath = firstNode.AbsolutePath;
rootDrive = Path.GetPathRoot(absolutePath);
extension = Path.GetExtension(absolutePath);
try {
accessRights = File.GetAccessControl(absolutePath).AccessRightType.ToString();
size = new FileInfo(absolutePath).Length;
} catch (Exception ex) when (ex is UnauthorizedAccessException || ex is NotSupportedException || ex is IOException || ex is SecurityException || ex is SEHException || ex is SystemException) {
_log.LogError(ex, "Error when trying to retrieve the access rights or length of the file: {ErrorMessage}", ex.Message);
}
}
}
}
}