TransferEngine
using Relativity.Transfer.Job;
using Relativity.Transfer.Paths;
using Relativity.Transfer.Resources;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Relativity.Transfer
{
public class TransferEngine : IDisposable
{
private static readonly object SyncRoot = new object();
private readonly AutoResetEvent startJobWaitHandle = new AutoResetEvent(false);
private readonly AutoResetEvent stopJobWaitHandle = new AutoResetEvent(false);
private readonly ClientConfiguration configuration;
private readonly ITransferPathCommand command;
private readonly ITransferRequest request;
private readonly ITransferLog log;
private readonly ITransferJobService jobService;
private BlockingCollection<TransferPath> blockingCollection;
private bool isRunning;
private TransferStatus status;
private bool disposed;
private string errorMessage;
private bool startJobStatus;
private bool stopJobStatus;
private readonly IPathStatusChecker pathStatusChecker;
private BlockingCollection<TransferPath> BlockingCollection {
get {
return blockingCollection ?? (blockingCollection = new BlockingCollection<TransferPath>());
}
set {
blockingCollection = value;
}
}
public int Count {
get {
lock (SyncRoot) {
return BlockingCollection.Count;
}
}
}
public string ErrorMessage {
get {
lock (SyncRoot) {
return errorMessage;
}
}
private set {
lock (SyncRoot) {
errorMessage = value;
}
}
}
public bool IsRunning {
get {
lock (SyncRoot) {
return isRunning;
}
}
private set {
lock (SyncRoot) {
isRunning = value;
}
}
}
public IReadOnlyCollection<ITransferIssue> Issues => command.Issues;
public TransferStatus Status {
get {
lock (SyncRoot) {
return status;
}
}
private set {
lock (SyncRoot) {
status = value;
}
}
}
private bool StartJobStatus {
get {
lock (SyncRoot) {
return startJobStatus;
}
}
set {
lock (SyncRoot) {
startJobStatus = value;
}
}
}
private bool StopJobStatus {
get {
lock (SyncRoot) {
return stopJobStatus;
}
}
set {
lock (SyncRoot) {
stopJobStatus = value;
}
}
}
public TransferEngine(ITransferRequest request, ITransferLog log, ClientConfiguration configuration, ITransferPathCommand command, ITransferJobService service)
{
if (request == null)
throw new ArgumentNullException("request");
if (log == null)
throw new ArgumentNullException("log");
if (configuration == null)
throw new ArgumentNullException("configuration");
if (command == null)
throw new ArgumentNullException("command");
if (service == null)
throw new ArgumentNullException("service");
this.request = request;
this.log = log;
this.configuration = configuration;
this.command = command;
jobService = service;
disposed = false;
IsRunning = false;
ErrorMessage = null;
pathStatusChecker = new RetryablePathStatusChecker(configuration);
}
public void Add(TransferPath path, CancellationToken token)
{
if (path == (TransferPath)null)
throw new ArgumentNullException("path");
if (string.IsNullOrEmpty(path.SourcePath))
throw new ArgumentException(CoreStrings.SourcePathArgumentExceptionMessage, "path");
if (string.IsNullOrEmpty(path.TargetPath))
throw new ArgumentException(CoreStrings.TargetPathArgumentExceptionMessage, "path");
BlockingCollection.Add(path, token);
jobService.Statistics.TotalRequestFiles++;
}
public void Clear()
{
if (jobService.Statistics.RetryAttempt == 0) {
DisposeBlockingCollection();
jobService.Statistics.Clear();
}
jobService.Statistics.EndTime = null;
jobService.Statistics.TotalFatalErrors = 0;
ErrorMessage = null;
IsRunning = false;
Status = TransferStatus.NotStarted;
startJobWaitHandle.Reset();
stopJobWaitHandle.Reset();
StartJobStatus = false;
StopJobStatus = false;
}
public void Start(CancellationToken token, TimeSpan timeout)
{
if (IsRunning)
throw new InvalidOperationException(CoreStrings.TransferFileEngineAlreadyStartedExceptionMessage);
DisposeBlockingCollection();
IsRunning = false;
DateTime? jobCompleteTimestamp;
Task.Run(delegate {
try {
IsRunning = true;
jobService.Statistics.StartTime = DateTime.Now;
log.LogTransferInformation(request, "Transfer Engine - the engine has started.", Array.Empty<object>());
jobCompleteTimestamp = null;
if (jobService.Statistics.RetryAttempt == 0)
command.PreExecute(token);
BlockingCollectionPartitioner<TransferPath> source = new BlockingCollectionPartitioner<TransferPath>(BlockingCollection, token);
SignalStartJob();
ParallelLoopResult parallelLoopResult = Parallel.ForEach(source, new ParallelOptions {
CancellationToken = token,
MaxDegreeOfParallelism = configuration.MaxJobParallelism
}, delegate(TransferPath path, ParallelLoopState state) {
if (!state.ShouldExitCurrentIteration) {
if (token.IsCancellationRequested)
state.Stop();
TransferPathResult transferPathResult = new TransferPathResult {
Path = path,
Status = TransferPathStatus.Started,
StartTime = new DateTime?(DateTime.Now)
};
JobTransferPath jobTransferPath = jobService.GetJobTransferPath(transferPathResult.Path);
if (jobTransferPath == null) {
jobTransferPath = new JobTransferPath {
Path = path,
Status = TransferPathStatus.Started
};
jobService.Save(jobTransferPath);
}
try {
transferPathResult = command.Execute(path, token);
UpdateJobTransferPath(jobTransferPath, transferPathResult);
jobCompleteTimestamp = transferPathResult.EndTime;
} catch (OperationCanceledException) {
transferPathResult.Status = TransferPathStatus.Canceled;
transferPathResult.EndTime = DateTime.Now;
UpdateJobTransferPath(jobTransferPath, transferPathResult);
throw;
} catch (AggregateException ex8) {
transferPathResult.Status = TransferPathStatus.Failed;
transferPathResult.EndTime = DateTime.Now;
UpdateJobTransferPath(jobTransferPath, transferPathResult);
log.LogTransferError(ex8, request, "Failed to transfer the '{SourcePath}' file.", path.SourcePath, LogRedaction.OnPositions(default(int)));
throw ex8.Flatten();
} catch (TransferException ex9) {
jobCompleteTimestamp = DateTime.Now;
transferPathResult.Status = ((!ex9.Fatal) ? TransferPathStatus.Failed : TransferPathStatus.Fatal);
transferPathResult.EndTime = jobCompleteTimestamp;
UpdateJobTransferPath(jobTransferPath, transferPathResult);
log.LogTransferError(ex9, request, "Failed to transfer the '{SourcePath}' file.", path.SourcePath, LogRedaction.OnPositions(default(int)));
throw;
} catch (Exception exception) {
jobCompleteTimestamp = DateTime.Now;
transferPathResult.Status = TransferPathStatus.Failed;
transferPathResult.EndTime = jobCompleteTimestamp;
UpdateJobTransferPath(jobTransferPath, transferPathResult);
log.LogTransferError(exception, request, "Failed to transfer the '{SourcePath}' file.", path.SourcePath, LogRedaction.OnPositions(default(int)));
throw;
}
}
});
log.LogTransferInformation(request, "Transfer Engine - ParallelResult - IsCompleted={IsCompleted}.", parallelLoopResult.IsCompleted);
if (!jobCompleteTimestamp.HasValue)
jobCompleteTimestamp = DateTime.Now;
jobService.Statistics.EndTime = jobCompleteTimestamp;
command.PostExecute(token);
SetStatus();
log.LogTransferInformation(request, "Transfer Engine - Successfully executed all commands.", Array.Empty<object>());
} catch (OperationCanceledException) {
jobService.Statistics.EndTime = DateTime.Now;
Status = TransferStatus.Canceled;
log.LogTransferInformation(request, "The job has been canceled.", Array.Empty<object>());
} catch (AggregateException ex2) {
jobService.Statistics.EndTime = DateTime.Now;
AggregateException ex3 = ex2.Flatten();
if (ex3.InnerException != null && ex3.InnerException.GetType() == typeof(TransferException)) {
TransferException ex4 = (TransferException)ex3.InnerException;
Status = ((!ex4.Fatal) ? TransferStatus.Failed : TransferStatus.Fatal);
} else
Status = TransferStatus.Failed;
jobService.Statistics.JobError = true;
jobService.Statistics.JobErrorMessage = (ex3.InnerException?.Message ?? ex3.Message);
log.LogTransferError(ex2, request, "Transfer Engine - Failed to execute all commands.", Array.Empty<object>());
} catch (TransferException ex5) {
jobService.Statistics.EndTime = DateTime.Now;
Status = ((!ex5.Fatal) ? TransferStatus.Failed : TransferStatus.Fatal);
jobService.Statistics.JobError = true;
jobService.Statistics.JobErrorMessage = ex5.Message;
log.LogTransferError(ex5, request, "Transfer Engine - Failed to execute all commands.", Array.Empty<object>());
} catch (Exception ex6) {
jobService.Statistics.EndTime = DateTime.Now;
Status = TransferStatus.Failed;
jobService.Statistics.JobError = true;
jobService.Statistics.JobErrorMessage = ex6.Message;
log.LogTransferError(ex6, request, "Transfer Engine - Failed to execute all commands.", Array.Empty<object>());
if (ExceptionHelper.IsFatalException(ex6))
throw;
} finally {
IsRunning = false;
log.LogTransferInformation(request, "Transfer Engine - The engine has terminated.", Array.Empty<object>());
SignalStartJob();
SignalStopJob();
SignalStopJob();
}
});
WaitStartJob(token, timeout);
}
public void Stop(CancellationToken token, TimeSpan timeout)
{
try {
BlockingCollection.CompleteAdding();
WaitStopJob(token, timeout);
} finally {
IsRunning = false;
if (token.IsCancellationRequested)
Status = TransferStatus.Canceled;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void DisposeBlockingCollection()
{
lock (SyncRoot) {
BlockingCollection.Dispose();
BlockingCollection = null;
}
}
private void Dispose(bool disposing)
{
if (!disposed) {
if (disposing) {
DisposeBlockingCollection();
startJobWaitHandle.Dispose();
stopJobWaitHandle.Dispose();
}
disposed = true;
}
}
private void SetStatus()
{
Status = jobService.CalculateTransferStatus();
}
private void UpdateJobTransferPath(JobTransferPath jobTransferPath, TransferPathResult result)
{
jobTransferPath.BytesTransferred = result.BytesTransferred;
jobTransferPath.Checksum = result.Checksum;
jobTransferPath.EndTime = result.EndTime;
jobTransferPath.StartTime = result.StartTime;
jobTransferPath.Status = result.Status;
TransferStatisticsCalculator transferStatisticsCalculator = new TransferStatisticsCalculator();
transferStatisticsCalculator.Update(jobService.Statistics, result.Status);
if (pathStatusChecker.IsRetrayable(result.Status))
jobTransferPath.RetryCount++;
jobService.Save(jobTransferPath);
}
private void SignalStartJob()
{
lock (SyncRoot) {
if (!disposed) {
startJobWaitHandle.Set();
log.LogTransferInformation(request, "Set the transfer engine start wait handle.", Array.Empty<object>());
}
}
}
private void SignalStopJob()
{
lock (SyncRoot) {
if (!disposed) {
stopJobWaitHandle.Set();
log.LogTransferInformation(request, "Set the transfer engine stop wait handle.", Array.Empty<object>());
}
}
}
private void WaitStartJob(CancellationToken token, TimeSpan timeout)
{
if (!StartJobStatus) {
log.LogTransferInformation(request, "Waiting for the transfer engine start job wait handle.", Array.Empty<object>());
WaitHandle.WaitAny(new WaitHandle[2] {
token.WaitHandle,
startJobWaitHandle
}, timeout);
StartJobStatus = true;
log.LogTransferInformation(request, "Received the transfer engine start job wait handle.", Array.Empty<object>());
}
}
private void WaitStopJob(CancellationToken token, TimeSpan timeout)
{
if (!StopJobStatus) {
log.LogTransferInformation(request, "Waiting for the transfer engine stop job wait handle.", Array.Empty<object>());
WaitHandle.WaitAny(new WaitHandle[2] {
token.WaitHandle,
stopJobWaitHandle
}, timeout);
StopJobStatus = true;
log.LogTransferInformation(request, "Received the transfer engine stop job wait handle.", Array.Empty<object>());
}
}
}
}