<PackageReference Include="Relativity.Transfer.Client" Version="6.2.1" />

AsperaTransferJob

using Aspera.Transfer; using Relativity.Transfer.Aspera.Resources; using System; using System.Linq; using System.Threading; namespace Relativity.Transfer.Aspera { internal class AsperaTransferJob : TransferJobBase { private static readonly object SyncRoot = new object(); private readonly AsperaClientConfiguration asperaConfiguration; private readonly FileStorageBase targetFileShare; private readonly IFileSystemService fileSystemService; private AsperaFaspService faspService; private AsperaFaspProxy faspProxy; private AsperaFileTransferListener listener; private TransferResult transferResult; private Guid? sessionId; public override bool IsDataRateChangeSupported => true; private Guid? SessionId { get { lock (SyncRoot) { return sessionId; } } set { lock (SyncRoot) { sessionId = value; } } } public AsperaTransferJob(ITransferLog log, ITransferRequest request, FileStorageBase targetFileShare, ITransferJobService service, AsperaClientConfiguration configuration, IRelativityServiceFactory serviceFactory, IFileSystemService fileSystemService) : base(log, request, service, configuration, serviceFactory) { if (targetFileShare == null) throw new ArgumentNullException("targetFileShare"); transferResult = new TransferResult(); asperaConfiguration = configuration; this.targetFileShare = targetFileShare; this.fileSystemService = fileSystemService; faspProxy = new AsperaFaspProxy(log, configuration, fileSystemService); } protected override void OnChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token) { try { ValidateJob(); faspProxy.SetRate(SessionId.Value, targetRateMbps * 1000, minRateMbps * 1000, TypeHelper.ParseEnum(asperaConfiguration.Policy, Policy.FAIR, base.Log)); } catch (FaspManagerException ex) { base.Log.LogError(ex, "An Aspera FASP error occurred when attempting to change the data rate.", Array.Empty<object>()); throw new InvalidOperationException(AsperaStrings.AsperaChangeDataRateExceptionMessage, ex); } catch (TransferException ex2) { base.Log.LogError(ex2, "A transfer error occurred when attempting to change the data rate.", Array.Empty<object>()); throw new InvalidOperationException(AsperaStrings.AsperaChangeDataRateExceptionMessage, ex2); } } protected override IRemotePathResolver OnCreatePathResolver(bool source) { if (!base.Request.RemotePathsInUncFormat) return null; return new AsperaUncPathResolver(targetFileShare.Url, asperaConfiguration.DocRootLevels); } protected override void OnJobCanceled(bool isDisposing) { base.OnJobCanceled(isDisposing); CancelJob(isDisposing); } protected override void OnJobStarted(TimeSpan timeout, CancellationToken token) { faspProxy.SetupRuntime(); transferResult = new TransferResult { Configuration = base.Configuration, Request = base.Request, Status = TransferStatus.NotStarted }; faspService = new AsperaFaspService(base.Log, asperaConfiguration, faspProxy, fileSystemService); if (SessionId.HasValue && listener != null) { faspService.CancelJob(SessionId.Value, true, listener); listener?.Dispose(); listener = null; } listener = new AsperaFileTransferListener(base.Request, base.JobService, base.Log, asperaConfiguration); CreateJob(timeout, token); } protected override void OnJobRetry(TimeSpan timeout, CancellationToken token) { CancelJob(true); CreateJob(timeout, token); RetryAddPaths(token); } protected override void OnPathAdded(TransferPath path, CancellationToken token) { if (path == (TransferPath)null) throw new ArgumentNullException("path"); ValidateJob(); faspService.AddFile(SessionId.Value, path); } protected override ITransferResult OnWaitJobCompletion(TimeSpan timeout, CancellationToken token) { try { ValidateJob(); if (!token.IsCancellationRequested && SessionId.HasValue) { faspService.LockSession(SessionId.Value); listener.WaitStopJob(token, timeout); SetTransferResult(transferResult, base.JobService.Issues, token); } return transferResult; } finally { if (SessionId.HasValue && !token.IsCancellationRequested) { faspService.RemoveJob(SessionId.Value, listener); SessionId = null; } } } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { if (listener != null) { listener.Dispose(); listener = null; } if (faspProxy != null) { faspProxy.Dispose(); faspProxy = null; } faspService = null; } } private void CancelJob(bool forced) { try { if (SessionId.HasValue) { faspService.CancelJob(SessionId.Value, true, listener); SessionId = null; } } catch (FaspManagerException exception) { base.Log.LogWarning2(exception, base.Request, "The FASP manager failed to cancel the job. This can happen when when an existing job has been canceled internally.", Array.Empty<object>()); } finally { if (!forced) transferResult.Status = TransferStatus.Canceled; } } private void CreateJob(TimeSpan timeout, CancellationToken token) { listener.Clear(); SessionId = null; SessionId = faspService.CreateAsperaJob(base.Request, listener); listener.WaitStartJob(token, timeout); try { ValidateJob(); } catch (Exception) { SessionId = null; throw; } } private void ValidateJob() { ValidateJobIssues(); if (base.JobService.Statistics.JobError || (listener != null && listener.Status == TransferStatus.Fatal)) { bool fatal = listener != null && listener.Status == TransferStatus.Fatal; throw new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionFailedExceptionMessage), fatal); } if (listener == null || !SessionId.HasValue) throw new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionNotInitializedExceptionMessage), false); } private void ValidateJobIssues() { ILookup<int, ITransferIssue> lookup = (from item in base.JobService.Issues where item.Code.HasValue select item).ToLookup((ITransferIssue item) => item.Code.Value); if (lookup[16].Any()) throw new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionFailedExceptionMessage), true); if (lookup[19].Any()) throw new TransferException(AsperaStrings.AuthenticationError, true); } private string GetJobErrorMessage(string defaultMessage) { if (string.IsNullOrEmpty(base.JobService.Statistics.JobErrorMessage)) return defaultMessage; return base.JobService.Statistics.JobErrorMessage; } } }