TransferJobBase
using Polly;
using Relativity.Transfer.Dto;
using Relativity.Transfer.Issues;
using Relativity.Transfer.Resources;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Relativity.Transfer
{
public abstract class TransferJobBase : ITransferJob, IDisposable
{
private static readonly object SyncRoot = new object();
private static readonly TimeSpan DefaultJobStartTime = TimeSpan.FromMinutes(5);
private readonly int maxJobRetryAttempts;
private readonly bool validateSourcePaths;
private ITransferRequest currentRequest;
private TransferJobStatus currentStatus;
private ClientConfiguration configuration;
private int lastChangedMinDataRate;
private int lastChangedTargetDataRate;
private readonly List<long> totalRequestBytesPerAttempt = new List<long>();
private readonly List<long> totalRequestFilesPerAttempt = new List<long>();
private bool disposed;
public int ChangedMinDataRateMbps {
get {
lock (SyncRoot) {
return lastChangedMinDataRate;
}
}
private set {
lock (SyncRoot) {
lastChangedMinDataRate = value;
}
}
}
public int ChangedTargetDataRateMbps {
get {
lock (SyncRoot) {
return lastChangedTargetDataRate;
}
}
private set {
lock (SyncRoot) {
lastChangedTargetDataRate = value;
}
}
}
public ClientConfiguration Configuration {
get {
lock (SyncRoot) {
return configuration;
}
}
private set {
lock (SyncRoot) {
configuration = value;
}
}
}
public virtual bool IsDataRateChangeSupported => false;
public ITransferJobService JobService { get; set; }
public ITransferRequest Request {
get {
lock (SyncRoot) {
return currentRequest;
}
}
set {
lock (SyncRoot) {
currentRequest = value;
}
}
}
public TimeSpan RetryWaitPeriod => (Request.RetryStrategy ?? RetryStrategies.CreateExpBackoffStrategy()).Calculation(JobService.Statistics.RetryAttempt);
public TransferJobStatus Status {
get {
lock (SyncRoot) {
return currentStatus;
}
}
protected set {
lock (SyncRoot) {
currentStatus = value;
}
}
}
internal bool IsCancelationFromDisposal { get; set; }
internal bool BypassCancelCheck { get; set; }
protected ITransferLog Log { get; }
protected IRelativityServiceFactory ServiceFactory { get; }
internal ITargetRateValidator TargetRateValidator { get; }
internal ITargetRateThrottler TargetRateThrottler { get; }
protected IFileSystemService FileSystemService { get; }
private IRemotePathResolver SourcePathResolver { get; set; }
private IRemotePathResolver TargetPathResolver { get; set; }
private IPathValidationProvider PathValidationProvider { get; }
protected TransferJobBase(ITransferLog log, ITransferRequest request, ITransferJobService jobService, ClientConfiguration configuration, IRelativityServiceFactory serviceFactory)
: this(ServiceObjectLocator.GetService<IFileSystemService>(), log, request, jobService, configuration, serviceFactory)
{
}
protected TransferJobBase(IFileSystemService fileSystemService, ITransferLog log, ITransferRequest request, ITransferJobService jobService, ClientConfiguration configuration, IRelativityServiceFactory serviceFactory)
{
if (fileSystemService == null)
throw new ArgumentNullException("fileSystemService");
if (log == null)
throw new ArgumentNullException("log");
if (request == null)
throw new ArgumentNullException("request");
if (configuration == null)
throw new ArgumentNullException("configuration");
if (jobService == null)
throw new ArgumentNullException("jobService");
if (serviceFactory == null)
throw new ArgumentNullException("serviceFactory");
FileSystemService = fileSystemService;
Configuration = configuration;
maxJobRetryAttempts = configuration.MaxJobRetryAttempts;
validateSourcePaths = configuration.ValidateSourcePaths;
lastChangedMinDataRate = 0;
lastChangedTargetDataRate = 0;
Log = log;
Request = request;
Status = TransferJobStatus.NotStarted;
JobService = jobService;
ServiceFactory = serviceFactory;
TargetRateValidator = serviceFactory.CreateTargetRateValidator();
TargetRateThrottler = serviceFactory.CreateTargetRateThrottler();
PathValidationProvider = serviceFactory.PathValidationProvider;
}
public void AddPath(TransferPath path)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
CheckDisposed(true);
AddPath(path, CancellationToken.None);
}
public void AddPath(TransferPath path, CancellationToken token)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
CheckDisposed(true);
AddPaths(new TransferPath[1] {
path
}, token);
}
public void AddPaths(IEnumerable<TransferPath> paths)
{
if (paths == null)
throw new ArgumentNullException("paths");
CheckDisposed(true);
AddPaths(paths, CancellationToken.None);
}
public void AddPaths(IEnumerable<TransferPath> paths, CancellationToken token)
{
if (paths == null)
throw new ArgumentNullException("paths");
CheckDisposed(true);
if (!CheckCanceled(token)) {
Log.LogTransferInformation(Request, "Adding paths to the job.", Array.Empty<object>());
AddPathsWaitAndRetry(paths, token, true, maxJobRetryAttempts);
}
}
public Task AddPathsAsync(SerializedBatch batch)
{
if (batch == null)
throw new ArgumentNullException("batch");
CheckDisposed(true);
return AddPathsAsync(batch, CancellationToken.None);
}
public Task AddPathsAsync(SerializedBatch batch, CancellationToken token)
{
if (batch == null)
throw new ArgumentNullException("batch");
Log.LogTransferInformation(Request, "Adding {TotalFileCount} paths to the job from a batch number {BatchNumber}.", batch.TotalFileCount, batch.BatchNumber);
CheckDisposed(true);
TransferRequest transferRequest = Request as TransferRequest;
if (transferRequest != null) {
transferRequest.BatchNumber = batch.BatchNumber;
transferRequest.TotalBatchCount = batch.TotalBatchCount;
}
IEnumerable<TransferPath> paths = JsonFileSerializer.Deserialize<SerializedPathsBatchDto>(batch.File).Paths.Select(TransferPathDto.ConvertToPath);
return AddPathsAsync(paths, token);
}
public Task AddPathsAsync(IEnumerable<TransferPath> paths)
{
if (paths == null)
throw new ArgumentNullException("paths");
CheckDisposed(true);
return AddPathsAsync(paths, CancellationToken.None);
}
public Task AddPathsAsync(IEnumerable<TransferPath> paths, CancellationToken token)
{
if (paths == null)
throw new ArgumentNullException("paths");
Log.LogTransferInformation(Request, "Adding paths to the job.", Array.Empty<object>());
CheckDisposed(true);
return Task.Run(delegate {
WaitAndRetryWrapperAsync("AddPathsAsync", token, () => CoreStrings.TransferJobBaseAddPathsFailureMessage, delegate {
AddPathsHelper(paths, token);
}, true, false);
}, token);
}
public int ChangeDataRate(int minRateMbps, int targetRateMbps)
{
return ChangeDataRate(minRateMbps, targetRateMbps, CancellationToken.None);
}
public int ChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token)
{
return ChangeDataRate(minRateMbps, targetRateMbps, token, true);
}
private int ChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token, bool resetRetryAttempt)
{
ValidateDataRateParameters(minRateMbps, ref targetRateMbps);
if (CheckDisposed(false)) {
Log.LogWarning(CoreStrings.CantChangeDataRateOnDisposedError, Array.Empty<object>());
return ChangedTargetDataRateMbps;
}
if (CheckCanceled(token))
return targetRateMbps;
WaitAndRetryWrapper("ChangeDataRate", token, () => CoreStrings.TransferJobChangeDataRateFailureLogMessage, delegate {
OnChangeDataRate(minRateMbps, targetRateMbps, token);
ChangedMinDataRateMbps = minRateMbps;
ChangedTargetDataRateMbps = targetRateMbps;
}, resetRetryAttempt);
return ChangedTargetDataRateMbps;
}
public Task<int> ChangeDataRateAsync(int minRateMbps, int targetRateMbps)
{
return ChangeDataRateAsync(minRateMbps, targetRateMbps, CancellationToken.None);
}
public Task<int> ChangeDataRateAsync(int minRateMbps, int targetRateMbps, CancellationToken token)
{
ValidateDataRateParameters(minRateMbps, ref targetRateMbps);
return Task.Run(delegate {
if (CheckDisposed(false))
Log.LogWarning(CoreStrings.CantChangeDataRateOnDisposedError, Array.Empty<object>());
else
WaitAndRetryWrapperAsync("ChangeDataRateAsync", token, () => CoreStrings.TransferJobChangeDataRateFailureLogMessage, delegate {
OnChangeDataRate(minRateMbps, targetRateMbps, token);
ChangedMinDataRateMbps = minRateMbps;
ChangedTargetDataRateMbps = targetRateMbps;
}, true, false);
return ChangedTargetDataRateMbps;
}, token);
}
public Task<ITransferResult> CompleteAsync()
{
return CompleteAsync(TransferConstants.DefaultWaitTimeout, CancellationToken.None);
}
public Task<ITransferResult> CompleteAsync(CancellationToken token)
{
return CompleteAsync(TransferConstants.DefaultWaitTimeout, token);
}
public Task<ITransferResult> CompleteAsync(TimeSpan timeout)
{
return CompleteAsync(timeout, CancellationToken.None);
}
public Task<ITransferResult> CompleteAsync(TimeSpan timeout, CancellationToken token)
{
CheckDisposed(true);
if (CheckCanceled(token))
return Task.FromResult((ITransferResult)new TransferResult {
Configuration = Configuration,
MinDataRateMbps = ChangedMinDataRateMbps,
Request = Request,
Status = TransferStatus.Canceled,
TargetDataRateMbps = ChangedTargetDataRateMbps
});
ValidateWait();
Log.LogTransferInformation(Request, "Preparing to start the transfer job...", Request.ClientRequestId);
return Task.Run(() => WaitAndRetryWrapperAsync("CompleteAsync", token, () => CoreStrings.TransferJobBaseCompleteAsyncFailureMessage, delegate(int localRetryCount) {
try {
ITransferResult transferResult = OnWaitJobCompletion(timeout, token);
TransferResult transferResult2 = transferResult as TransferResult;
if (transferResult2 != null)
UpdateReadOnlyProperties(transferResult2);
ITransferStatistics statistics = JobService.CalculateStatistics();
Log.LogStatistics(statistics);
HandleTransferStatus(token, transferResult, localRetryCount);
return transferResult;
} catch (OperationCanceledException exception) {
TransferResult result = new TransferResult {
Configuration = Configuration,
MinDataRateMbps = ChangedMinDataRateMbps,
Request = Request,
Status = TransferStatus.Canceled,
TargetDataRateMbps = ChangedTargetDataRateMbps
};
SetTransferResult(result, new BindingList<ITransferIssue>(), token);
Log.LogTransferError(exception, Request, CoreStrings.TransferJobCancelMessage, Array.Empty<object>());
LogWhenTransferFinished(result, token);
return result;
}
}, false, false));
}
private void HandleTransferStatus(CancellationToken token, ITransferResult result, int localRetryCount)
{
switch (result.Status) {
case TransferStatus.Canceled:
Status = TransferJobStatus.Canceled;
Log.LogTransferInformation(Request, CoreStrings.TransferJobCancelMessage, Array.Empty<object>());
Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Canceled);
LogWhenTransferFinished(result, token);
break;
case TransferStatus.Successful:
Status = TransferJobStatus.Successful;
Log.LogTransferInformation(Request, CoreStrings.TransferJobSuccessMessage, Array.Empty<object>());
Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Ended);
LogWhenTransferFinished(result, token);
break;
case TransferStatus.Fatal:
Status = TransferJobStatus.Fatal;
Log.LogTransferError(Request, CoreStrings.TransferJobFatalErrorErrorMessage, Array.Empty<object>());
Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Ended);
LogWhenTransferFinished(result, token);
break;
default: {
IReadOnlyCollection<TransferPath> retryableRequestTransferPaths = JobService.GetRetryableRequestTransferPaths();
if (retryableRequestTransferPaths.Count > 0) {
if (localRetryCount != maxJobRetryAttempts) {
Log.LogTransferWarning(Request, "Job is about to be retried ({retryablePathsCount} paths). Transfer status: {JobStatus}.", retryableRequestTransferPaths.Count, result.Status);
throw new InvalidOperationException(CoreStrings.TransferJobCompleteFailedExceptionMessage);
}
Status = TransferJobStatus.Failed;
Log.LogTransferError(Request, CoreStrings.TransferJobFailedRetryErrorMessage, Array.Empty<object>());
Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.EndedMaxRetry);
LogWhenTransferFinished(result, token);
} else {
Status = TransferJobStatus.Failed;
Log.LogTransferError(Request, CoreStrings.TransferJobFailedWaitingErrorMessage, Array.Empty<object>());
Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Ended);
LogWhenTransferFinished(result, token);
}
break;
}
}
}
private void UpdateReadOnlyProperties(TransferResult transferResult)
{
transferResult.MinDataRateMbps = ChangedMinDataRateMbps;
transferResult.TargetDataRateMbps = ChangedTargetDataRateMbps;
if (transferResult.Configuration == null)
transferResult.Configuration = Configuration;
if (transferResult.Request == null)
transferResult.Request = Request;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public Task StartAsync()
{
return StartAsync(CancellationToken.None);
}
public Task StartAsync(CancellationToken token)
{
return StartAsync(DefaultJobStartTime, token);
}
public Task StartAsync(TimeSpan timeout)
{
return StartAsync(timeout, CancellationToken.None);
}
public Task StartAsync(TimeSpan timeout, CancellationToken token)
{
CheckDisposed(true);
if (CheckCanceled(token))
return Task.CompletedTask;
TargetRateThrottler.ThrottleToMaxAllowedValue(Configuration);
TargetRateValidator.Validate(Configuration);
ValidateStart();
InitializePathResolvers();
Status = TransferJobStatus.Created;
Log.LogTransferInformation(Request, "Transfer job is starting...", Array.Empty<object>());
Request.Context?.Clear(false);
Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Started);
if (!Request.ClientRequestId.HasValue)
Request.ClientRequestId = Guid.NewGuid();
if (!Request.JobId.HasValue)
Request.JobId = Guid.NewGuid();
return Task.Run(() => WaitAndRetryWrapperAsync("StartAsync", CancellationToken.None, () => CoreStrings.TransferJobBaseAddPathsFailureMessage, delegate {
OnJobStarted(timeout, token);
Status = (token.IsCancellationRequested ? TransferJobStatus.Canceled : TransferJobStatus.Running);
Log.LogTransferInformation(Request, "Successfully started the transfer job.", Array.Empty<object>());
}, true, true));
}
public PathValidationResult ValidatePath(TransferPath path)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
PathValidationResult pathValidationResult = PathValidationProvider.Validate(path);
if (pathValidationResult.IsOk) {
CheckDisposed(true);
if (path.Direction == TransferDirection.Upload && validateSourcePaths)
AssignPathAttributes(path);
} else {
Log.LogError(pathValidationResult.ErrorMessage, Array.Empty<object>());
Request.Context?.PublishTransferPathIssue(Request, pathValidationResult.CreateTransferIssue());
}
return pathValidationResult;
}
protected virtual void RetryAddPaths(CancellationToken token)
{
IReadOnlyCollection<TransferPath> retryableRequestTransferPaths = JobService.GetRetryableRequestTransferPaths();
if (retryableRequestTransferPaths.Any((TransferPath x) => string.IsNullOrEmpty(x.TargetPath)))
Log.LogTransferWarning(Request, "Adding transfer paths on retry scenario with no target specified.", Array.Empty<object>());
JobService.RemoveRetryableTransferPaths(retryableRequestTransferPaths);
AddPathsWaitAndRetry(retryableRequestTransferPaths, token, false, 1);
}
protected IRemotePathResolver CreatePathResolver(bool source)
{
return OnCreatePathResolver(source);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed) {
if (disposing)
try {
TryCancelJob(true);
} catch (Exception exception) {
Log.LogWarning(exception, "Failed to cancel the job within a Dispose block.", Array.Empty<object>());
} finally {
JobService?.Clear();
JobService = null;
Request = null;
GC.Collect();
}
disposed = true;
}
}
protected abstract void OnChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token);
protected virtual IRemotePathResolver OnCreatePathResolver(bool source)
{
return null;
}
protected virtual void OnJobCanceled(bool isDisposing)
{
IsCancelationFromDisposal = isDisposing;
if (!isDisposing) {
Status = TransferJobStatus.Canceled;
Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Canceled);
}
}
protected abstract void OnJobRetry(TimeSpan timeout, CancellationToken token);
protected abstract void OnJobStarted(TimeSpan timeout, CancellationToken token);
protected abstract void OnPathAdded(TransferPath path, CancellationToken token);
protected abstract ITransferResult OnWaitJobCompletion(TimeSpan timeout, CancellationToken token);
protected void SetTransferResult(TransferResult result, IReadOnlyCollection<ITransferIssue> issues, CancellationToken token)
{
if (result == null)
throw new ArgumentNullException("result");
if (issues == null)
throw new ArgumentNullException("issues");
ITransferStatistics transferStatistics = JobService.CalculateStatistics();
result.EndTime = transferStatistics.EndTime;
result.RetryCount = transferStatistics.RetryAttempt;
result.StartTime = transferStatistics.StartTime;
result.TotalRequestedFilesPerAttempt = totalRequestFilesPerAttempt;
result.TotalRequestedBytesPerAttempt = totalRequestBytesPerAttempt;
result.TotalFailedFiles = transferStatistics.TotalFailedFiles;
result.TotalFatalErrors = transferStatistics.TotalFatalErrors;
result.TotalFilesNotFound = transferStatistics.TotalFilesNotFound;
result.TotalFilePermissionErrors = transferStatistics.TotalFilePermissionsErrors;
result.TotalBadPathErrors = transferStatistics.TotalBadPathErrors;
result.TotalSkippedFiles = transferStatistics.TotalSkippedFiles;
result.TotalTransferredBytes = transferStatistics.TotalTransferredBytes;
result.TotalTransferredFiles = transferStatistics.TotalTransferredFiles;
result.Status = (token.IsCancellationRequested ? TransferStatus.Canceled : JobService.CalculateTransferStatus());
result.RegisterIssues(issues);
result.RegisterStatistics(transferStatistics);
}
protected virtual void ValidateStart()
{
switch (Status) {
case TransferJobStatus.NotStarted:
case TransferJobStatus.Created:
case TransferJobStatus.Running:
case TransferJobStatus.RetryPending:
case TransferJobStatus.Retrying:
case TransferJobStatus.Canceled:
break;
case TransferJobStatus.Successful:
case TransferJobStatus.Failed:
case TransferJobStatus.Fatal:
throw new InvalidOperationException(CoreStrings.StartAlreadyCompletedMessage);
default:
throw new InvalidOperationException(CoreStrings.InitializeUnexpectedStateMessage);
}
}
protected virtual void ValidateWait()
{
switch (Status) {
case TransferJobStatus.Running:
case TransferJobStatus.RetryPending:
case TransferJobStatus.Retrying:
case TransferJobStatus.Canceled:
break;
case TransferJobStatus.NotStarted:
throw new InvalidOperationException(CoreStrings.WaitNotStartedMessage);
case TransferJobStatus.Successful:
throw new InvalidOperationException(CoreStrings.WaitAlreadyCompletedMessage);
case TransferJobStatus.Failed:
case TransferJobStatus.Fatal:
throw new InvalidOperationException(CoreStrings.WaitAlreadyFailedMessage);
default:
throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, CoreStrings.WaitUnexpectedStateMessage, Status));
}
}
private static PolicyBuilder CreateRetryPolicy()
{
return Policy.Handle<TransferException>((Func<TransferException, bool>)((TransferException e) => !e.Fatal)).Or<InvalidOperationException>();
}
private void ValidateDataRateParameters(int minRateMbps, ref int targetRateMbps)
{
targetRateMbps = TargetRateThrottler.ThrottleToMaxAllowedValue(targetRateMbps);
TargetRateValidator.Validate(minRateMbps, targetRateMbps);
}
private void InitializePathResolvers()
{
SourcePathResolver = (Request.SourcePathResolver ?? CreatePathResolver(true));
TargetPathResolver = (Request.TargetPathResolver ?? CreatePathResolver(false));
}
private bool CheckDisposed(bool shouldThrow = true)
{
if (!disposed)
return false;
if (shouldThrow)
throw new ObjectDisposedException(CoreStrings.ObjectDisposedExceptionMessage);
return true;
}
private bool CheckCanceled(CancellationToken token)
{
if (BypassCancelCheck || (!token.IsCancellationRequested && Status != TransferJobStatus.Canceled))
return false;
TryCancelJob(false);
return true;
}
private void AssignPathAttributes(TransferPath path)
{
if (FileSystemService.FileExists(path.SourcePath))
path.PathAttributes = TransferPathAttributes.File;
else {
if (!FileSystemService.DirectoryExists(path.SourcePath)) {
Log.LogError(CoreStrings.SourcePathNotFoundArgumentExceptionMessage, path.SourcePath, LogRedaction.OnPositions(default(int)));
throw new ArgumentException("Source path not found - check logs for more details.", "path");
}
if (!path.PathAttributes.HasFlag(TransferPathAttributes.Directory)) {
path.PathAttributes = TransferPathAttributes.Directory;
if (FileSystemService.IsDirectoryEmpty(path.SourcePath))
path.PathAttributes |= TransferPathAttributes.Empty;
}
}
path.ValidateAttributes();
}
private void HandlePolicyJobRetry(int localRetryCount, CancellationToken token)
{
lock (SyncRoot) {
if (localRetryCount > 0 && Status != TransferJobStatus.Retrying && Status != TransferJobStatus.Running) {
Status = TransferJobStatus.Retrying;
try {
int changedMinDataRateMbps = ChangedMinDataRateMbps;
int changedTargetDataRateMbps = ChangedTargetDataRateMbps;
ChangedMinDataRateMbps = 0;
ChangedTargetDataRateMbps = 0;
OnJobRetry(TransferConstants.DefaultWaitTimeout, token);
if (changedMinDataRateMbps > 0 || changedTargetDataRateMbps > 0)
ChangeDataRate(changedMinDataRateMbps, changedTargetDataRateMbps, token, false);
} finally {
Status = TransferJobStatus.Running;
}
}
}
}
private void HandlePolicyJobException(int maxAttempts, bool retryingJob)
{
lock (SyncRoot) {
Status = TransferJobStatus.RetryPending;
if (!retryingJob) {
JobService.SaveStatistics();
JobService.IncrementRetryAttempt();
Request.Context?.PublishTransferJobRetry(Request, JobService.Statistics.RetryAttempt, maxAttempts);
}
Request.Context?.Clear(true);
}
}
private void ResolvePath(TransferPath path)
{
if (path.Direction == TransferDirection.Download) {
if (!FileSystemService.IsPathRooted(path.TargetPath))
path.TargetPath = FileSystemService.GetFullPath(path.TargetPath);
if (SourcePathResolver != null)
path.SourcePath = SourcePathResolver.ResolvePath(path.SourcePath);
}
if (path.Direction == TransferDirection.Upload) {
if (!FileSystemService.IsPathRooted(path.SourcePath))
path.SourcePath = FileSystemService.GetFullPath(path.SourcePath);
if (TargetPathResolver != null)
path.TargetPath = TargetPathResolver.ResolvePath(path.TargetPath);
}
}
private void PreparePath(TransferPath path)
{
if (path.Direction == TransferDirection.None)
path.Direction = Request.Direction;
if (string.IsNullOrEmpty(path.SourcePath)) {
Log.LogTransferError(Request, "Malformed transfer request: {request}", Request);
throw new ArgumentException(CoreStrings.SourcePathArgumentExceptionMessage, "path");
}
if (string.IsNullOrEmpty(path.TargetPath)) {
path.TargetPath = Request.TargetPath;
if (string.IsNullOrEmpty(path.TargetPath)) {
Log.LogTransferError(Request, "Malformed transfer request: {request}", Request);
throw new TransferException(CoreStrings.TargetPathArgumentExceptionMessage);
}
}
}
private void AddPathsHelper(IEnumerable<TransferPath> paths, CancellationToken token)
{
long num = 0;
long num2 = 0;
foreach (TransferPath path in paths) {
token.ThrowIfCancellationRequested();
PreparePath(path);
if (ValidatePath(path).IsOk) {
ResolvePath(path);
JobService.Save(path);
OnPathAdded(path, token);
num++;
num2 += path.Bytes;
}
}
totalRequestFilesPerAttempt.Add(num);
totalRequestBytesPerAttempt.Add(num2);
}
private void AddPathsWaitAndRetry(IEnumerable<TransferPath> paths, CancellationToken token, bool resetRetryAttempt, int maxRetries)
{
WaitAndRetryWrapper("AddPathsWaitAndRetry", token, () => CoreStrings.TransferJobBaseAddPathsFailureMessage, delegate {
AddPathsHelper(paths, token);
}, resetRetryAttempt, maxRetries);
}
private void TryCancelJob(bool isDisposing)
{
try {
OnJobCanceled(isDisposing);
} catch (Exception exception) {
Log.LogTransferWarning(exception, Request, "An attempt to cancel the job failed.", Array.Empty<object>());
if (ExceptionHelper.IsFatalException(exception))
throw;
}
}
private void WaitAndRetryWrapper(string caller, CancellationToken token, Func<string> onException, Action onExecute, bool resetRetryAttempt)
{
WaitAndRetryWrapper(caller, token, onException, onExecute, resetRetryAttempt, maxJobRetryAttempts);
}
private void WaitAndRetryWrapper(string caller, CancellationToken token, Func<string> onException, Action onExecute, bool resetRetryAttempt, int maxRetries)
{
WaitAndRetryWrapper(caller, token, onException, delegate {
onExecute();
return 0;
}, resetRetryAttempt, maxRetries);
}
private void WaitAndRetryWrapper<T>(string caller, CancellationToken token, Func<string> onException, Func<T> onExecute, bool resetRetryAttempt, int maxRetryAttempts)
{
int localRetryCount = 0;
IRetryStrategy retryStrategy = Request.RetryStrategy ?? RetryStrategies.CreateExpBackoffStrategy();
PolicyBuilder val = CreateRetryPolicy();
try {
RetrySyntax.WaitAndRetry(val, maxRetryAttempts, retryStrategy.Calculation, (Action<Exception, TimeSpan, Context>)delegate(Exception exception, TimeSpan timespan, Context context) {
localRetryCount++;
Log.LogTransferError(exception, Request, "Retry - " + caller + " - {Timespan} - " + onException() + ".", timespan);
HandlePolicyJobException(maxRetryAttempts, false);
}).Execute((Action<CancellationToken>)delegate {
token.ThrowIfCancellationRequested();
HandlePolicyJobRetry(localRetryCount, token);
onExecute();
if (resetRetryAttempt)
JobService.ResetRetryAttempt();
}, token);
} catch (OperationCanceledException exception2) {
Log.LogInformation(exception2, "The transfer job has been canceled.", Array.Empty<object>());
throw;
} catch (TransferException ex) {
if (ex.Fatal)
Log.LogFatal(ex, "The transfer job has failed.", Array.Empty<object>());
else
Log.LogError(ex, "The transfer job has failed.", Array.Empty<object>());
throw;
} finally {
if (token.IsCancellationRequested)
TryCancelJob(false);
}
}
private Task WaitAndRetryWrapperAsync(string caller, CancellationToken token, Func<string> onException, Action onExecute, bool resetRetryAttempt, bool bypassHandlingCheck)
{
return WaitAndRetryWrapperAsync(caller, token, onException, delegate {
onExecute();
return 0;
}, resetRetryAttempt, bypassHandlingCheck);
}
private Task<T> WaitAndRetryWrapperAsync<T>(string caller, CancellationToken token, Func<string> onException, Func<int, T> onExecute, bool resetRetryAttempt, bool bypassHandlingCheck)
{
int localRetryCount = 0;
IRetryStrategy retryStrategy = Request.RetryStrategy ?? RetryStrategies.CreateExpBackoffStrategy();
PolicyBuilder val = CreateRetryPolicy();
try {
return RetrySyntaxAsync.WaitAndRetryAsync(val, maxJobRetryAttempts, retryStrategy.Calculation, (Action<Exception, TimeSpan, int, Context>)delegate(Exception exception, TimeSpan timespan, int retryCount, Context context) {
localRetryCount++;
Log.LogTransferError(exception, Request, $"""{caller}""{localRetryCount}""{timespan}""{onException()}""", Array.Empty<object>());
if (!bypassHandlingCheck)
HandlePolicyJobException(maxJobRetryAttempts, false);
}).ExecuteAsync<T>((Func<CancellationToken, Task<T>>)delegate {
token.ThrowIfCancellationRequested();
if (!bypassHandlingCheck)
HandlePolicyJobRetry(localRetryCount, token);
T result = onExecute(localRetryCount);
if (resetRetryAttempt)
JobService.ResetRetryAttempt();
return Task.FromResult<T>(result);
}, token);
} catch (OperationCanceledException exception2) {
Log.LogInformation(exception2, "The transfer job has been canceled.", Array.Empty<object>());
throw;
} catch (TransferException ex) {
if (ex.Fatal)
Log.LogFatal(ex, "The transfer job has failed.", Array.Empty<object>());
else
Log.LogError(ex, "The transfer job has failed.", Array.Empty<object>());
throw;
} finally {
if (token.IsCancellationRequested)
TryCancelJob(false);
}
}
private void LogWhenTransferFinished(ITransferResult result, CancellationToken token)
{
LogLocalDriveInformation();
LogIfLegacyWebApiIsUsed(result);
LogTransferReportMessage(result);
SubmitTransferResultMetrics(result, token);
}
private void LogIfLegacyWebApiIsUsed(ITransferResult transferResult)
{
if (transferResult.Configuration.UseLegacyWebApi)
Log.LogTransferWarning(transferResult.Request, "Transfer performed with TAPI configured to use legacy WebAPI communication mode with Relativity REST services. ", Array.Empty<object>());
}
private void LogLocalDriveInformation()
{
foreach (DriveStatistics allDrife in DriveStatisticsRepository.GetAllDrives()) {
Log.LogTransferInformation(Request, CoreStrings.LocalDriveInformation, allDrife.Name, allDrife.DriveType, allDrife.TotalFreePercent, allDrife.TotalFreeSpace, allDrife.ErrorMessage);
}
}
private void LogTransferReportMessage(ITransferResult result)
{
if (result.Status == TransferStatus.Failed || result.Status == TransferStatus.Fatal)
Log.LogTransferError(result.Request, TransferReportBuilder.GetTransferReportMessage(result), Array.Empty<object>());
else
Log.LogTransferInformation(result.Request, TransferReportBuilder.GetTransferReportMessage(result), Array.Empty<object>());
}
private void SubmitTransferResultMetrics(ITransferResult result, CancellationToken token)
{
if (Request.SubmitApmMetrics)
try {
ServiceFactory.CreateApmMetricsService().SubmitTransferResultMetricsAsync(result, token).ConfigureAwait(false)
.GetAwaiter()
.GetResult();
} catch (TaskCanceledException) {
throw;
} catch (Exception exception) {
if (ExceptionHelper.IsFatalException(exception))
throw;
Log.LogWarning(exception, "Failed waiting for the job APM metrics to complete.", Array.Empty<object>());
}
}
}
}