AzureBlobTransferCommand
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.DataMovement;
using Relativity.Transfer.AzureBlob.Resources;
using System;
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Threading;
namespace Relativity.Transfer.AzureBlob
{
public class AzureBlobTransferCommand : TransferPathCommandBase
{
private static readonly object SyncRoot = new object();
private readonly ConcurrentDictionary<string, CloudBlobDirectory> blobDirectories = new ConcurrentDictionary<string, CloudBlobDirectory>();
private readonly AzureBlobClientConfiguration configuration;
private SingleTransferContext azureTransferContext;
private CloudBlobContainer blobContainer;
private CloudStorageAccount storageAccount;
public AzureBlobTransferCommand(ITransferLog log, ITransferRequest request, ITransferJobService jobService, IFileSystemService fileSystemService, AzureBlobClientConfiguration configuration)
: base(log, request, jobService, fileSystemService, configuration)
{
if (log == null)
throw new ArgumentNullException("log");
if (request == null)
throw new ArgumentNullException("request");
if (configuration == null)
throw new ArgumentNullException("configuration");
this.configuration = configuration;
}
protected override void OnPreExecute(CancellationToken token)
{
base.OnPreExecute(token);
ValidateRequredConfiguration();
TransferManager.get_Configurations().set_BlockSize(configuration.DmlBlockSize);
TransferManager.get_Configurations().set_ParallelOperations(configuration.DmlParallelOperations);
blobContainer = GetContainerReference();
CheckBlobContainerNotNull(true);
}
[SuppressMessage("Microsoft.Maintainability", "CA1502:AvoidExcessiveComplexity", Justification = "There are numerous exceptions and exception types to check for improved error handling.")]
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Reviewed OK.")]
[SuppressMessage("Microsoft.Maintainability", "CA1506:AvoidExcessiveClassCoupling", Justification = "This is an early prototype for the CollectR preview and will address later.")]
protected override TransferPathResult OnExecute(TransferPath path, CancellationToken token)
{
if (!(path == (TransferPath)null)) {
if (blobContainer == null) {
lock (SyncRoot) {
if (blobContainer == null)
SetupAzureBlobContainer();
}
}
if (azureTransferContext == null) {
lock (SyncRoot) {
if (azureTransferContext == null)
SetupAzureTransferContext();
}
}
azureTransferContext.set_ProgressHandler((IProgress<TransferStatus>)new Progress<TransferStatus>(delegate(TransferStatus status) {
double progress = TransferHelper.CalculateByteLevelProgress(path.Bytes, status.get_BytesTransferred());
PublishLargeFileProgress(path, status.get_BytesTransferred(), path.Bytes, progress);
}));
TransferPathResult transferPathResult = new TransferPathResult {
Path = path
};
string text = (!string.IsNullOrEmpty(path.TargetFileName)) ? path.TargetFileName : base.FileSystemService.GetFileName(path.SourcePath);
try {
if (path.Direction != TransferDirection.Upload) {
if (!base.FileSystemService.DirectoryExists(path.TargetPath))
base.FileSystemService.CreateDirectory(path.TargetPath);
string file = (!string.IsNullOrEmpty(path.TargetFileName)) ? base.FileSystemService.Combine(path.TargetPath, path.TargetFileName) : base.FileSystemService.Combine(path.TargetPath, base.FileSystemService.GetFileName(path.SourcePath));
string relativePathDirectory = PathHelper.GetRelativePathDirectory(path.SourcePath);
CloudBlockBlob val;
if (string.IsNullOrEmpty(relativePathDirectory) || string.Compare(relativePathDirectory, "/", StringComparison.OrdinalIgnoreCase) == 0)
val = blobContainer.GetBlockBlobReference(text);
else {
CloudBlobDirectory cloudBlobDirectory = GetCloudBlobDirectory(relativePathDirectory);
if (cloudBlobDirectory == null)
throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.AzureBlobTargetPathDirectoryConversionExceptionMessage, path.SourcePath), true);
val = cloudBlobDirectory.GetBlockBlobReference(text);
}
try {
using (FileStream fileStream = base.FileSystemService.CreateNewFileStreamWithReadWriteAccess(file)) {
transferPathResult.StartTime = DateTime.Now;
TransferManager.DownloadAsync(val, (Stream)fileStream, null, azureTransferContext, token).GetAwaiter().GetResult();
transferPathResult.EndTime = DateTime.Now;
transferPathResult.Status = (token.IsCancellationRequested ? TransferPathStatus.Canceled : TransferPathStatus.Successful);
transferPathResult.BytesTransferred += fileStream.Length;
transferPathResult.Checksum = string.Empty;
PublishTransferStatistics(false);
return transferPathResult;
}
} catch (Exception) {
if (base.FileSystemService.FileExists(file))
base.FileSystemService.DeleteFile(file);
throw;
}
}
CloudBlockBlob val2;
if (string.IsNullOrEmpty(path.TargetPath) || string.Compare(path.TargetPath, "/", StringComparison.OrdinalIgnoreCase) == 0)
val2 = blobContainer.GetBlockBlobReference(text);
else {
CloudBlobDirectory cloudBlobDirectory2 = GetCloudBlobDirectory(path.TargetPath);
if (cloudBlobDirectory2 == null)
throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.AzureBlobTargetPathDirectoryConversionExceptionMessage, path.TargetPath), true);
val2 = cloudBlobDirectory2.GetBlockBlobReference(text);
}
using (FileStream fileStream2 = base.FileSystemService.CreateOpenFileStreamWithReadOnlyAccess(path.SourcePath)) {
transferPathResult.StartTime = DateTime.Now;
long length = fileStream2.Length;
TransferManager.UploadAsync((Stream)fileStream2, val2, null, azureTransferContext, token).Wait(token);
transferPathResult.EndTime = DateTime.Now;
transferPathResult.Status = (token.IsCancellationRequested ? TransferPathStatus.Canceled : TransferPathStatus.Successful);
transferPathResult.BytesTransferred += length;
PublishTransferStatistics(false);
return transferPathResult;
}
} catch (TransferException) {
throw;
} catch (FileNotFoundException ex2) {
base.Log.LogError(ex2, "Could not find the {SourceFile} source file.", path.SourcePath);
RegisterIssue(new TransferIssue {
Attributes = (GetWarningErrorAttribute() | IssueAttributes.FileNotFound),
Code = new int?(ex2.HResult),
MaxRetryAttempts = base.MaxJobRetryAttempts,
Message = ex2.Message,
Path = path,
RetryAttempt = base.JobService.Statistics.RetryAttempt,
Timestamp = DateTime.Now
});
transferPathResult.Status = TransferPathStatus.FileNotFound;
transferPathResult.EndTime = DateTime.Now;
return transferPathResult;
} catch (InvalidOperationException ex3) {
base.Log.LogError(ex3, "A serious error occurred transferring the {SourceFile} source file.", path.SourcePath);
TransferPathStatus status2 = TransferPathStatus.Failed;
IssueAttributes issueAttributes = GetWarningErrorAttribute();
StorageException val4 = ex3.InnerException as StorageException;
if (val4 != null && val4.get_RequestInformation().get_HttpStatusCode() == 404) {
status2 = TransferPathStatus.FileNotFound;
issueAttributes |= IssueAttributes.FileNotFound;
}
RegisterIssue(new TransferIssue {
Attributes = issueAttributes,
Code = new int?(ex3.HResult),
MaxRetryAttempts = base.MaxJobRetryAttempts,
Message = ex3.Message,
Path = path,
RetryAttempt = base.JobService.Statistics.RetryAttempt,
Timestamp = DateTime.Now
});
transferPathResult.Status = status2;
transferPathResult.EndTime = DateTime.Now;
return transferPathResult;
} catch (UnauthorizedAccessException exception) {
return HandleUnauthorizedAccessException(path, exception, transferPathResult, configuration.MaxJobRetryAttempts);
} catch (AggregateException ex4) {
AggregateException ex5 = ex4.Flatten();
base.Log.LogError(ex5.InnerException, "A serious error occurred transferring the {SourceFile} source file.", path.SourcePath);
RegisterIssue(new TransferIssue {
Attributes = GetWarningErrorAttribute(),
Code = new int?(ex4.HResult),
MaxRetryAttempts = base.MaxJobRetryAttempts,
Message = ex4.Message,
Path = path,
RetryAttempt = base.JobService.Statistics.RetryAttempt,
Timestamp = DateTime.Now
});
transferPathResult.Status = TransferPathStatus.Failed;
transferPathResult.EndTime = DateTime.Now;
return transferPathResult;
} catch (OperationCanceledException) {
HandleCancel(transferPathResult);
return transferPathResult;
} catch (Exception ex7) {
base.Log.LogError(ex7, "A serious error occurred transferring the {SourceFile} source file.", path.SourcePath);
RegisterIssue(new TransferIssue {
Attributes = GetWarningErrorAttribute(),
Code = new int?(ex7.HResult),
MaxRetryAttempts = base.MaxJobRetryAttempts,
Message = ex7.Message,
Path = path,
RetryAttempt = base.JobService.Statistics.RetryAttempt,
Timestamp = DateTime.Now
});
transferPathResult.Status = TransferPathStatus.Failed;
transferPathResult.EndTime = DateTime.Now;
return transferPathResult;
}
}
throw new ArgumentNullException("path");
}
protected override void OnPostExecute(CancellationToken token)
{
base.OnPostExecute(token);
azureTransferContext = null;
blobContainer = null;
blobDirectories.Clear();
}
private void SetupAzureBlobContainer()
{
blobContainer = GetContainerReference();
CheckBlobContainerNotNull(false);
}
private unsafe void SetupAzureTransferContext()
{
SingleTransferContext val = new SingleTransferContext();
val.set_ClientRequestId(base.Request.ClientRequestId.ToString());
val.set_LogLevel(3);
val.set_ShouldOverwriteCallbackAsync(new ShouldOverwriteCallbackAsync((object)this, (IntPtr)(void*)));
azureTransferContext = val;
azureTransferContext.add_FileTransferred((EventHandler<TransferEventArgs>)delegate(object sender, TransferEventArgs args) {
if (args.get_Exception() != null)
base.Log.LogError(args.get_Exception(), "The Azure file transfer for source '{Source}' is not successful.", args.get_Source());
});
azureTransferContext.add_FileFailed((EventHandler<TransferEventArgs>)delegate(object sender, TransferEventArgs args) {
if (args.get_Exception() != null)
base.Log.LogError(args.get_Exception(), "The Azure file transfer for source '{Source}' failed.", args.get_Source());
else
base.Log.LogError("The Azure file transfer for source '{Source}' failed.", args.get_Source());
});
}
private CloudBlobDirectory GetCloudBlobDirectory(string path)
{
if (!blobDirectories.TryGetValue(path.ToUpperInvariant(), out CloudBlobDirectory value)) {
string[] array = path.Split(new char[1] {
'/'
}, StringSplitOptions.RemoveEmptyEntries);
foreach (string text in array) {
value = ((value != null) ? value.GetDirectoryReference(text) : blobContainer.GetDirectoryReference(text));
}
string key = path.ToUpperInvariant().TrimEnd(new char[1] {
'/'
});
blobDirectories.TryAdd(key, value);
}
return value;
}
private CloudBlobContainer GetContainerReference()
{
CloudBlobContainer val = null;
if (!string.IsNullOrEmpty(configuration.BlobContainerSasUri)) {
if (!Uri.TryCreate(configuration.BlobContainerSasUri, UriKind.Absolute, out Uri result))
throw new TransferException(AzureBlobStrings.SasUriIsNotWellFormedErrorMessage, true);
val = new CloudBlobContainer(result);
}
if (val == null)
try {
return storageAccount.CreateCloudBlobClient().GetContainerReference(configuration.BlobContainerName);
} catch (Exception innerException) {
throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.RetrieveBlobContainerErrorMessage, configuration.BlobContainerName), innerException, true);
}
return val;
}
private void ValidateRequredConfiguration()
{
if ((string.IsNullOrEmpty(configuration.StorageAccountConnectionString) || string.IsNullOrEmpty(configuration.BlobContainerName)) && string.IsNullOrEmpty(configuration.BlobContainerSasUri)) {
if (string.IsNullOrEmpty(configuration.StorageAccountConnectionString))
throw new TransferException(AzureBlobStrings.AzureStorageAccountInfoNotProvidedErrorMessage, true);
throw new TransferException(AzureBlobStrings.BlobConnectionInfoNotProvidedErrorMessage, true);
}
if (!string.IsNullOrEmpty(configuration.StorageAccountConnectionString))
try {
storageAccount = CloudStorageAccount.Parse(configuration.StorageAccountConnectionString);
if (storageAccount == null)
throw new TransferException(AzureBlobStrings.InvalidStorageAccountErrorMessage, true);
} catch (Exception innerException) {
throw new TransferException(AzureBlobStrings.InvalidStorageAccountErrorMessage, innerException, true);
}
}
private void CheckBlobContainerNotNull(bool fatal)
{
if (blobContainer != null)
return;
throw new TransferException(string.Format(CultureInfo.CurrentCulture, AzureBlobStrings.BlobContainerDoesNotExistErrorMessage, configuration.BlobContainerName), fatal);
}
}
}