AsperaTransferJob
using Aspera.Transfer;
using Relativity.Transfer.Aspera.Resources;
using System;
using System.Linq;
using System.Threading;
namespace Relativity.Transfer.Aspera
{
internal class AsperaTransferJob : TransferJobBase
{
private static readonly object SyncRoot = new object();
private readonly AsperaClientConfiguration asperaConfiguration;
private readonly FileStorageBase ;
private readonly IFileSystemService fileSystemService;
private AsperaFaspService faspService;
private AsperaFaspProxy faspProxy;
private AsperaFileTransferListener listener;
private TransferResult transferResult;
private Guid? sessionId;
public override bool IsDataRateChangeSupported => true;
private Guid? SessionId {
get {
lock (SyncRoot) {
return sessionId;
}
}
set {
lock (SyncRoot) {
sessionId = value;
}
}
}
public AsperaTransferJob(ITransferLog log, ITransferRequest request, FileStorageBase targetFileShare, ITransferJobService service, AsperaClientConfiguration configuration, IRelativityServiceFactory serviceFactory, IFileSystemService fileSystemService)
: base(log, request, service, configuration, serviceFactory)
{
if (targetFileShare == null)
throw new ArgumentNullException("targetFileShare");
transferResult = new TransferResult();
asperaConfiguration = configuration;
this.targetFileShare = targetFileShare;
this.fileSystemService = fileSystemService;
faspProxy = new AsperaFaspProxy(log, configuration, fileSystemService);
}
protected override void OnChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token)
{
try {
ValidateJob();
faspProxy.SetRate(SessionId.Value, targetRateMbps * 1000, minRateMbps * 1000, TypeHelper.ParseEnum(asperaConfiguration.Policy, Policy.FAIR, base.Log));
} catch (FaspManagerException ex) {
base.Log.LogError(ex, "An Aspera FASP error occurred when attempting to change the data rate.", Array.Empty<object>());
throw new InvalidOperationException(AsperaStrings.AsperaChangeDataRateExceptionMessage, ex);
} catch (TransferException ex2) {
base.Log.LogError(ex2, "A transfer error occurred when attempting to change the data rate.", Array.Empty<object>());
throw new InvalidOperationException(AsperaStrings.AsperaChangeDataRateExceptionMessage, ex2);
}
}
protected override IRemotePathResolver OnCreatePathResolver(bool source)
{
if (!base.Request.RemotePathsInUncFormat)
return null;
return new AsperaUncPathResolver(targetFileShare.Url, asperaConfiguration.DocRootLevels);
}
protected override void OnJobCanceled(bool isDisposing)
{
base.OnJobCanceled(isDisposing);
CancelJob(isDisposing);
}
protected override void OnJobStarted(TimeSpan timeout, CancellationToken token)
{
faspProxy.SetupRuntime();
transferResult = new TransferResult {
Configuration = base.Configuration,
Request = base.Request,
Status = TransferStatus.NotStarted
};
faspService = new AsperaFaspService(base.Log, asperaConfiguration, faspProxy, fileSystemService);
if (SessionId.HasValue && listener != null) {
faspService.CancelJob(SessionId.Value, true, listener);
listener?.Dispose();
listener = null;
}
listener = new AsperaFileTransferListener(base.Request, base.JobService, base.Log, asperaConfiguration);
CreateJob(timeout, token);
}
protected override void OnJobRetry(TimeSpan timeout, CancellationToken token)
{
CancelJob(true);
CreateJob(timeout, token);
RetryAddPaths(token);
}
protected override void OnPathAdded(TransferPath path, CancellationToken token)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
ValidateJob();
faspService.AddFile(SessionId.Value, path);
}
protected override ITransferResult OnWaitJobCompletion(TimeSpan timeout, CancellationToken token)
{
try {
ValidateJob();
if (!token.IsCancellationRequested && SessionId.HasValue) {
faspService.LockSession(SessionId.Value);
listener.WaitStopJob(token, timeout);
SetTransferResult(transferResult, base.JobService.Issues, token);
}
return transferResult;
} finally {
if (SessionId.HasValue && !token.IsCancellationRequested) {
faspService.RemoveJob(SessionId.Value, listener);
SessionId = null;
}
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing) {
if (listener != null) {
listener.Dispose();
listener = null;
}
if (faspProxy != null) {
faspProxy.Dispose();
faspProxy = null;
}
faspService = null;
}
}
private void CancelJob(bool forced)
{
try {
if (SessionId.HasValue) {
faspService.CancelJob(SessionId.Value, true, listener);
SessionId = null;
}
} catch (FaspManagerException exception) {
base.Log.LogWarning2(exception, base.Request, "The FASP manager failed to cancel the job. This can happen when when an existing job has been canceled internally.", Array.Empty<object>());
} finally {
if (!forced)
transferResult.Status = TransferStatus.Canceled;
}
}
private void CreateJob(TimeSpan timeout, CancellationToken token)
{
listener.Clear();
SessionId = null;
SessionId = faspService.CreateAsperaJob(base.Request, listener);
listener.WaitStartJob(token, timeout);
try {
ValidateJob();
} catch (Exception) {
SessionId = null;
throw;
}
}
private void ValidateJob()
{
ValidateJobIssues();
if (base.JobService.Statistics.JobError || (listener != null && listener.Status == TransferStatus.Fatal)) {
bool fatal = listener != null && listener.Status == TransferStatus.Fatal;
throw new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionFailedExceptionMessage), fatal);
}
if (listener == null || !SessionId.HasValue)
throw new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionNotInitializedExceptionMessage), false);
}
private void ValidateJobIssues()
{
ILookup<int, ITransferIssue> lookup = (from item in base.JobService.Issues
where item.Code.HasValue
select item).ToLookup((ITransferIssue item) => item.Code.Value);
if (lookup[16].Any())
throw new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionFailedExceptionMessage), true);
if (lookup[19].Any())
throw new TransferException(AsperaStrings.AuthenticationError, true);
}
private string GetJobErrorMessage(string defaultMessage)
{
if (string.IsNullOrEmpty(base.JobService.Statistics.JobErrorMessage))
return defaultMessage;
return base.JobService.Statistics.JobErrorMessage;
}
}
}