<PackageReference Include="Relativity.Server.Transfer.SDK" Version="7.7.0" />

TransferJobBase

public abstract class TransferJobBase : ITransferJob, IDisposable
using Polly; using Relativity.Transfer.Dto; using Relativity.Transfer.Issues; using Relativity.Transfer.Resources; using System; using System.Collections.Generic; using System.ComponentModel; using System.Globalization; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Relativity.Transfer { public abstract class TransferJobBase : ITransferJob, IDisposable { private static readonly object SyncRoot = new object(); private static readonly TimeSpan DefaultJobStartTime = TimeSpan.FromMinutes(5); private readonly int maxJobRetryAttempts; private readonly bool validateSourcePaths; private ITransferRequest currentRequest; private TransferJobStatus currentStatus; private ClientConfiguration configuration; private int lastChangedMinDataRate; private int lastChangedTargetDataRate; private readonly List<long> totalRequestBytesPerAttempt = new List<long>(); private readonly List<long> totalRequestFilesPerAttempt = new List<long>(); private bool disposed; public int ChangedMinDataRateMbps { get { lock (SyncRoot) { return lastChangedMinDataRate; } } private set { lock (SyncRoot) { lastChangedMinDataRate = value; } } } public int ChangedTargetDataRateMbps { get { lock (SyncRoot) { return lastChangedTargetDataRate; } } private set { lock (SyncRoot) { lastChangedTargetDataRate = value; } } } public ClientConfiguration Configuration { get { lock (SyncRoot) { return configuration; } } private set { lock (SyncRoot) { configuration = value; } } } public virtual bool IsDataRateChangeSupported => false; public ITransferJobService JobService { get; set; } public ITransferRequest Request { get { lock (SyncRoot) { return currentRequest; } } set { lock (SyncRoot) { currentRequest = value; } } } public TimeSpan RetryWaitPeriod => (Request.RetryStrategy ?? RetryStrategies.CreateExpBackoffStrategy()).Calculation(JobService.Statistics.RetryAttempt); public TransferJobStatus Status { get { lock (SyncRoot) { return currentStatus; } } protected set { lock (SyncRoot) { currentStatus = value; } } } internal bool IsCancelationFromDisposal { get; set; } internal bool BypassCancelCheck { get; set; } protected ITransferLog Log { get; } protected IRelativityServiceFactory ServiceFactory { get; } internal ITargetRateValidator TargetRateValidator { get; } internal ITargetRateThrottler TargetRateThrottler { get; } protected IFileSystemService FileSystemService { get; } private IRemotePathResolver SourcePathResolver { get; set; } private IRemotePathResolver TargetPathResolver { get; set; } private IPathValidationProvider PathValidationProvider { get; } protected TransferJobBase(ITransferLog log, ITransferRequest request, ITransferJobService jobService, ClientConfiguration configuration, IRelativityServiceFactory serviceFactory) : this(ServiceObjectLocator.GetService<IFileSystemService>(), log, request, jobService, configuration, serviceFactory) { } protected TransferJobBase(IFileSystemService fileSystemService, ITransferLog log, ITransferRequest request, ITransferJobService jobService, ClientConfiguration configuration, IRelativityServiceFactory serviceFactory) { if (fileSystemService == null) throw new ArgumentNullException("fileSystemService"); if (log == null) throw new ArgumentNullException("log"); if (request == null) throw new ArgumentNullException("request"); if (configuration == null) throw new ArgumentNullException("configuration"); if (jobService == null) throw new ArgumentNullException("jobService"); if (serviceFactory == null) throw new ArgumentNullException("serviceFactory"); FileSystemService = fileSystemService; Configuration = configuration; maxJobRetryAttempts = configuration.MaxJobRetryAttempts; validateSourcePaths = configuration.ValidateSourcePaths; lastChangedMinDataRate = 0; lastChangedTargetDataRate = 0; Log = log; Request = request; Status = TransferJobStatus.NotStarted; JobService = jobService; ServiceFactory = serviceFactory; TargetRateValidator = serviceFactory.CreateTargetRateValidator(); TargetRateThrottler = serviceFactory.CreateTargetRateThrottler(); PathValidationProvider = serviceFactory.PathValidationProvider; } public void AddPath(TransferPath path) { if (path == (TransferPath)null) throw new ArgumentNullException("path"); CheckDisposed(true); AddPath(path, CancellationToken.None); } public void AddPath(TransferPath path, CancellationToken token) { if (path == (TransferPath)null) throw new ArgumentNullException("path"); CheckDisposed(true); AddPaths(new TransferPath[1] { path }, token); } public void AddPaths(IEnumerable<TransferPath> paths) { if (paths == null) throw new ArgumentNullException("paths"); CheckDisposed(true); AddPaths(paths, CancellationToken.None); } public void AddPaths(IEnumerable<TransferPath> paths, CancellationToken token) { if (paths == null) throw new ArgumentNullException("paths"); CheckDisposed(true); if (!CheckCanceled(token)) { Log.LogTransferInformation(Request, "Adding paths to the job.", Array.Empty<object>()); AddPathsWaitAndRetry(paths, token, true, maxJobRetryAttempts); } } public Task AddPathsAsync(SerializedBatch batch) { if (batch == null) throw new ArgumentNullException("batch"); CheckDisposed(true); return AddPathsAsync(batch, CancellationToken.None); } public Task AddPathsAsync(SerializedBatch batch, CancellationToken token) { if (batch == null) throw new ArgumentNullException("batch"); Log.LogTransferInformation(Request, "Adding {TotalFileCount} paths to the job from a batch number {BatchNumber}.", batch.TotalFileCount, batch.BatchNumber); CheckDisposed(true); TransferRequest transferRequest = Request as TransferRequest; if (transferRequest != null) { transferRequest.BatchNumber = batch.BatchNumber; transferRequest.TotalBatchCount = batch.TotalBatchCount; } IEnumerable<TransferPath> paths = JsonFileSerializer.Deserialize<SerializedPathsBatchDto>(batch.File).Paths.Select(TransferPathDto.ConvertToPath); return AddPathsAsync(paths, token); } public Task AddPathsAsync(IEnumerable<TransferPath> paths) { if (paths == null) throw new ArgumentNullException("paths"); CheckDisposed(true); return AddPathsAsync(paths, CancellationToken.None); } public Task AddPathsAsync(IEnumerable<TransferPath> paths, CancellationToken token) { if (paths == null) throw new ArgumentNullException("paths"); Log.LogTransferInformation(Request, "Adding paths to the job.", Array.Empty<object>()); CheckDisposed(true); return Task.Run(delegate { WaitAndRetryWrapperAsync("AddPathsAsync", token, () => CoreStrings.TransferJobBaseAddPathsFailureMessage, delegate { AddPathsHelper(paths, token); }, true, false); }, token); } public int ChangeDataRate(int minRateMbps, int targetRateMbps) { return ChangeDataRate(minRateMbps, targetRateMbps, CancellationToken.None); } public int ChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token) { return ChangeDataRate(minRateMbps, targetRateMbps, token, true); } private int ChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token, bool resetRetryAttempt) { ValidateDataRateParameters(minRateMbps, ref targetRateMbps); if (CheckDisposed(false)) { Log.LogWarning(CoreStrings.CantChangeDataRateOnDisposedError, Array.Empty<object>()); return ChangedTargetDataRateMbps; } if (CheckCanceled(token)) return targetRateMbps; WaitAndRetryWrapper("ChangeDataRate", token, () => CoreStrings.TransferJobChangeDataRateFailureLogMessage, delegate { OnChangeDataRate(minRateMbps, targetRateMbps, token); ChangedMinDataRateMbps = minRateMbps; ChangedTargetDataRateMbps = targetRateMbps; }, resetRetryAttempt); return ChangedTargetDataRateMbps; } public Task<int> ChangeDataRateAsync(int minRateMbps, int targetRateMbps) { return ChangeDataRateAsync(minRateMbps, targetRateMbps, CancellationToken.None); } public Task<int> ChangeDataRateAsync(int minRateMbps, int targetRateMbps, CancellationToken token) { ValidateDataRateParameters(minRateMbps, ref targetRateMbps); return Task.Run(delegate { if (CheckDisposed(false)) Log.LogWarning(CoreStrings.CantChangeDataRateOnDisposedError, Array.Empty<object>()); else WaitAndRetryWrapperAsync("ChangeDataRateAsync", token, () => CoreStrings.TransferJobChangeDataRateFailureLogMessage, delegate { OnChangeDataRate(minRateMbps, targetRateMbps, token); ChangedMinDataRateMbps = minRateMbps; ChangedTargetDataRateMbps = targetRateMbps; }, true, false); return ChangedTargetDataRateMbps; }, token); } public Task<ITransferResult> CompleteAsync() { return CompleteAsync(TransferConstants.DefaultWaitTimeout, CancellationToken.None); } public Task<ITransferResult> CompleteAsync(CancellationToken token) { return CompleteAsync(TransferConstants.DefaultWaitTimeout, token); } public Task<ITransferResult> CompleteAsync(TimeSpan timeout) { return CompleteAsync(timeout, CancellationToken.None); } public Task<ITransferResult> CompleteAsync(TimeSpan timeout, CancellationToken token) { CheckDisposed(true); if (CheckCanceled(token)) return Task.FromResult((ITransferResult)new TransferResult { Configuration = Configuration, MinDataRateMbps = ChangedMinDataRateMbps, Request = Request, Status = TransferStatus.Canceled, TargetDataRateMbps = ChangedTargetDataRateMbps }); ValidateWait(); Log.LogTransferInformation(Request, "Preparing to start the transfer job...", Request.ClientRequestId); return Task.Run(() => WaitAndRetryWrapperAsync("CompleteAsync", token, () => CoreStrings.TransferJobBaseCompleteAsyncFailureMessage, delegate(int localRetryCount) { try { ITransferResult transferResult = OnWaitJobCompletion(timeout, token); TransferResult transferResult2 = transferResult as TransferResult; if (transferResult2 != null) UpdateReadOnlyProperties(transferResult2); ITransferStatistics statistics = JobService.CalculateStatistics(); Log.LogStatistics(statistics); HandleTransferStatus(token, transferResult, localRetryCount); return transferResult; } catch (OperationCanceledException exception) { TransferResult result = new TransferResult { Configuration = Configuration, MinDataRateMbps = ChangedMinDataRateMbps, Request = Request, Status = TransferStatus.Canceled, TargetDataRateMbps = ChangedTargetDataRateMbps }; SetTransferResult(result, new BindingList<ITransferIssue>(), token); Log.LogTransferError(exception, Request, CoreStrings.TransferJobCancelMessage, Array.Empty<object>()); LogWhenTransferFinished(result, token); return result; } }, false, false)); } private void HandleTransferStatus(CancellationToken token, ITransferResult result, int localRetryCount) { switch (result.Status) { case TransferStatus.Canceled: Status = TransferJobStatus.Canceled; Log.LogTransferInformation(Request, CoreStrings.TransferJobCancelMessage, Array.Empty<object>()); Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Canceled); LogWhenTransferFinished(result, token); break; case TransferStatus.Successful: Status = TransferJobStatus.Successful; Log.LogTransferInformation(Request, CoreStrings.TransferJobSuccessMessage, Array.Empty<object>()); Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Ended); LogWhenTransferFinished(result, token); break; case TransferStatus.Fatal: Status = TransferJobStatus.Fatal; Log.LogTransferError(Request, CoreStrings.TransferJobFatalErrorErrorMessage, Array.Empty<object>()); Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Ended); LogWhenTransferFinished(result, token); break; default: { IReadOnlyCollection<TransferPath> retryableRequestTransferPaths = JobService.GetRetryableRequestTransferPaths(); if (retryableRequestTransferPaths.Count > 0) { if (localRetryCount != maxJobRetryAttempts) { Log.LogTransferWarning(Request, "Job is about to be retried ({retryablePathsCount} paths). Transfer status: {JobStatus}.", retryableRequestTransferPaths.Count, result.Status); throw new InvalidOperationException(CoreStrings.TransferJobCompleteFailedExceptionMessage); } Status = TransferJobStatus.Failed; Log.LogTransferError(Request, CoreStrings.TransferJobFailedRetryErrorMessage, Array.Empty<object>()); Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.EndedMaxRetry); LogWhenTransferFinished(result, token); } else { Status = TransferJobStatus.Failed; Log.LogTransferError(Request, CoreStrings.TransferJobFailedWaitingErrorMessage, Array.Empty<object>()); Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Ended); LogWhenTransferFinished(result, token); } break; } } } private void UpdateReadOnlyProperties(TransferResult transferResult) { transferResult.MinDataRateMbps = ChangedMinDataRateMbps; transferResult.TargetDataRateMbps = ChangedTargetDataRateMbps; if (transferResult.Configuration == null) transferResult.Configuration = Configuration; if (transferResult.Request == null) transferResult.Request = Request; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } public Task StartAsync() { return StartAsync(CancellationToken.None); } public Task StartAsync(CancellationToken token) { return StartAsync(DefaultJobStartTime, token); } public Task StartAsync(TimeSpan timeout) { return StartAsync(timeout, CancellationToken.None); } public Task StartAsync(TimeSpan timeout, CancellationToken token) { CheckDisposed(true); if (CheckCanceled(token)) return Task.CompletedTask; TargetRateThrottler.ThrottleToMaxAllowedValue(Configuration); TargetRateValidator.Validate(Configuration); ValidateStart(); InitializePathResolvers(); Status = TransferJobStatus.Created; Log.LogTransferInformation(Request, "Transfer job is starting...", Array.Empty<object>()); Request.Context?.Clear(false); Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Started); if (!Request.ClientRequestId.HasValue) Request.ClientRequestId = Guid.NewGuid(); if (!Request.JobId.HasValue) Request.JobId = Guid.NewGuid(); return Task.Run(() => WaitAndRetryWrapperAsync("StartAsync", CancellationToken.None, () => CoreStrings.TransferJobBaseAddPathsFailureMessage, delegate { OnJobStarted(timeout, token); Status = (token.IsCancellationRequested ? TransferJobStatus.Canceled : TransferJobStatus.Running); Log.LogTransferInformation(Request, "Successfully started the transfer job.", Array.Empty<object>()); }, true, true)); } public PathValidationResult ValidatePath(TransferPath path) { if (path == (TransferPath)null) throw new ArgumentNullException("path"); PathValidationResult pathValidationResult = PathValidationProvider.Validate(path); if (pathValidationResult.IsOk) { CheckDisposed(true); if (path.Direction == TransferDirection.Upload && validateSourcePaths) AssignPathAttributes(path); } else { Log.LogError(pathValidationResult.ErrorMessage, Array.Empty<object>()); Request.Context?.PublishTransferPathIssue(Request, pathValidationResult.CreateTransferIssue()); } return pathValidationResult; } protected virtual void RetryAddPaths(CancellationToken token) { IReadOnlyCollection<TransferPath> retryableRequestTransferPaths = JobService.GetRetryableRequestTransferPaths(); if (retryableRequestTransferPaths.Any((TransferPath x) => string.IsNullOrEmpty(x.TargetPath))) Log.LogTransferWarning(Request, "Adding transfer paths on retry scenario with no target specified.", Array.Empty<object>()); JobService.RemoveRetryableTransferPaths(retryableRequestTransferPaths); AddPathsWaitAndRetry(retryableRequestTransferPaths, token, false, 1); } protected IRemotePathResolver CreatePathResolver(bool source) { return OnCreatePathResolver(source); } protected virtual void Dispose(bool disposing) { if (!disposed) { if (disposing) try { TryCancelJob(true); } catch (Exception exception) { Log.LogWarning(exception, "Failed to cancel the job within a Dispose block.", Array.Empty<object>()); } finally { JobService?.Clear(); JobService = null; Request = null; GC.Collect(); } disposed = true; } } protected abstract void OnChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token); protected virtual IRemotePathResolver OnCreatePathResolver(bool source) { return null; } protected virtual void OnJobCanceled(bool isDisposing) { IsCancelationFromDisposal = isDisposing; if (!isDisposing) { Status = TransferJobStatus.Canceled; Request.Context?.PublishTransferRequest(Request, TransferRequestStatus.Canceled); } } protected abstract void OnJobRetry(TimeSpan timeout, CancellationToken token); protected abstract void OnJobStarted(TimeSpan timeout, CancellationToken token); protected abstract void OnPathAdded(TransferPath path, CancellationToken token); protected abstract ITransferResult OnWaitJobCompletion(TimeSpan timeout, CancellationToken token); protected void SetTransferResult(TransferResult result, IReadOnlyCollection<ITransferIssue> issues, CancellationToken token) { if (result == null) throw new ArgumentNullException("result"); if (issues == null) throw new ArgumentNullException("issues"); ITransferStatistics transferStatistics = JobService.CalculateStatistics(); result.EndTime = transferStatistics.EndTime; result.RetryCount = transferStatistics.RetryAttempt; result.StartTime = transferStatistics.StartTime; result.TotalRequestedFilesPerAttempt = totalRequestFilesPerAttempt; result.TotalRequestedBytesPerAttempt = totalRequestBytesPerAttempt; result.TotalFailedFiles = transferStatistics.TotalFailedFiles; result.TotalFatalErrors = transferStatistics.TotalFatalErrors; result.TotalFilesNotFound = transferStatistics.TotalFilesNotFound; result.TotalFilePermissionErrors = transferStatistics.TotalFilePermissionsErrors; result.TotalBadPathErrors = transferStatistics.TotalBadPathErrors; result.TotalSkippedFiles = transferStatistics.TotalSkippedFiles; result.TotalTransferredBytes = transferStatistics.TotalTransferredBytes; result.TotalTransferredFiles = transferStatistics.TotalTransferredFiles; result.Status = (token.IsCancellationRequested ? TransferStatus.Canceled : JobService.CalculateTransferStatus()); result.RegisterIssues(issues); result.RegisterStatistics(transferStatistics); } protected virtual void ValidateStart() { switch (Status) { case TransferJobStatus.NotStarted: case TransferJobStatus.Created: case TransferJobStatus.Running: case TransferJobStatus.RetryPending: case TransferJobStatus.Retrying: case TransferJobStatus.Canceled: break; case TransferJobStatus.Successful: case TransferJobStatus.Failed: case TransferJobStatus.Fatal: throw new InvalidOperationException(CoreStrings.StartAlreadyCompletedMessage); default: throw new InvalidOperationException(CoreStrings.InitializeUnexpectedStateMessage); } } protected virtual void ValidateWait() { switch (Status) { case TransferJobStatus.Running: case TransferJobStatus.RetryPending: case TransferJobStatus.Retrying: case TransferJobStatus.Canceled: break; case TransferJobStatus.NotStarted: throw new InvalidOperationException(CoreStrings.WaitNotStartedMessage); case TransferJobStatus.Successful: throw new InvalidOperationException(CoreStrings.WaitAlreadyCompletedMessage); case TransferJobStatus.Failed: case TransferJobStatus.Fatal: throw new InvalidOperationException(CoreStrings.WaitAlreadyFailedMessage); default: throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, CoreStrings.WaitUnexpectedStateMessage, Status)); } } private static PolicyBuilder CreateRetryPolicy() { return Policy.Handle<TransferException>((Func<TransferException, bool>)((TransferException e) => !e.Fatal)).Or<InvalidOperationException>(); } private void ValidateDataRateParameters(int minRateMbps, ref int targetRateMbps) { targetRateMbps = TargetRateThrottler.ThrottleToMaxAllowedValue(targetRateMbps); TargetRateValidator.Validate(minRateMbps, targetRateMbps); } private void InitializePathResolvers() { SourcePathResolver = (Request.SourcePathResolver ?? CreatePathResolver(true)); TargetPathResolver = (Request.TargetPathResolver ?? CreatePathResolver(false)); } private bool CheckDisposed(bool shouldThrow = true) { if (!disposed) return false; if (shouldThrow) throw new ObjectDisposedException(CoreStrings.ObjectDisposedExceptionMessage); return true; } private bool CheckCanceled(CancellationToken token) { if (BypassCancelCheck || (!token.IsCancellationRequested && Status != TransferJobStatus.Canceled)) return false; TryCancelJob(false); return true; } private void AssignPathAttributes(TransferPath path) { if (FileSystemService.FileExists(path.SourcePath)) path.PathAttributes = TransferPathAttributes.File; else { if (!FileSystemService.DirectoryExists(path.SourcePath)) { Log.LogError(CoreStrings.SourcePathNotFoundArgumentExceptionMessage, path.SourcePath, LogRedaction.OnPositions(default(int))); throw new ArgumentException("Source path not found - check logs for more details.", "path"); } if (!path.PathAttributes.HasFlag(TransferPathAttributes.Directory)) { path.PathAttributes = TransferPathAttributes.Directory; if (FileSystemService.IsDirectoryEmpty(path.SourcePath)) path.PathAttributes |= TransferPathAttributes.Empty; } } path.ValidateAttributes(); } private void HandlePolicyJobRetry(int localRetryCount, CancellationToken token) { lock (SyncRoot) { if (localRetryCount > 0 && Status != TransferJobStatus.Retrying && Status != TransferJobStatus.Running) { Status = TransferJobStatus.Retrying; try { int changedMinDataRateMbps = ChangedMinDataRateMbps; int changedTargetDataRateMbps = ChangedTargetDataRateMbps; ChangedMinDataRateMbps = 0; ChangedTargetDataRateMbps = 0; OnJobRetry(TransferConstants.DefaultWaitTimeout, token); if (changedMinDataRateMbps > 0 || changedTargetDataRateMbps > 0) ChangeDataRate(changedMinDataRateMbps, changedTargetDataRateMbps, token, false); } finally { Status = TransferJobStatus.Running; } } } } private void HandlePolicyJobException(int maxAttempts, bool retryingJob) { lock (SyncRoot) { Status = TransferJobStatus.RetryPending; if (!retryingJob) { JobService.SaveStatistics(); JobService.IncrementRetryAttempt(); Request.Context?.PublishTransferJobRetry(Request, JobService.Statistics.RetryAttempt, maxAttempts); } Request.Context?.Clear(true); } } private void ResolvePath(TransferPath path) { if (path.Direction == TransferDirection.Download) { if (!FileSystemService.IsPathRooted(path.TargetPath)) path.TargetPath = FileSystemService.GetFullPath(path.TargetPath); if (SourcePathResolver != null) path.SourcePath = SourcePathResolver.ResolvePath(path.SourcePath); } if (path.Direction == TransferDirection.Upload) { if (!FileSystemService.IsPathRooted(path.SourcePath)) path.SourcePath = FileSystemService.GetFullPath(path.SourcePath); if (TargetPathResolver != null) path.TargetPath = TargetPathResolver.ResolvePath(path.TargetPath); } } private void PreparePath(TransferPath path) { if (path.Direction == TransferDirection.None) path.Direction = Request.Direction; if (string.IsNullOrEmpty(path.SourcePath)) { Log.LogTransferError(Request, "Malformed transfer request: {request}", Request); throw new ArgumentException(CoreStrings.SourcePathArgumentExceptionMessage, "path"); } if (string.IsNullOrEmpty(path.TargetPath)) { path.TargetPath = Request.TargetPath; if (string.IsNullOrEmpty(path.TargetPath)) { Log.LogTransferError(Request, "Malformed transfer request: {request}", Request); throw new TransferException(CoreStrings.TargetPathArgumentExceptionMessage); } } } private void AddPathsHelper(IEnumerable<TransferPath> paths, CancellationToken token) { long num = 0; long num2 = 0; foreach (TransferPath path in paths) { token.ThrowIfCancellationRequested(); PreparePath(path); if (ValidatePath(path).IsOk) { ResolvePath(path); JobService.Save(path); OnPathAdded(path, token); num++; num2 += path.Bytes; } } totalRequestFilesPerAttempt.Add(num); totalRequestBytesPerAttempt.Add(num2); } private void AddPathsWaitAndRetry(IEnumerable<TransferPath> paths, CancellationToken token, bool resetRetryAttempt, int maxRetries) { WaitAndRetryWrapper("AddPathsWaitAndRetry", token, () => CoreStrings.TransferJobBaseAddPathsFailureMessage, delegate { AddPathsHelper(paths, token); }, resetRetryAttempt, maxRetries); } private void TryCancelJob(bool isDisposing) { try { OnJobCanceled(isDisposing); } catch (Exception exception) { Log.LogTransferWarning(exception, Request, "An attempt to cancel the job failed.", Array.Empty<object>()); if (ExceptionHelper.IsFatalException(exception)) throw; } } private void WaitAndRetryWrapper(string caller, CancellationToken token, Func<string> onException, Action onExecute, bool resetRetryAttempt) { WaitAndRetryWrapper(caller, token, onException, onExecute, resetRetryAttempt, maxJobRetryAttempts); } private void WaitAndRetryWrapper(string caller, CancellationToken token, Func<string> onException, Action onExecute, bool resetRetryAttempt, int maxRetries) { WaitAndRetryWrapper(caller, token, onException, delegate { onExecute(); return 0; }, resetRetryAttempt, maxRetries); } private void WaitAndRetryWrapper<T>(string caller, CancellationToken token, Func<string> onException, Func<T> onExecute, bool resetRetryAttempt, int maxRetryAttempts) { int localRetryCount = 0; IRetryStrategy retryStrategy = Request.RetryStrategy ?? RetryStrategies.CreateExpBackoffStrategy(); PolicyBuilder val = CreateRetryPolicy(); try { RetrySyntax.WaitAndRetry(val, maxRetryAttempts, retryStrategy.Calculation, (Action<Exception, TimeSpan, Context>)delegate(Exception exception, TimeSpan timespan, Context context) { localRetryCount++; Log.LogTransferError(exception, Request, "Retry - " + caller + " - {Timespan} - " + onException() + ".", timespan); HandlePolicyJobException(maxRetryAttempts, false); }).Execute((Action<CancellationToken>)delegate { token.ThrowIfCancellationRequested(); HandlePolicyJobRetry(localRetryCount, token); onExecute(); if (resetRetryAttempt) JobService.ResetRetryAttempt(); }, token); } catch (OperationCanceledException exception2) { Log.LogInformation(exception2, "The transfer job has been canceled.", Array.Empty<object>()); throw; } catch (TransferException ex) { if (ex.Fatal) Log.LogFatal(ex, "The transfer job has failed.", Array.Empty<object>()); else Log.LogError(ex, "The transfer job has failed.", Array.Empty<object>()); throw; } finally { if (token.IsCancellationRequested) TryCancelJob(false); } } private Task WaitAndRetryWrapperAsync(string caller, CancellationToken token, Func<string> onException, Action onExecute, bool resetRetryAttempt, bool bypassHandlingCheck) { return WaitAndRetryWrapperAsync(caller, token, onException, delegate { onExecute(); return 0; }, resetRetryAttempt, bypassHandlingCheck); } private Task<T> WaitAndRetryWrapperAsync<T>(string caller, CancellationToken token, Func<string> onException, Func<int, T> onExecute, bool resetRetryAttempt, bool bypassHandlingCheck) { int localRetryCount = 0; IRetryStrategy retryStrategy = Request.RetryStrategy ?? RetryStrategies.CreateExpBackoffStrategy(); PolicyBuilder val = CreateRetryPolicy(); try { return RetrySyntaxAsync.WaitAndRetryAsync(val, maxJobRetryAttempts, retryStrategy.Calculation, (Action<Exception, TimeSpan, int, Context>)delegate(Exception exception, TimeSpan timespan, int retryCount, Context context) { localRetryCount++; Log.LogTransferError(exception, Request, $"""{caller}""{localRetryCount}""{timespan}""{onException()}""", Array.Empty<object>()); if (!bypassHandlingCheck) HandlePolicyJobException(maxJobRetryAttempts, false); }).ExecuteAsync<T>((Func<CancellationToken, Task<T>>)delegate { token.ThrowIfCancellationRequested(); if (!bypassHandlingCheck) HandlePolicyJobRetry(localRetryCount, token); T result = onExecute(localRetryCount); if (resetRetryAttempt) JobService.ResetRetryAttempt(); return Task.FromResult<T>(result); }, token); } catch (OperationCanceledException exception2) { Log.LogInformation(exception2, "The transfer job has been canceled.", Array.Empty<object>()); throw; } catch (TransferException ex) { if (ex.Fatal) Log.LogFatal(ex, "The transfer job has failed.", Array.Empty<object>()); else Log.LogError(ex, "The transfer job has failed.", Array.Empty<object>()); throw; } finally { if (token.IsCancellationRequested) TryCancelJob(false); } } private void LogWhenTransferFinished(ITransferResult result, CancellationToken token) { LogLocalDriveInformation(); LogIfLegacyWebApiIsUsed(result); LogTransferReportMessage(result); SubmitTransferResultMetrics(result, token); } private void LogIfLegacyWebApiIsUsed(ITransferResult transferResult) { if (transferResult.Configuration.UseLegacyWebApi) Log.LogTransferWarning(transferResult.Request, "Transfer performed with TAPI configured to use legacy WebAPI communication mode with Relativity REST services. ", Array.Empty<object>()); } private void LogLocalDriveInformation() { foreach (DriveStatistics allDrife in DriveStatisticsRepository.GetAllDrives()) { Log.LogTransferInformation(Request, CoreStrings.LocalDriveInformation, allDrife.Name, allDrife.DriveType, allDrife.TotalFreePercent, allDrife.TotalFreeSpace, allDrife.ErrorMessage); } } private void LogTransferReportMessage(ITransferResult result) { if (result.Status == TransferStatus.Failed || result.Status == TransferStatus.Fatal) Log.LogTransferError(result.Request, TransferReportBuilder.GetTransferReportMessage(result), Array.Empty<object>()); else Log.LogTransferInformation(result.Request, TransferReportBuilder.GetTransferReportMessage(result), Array.Empty<object>()); } private void SubmitTransferResultMetrics(ITransferResult result, CancellationToken token) { if (Request.SubmitApmMetrics) try { ServiceFactory.CreateApmMetricsService().SubmitTransferResultMetricsAsync(result, token).ConfigureAwait(false) .GetAwaiter() .GetResult(); } catch (TaskCanceledException) { throw; } catch (Exception exception) { if (ExceptionHelper.IsFatalException(exception)) throw; Log.LogWarning(exception, "Failed waiting for the job APM metrics to complete.", Array.Empty<object>()); } } } }