<PackageReference Include="Relativity.Server.Transfer.SDK" Version="7.7.0" />

PathSearchStorage

using Polly; using Relativity.Transfer.Dto; using Relativity.Transfer.Resources; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Relativity.Transfer { internal class PathSearchStorage : IPathSearchStorage { private const string FileNameTemplate = "transfer-batch-{0}.json"; private static readonly object SyncRoot = new object(); private readonly CancellationToken cancellationToken; private readonly ITransferLog transferLog; private readonly ConcurrentBag<TransferPath> paths = new ConcurrentBag<TransferPath>(); private readonly ConcurrentBag<SerializedBatch> serializedBatches = new ConcurrentBag<SerializedBatch>(); private readonly PathEnumeratorContext context; private readonly long maxBytesPerBatch; private readonly long maxFilesPerBatch; private long totalBatchedBytes; private int totalBatchedDirectories; private long totalBatchedFiles; private long totalBytes; private long totalFiles; private int totalDirectories; private string batchDirectory; private bool batchEnabled; private DateTime startTime; private readonly IFileSerializer serializer; public string BatchDirectory { get { lock (SyncRoot) { return batchDirectory; } } set { lock (SyncRoot) { batchDirectory = value; batchEnabled = !string.IsNullOrEmpty(value); } } } public bool LocalPaths { get; set; } public ConcurrentBag<ErrorPath> PathErrors { get; } = new ConcurrentBag<ErrorPath>(); public ConcurrentBag<TransferPath> Paths => paths; public ConcurrentBag<SerializedBatch> SerializedPaths => serializedBatches; public DateTime StartTime { get { lock (SyncRoot) { return startTime; } } set { lock (SyncRoot) { startTime = value; } } } public long TotalBatchedBytes => Interlocked.CompareExchange(ref totalBatchedBytes, 0, 0); public int TotalBatchedDirectories => Interlocked.CompareExchange(ref totalBatchedDirectories, 0, 0); public long TotalBatchedFiles => Interlocked.CompareExchange(ref totalBatchedFiles, 0, 0); public long TotalBytes => Interlocked.CompareExchange(ref totalBytes, 0, 0); public int TotalDirectories => Interlocked.CompareExchange(ref totalDirectories, 0, 0); public long TotalFiles => Interlocked.CompareExchange(ref totalFiles, 0, 0); public PathSearchStorage(PathEnumeratorContext context, IFileSerializer serializer, ITransferLog log) : this(context, serializer, log, CancellationToken.None) { } public PathSearchStorage(PathEnumeratorContext context, IFileSerializer serializer, ITransferLog log, CancellationToken token) { if (context == null) throw new ArgumentNullException("context"); if (serializer == null) throw new ArgumentNullException("context"); if (log == null) throw new ArgumentNullException("log"); this.context = context; this.serializer = serializer; transferLog = log; cancellationToken = token; BatchDirectory = null; LocalPaths = true; totalBatchedBytes = 0; totalBatchedDirectories = 0; totalBatchedFiles = 0; totalBytes = 0; totalDirectories = 0; totalFiles = 0; maxBytesPerBatch = GlobalSettings.Instance.MaxBytesPerBatch; maxFilesPerBatch = GlobalSettings.Instance.MaxFilesPerBatch; } public static string GetBatchFileName(int batchNumber) { return string.Format(CultureInfo.InvariantCulture, "transfer-batch-{0}.json", batchNumber); } public void Add(TransferPath path) { if (path == (TransferPath)null) throw new ArgumentNullException("path"); if (string.IsNullOrEmpty(path.SourcePath)) throw new ArgumentException(CoreStrings.SourcePathArgumentExceptionMessage, "path"); lock (SyncRoot) { if (ShouldDumpPathsToFile()) Save(false); path.Order = Convert.ToInt32(TotalFiles) + 1; paths.Add(path); if (path.PathAttributes.HasFlag(TransferPathAttributes.File)) { totalBatchedBytes += path.Bytes; totalBatchedFiles++; totalBytes += path.Bytes; totalFiles++; } else if (path.PathAttributes.HasFlag(TransferPathAttributes.Directory)) { totalBatchedDirectories++; totalDirectories++; } } } public void Add(TransferPath path, PathValidationResult validationResult) { if (validationResult.Status == PathValidationStatus.Ok) Add(path); else PathErrors.Add(validationResult.CreatesErrorPath()); } public void IncrementDirectoryCount() { lock (SyncRoot) { totalBatchedDirectories++; totalDirectories++; } } public void Save(bool completed) { lock (SyncRoot) { int count = serializedBatches.Count; if (TotalBatchedFiles != 0 || TotalBatchedDirectories != 0 || count <= 0) { List<TransferPathDto> list = (from x in paths.ToList() orderby x.Order select x).Select(TransferPathDto.ConvertToDto).ToList(); int num = count + 1; string batchFileName = GetBatchFileName(num); string file = Path.Combine(BatchDirectory, batchFileName); SerializedPathsBatchDto serializedPathsBatchDto = new SerializedPathsBatchDto { BatchNumber = num, LocalPaths = LocalPaths, MinSourcePathId = ((list.Count > 0) ? list.Min((TransferPathDto x) => x.SourcePathId.GetValueOrDefault()) : 0), MaxSourcePathId = ((list.Count > 0) ? list.Max((TransferPathDto x) => x.SourcePathId.GetValueOrDefault()) : 0), TotalBatchCount = num, TotalByteCount = TotalBatchedBytes, TotalDirectoryCount = TotalBatchedDirectories, TotalFileCount = TotalBatchedFiles }; serializedPathsBatchDto.Paths.AddRange(list); SerializedBatch serializedBatch = new SerializedBatch { BatchNumber = serializedPathsBatchDto.BatchNumber, File = file, LocalPaths = serializedPathsBatchDto.LocalPaths, MinSourcePathId = serializedPathsBatchDto.MinSourcePathId, MaxSourcePathId = serializedPathsBatchDto.MaxSourcePathId, TotalBatchCount = num, TotalByteCount = serializedPathsBatchDto.TotalByteCount, TotalDirectoryCount = serializedPathsBatchDto.TotalDirectoryCount, TotalFileCount = serializedPathsBatchDto.TotalFileCount }; serializedBatches.Add(serializedBatch); serializer.Serialize(serializedPathsBatchDto, file); transferLog.LogInformation("Serialized the '{BatchFile}' batch file. Batch number: {BatchNumber}, Total batch count: {TotalBatchCount}, Total file count: {TotalFileCount:n0}, Total byte count: {TotalByteCount:n0}, Total directory count: {TotalDirectoryCount:n0}, Min source path id: {MinSourcePathId}, Max source path id: {MaxSourcePathId}", batchFileName, serializedPathsBatchDto.BatchNumber, serializedPathsBatchDto.TotalBatchCount, serializedPathsBatchDto.TotalFileCount, serializedPathsBatchDto.TotalByteCount, serializedPathsBatchDto.TotalDirectoryCount, serializedPathsBatchDto.MinSourcePathId, serializedPathsBatchDto.MaxSourcePathId); if (completed || context.SyncBatchTotals) SynchBatchTotals(num); SerializedBatch batch = serializedBatch.DeepCopy(); while (!paths.IsEmpty) { paths.TryTake(out TransferPath _); } Interlocked.Exchange(ref totalBatchedBytes, 0); Interlocked.Exchange(ref totalBatchedDirectories, 0); Interlocked.Exchange(ref totalBatchedFiles, 0); context.PublishSerializedPaths(batch); } } } private void SynchBatchTotals(int nextBatchCount) { if (nextBatchCount != 1) { Stopwatch stopwatch = Stopwatch.StartNew(); IRetryStrategy retryStrategy = RetryStrategies.CreateFixedTimeStrategy(3); Parallel.ForEach(serializedBatches, new ParallelOptions { CancellationToken = cancellationToken, MaxDegreeOfParallelism = Environment.ProcessorCount }, () => 0, delegate(SerializedBatch existingBatch, ParallelLoopState loopState, int localCount) { if (cancellationToken.IsCancellationRequested) loopState.Stop(); existingBatch.TotalBatchCount = nextBatchCount; RetrySyntax.WaitAndRetry(Policy.Handle<Exception>(), 3, retryStrategy.Calculation, (Action<Exception, TimeSpan>)delegate(Exception exception, TimeSpan timespan) { transferLog.LogError(exception, "Failed to update and re-serialize the '{File}' batch file.", existingBatch.File); }).Execute((Action<CancellationToken>)delegate { SerializedPathsBatchDto serializedPathsBatchDto = serializer.Deserialize<SerializedPathsBatchDto>(existingBatch.File); serializedPathsBatchDto.TotalBatchCount = nextBatchCount; serializer.Serialize(serializedPathsBatchDto, existingBatch.File); }, cancellationToken); return 0; }, delegate { }); stopwatch.Stop(); transferLog.LogInformation("Successfully re-serialized all batch files. Elapsed: " + stopwatch.Elapsed.ToString(), Array.Empty<object>()); } } private bool ShouldDumpPathsToFile() { if (batchEnabled) { if (!IsMaxBytesPerBatchExceeded()) return IsTotalPathsPerBatchExceeded(); return true; } return false; } private bool IsTotalPathsPerBatchExceeded() { return totalBatchedFiles + totalBatchedDirectories >= maxFilesPerBatch; } private bool IsMaxBytesPerBatchExceeded() { return totalBatchedBytes >= maxBytesPerBatch; } } }