TransferEngineJob
using Relativity.Transfer.Resources;
using System;
using System.Threading;
namespace Relativity.Transfer
{
public class TransferEngineJob : TransferJobBase
{
private readonly ITransferPathCommand command;
private readonly IRemotePathResolver remoteSourcePathResolver;
private readonly IRemotePathResolver remoteTargetPathResolver;
private TransferEngine engine;
private TransferResult transferResult;
public TransferEngineJob(ITransferLog log, ITransferRequest request, ITransferJobService service, ClientConfiguration configuration, IRelativityServiceFactory serviceFactory, ITransferPathCommand command)
: this(log, request, service, configuration, serviceFactory, command, null, null)
{
}
public TransferEngineJob(ITransferLog log, ITransferRequest request, ITransferJobService service, ClientConfiguration configuration, IRelativityServiceFactory serviceFactory, ITransferPathCommand command, IRemotePathResolver remoteSourcePathResolver, IRemotePathResolver remoteTargetPathResolver)
: base(log, request, service, configuration, serviceFactory)
{
this.command = command;
this.remoteSourcePathResolver = remoteSourcePathResolver;
this.remoteTargetPathResolver = remoteTargetPathResolver;
}
protected override void OnChangeDataRate(int minRateMbps, int targetRateMbps, CancellationToken token)
{
throw new NotSupportedException(CoreStrings.ChangeTransferRateNotSupported);
}
protected override IRemotePathResolver OnCreatePathResolver(bool source)
{
if (!source)
return remoteTargetPathResolver;
return remoteSourcePathResolver;
}
protected override void OnPathAdded(TransferPath path, CancellationToken token)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
ValidateJob();
engine.Add(path, token);
}
protected override void OnJobRetry(TimeSpan timeout, CancellationToken token)
{
StopEngine(token, timeout);
StartEngine(token, timeout);
ValidateJob();
RetryAddPaths(token);
}
protected override void OnJobStarted(TimeSpan timeout, CancellationToken token)
{
engine?.Dispose();
engine = new TransferEngine(base.Request, base.Log, base.Configuration, command, base.JobService);
StartEngine(token, timeout);
transferResult = new TransferResult {
Configuration = base.Configuration,
Request = base.Request
};
}
protected override ITransferResult OnWaitJobCompletion(TimeSpan timeout, CancellationToken token)
{
try {
ValidateJob();
if (!token.IsCancellationRequested) {
StopEngine(token, timeout);
if (!token.IsCancellationRequested)
SetTransferResult(transferResult, engine.Issues, token);
else
transferResult.Status = TransferStatus.Canceled;
} else
transferResult.Status = TransferStatus.Canceled;
return transferResult;
} catch (OperationCanceledException) {
throw;
} catch (TransferException ex2) {
StopEngine(token, timeout);
transferResult.Status = ((!ex2.Fatal) ? TransferStatus.Failed : TransferStatus.Fatal);
return transferResult;
} catch (Exception exception) {
StopEngine(token, timeout);
transferResult.Status = TransferStatus.Failed;
if (ExceptionHelper.IsFatalException(exception))
throw;
return transferResult;
}
}
protected override void OnJobCanceled(bool isDisposing)
{
base.OnJobCanceled(isDisposing);
if (engine?.IsRunning ?? false)
StopEngine(CancellationToken.None, TimeSpan.FromSeconds(30));
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing && engine != null)
engine.Dispose();
}
private void StartEngine(CancellationToken token, TimeSpan timeout)
{
engine.Clear();
engine.Start(token, timeout);
}
private void StopEngine(CancellationToken token, TimeSpan timeout)
{
engine.Stop(token, timeout);
}
private void ValidateJob()
{
if (base.JobService.Statistics.JobError || (engine != null && engine.Status == TransferStatus.Fatal)) {
bool fatal = engine != null && engine.Status == TransferStatus.Fatal;
throw new TransferException(GetEngineErrorMessage(CoreStrings.EngineJobFailedExceptionMessage), fatal);
}
if (engine == null || (engine != null && !engine.IsRunning))
throw new TransferException(GetEngineErrorMessage(CoreStrings.TransferFileEngineNotInitializedExceptionMessage), false);
}
private string GetEngineErrorMessage(string defaultMessage)
{
if (engine == null)
return defaultMessage;
if (string.IsNullOrEmpty(engine.ErrorMessage))
return defaultMessage;
return engine.ErrorMessage;
}
}
}