AsperaTransferJob
using Aspera.Transfer;
using Relativity.Transfer.Aspera.Issues;
using Relativity.Transfer.Aspera.Resources;
using System;
using System.Linq;
using System.Threading;
namespace Relativity.Transfer.Aspera
{
internal class AsperaTransferJob : TransferJobBase
{
private static readonly object SyncRoot = new object();
private readonly AsperaClientConfiguration asperaConfiguration;
private readonly FileStorageBase ;
private readonly IFileSystemService fileSystemService;
private AsperaFaspService faspService;
private AsperaFaspProxy faspProxy;
private AsperaEventListener eventListener;
private AsperaEventHandler eventHandler;
private TransferResult transferResult;
private Guid? sessionId;
public override bool IsDataRateChangeSupported => true;
private Guid? SessionId {
get {
lock (SyncRoot) {
return sessionId;
}
}
set {
lock (SyncRoot) {
sessionId = value;
}
}
}
public AsperaTransferJob(ITransferLog log, ITransferRequest request, FileStorageBase targetFileShare, ITransferJobService service, AsperaClientConfiguration configuration, IRelativityServiceFactory serviceFactory, IFileSystemService fileSystemService)
: base(log, request, service, configuration, serviceFactory)
{
if (targetFileShare == null)
throw new ArgumentNullException("targetFileShare");
transferResult = new TransferResult();
asperaConfiguration = configuration;
this.targetFileShare = targetFileShare;
this.fileSystemService = fileSystemService;
faspProxy = new AsperaFaspProxy(log, configuration, fileSystemService);
}
protected override void OnChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token)
{
try {
ValidateJob();
faspProxy.SetRate(SessionId.Value, targetRateMbps * 1000, minRateMbps * 1000, TypeHelper.ParseEnum(asperaConfiguration.Policy, Policy.FAIR, base.Log));
} catch (FaspManagerException exception) {
base.Log.LogTransferWarning(exception, base.Request, AsperaStrings.TranferNotReadyForDataRateChangeError, Array.Empty<object>());
} catch (TransferException ex) {
base.Log.LogError(ex, AsperaStrings.ChangeDataRateError, Array.Empty<object>());
throw new InvalidOperationException(AsperaStrings.AsperaChangeDataRateExceptionMessage, ex);
}
}
protected override IRemotePathResolver OnCreatePathResolver(bool source)
{
if (!base.Request.RemotePathsInUncFormat)
return null;
return new AsperaUncPathResolver(targetFileShare.Url, asperaConfiguration.DocRootLevels, base.Log);
}
protected override void OnJobCanceled(bool isDisposing)
{
base.OnJobCanceled(isDisposing);
CancelJob(isDisposing);
}
protected override void OnJobStarted(TimeSpan timeout, CancellationToken token)
{
faspProxy.SetupRuntime();
transferResult = new TransferResult {
Configuration = base.Configuration,
Request = base.Request,
Status = TransferStatus.NotStarted
};
faspService = new AsperaFaspService(base.Log, asperaConfiguration, faspProxy, fileSystemService);
if (SessionId.HasValue && eventListener != null) {
faspService.CancelJob(SessionId.Value, true, eventListener);
eventListener?.Dispose();
eventListener = null;
}
CreateJob(timeout, token);
}
protected override void OnJobRetry(TimeSpan timeout, CancellationToken token)
{
base.Log.LogTransferInformation(base.Request, "Retrying job. Timeout: {timeout}, request: {request}", timeout, base.Request);
CancelJob(true);
CreateJob(timeout, token);
RetryAddPaths(token);
}
protected override void OnPathAdded(TransferPath path, CancellationToken token)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
ValidateJob();
faspService.AddFile(SessionId.Value, path);
}
protected override ITransferResult OnWaitJobCompletion(TimeSpan timeout, CancellationToken token)
{
try {
ValidateJob();
if (!token.IsCancellationRequested && SessionId.HasValue) {
faspService.LockSession(SessionId.Value);
eventHandler.WaitStopJob(token, timeout);
eventListener.StopListening(timeout);
SetTransferResult(transferResult, base.JobService.Issues, token);
}
return transferResult;
} finally {
if (SessionId.HasValue && !token.IsCancellationRequested) {
faspService.RemoveJob(SessionId.Value, eventListener);
SessionId = null;
}
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing) {
if (eventHandler != null) {
eventHandler.Dispose();
eventHandler = null;
}
if (eventListener != null) {
eventListener.Dispose();
eventListener = null;
}
if (faspProxy != null) {
faspProxy.Dispose();
faspProxy = null;
}
faspService = null;
}
}
private void CancelJob(bool forced)
{
try {
if (SessionId.HasValue) {
faspService.CancelJob(SessionId.Value, true, eventListener);
SessionId = null;
}
} catch (FaspManagerException exception) {
base.Log.LogTransferWarning(exception, base.Request, "The FASP manager failed to cancel the job. This can happen when when an existing job has been canceled internally.", Array.Empty<object>());
} finally {
if (!forced)
transferResult.Status = TransferStatus.Canceled;
}
}
private void CreateJob(TimeSpan timeout, CancellationToken token)
{
base.JobService.ClearTransferIssueList();
eventHandler = new AsperaEventHandler(base.Request, base.JobService, base.Log, asperaConfiguration);
eventListener = new AsperaEventListener(token, eventHandler);
SessionId = null;
SessionId = faspService.CreateAsperaJob(base.Request, eventListener);
eventHandler.WaitStartJob(token, timeout);
try {
ValidateJob();
} catch (Exception) {
SessionId = null;
throw;
}
}
private void ValidateJob()
{
ValidateJobIssues();
bool flag = eventListener != null && eventHandler.GetTransferStatus() == TransferStatus.Fatal;
if (base.JobService.Statistics.JobError | flag) {
TransferException ex = new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionFailedExceptionMessage), flag);
if (flag)
base.Log.LogTransferFatal(ex, base.Request, "Aspera job failed due to unknown fatal error. \n{Statistics}", base.JobService.Statistics);
throw ex;
}
if (eventListener == null || !SessionId.HasValue)
throw new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionNotInitializedExceptionMessage), false);
}
private void ValidateJobIssues()
{
ILookup<int, ITransferIssue> lookup = (from item in base.JobService.Issues
where item.Code.HasValue
select item).ToLookup((ITransferIssue item) => item.Code.Value);
if (lookup[16].Any()) {
TransferException ex = new TransferException(GetJobErrorMessage(AsperaStrings.AsperaSessionFailedExceptionMessage), true);
base.Log.LogTransferFatal(ex, base.Request, "Aspera job failed due to connection lost error. \n{Statistics} \n {Issues}", base.JobService.Statistics, IssuesDumper.Dump(base.JobService.Issues));
throw ex;
}
if (lookup[19].Any())
throw new TransferException(AsperaStrings.AuthenticationError, true);
}
private string GetJobErrorMessage(string defaultMessage)
{
if (string.IsNullOrEmpty(base.JobService.Statistics.JobErrorMessage))
return defaultMessage;
return base.JobService.Statistics.JobErrorMessage;
}
}
}