PathSearchStorage
using Polly;
using Relativity.Transfer.Dto;
using Relativity.Transfer.Resources;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Relativity.Transfer
{
internal class PathSearchStorage : IPathSearchStorage
{
private const string FileNameTemplate = "transfer-batch-{0}.json";
private static readonly object SyncRoot = new object();
private readonly CancellationToken cancellationToken;
private readonly ITransferLog transferLog;
private readonly ConcurrentBag<TransferPath> paths = new ConcurrentBag<TransferPath>();
private readonly ConcurrentBag<SerializedBatch> serializedBatches = new ConcurrentBag<SerializedBatch>();
private readonly PathEnumeratorContext context;
private readonly long maxBytesPerBatch;
private readonly long maxFilesPerBatch;
private long totalBatchedBytes;
private int totalBatchedDirectories;
private long totalBatchedFiles;
private long totalBytes;
private long totalFiles;
private int totalDirectories;
private string batchDirectory;
private bool batchEnabled;
private DateTime startTime;
private readonly IFileSerializer serializer;
public string BatchDirectory {
get {
lock (SyncRoot) {
return batchDirectory;
}
}
set {
lock (SyncRoot) {
batchDirectory = value;
batchEnabled = !string.IsNullOrEmpty(value);
}
}
}
public bool LocalPaths { get; set; }
public ConcurrentBag<ErrorPath> PathErrors { get; } = new ConcurrentBag<ErrorPath>();
public ConcurrentBag<TransferPath> Paths => paths;
public ConcurrentBag<SerializedBatch> SerializedPaths => serializedBatches;
public DateTime StartTime {
get {
lock (SyncRoot) {
return startTime;
}
}
set {
lock (SyncRoot) {
startTime = value;
}
}
}
public long TotalBatchedBytes => Interlocked.CompareExchange(ref totalBatchedBytes, 0, 0);
public int TotalBatchedDirectories => Interlocked.CompareExchange(ref totalBatchedDirectories, 0, 0);
public long TotalBatchedFiles => Interlocked.CompareExchange(ref totalBatchedFiles, 0, 0);
public long TotalBytes => Interlocked.CompareExchange(ref totalBytes, 0, 0);
public int TotalDirectories => Interlocked.CompareExchange(ref totalDirectories, 0, 0);
public long TotalFiles => Interlocked.CompareExchange(ref totalFiles, 0, 0);
public PathSearchStorage(PathEnumeratorContext context, IFileSerializer serializer, ITransferLog log)
: this(context, serializer, log, CancellationToken.None)
{
}
public PathSearchStorage(PathEnumeratorContext context, IFileSerializer serializer, ITransferLog log, CancellationToken token)
{
if (context == null)
throw new ArgumentNullException("context");
if (serializer == null)
throw new ArgumentNullException("context");
if (log == null)
throw new ArgumentNullException("log");
this.context = context;
this.serializer = serializer;
transferLog = log;
cancellationToken = token;
BatchDirectory = null;
LocalPaths = true;
totalBatchedBytes = 0;
totalBatchedDirectories = 0;
totalBatchedFiles = 0;
totalBytes = 0;
totalDirectories = 0;
totalFiles = 0;
maxBytesPerBatch = GlobalSettings.Instance.MaxBytesPerBatch;
maxFilesPerBatch = GlobalSettings.Instance.MaxFilesPerBatch;
}
public static string GetBatchFileName(int batchNumber)
{
return string.Format(CultureInfo.InvariantCulture, "transfer-batch-{0}.json", batchNumber);
}
public void Add(TransferPath path)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
if (string.IsNullOrEmpty(path.SourcePath))
throw new ArgumentException(CoreStrings.SourcePathArgumentExceptionMessage, "path");
lock (SyncRoot) {
if (ShouldDumpPathsToFile())
Save(false);
path.Order = Convert.ToInt32(TotalFiles) + 1;
paths.Add(path);
if (path.PathAttributes.HasFlag(TransferPathAttributes.File)) {
totalBatchedBytes += path.Bytes;
totalBatchedFiles++;
totalBytes += path.Bytes;
totalFiles++;
} else if (path.PathAttributes.HasFlag(TransferPathAttributes.Directory)) {
totalBatchedDirectories++;
totalDirectories++;
}
}
}
public void Add(TransferPath path, PathValidationResult validationResult)
{
if (validationResult.Status == PathValidationStatus.Ok)
Add(path);
else
PathErrors.Add(validationResult.CreatesErrorPath());
}
public void IncrementDirectoryCount()
{
lock (SyncRoot) {
totalBatchedDirectories++;
totalDirectories++;
}
}
public void Save(bool completed)
{
lock (SyncRoot) {
int count = serializedBatches.Count;
if (TotalBatchedFiles != 0 || TotalBatchedDirectories != 0 || count <= 0) {
List<TransferPathDto> list = (from x in paths.ToList()
orderby x.Order
select x).Select(TransferPathDto.ConvertToDto).ToList();
int num = count + 1;
string batchFileName = GetBatchFileName(num);
string file = Path.Combine(BatchDirectory, batchFileName);
SerializedPathsBatchDto serializedPathsBatchDto = new SerializedPathsBatchDto {
BatchNumber = num,
LocalPaths = LocalPaths,
MinSourcePathId = ((list.Count > 0) ? list.Min((TransferPathDto x) => x.SourcePathId.GetValueOrDefault()) : 0),
MaxSourcePathId = ((list.Count > 0) ? list.Max((TransferPathDto x) => x.SourcePathId.GetValueOrDefault()) : 0),
TotalBatchCount = num,
TotalByteCount = TotalBatchedBytes,
TotalDirectoryCount = TotalBatchedDirectories,
TotalFileCount = TotalBatchedFiles
};
serializedPathsBatchDto.Paths.AddRange(list);
SerializedBatch serializedBatch = new SerializedBatch {
BatchNumber = serializedPathsBatchDto.BatchNumber,
File = file,
LocalPaths = serializedPathsBatchDto.LocalPaths,
MinSourcePathId = serializedPathsBatchDto.MinSourcePathId,
MaxSourcePathId = serializedPathsBatchDto.MaxSourcePathId,
TotalBatchCount = num,
TotalByteCount = serializedPathsBatchDto.TotalByteCount,
TotalDirectoryCount = serializedPathsBatchDto.TotalDirectoryCount,
TotalFileCount = serializedPathsBatchDto.TotalFileCount
};
serializedBatches.Add(serializedBatch);
serializer.Serialize(serializedPathsBatchDto, file);
transferLog.LogInformation("Serialized the '{BatchFile}' batch file. Batch number: {BatchNumber}, Total batch count: {TotalBatchCount}, Total file count: {TotalFileCount:n0}, Total byte count: {TotalByteCount:n0}, Total directory count: {TotalDirectoryCount:n0}, Min source path id: {MinSourcePathId}, Max source path id: {MaxSourcePathId}", batchFileName, serializedPathsBatchDto.BatchNumber, serializedPathsBatchDto.TotalBatchCount, serializedPathsBatchDto.TotalFileCount, serializedPathsBatchDto.TotalByteCount, serializedPathsBatchDto.TotalDirectoryCount, serializedPathsBatchDto.MinSourcePathId, serializedPathsBatchDto.MaxSourcePathId);
if (completed || context.SyncBatchTotals)
SynchBatchTotals(num);
SerializedBatch batch = serializedBatch.DeepCopy();
while (!paths.IsEmpty) {
paths.TryTake(out TransferPath _);
}
Interlocked.Exchange(ref totalBatchedBytes, 0);
Interlocked.Exchange(ref totalBatchedDirectories, 0);
Interlocked.Exchange(ref totalBatchedFiles, 0);
context.PublishSerializedPaths(batch);
}
}
}
private void SynchBatchTotals(int nextBatchCount)
{
if (nextBatchCount != 1) {
Stopwatch stopwatch = Stopwatch.StartNew();
IRetryStrategy retryStrategy = RetryStrategies.CreateFixedTimeStrategy(3);
Parallel.ForEach(serializedBatches, new ParallelOptions {
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = Environment.ProcessorCount
}, () => 0, delegate(SerializedBatch existingBatch, ParallelLoopState loopState, int localCount) {
if (cancellationToken.IsCancellationRequested)
loopState.Stop();
existingBatch.TotalBatchCount = nextBatchCount;
RetrySyntax.WaitAndRetry(Policy.Handle<Exception>(), 3, retryStrategy.Calculation, (Action<Exception, TimeSpan>)delegate(Exception exception, TimeSpan timespan) {
transferLog.LogError(exception, "Failed to update and re-serialize the '{File}' batch file.", existingBatch.File);
}).Execute((Action<CancellationToken>)delegate {
SerializedPathsBatchDto serializedPathsBatchDto = serializer.Deserialize<SerializedPathsBatchDto>(existingBatch.File);
serializedPathsBatchDto.TotalBatchCount = nextBatchCount;
serializer.Serialize(serializedPathsBatchDto, existingBatch.File);
}, cancellationToken);
return 0;
}, delegate {
});
stopwatch.Stop();
transferLog.LogInformation("Successfully re-serialized all batch files. Elapsed: " + stopwatch.Elapsed.ToString(), Array.Empty<object>());
}
}
private bool ShouldDumpPathsToFile()
{
if (batchEnabled) {
if (!IsMaxBytesPerBatchExceeded())
return IsTotalPathsPerBatchExceeded();
return true;
}
return false;
}
private bool IsTotalPathsPerBatchExceeded()
{
return totalBatchedFiles + totalBatchedDirectories >= maxFilesPerBatch;
}
private bool IsMaxBytesPerBatchExceeded()
{
return totalBatchedBytes >= maxBytesPerBatch;
}
}
}