<PackageReference Include="Relativity.Transfer.Client" Version="6.3.7" />

TransferJobService

using Relativity.Transfer.Job; using Relativity.Transfer.Paths; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Threading; namespace Relativity.Transfer { public class TransferJobService : ITransferJobService { private static readonly object SyncRoot = new object(); private readonly ConcurrentBag<ITransferIssue> issues = new ConcurrentBag<ITransferIssue>(); private readonly RemainingTimeCalculator remainingTimeCalculator; private readonly IStatisticsCalculator statisticsCalculator; private readonly ITransferRequest request; private readonly ITransferLog log; private readonly ClientConfiguration configuration; private readonly ITransferPathRepository transferPathRepository; private readonly IJobTransferPathRepository jobTransferPathRepository; private bool? fileNotFoundErrorsDisabled; private DateTime? logStatisticsTimestamp; private DateTime? transferStatisticsTimestamp; private CancellationToken token; public IPathStatusChecker RetryablePathChecker { get; set; } public IReadOnlyCollection<ITransferIssue> Issues => issues; public long JobTransferPathCount => jobTransferPathRepository.Count; public long RequestTransferPathCount => transferPathRepository.Count; public TransferStatistics Statistics { get; } private bool FileNotFoundErrorsDisabled { get { if (!fileNotFoundErrorsDisabled.HasValue) fileNotFoundErrorsDisabled = configuration.FileNotFoundErrorsDisabled; return fileNotFoundErrorsDisabled.Value; } } public TransferJobService(ITransferRequest request, ClientConfiguration configuration, ITransferLog log, CancellationToken token) : this(request, configuration, new PrimaryKeyGenerator(), null, log, token) { } public TransferJobService(ITransferRequest request, ClientConfiguration configuration, ITransferPathKeyGenerator primaryKeyGenerator, ITransferPathKeyGenerator alternateKeyGenerator, ITransferLog log, CancellationToken token) { if (request == null) throw new ArgumentNullException("request"); if (configuration == null) throw new ArgumentNullException("configuration"); if (log == null) throw new ArgumentNullException("log"); this.configuration = configuration; this.request = request; remainingTimeCalculator = new RemainingTimeCalculator(GlobalSettings.Instance); if (this.configuration.SavingMemoryMode) { MemorySavingJobTransferPathRepository memorySavingJobTransferPathRepository = new MemorySavingJobTransferPathRepository(new JobTransferPathRepository(primaryKeyGenerator, alternateKeyGenerator)); statisticsCalculator = new StatisticsCalculator(new MemorySavingStatisticsTotalsCalculator(memorySavingJobTransferPathRepository)); jobTransferPathRepository = memorySavingJobTransferPathRepository; transferPathRepository = new MemorySavingTransferPathRepository(new TransferPathRepository()); } else { jobTransferPathRepository = new JobTransferPathRepository(primaryKeyGenerator, alternateKeyGenerator); statisticsCalculator = new StatisticsCalculator(new StatisticsTotalsCalculator(jobTransferPathRepository)); transferPathRepository = new TransferPathRepository(); } Statistics = new TransferStatistics(request); RetryablePathChecker = new RetryablePathStatusChecker(this.configuration); this.log = log; this.token = token; } [SuppressMessage("Microsoft.Maintainability", "CA1502:AvoidExcessiveComplexity", Justification = "The complexity is acceptable.")] public virtual ITransferStatistics CalculateStatistics() { TransferStatistics transferStatistics = statisticsCalculator.Calculate(Statistics); transferStatistics.RemainingTime = remainingTimeCalculator.Calculate(transferStatistics); Statistics.AverageTransferRateMbps = transferStatistics.AverageTransferRateMbps; Statistics.TransferRateMbps = transferStatistics.TransferRateMbps; Statistics.RemainingTime = transferStatistics.RemainingTime; Statistics.Progress = transferStatistics.Progress; Statistics.TotalFilesNotFound = transferStatistics.TotalFilesNotFound; Statistics.TotalFilePermissionsErrors = transferStatistics.TotalFilePermissionsErrors; Statistics.TotalBadPathErrors = transferStatistics.TotalBadPathErrors; Statistics.TotalFailedFiles = transferStatistics.TotalFailedFiles; Statistics.TotalFatalErrors = transferStatistics.TotalFatalErrors; Statistics.TotalSkippedFiles = transferStatistics.TotalSkippedFiles; return transferStatistics; } public virtual TransferStatus CalculateTransferStatus() { if (token.IsCancellationRequested) return TransferStatus.Canceled; ITransferStatistics transferStatistics = CalculateStatistics(); if (transferStatistics.TotalFatalErrors > 0 || (!configuration.PermissionErrorsRetry && transferStatistics.TotalFilePermissionsErrors > 0) || (!configuration.BadPathErrorsRetry && transferStatistics.TotalBadPathErrors > 0)) return TransferStatus.Fatal; if (transferStatistics.JobError || (!FileNotFoundErrorsDisabled && transferStatistics.TotalFilesNotFound > 0) || transferStatistics.TotalFailedFiles > 0 || (!configuration.TransferEmptyDirectories && transferStatistics.TotalTransferredFiles == 0) || (configuration.PermissionErrorsRetry && transferStatistics.TotalFilePermissionsErrors > 0) || (configuration.BadPathErrorsRetry && transferStatistics.TotalBadPathErrors > 0)) return TransferStatus.Failed; return TransferStatus.Successful; } public void Clear() { jobTransferPathRepository.Clear(); transferPathRepository.Clear(); statisticsCalculator.Clear(); remainingTimeCalculator.Clear(); logStatisticsTimestamp = null; transferStatisticsTimestamp = null; } public void ClearTransferIssueList() { lock (SyncRoot) { while (!issues.IsEmpty) { issues.TryTake(out ITransferIssue _); } } } public virtual JobTransferPath GetJobTransferPath(TransferPath path) { return jobTransferPathRepository.SelectByKey(path); } public virtual JobTransferPath GetJobTransferPath(string path) { return jobTransferPathRepository.SelectByKey(path); } public virtual IReadOnlyCollection<JobTransferPath> GetJobTransferPaths() { return (IReadOnlyCollection<JobTransferPath>)jobTransferPathRepository.SelectAll(); } public virtual IReadOnlyCollection<TransferPath> GetRetryableRequestTransferPaths() { if (token.IsCancellationRequested) return new ReadOnlyCollection<TransferPath>(new List<TransferPath>()); List<TransferPath> list = (from path in transferPathRepository.SelectAll() let jobTransferPath = jobTransferPathRepository.SelectByKey(path) where RetryablePathChecker.IsRetrayable(jobTransferPath.Status) select path).ToList(); ITransferStatistics transferStatistics = statisticsCalculator.Calculate(Statistics); log.LogTransferInformation(request, $"""{transferStatistics.TotalRequestFiles}""{transferStatistics.TotalRequestBytes}", Array.Empty<object>()); log.LogTransferInformation(request, $"""{transferStatistics.TotalTransferredFiles}""{transferStatistics.TotalTransferredBytes}", Array.Empty<object>()); long[] source = (from x in list select x.Bytes).ToArray(); log.LogTransferInformation(request, $"""{source.Count()}""{source.Sum()}", Array.Empty<object>()); return list; } public virtual TransferPath GetRequestTransferPath(string path) { return transferPathRepository.SelectByKey(path); } public virtual IReadOnlyCollection<TransferPath> GetRequestTransferPaths() { if (configuration.SavingMemoryMode) throw new InvalidOperationException("Requested transfer paths are not available in saving memory mode."); return (IReadOnlyCollection<TransferPath>)transferPathRepository.SelectAll(); } public void IncrementRetryAttempt() { Statistics.RetryAttempt++; } public bool IsCompleted(TransferPathStatus status) { return RetryablePathChecker.IsCompleted(status); } public bool IsRetrayable(TransferPathStatus status) { return RetryablePathChecker.IsRetrayable(status); } public virtual void PublishStatistics(bool force) { if (!token.IsCancellationRequested) { lock (SyncRoot) { DateTime now = DateTime.Now; bool flag = ShouldPublishLogStatistics(force, now); bool flag2 = ShouldPublishTransferStatistics(force, now); if (flag || flag2) { ITransferStatistics statistics = CalculateStatistics(); DateTime now2 = DateTime.Now; if (flag2) { request.Context?.PublishTransferStatistics(request, statistics); transferStatisticsTimestamp = now2; } if (flag) { log.LogTransferStatistics(request, statistics); logStatisticsTimestamp = now2; } } } } } public void RegisterIssue(ITransferIssue issue) { if (issue == null) throw new ArgumentNullException("issue"); issues.Add(issue); } public virtual void RemoveRetryableTransferPaths(IEnumerable<TransferPath> paths) { if (!token.IsCancellationRequested) { if (paths == null) throw new ArgumentNullException("paths"); foreach (TransferPath path in paths) { if (token.IsCancellationRequested) break; transferPathRepository.Delete(path); } } } public void ResetRetryAttempt() { Statistics.RetryAttempt = 0; } public virtual void Save(JobTransferPath path) { if (!token.IsCancellationRequested) { if (configuration.SavingMemoryMode) transferPathRepository.Delete(path); jobTransferPathRepository.Upsert(path); } } public virtual JobTransferPath Save(TransferPath path) { JobTransferPath jobTransferPath = new JobTransferPath { Path = path, RetryCount = 0, Status = TransferPathStatus.None, Index = 0 }; if (token.IsCancellationRequested) return jobTransferPath; transferPathRepository.Upsert(path); jobTransferPathRepository.Upsert(jobTransferPath); return jobTransferPath; } public virtual JobTransferPath Save(string path) { int order = Convert.ToInt32(jobTransferPathRepository.Count + 1); TransferPath path2 = new TransferPath { Direction = request.Direction, Order = order, SourcePath = path, TargetFileName = null, TargetPath = request.TargetPath }; transferPathRepository.Upsert(path2); JobTransferPath jobTransferPath = new JobTransferPath { Path = path2, RetryCount = 0, Status = TransferPathStatus.Started, Index = 0 }; if (token.IsCancellationRequested) return jobTransferPath; jobTransferPathRepository.Upsert(jobTransferPath); return jobTransferPath; } public virtual void SaveStatistics() { if (!token.IsCancellationRequested) { TransferStatistics transferStatistics = new TransferStatistics(Statistics); statisticsCalculator.Save(transferStatistics); Statistics.Clear(); Statistics.Id = Guid.NewGuid(); Statistics.Order = statisticsCalculator.Count; Statistics.RetryAttempt = transferStatistics.RetryAttempt; logStatisticsTimestamp = null; transferStatisticsTimestamp = null; remainingTimeCalculator.Clear(); } } private bool ShouldPublishLogStatistics(bool force, DateTime now) { if (!GlobalSettings.Instance.StatisticsLogEnabled) return false; if (force || !logStatisticsTimestamp.HasValue) return true; return (now - logStatisticsTimestamp.Value).TotalSeconds >= GlobalSettings.Instance.StatisticsLogIntervalSeconds; } private bool ShouldPublishTransferStatistics(bool force, DateTime now) { if (request.Context == null || !request.Context.StatisticsEnabled) return false; if (force || !transferStatisticsTimestamp.HasValue) return true; double totalSeconds = (now - transferStatisticsTimestamp.Value).TotalSeconds; if (request.Context != null) return totalSeconds >= request.Context.StatisticsRateSeconds; return false; } } }