<PackageReference Include="Relativity.Transfer.Client" Version="5.0.7" />

AzureBlobTransferCommand

using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; using Microsoft.WindowsAzure.Storage.DataMovement; using Relativity.Transfer.AzureBlob.Resources; using System; using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.IO; using System.Threading; namespace Relativity.Transfer.AzureBlob { public class AzureBlobTransferCommand : TransferPathCommandBase { private static readonly object SyncRoot = new object(); private readonly ConcurrentDictionary<string, CloudBlobDirectory> blobDirectories = new ConcurrentDictionary<string, CloudBlobDirectory>(); private readonly AzureBlobClientConfiguration configuration; private SingleTransferContext azureTransferContext; private CloudBlobContainer blobContainer; private CloudStorageAccount storageAccount; public AzureBlobTransferCommand(ITransferLog log, ITransferRequest request, ITransferJobService jobService, IFileSystemService fileSystemService, AzureBlobClientConfiguration configuration) : base(log, request, jobService, fileSystemService, configuration) { if (log == null) throw new ArgumentNullException("log"); if (request == null) throw new ArgumentNullException("request"); if (configuration == null) throw new ArgumentNullException("configuration"); this.configuration = configuration; } protected override void OnPreExecute(CancellationToken token) { base.OnPreExecute(token); ValidateRequredConfiguration(); TransferManager.get_Configurations().set_BlockSize(configuration.DmlBlockSize); TransferManager.get_Configurations().set_ParallelOperations(configuration.DmlParallelOperations); blobContainer = GetContainerReference(); CheckBlobContainerNotNull(true); } [SuppressMessage("Microsoft.Maintainability", "CA1502:AvoidExcessiveComplexity", Justification = "There are numerous exceptions and exception types to check for improved error handling.")] [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Reviewed OK.")] [SuppressMessage("Microsoft.Maintainability", "CA1506:AvoidExcessiveClassCoupling", Justification = "This is an early prototype for the CollectR preview and will address later.")] protected override TransferPathResult OnExecute(TransferPath path, CancellationToken token) { if (!(path == (TransferPath)null)) { if (blobContainer == null) { lock (SyncRoot) { if (blobContainer == null) SetupAzureBlobContainer(); } } if (azureTransferContext == null) { lock (SyncRoot) { if (azureTransferContext == null) SetupAzureTransferContext(); } } azureTransferContext.set_ProgressHandler((IProgress<TransferStatus>)new Progress<TransferStatus>(delegate(TransferStatus status) { double progress = TransferHelper.CalculateByteLevelProgress(path.Bytes, status.get_BytesTransferred()); PublishLargeFileProgress(path, status.get_BytesTransferred(), path.Bytes, progress); })); TransferPathResult transferPathResult = new TransferPathResult { Path = path }; string text = (!string.IsNullOrEmpty(path.TargetFileName)) ? path.TargetFileName : base.FileSystemService.GetFileName(path.SourcePath); try { if (path.Direction != TransferDirection.Upload) { if (!base.FileSystemService.DirectoryExists(path.TargetPath)) base.FileSystemService.CreateDirectory(path.TargetPath); string file = (!string.IsNullOrEmpty(path.TargetFileName)) ? base.FileSystemService.Combine(path.TargetPath, path.TargetFileName) : base.FileSystemService.Combine(path.TargetPath, base.FileSystemService.GetFileName(path.SourcePath)); string relativePathDirectory = PathHelper.GetRelativePathDirectory(path.SourcePath); CloudBlockBlob val; if (string.IsNullOrEmpty(relativePathDirectory) || string.Compare(relativePathDirectory, "/", StringComparison.OrdinalIgnoreCase) == 0) val = blobContainer.GetBlockBlobReference(text); else { CloudBlobDirectory cloudBlobDirectory = GetCloudBlobDirectory(relativePathDirectory); if (cloudBlobDirectory == null) throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.AzureBlobTargetPathDirectoryConversionExceptionMessage, path.SourcePath), true); val = cloudBlobDirectory.GetBlockBlobReference(text); } try { using (FileStream fileStream = base.FileSystemService.CreateNewFileStreamWithReadWriteAccess(file)) { transferPathResult.StartTime = DateTime.Now; TransferManager.DownloadAsync(val, (Stream)fileStream, null, azureTransferContext, token).GetAwaiter().GetResult(); transferPathResult.EndTime = DateTime.Now; transferPathResult.Status = (token.IsCancellationRequested ? TransferPathStatus.Canceled : TransferPathStatus.Successful); transferPathResult.BytesTransferred += fileStream.Length; transferPathResult.Checksum = string.Empty; PublishTransferStatistics(false); return transferPathResult; } } catch (Exception) { if (base.FileSystemService.FileExists(file)) base.FileSystemService.DeleteFile(file); throw; } } CloudBlockBlob val2; if (string.IsNullOrEmpty(path.TargetPath) || string.Compare(path.TargetPath, "/", StringComparison.OrdinalIgnoreCase) == 0) val2 = blobContainer.GetBlockBlobReference(text); else { CloudBlobDirectory cloudBlobDirectory2 = GetCloudBlobDirectory(path.TargetPath); if (cloudBlobDirectory2 == null) throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.AzureBlobTargetPathDirectoryConversionExceptionMessage, path.TargetPath), true); val2 = cloudBlobDirectory2.GetBlockBlobReference(text); } using (FileStream fileStream2 = base.FileSystemService.CreateOpenFileStreamWithReadOnlyAccess(path.SourcePath)) { transferPathResult.StartTime = DateTime.Now; long length = fileStream2.Length; TransferManager.UploadAsync((Stream)fileStream2, val2, null, azureTransferContext, token).Wait(token); transferPathResult.EndTime = DateTime.Now; transferPathResult.Status = (token.IsCancellationRequested ? TransferPathStatus.Canceled : TransferPathStatus.Successful); transferPathResult.BytesTransferred += length; PublishTransferStatistics(false); return transferPathResult; } } catch (TransferException) { throw; } catch (FileNotFoundException ex2) { base.Log.LogError(ex2, "Could not find the {SourceFile} source file.", path.SourcePath); RegisterIssue(new TransferIssue { Attributes = (GetWarningErrorAttribute() | IssueAttributes.FileNotFound), Code = new int?(ex2.HResult), MaxRetryAttempts = base.MaxJobRetryAttempts, Message = ex2.Message, Path = path, RetryAttempt = base.JobService.Statistics.RetryAttempt, Timestamp = DateTime.Now }); transferPathResult.Status = TransferPathStatus.FileNotFound; transferPathResult.EndTime = DateTime.Now; return transferPathResult; } catch (InvalidOperationException ex3) { base.Log.LogError(ex3, "A serious error occurred transferring the {SourceFile} source file.", path.SourcePath); TransferPathStatus status2 = TransferPathStatus.Failed; IssueAttributes issueAttributes = GetWarningErrorAttribute(); StorageException val4 = ex3.InnerException as StorageException; if (val4 != null && val4.get_RequestInformation().get_HttpStatusCode() == 404) { status2 = TransferPathStatus.FileNotFound; issueAttributes |= IssueAttributes.FileNotFound; } RegisterIssue(new TransferIssue { Attributes = issueAttributes, Code = new int?(ex3.HResult), MaxRetryAttempts = base.MaxJobRetryAttempts, Message = ex3.Message, Path = path, RetryAttempt = base.JobService.Statistics.RetryAttempt, Timestamp = DateTime.Now }); transferPathResult.Status = status2; transferPathResult.EndTime = DateTime.Now; return transferPathResult; } catch (UnauthorizedAccessException exception) { return HandleUnauthorizedAccessException(path, exception, transferPathResult, configuration.MaxJobRetryAttempts); } catch (AggregateException ex4) { AggregateException ex5 = ex4.Flatten(); base.Log.LogError(ex5.InnerException, "A serious error occurred transferring the {SourceFile} source file.", path.SourcePath); RegisterIssue(new TransferIssue { Attributes = GetWarningErrorAttribute(), Code = new int?(ex4.HResult), MaxRetryAttempts = base.MaxJobRetryAttempts, Message = ex4.Message, Path = path, RetryAttempt = base.JobService.Statistics.RetryAttempt, Timestamp = DateTime.Now }); transferPathResult.Status = TransferPathStatus.Failed; transferPathResult.EndTime = DateTime.Now; return transferPathResult; } catch (OperationCanceledException) { HandleCancel(transferPathResult); return transferPathResult; } catch (Exception ex7) { base.Log.LogError(ex7, "A serious error occurred transferring the {SourceFile} source file.", path.SourcePath); RegisterIssue(new TransferIssue { Attributes = GetWarningErrorAttribute(), Code = new int?(ex7.HResult), MaxRetryAttempts = base.MaxJobRetryAttempts, Message = ex7.Message, Path = path, RetryAttempt = base.JobService.Statistics.RetryAttempt, Timestamp = DateTime.Now }); transferPathResult.Status = TransferPathStatus.Failed; transferPathResult.EndTime = DateTime.Now; return transferPathResult; } } throw new ArgumentNullException("path"); } protected override void OnPostExecute(CancellationToken token) { base.OnPostExecute(token); azureTransferContext = null; blobContainer = null; blobDirectories.Clear(); } private void SetupAzureBlobContainer() { blobContainer = GetContainerReference(); CheckBlobContainerNotNull(false); } private unsafe void SetupAzureTransferContext() { SingleTransferContext val = new SingleTransferContext(); val.set_ClientRequestId(base.Request.ClientRequestId.ToString()); val.set_LogLevel(3); val.set_ShouldOverwriteCallbackAsync(new ShouldOverwriteCallbackAsync((object)this, (IntPtr)(void*))); azureTransferContext = val; azureTransferContext.add_FileTransferred((EventHandler<TransferEventArgs>)delegate(object sender, TransferEventArgs args) { if (args.get_Exception() != null) base.Log.LogError(args.get_Exception(), "The Azure file transfer for source '{Source}' is not successful.", args.get_Source()); }); azureTransferContext.add_FileFailed((EventHandler<TransferEventArgs>)delegate(object sender, TransferEventArgs args) { if (args.get_Exception() != null) base.Log.LogError(args.get_Exception(), "The Azure file transfer for source '{Source}' failed.", args.get_Source()); else base.Log.LogError("The Azure file transfer for source '{Source}' failed.", args.get_Source()); }); } private CloudBlobDirectory GetCloudBlobDirectory(string path) { if (!blobDirectories.TryGetValue(path.ToUpperInvariant(), out CloudBlobDirectory value)) { string[] array = path.Split(new char[1] { '/' }, StringSplitOptions.RemoveEmptyEntries); foreach (string text in array) { value = ((value != null) ? value.GetDirectoryReference(text) : blobContainer.GetDirectoryReference(text)); } string key = path.ToUpperInvariant().TrimEnd(new char[1] { '/' }); blobDirectories.TryAdd(key, value); } return value; } private CloudBlobContainer GetContainerReference() { CloudBlobContainer val = null; if (!string.IsNullOrEmpty(configuration.BlobContainerSasUri)) { if (!Uri.TryCreate(configuration.BlobContainerSasUri, UriKind.Absolute, out Uri result)) throw new TransferException(AzureBlobStrings.SasUriIsNotWellFormedErrorMessage, true); val = new CloudBlobContainer(result); } if (val == null) try { return storageAccount.CreateCloudBlobClient().GetContainerReference(configuration.BlobContainerName); } catch (Exception innerException) { throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.RetrieveBlobContainerErrorMessage, configuration.BlobContainerName), innerException, true); } return val; } private void ValidateRequredConfiguration() { if ((string.IsNullOrEmpty(configuration.StorageAccountConnectionString) || string.IsNullOrEmpty(configuration.BlobContainerName)) && string.IsNullOrEmpty(configuration.BlobContainerSasUri)) { if (string.IsNullOrEmpty(configuration.StorageAccountConnectionString)) throw new TransferException(AzureBlobStrings.AzureStorageAccountInfoNotProvidedErrorMessage, true); throw new TransferException(AzureBlobStrings.BlobConnectionInfoNotProvidedErrorMessage, true); } if (!string.IsNullOrEmpty(configuration.StorageAccountConnectionString)) try { storageAccount = CloudStorageAccount.Parse(configuration.StorageAccountConnectionString); if (storageAccount == null) throw new TransferException(AzureBlobStrings.InvalidStorageAccountErrorMessage, true); } catch (Exception innerException) { throw new TransferException(AzureBlobStrings.InvalidStorageAccountErrorMessage, innerException, true); } } private void CheckBlobContainerNotNull(bool fatal) { if (blobContainer != null) return; throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.BlobContainerDoesNotExistErrorMessage, configuration.BlobContainerName), fatal); } } }