<PackageReference Include="Relativity.Transfer.Client" Version="7.2.26" />

TransferEngine

public class TransferEngine : IDisposable
using Relativity.Transfer.Job; using Relativity.Transfer.Paths; using Relativity.Transfer.Resources; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace Relativity.Transfer { public class TransferEngine : IDisposable { private static readonly object SyncRoot = new object(); private readonly AutoResetEvent startJobWaitHandle = new AutoResetEvent(false); private readonly AutoResetEvent stopJobWaitHandle = new AutoResetEvent(false); private readonly ClientConfiguration configuration; private readonly ITransferPathCommand command; private readonly ITransferRequest request; private readonly ITransferLog log; private readonly ITransferJobService jobService; private BlockingCollection<TransferPath> blockingCollection; private bool isRunning; private TransferStatus status; private bool disposed; private string errorMessage; private bool startJobStatus; private bool stopJobStatus; private readonly IPathStatusChecker pathStatusChecker; private BlockingCollection<TransferPath> BlockingCollection { get { return blockingCollection ?? (blockingCollection = new BlockingCollection<TransferPath>()); } set { blockingCollection = value; } } public int Count { get { lock (SyncRoot) { return BlockingCollection.Count; } } } public string ErrorMessage { get { lock (SyncRoot) { return errorMessage; } } private set { lock (SyncRoot) { errorMessage = value; } } } public bool IsRunning { get { lock (SyncRoot) { return isRunning; } } private set { lock (SyncRoot) { isRunning = value; } } } public IReadOnlyCollection<ITransferIssue> Issues => command.Issues; public TransferStatus Status { get { lock (SyncRoot) { return status; } } private set { lock (SyncRoot) { status = value; } } } private bool StartJobStatus { get { lock (SyncRoot) { return startJobStatus; } } set { lock (SyncRoot) { startJobStatus = value; } } } private bool StopJobStatus { get { lock (SyncRoot) { return stopJobStatus; } } set { lock (SyncRoot) { stopJobStatus = value; } } } public TransferEngine(ITransferRequest request, ITransferLog log, ClientConfiguration configuration, ITransferPathCommand command, ITransferJobService service) { if (request == null) throw new ArgumentNullException("request"); if (log == null) throw new ArgumentNullException("log"); if (configuration == null) throw new ArgumentNullException("configuration"); if (command == null) throw new ArgumentNullException("command"); if (service == null) throw new ArgumentNullException("service"); this.request = request; this.log = log; this.configuration = configuration; this.command = command; jobService = service; disposed = false; IsRunning = false; ErrorMessage = null; pathStatusChecker = new RetryablePathStatusChecker(configuration); } public void Add(TransferPath path, CancellationToken token) { if (path == (TransferPath)null) throw new ArgumentNullException("path"); if (string.IsNullOrEmpty(path.SourcePath)) throw new ArgumentException(CoreStrings.SourcePathArgumentExceptionMessage, "path"); if (string.IsNullOrEmpty(path.TargetPath)) throw new ArgumentException(CoreStrings.TargetPathArgumentExceptionMessage, "path"); BlockingCollection.Add(path, token); jobService.Statistics.TotalRequestFiles++; } public void Clear() { if (jobService.Statistics.RetryAttempt == 0) { DisposeBlockingCollection(); jobService.Statistics.Clear(); } jobService.Statistics.EndTime = null; jobService.Statistics.TotalFatalErrors = 0; ErrorMessage = null; IsRunning = false; Status = TransferStatus.NotStarted; startJobWaitHandle.Reset(); stopJobWaitHandle.Reset(); StartJobStatus = false; StopJobStatus = false; } public void Start(CancellationToken token, TimeSpan timeout) { if (IsRunning) throw new InvalidOperationException(CoreStrings.TransferFileEngineAlreadyStartedExceptionMessage); DisposeBlockingCollection(); IsRunning = false; DateTime? jobCompleteTimestamp; Task.Run(delegate { try { IsRunning = true; jobService.Statistics.StartTime = DateTime.Now; log.LogTransferInformation(request, "Transfer Engine - the engine has started.", Array.Empty<object>()); jobCompleteTimestamp = null; if (jobService.Statistics.RetryAttempt == 0) command.PreExecute(token); BlockingCollectionPartitioner<TransferPath> source = new BlockingCollectionPartitioner<TransferPath>(BlockingCollection, token); SignalStartJob(); ParallelLoopResult parallelLoopResult = Parallel.ForEach(source, new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = configuration.MaxJobParallelism }, delegate(TransferPath path, ParallelLoopState state) { if (!state.ShouldExitCurrentIteration) { if (token.IsCancellationRequested) state.Stop(); TransferPathResult transferPathResult = new TransferPathResult { Path = path, Status = TransferPathStatus.Started, StartTime = new DateTime?(DateTime.Now) }; JobTransferPath jobTransferPath = jobService.GetJobTransferPath(transferPathResult.Path); if (jobTransferPath == null) { jobTransferPath = new JobTransferPath { Path = path, Status = TransferPathStatus.Started }; jobService.Save(jobTransferPath); } try { transferPathResult = command.Execute(path, token); UpdateJobTransferPath(jobTransferPath, transferPathResult); jobCompleteTimestamp = transferPathResult.EndTime; } catch (OperationCanceledException) { transferPathResult.Status = TransferPathStatus.Canceled; transferPathResult.EndTime = DateTime.Now; UpdateJobTransferPath(jobTransferPath, transferPathResult); throw; } catch (AggregateException ex8) { transferPathResult.Status = TransferPathStatus.Failed; transferPathResult.EndTime = DateTime.Now; UpdateJobTransferPath(jobTransferPath, transferPathResult); log.LogTransferError(ex8, request, "Failed to transfer the '{SourcePath}' file.", path.SourcePath, LogRedaction.OnPositions(default(int))); throw ex8.Flatten(); } catch (TransferException ex9) { jobCompleteTimestamp = DateTime.Now; transferPathResult.Status = ((!ex9.Fatal) ? TransferPathStatus.Failed : TransferPathStatus.Fatal); transferPathResult.EndTime = jobCompleteTimestamp; UpdateJobTransferPath(jobTransferPath, transferPathResult); log.LogTransferError(ex9, request, "Failed to transfer the '{SourcePath}' file.", path.SourcePath, LogRedaction.OnPositions(default(int))); throw; } catch (Exception exception) { jobCompleteTimestamp = DateTime.Now; transferPathResult.Status = TransferPathStatus.Failed; transferPathResult.EndTime = jobCompleteTimestamp; UpdateJobTransferPath(jobTransferPath, transferPathResult); log.LogTransferError(exception, request, "Failed to transfer the '{SourcePath}' file.", path.SourcePath, LogRedaction.OnPositions(default(int))); throw; } } }); log.LogTransferInformation(request, "Transfer Engine - ParallelResult - IsCompleted={IsCompleted}.", parallelLoopResult.IsCompleted); if (!jobCompleteTimestamp.HasValue) jobCompleteTimestamp = DateTime.Now; jobService.Statistics.EndTime = jobCompleteTimestamp; command.PostExecute(token); SetStatus(); log.LogTransferInformation(request, "Transfer Engine - Successfully executed all commands.", Array.Empty<object>()); } catch (OperationCanceledException) { jobService.Statistics.EndTime = DateTime.Now; Status = TransferStatus.Canceled; log.LogTransferInformation(request, "The job has been canceled.", Array.Empty<object>()); } catch (AggregateException ex2) { jobService.Statistics.EndTime = DateTime.Now; AggregateException ex3 = ex2.Flatten(); if (ex3.InnerException != null && ex3.InnerException.GetType() == typeof(TransferException)) { TransferException ex4 = (TransferException)ex3.InnerException; Status = ((!ex4.Fatal) ? TransferStatus.Failed : TransferStatus.Fatal); } else Status = TransferStatus.Failed; jobService.Statistics.JobError = true; jobService.Statistics.JobErrorMessage = (ex3.InnerException?.Message ?? ex3.Message); log.LogTransferError(ex2, request, "Transfer Engine - Failed to execute all commands.", Array.Empty<object>()); } catch (TransferException ex5) { jobService.Statistics.EndTime = DateTime.Now; Status = ((!ex5.Fatal) ? TransferStatus.Failed : TransferStatus.Fatal); jobService.Statistics.JobError = true; jobService.Statistics.JobErrorMessage = ex5.Message; log.LogTransferError(ex5, request, "Transfer Engine - Failed to execute all commands.", Array.Empty<object>()); } catch (Exception ex6) { jobService.Statistics.EndTime = DateTime.Now; Status = TransferStatus.Failed; jobService.Statistics.JobError = true; jobService.Statistics.JobErrorMessage = ex6.Message; log.LogTransferError(ex6, request, "Transfer Engine - Failed to execute all commands.", Array.Empty<object>()); if (ExceptionHelper.IsFatalException(ex6)) throw; } finally { IsRunning = false; log.LogTransferInformation(request, "Transfer Engine - The engine has terminated.", Array.Empty<object>()); SignalStartJob(); SignalStopJob(); SignalStopJob(); } }); WaitStartJob(token, timeout); } public void Stop(CancellationToken token, TimeSpan timeout) { try { BlockingCollection.CompleteAdding(); WaitStopJob(token, timeout); } finally { IsRunning = false; if (token.IsCancellationRequested) Status = TransferStatus.Canceled; } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void DisposeBlockingCollection() { lock (SyncRoot) { BlockingCollection.Dispose(); BlockingCollection = null; } } private void Dispose(bool disposing) { if (!disposed) { if (disposing) { DisposeBlockingCollection(); startJobWaitHandle.Dispose(); stopJobWaitHandle.Dispose(); } disposed = true; } } private void SetStatus() { Status = jobService.CalculateTransferStatus(); } private void UpdateJobTransferPath(JobTransferPath jobTransferPath, TransferPathResult result) { jobTransferPath.BytesTransferred = result.BytesTransferred; jobTransferPath.Checksum = result.Checksum; jobTransferPath.EndTime = result.EndTime; jobTransferPath.StartTime = result.StartTime; jobTransferPath.Status = result.Status; TransferStatisticsCalculator transferStatisticsCalculator = new TransferStatisticsCalculator(); transferStatisticsCalculator.Update(jobService.Statistics, result.Status); if (pathStatusChecker.IsRetrayable(result.Status)) jobTransferPath.RetryCount++; jobService.Save(jobTransferPath); } private void SignalStartJob() { lock (SyncRoot) { if (!disposed) { startJobWaitHandle.Set(); log.LogTransferInformation(request, "Set the transfer engine start wait handle.", Array.Empty<object>()); } } } private void SignalStopJob() { lock (SyncRoot) { if (!disposed) { stopJobWaitHandle.Set(); log.LogTransferInformation(request, "Set the transfer engine stop wait handle.", Array.Empty<object>()); } } } private void WaitStartJob(CancellationToken token, TimeSpan timeout) { if (!StartJobStatus) { log.LogTransferInformation(request, "Waiting for the transfer engine start job wait handle.", Array.Empty<object>()); WaitHandle.WaitAny(new WaitHandle[2] { token.WaitHandle, startJobWaitHandle }, timeout); StartJobStatus = true; log.LogTransferInformation(request, "Received the transfer engine start job wait handle.", Array.Empty<object>()); } } private void WaitStopJob(CancellationToken token, TimeSpan timeout) { if (!StopJobStatus) { log.LogTransferInformation(request, "Waiting for the transfer engine stop job wait handle.", Array.Empty<object>()); WaitHandle.WaitAny(new WaitHandle[2] { token.WaitHandle, stopJobWaitHandle }, timeout); StopJobStatus = true; log.LogTransferInformation(request, "Received the transfer engine stop job wait handle.", Array.Empty<object>()); } } } }