TransferJobService
using Relativity.Transfer.Job;
using Relativity.Transfer.Paths;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
namespace Relativity.Transfer
{
public class TransferJobService : ITransferJobService
{
private static readonly object SyncRoot = new object();
private readonly ConcurrentBag<ITransferIssue> issues = new ConcurrentBag<ITransferIssue>();
private readonly RemainingTimeCalculator remainingTimeCalculator;
private readonly IStatisticsCalculator statisticsCalculator;
private readonly ITransferRequest request;
private readonly ITransferLog log;
private readonly ClientConfiguration configuration;
private readonly ITransferPathRepository transferPathRepository;
private readonly IJobTransferPathRepository jobTransferPathRepository;
private bool? fileNotFoundErrorsDisabled;
private DateTime? logStatisticsTimestamp;
private DateTime? transferStatisticsTimestamp;
private CancellationToken token;
public IPathStatusChecker RetryablePathChecker { get; set; }
public IReadOnlyCollection<ITransferIssue> Issues => issues;
public long JobTransferPathCount => jobTransferPathRepository.Count;
public long RequestTransferPathCount => transferPathRepository.Count;
public TransferStatistics Statistics { get; }
private bool FileNotFoundErrorsDisabled {
get {
if (!fileNotFoundErrorsDisabled.HasValue)
fileNotFoundErrorsDisabled = configuration.FileNotFoundErrorsDisabled;
return fileNotFoundErrorsDisabled.Value;
}
}
public TransferJobService(ITransferRequest request, ClientConfiguration configuration, ITransferLog log, CancellationToken token)
: this(request, configuration, new PrimaryKeyGenerator(), null, log, token)
{
}
public TransferJobService(ITransferRequest request, ClientConfiguration configuration, ITransferPathKeyGenerator primaryKeyGenerator, ITransferPathKeyGenerator alternateKeyGenerator, ITransferLog log, CancellationToken token)
{
if (request == null)
throw new ArgumentNullException("request");
if (configuration == null)
throw new ArgumentNullException("configuration");
if (log == null)
throw new ArgumentNullException("log");
this.configuration = configuration;
this.request = request;
remainingTimeCalculator = new RemainingTimeCalculator(GlobalSettings.Instance);
if (this.configuration.SavingMemoryMode) {
MemorySavingJobTransferPathRepository memorySavingJobTransferPathRepository = new MemorySavingJobTransferPathRepository(new JobTransferPathRepository(primaryKeyGenerator, alternateKeyGenerator));
statisticsCalculator = new StatisticsCalculator(new MemorySavingStatisticsTotalsCalculator(memorySavingJobTransferPathRepository));
jobTransferPathRepository = memorySavingJobTransferPathRepository;
transferPathRepository = new MemorySavingTransferPathRepository(new TransferPathRepository());
} else {
jobTransferPathRepository = new JobTransferPathRepository(primaryKeyGenerator, alternateKeyGenerator);
statisticsCalculator = new StatisticsCalculator(new StatisticsTotalsCalculator(jobTransferPathRepository));
transferPathRepository = new TransferPathRepository();
}
Statistics = new TransferStatistics(request);
RetryablePathChecker = new RetryablePathStatusChecker(this.configuration);
this.log = log;
this.token = token;
}
[SuppressMessage("Microsoft.Maintainability", "CA1502:AvoidExcessiveComplexity", Justification = "The complexity is acceptable.")]
public virtual ITransferStatistics CalculateStatistics()
{
TransferStatistics transferStatistics = statisticsCalculator.Calculate(Statistics);
transferStatistics.RemainingTime = remainingTimeCalculator.Calculate(transferStatistics);
Statistics.AverageTransferRateMbps = transferStatistics.AverageTransferRateMbps;
Statistics.TransferRateMbps = transferStatistics.TransferRateMbps;
Statistics.RemainingTime = transferStatistics.RemainingTime;
Statistics.Progress = transferStatistics.Progress;
Statistics.TotalFilesNotFound = transferStatistics.TotalFilesNotFound;
Statistics.TotalFilePermissionsErrors = transferStatistics.TotalFilePermissionsErrors;
Statistics.TotalBadPathErrors = transferStatistics.TotalBadPathErrors;
Statistics.TotalFailedFiles = transferStatistics.TotalFailedFiles;
Statistics.TotalFatalErrors = transferStatistics.TotalFatalErrors;
Statistics.TotalSkippedFiles = transferStatistics.TotalSkippedFiles;
return transferStatistics;
}
public virtual TransferStatus CalculateTransferStatus()
{
if (token.IsCancellationRequested)
return TransferStatus.Canceled;
ITransferStatistics transferStatistics = CalculateStatistics();
if (transferStatistics.TotalFatalErrors > 0 || (!configuration.PermissionErrorsRetry && transferStatistics.TotalFilePermissionsErrors > 0) || (!configuration.BadPathErrorsRetry && transferStatistics.TotalBadPathErrors > 0))
return TransferStatus.Fatal;
if (transferStatistics.JobError || (!FileNotFoundErrorsDisabled && transferStatistics.TotalFilesNotFound > 0) || transferStatistics.TotalFailedFiles > 0 || (!configuration.TransferEmptyDirectories && transferStatistics.TotalTransferredFiles == 0) || (configuration.PermissionErrorsRetry && transferStatistics.TotalFilePermissionsErrors > 0) || (configuration.BadPathErrorsRetry && transferStatistics.TotalBadPathErrors > 0))
return TransferStatus.Failed;
return TransferStatus.Successful;
}
public void Clear()
{
jobTransferPathRepository.Clear();
transferPathRepository.Clear();
statisticsCalculator.Clear();
remainingTimeCalculator.Clear();
logStatisticsTimestamp = null;
transferStatisticsTimestamp = null;
}
public void ClearTransferIssueList()
{
lock (SyncRoot) {
while (!issues.IsEmpty) {
issues.TryTake(out ITransferIssue _);
}
}
}
public virtual JobTransferPath GetJobTransferPath(TransferPath path)
{
return jobTransferPathRepository.SelectByKey(path);
}
public virtual JobTransferPath GetJobTransferPath(string path)
{
return jobTransferPathRepository.SelectByKey(path);
}
public virtual IReadOnlyCollection<JobTransferPath> GetJobTransferPaths()
{
return (IReadOnlyCollection<JobTransferPath>)jobTransferPathRepository.SelectAll();
}
public virtual IReadOnlyCollection<TransferPath> GetRetryableRequestTransferPaths()
{
if (token.IsCancellationRequested)
return new ReadOnlyCollection<TransferPath>(new List<TransferPath>());
List<TransferPath> list = (from path in transferPathRepository.SelectAll()
let jobTransferPath = jobTransferPathRepository.SelectByKey(path)
where RetryablePathChecker.IsRetrayable(jobTransferPath.Status)
select path).ToList();
ITransferStatistics transferStatistics = statisticsCalculator.Calculate(Statistics);
log.LogTransferInformation(request, $"""{transferStatistics.TotalRequestFiles}""{transferStatistics.TotalRequestBytes}", Array.Empty<object>());
log.LogTransferInformation(request, $"""{transferStatistics.TotalTransferredFiles}""{transferStatistics.TotalTransferredBytes}", Array.Empty<object>());
long[] source = (from x in list
select x.Bytes).ToArray();
log.LogTransferInformation(request, $"""{source.Count()}""{source.Sum()}", Array.Empty<object>());
return list;
}
public virtual TransferPath GetRequestTransferPath(string path)
{
return transferPathRepository.SelectByKey(path);
}
public virtual IReadOnlyCollection<TransferPath> GetRequestTransferPaths()
{
if (configuration.SavingMemoryMode)
throw new InvalidOperationException("Requested transfer paths are not available in saving memory mode.");
return (IReadOnlyCollection<TransferPath>)transferPathRepository.SelectAll();
}
public void IncrementRetryAttempt()
{
Statistics.RetryAttempt++;
}
public bool IsCompleted(TransferPathStatus status)
{
return RetryablePathChecker.IsCompleted(status);
}
public bool IsRetrayable(TransferPathStatus status)
{
return RetryablePathChecker.IsRetrayable(status);
}
public virtual void PublishStatistics(bool force)
{
if (!token.IsCancellationRequested) {
lock (SyncRoot) {
DateTime now = DateTime.Now;
bool flag = ShouldPublishLogStatistics(force, now);
bool flag2 = ShouldPublishTransferStatistics(force, now);
if (flag || flag2) {
ITransferStatistics statistics = CalculateStatistics();
DateTime now2 = DateTime.Now;
if (flag2) {
request.Context?.PublishTransferStatistics(request, statistics);
transferStatisticsTimestamp = now2;
}
if (flag) {
log.LogTransferStatistics(request, statistics);
logStatisticsTimestamp = now2;
}
}
}
}
}
public void RegisterIssue(ITransferIssue issue)
{
if (issue == null)
throw new ArgumentNullException("issue");
issues.Add(issue);
}
public virtual void RemoveRetryableTransferPaths(IEnumerable<TransferPath> paths)
{
if (!token.IsCancellationRequested) {
if (paths == null)
throw new ArgumentNullException("paths");
foreach (TransferPath path in paths) {
if (token.IsCancellationRequested)
break;
transferPathRepository.Delete(path);
}
}
}
public void ResetRetryAttempt()
{
Statistics.RetryAttempt = 0;
}
public virtual void Save(JobTransferPath path)
{
if (!token.IsCancellationRequested) {
if (configuration.SavingMemoryMode)
transferPathRepository.Delete(path);
jobTransferPathRepository.Upsert(path);
}
}
public virtual JobTransferPath Save(TransferPath path)
{
JobTransferPath jobTransferPath = new JobTransferPath {
Path = path,
RetryCount = 0,
Status = TransferPathStatus.None,
Index = 0
};
if (token.IsCancellationRequested)
return jobTransferPath;
transferPathRepository.Upsert(path);
jobTransferPathRepository.Upsert(jobTransferPath);
return jobTransferPath;
}
public virtual JobTransferPath Save(string path)
{
int order = Convert.ToInt32(jobTransferPathRepository.Count + 1);
TransferPath path2 = new TransferPath {
Direction = request.Direction,
Order = order,
SourcePath = path,
TargetFileName = null,
TargetPath = request.TargetPath
};
transferPathRepository.Upsert(path2);
JobTransferPath jobTransferPath = new JobTransferPath {
Path = path2,
RetryCount = 0,
Status = TransferPathStatus.Started,
Index = 0
};
if (token.IsCancellationRequested)
return jobTransferPath;
jobTransferPathRepository.Upsert(jobTransferPath);
return jobTransferPath;
}
public virtual void SaveStatistics()
{
if (!token.IsCancellationRequested) {
TransferStatistics transferStatistics = new TransferStatistics(Statistics);
statisticsCalculator.Save(transferStatistics);
Statistics.Clear();
Statistics.Id = Guid.NewGuid();
Statistics.Order = statisticsCalculator.Count;
Statistics.RetryAttempt = transferStatistics.RetryAttempt;
logStatisticsTimestamp = null;
transferStatisticsTimestamp = null;
remainingTimeCalculator.Clear();
}
}
private bool ShouldPublishLogStatistics(bool force, DateTime now)
{
if (!GlobalSettings.Instance.StatisticsLogEnabled)
return false;
if (force || !logStatisticsTimestamp.HasValue)
return true;
return (now - logStatisticsTimestamp.Value).TotalSeconds >= GlobalSettings.Instance.StatisticsLogIntervalSeconds;
}
private bool ShouldPublishTransferStatistics(bool force, DateTime now)
{
if (request.Context == null || !request.Context.StatisticsEnabled)
return false;
if (force || !transferStatisticsTimestamp.HasValue)
return true;
double totalSeconds = (now - transferStatisticsTimestamp.Value).TotalSeconds;
if (request.Context != null)
return totalSeconds >= request.Context.StatisticsRateSeconds;
return false;
}
}
}