<PackageReference Include="Relativity.Server.Import.SDK" Version="2.9.2" />

WriterRetryable

public class WriterRetryable : IDisposable
using kCura.WinEDDS; using kCura.WinEDDS.Exceptions; using kCura.WinEDDS.Exporters; using Polly; using Relativity.DataExchange.Export.VolumeManagerV2.Metadata.Paths; using Relativity.DataExchange.Export.VolumeManagerV2.Statistics; using Relativity.Logging; using System; using System.Collections.Generic; using System.IO; using System.Threading; namespace Relativity.DataExchange.Export.VolumeManagerV2.Metadata.Writers { public class WriterRetryable : IDisposable { private StreamWriter _streamWriter; private long _streamWriterLastPosition; private bool _isBroken; private bool _initialCreation; private long _lastBatchSavedState; private readonly Policy _retryPolicy; private readonly IStreamFactory _streamFactory; private readonly ILog _logger; private readonly IStatus _status; private readonly IDestinationPath _destinationPath; private readonly IProcessingStatistics _processingStatistics; public WriterRetryable(IWritersRetryPolicy writersRetryPolicy, IStreamFactory streamFactory, ILog logger, IStatus status, IDestinationPath destinationPath, IProcessingStatistics processingStatistics) { _streamFactory = streamFactory; _logger = logger; _status = status; _destinationPath = destinationPath; _processingStatistics = processingStatistics; _retryPolicy = writersRetryPolicy.CreateRetryPolicy(OnRetry); _streamWriterLastPosition = 0; _lastBatchSavedState = 0; _isBroken = false; _initialCreation = true; } protected void Execute(Action<IEnumerator<ObjectExportInfo>, StreamWriter> write, ObjectExportInfo[] artifacts, CancellationToken cancellationToken) { Context val = new Context(Guid.NewGuid().ToString()); IEnumerator<ObjectExportInfo> enumerator = new ArtifactEnumerator(artifacts, val); try { _retryPolicy.Execute((Action<Context, CancellationToken>)delegate { CreateStreamIfNeeded(); try { write(enumerator, _streamWriter); } catch (IOException ex2) { _logger.LogError((Exception)ex2, "Error occurred during writing to file {type}.", new object[1] { _destinationPath.DestinationFileType }); throw new FileWriteException(_destinationPath.DestinationFileType, ex2); } SaveStreamPosition(); UpdateStatistics(); }, val, cancellationToken); } catch (OperationCanceledException ex) { if (!cancellationToken.IsCancellationRequested) { _logger.LogError((Exception)ex, "Operation canceled, but cancellation has NOT been requested.", Array.Empty<object>()); throw; } _logger.LogWarning((Exception)ex, "Operation canceled when retrying writing to load file.", Array.Empty<object>()); } } private void OnRetry(Exception exception, TimeSpan timeBetweenRetries, int retryCount, Context context) { _isBroken = true; _logger.LogWarning("Retrying writing to file. Error occurred {exception}.", new object[1] { exception.Message }); int lastArtifactId = GetLastArtifactId(context); _status.WriteWarning($"""{lastArtifactId}""{_destinationPath.Path}"""); _status.WriteWarning($"""{exception}"); if (retryCount > 1) { _status.WriteWarning($"""{timeBetweenRetries.Seconds}"""); _logger.LogVerbose("Waiting {time} before next retry.", new object[1] { timeBetweenRetries.Seconds }); } else _status.WriteWarning("Retrying now"); } private void CreateStreamIfNeeded() { if (_isBroken || _initialCreation) { _logger.LogVerbose("Stream broken or hasn't been initialized. Creating.", Array.Empty<object>()); bool append = !_initialCreation; _streamWriter = _streamFactory.Create(_streamWriter, _streamWriterLastPosition, _destinationPath.Path, _destinationPath.Encoding, append); _isBroken = false; _initialCreation = false; } } private void SaveStreamPosition() { if (_streamWriter != null) { FlushStream(); _streamWriterLastPosition = _streamWriter.BaseStream.Position; _logger.LogVerbose("Stream position {position} saved.", new object[1] { _streamWriterLastPosition }); } } private void UpdateStatistics() { _processingStatistics.UpdateStatisticsForFile(_destinationPath.Path); } private void FlushStream() { try { _streamWriter?.Flush(); } catch (Exception ex) { _logger.LogError(ex, "Failed to flush {type} file stream.", new object[1] { _destinationPath.DestinationFileType }); throw new FileWriteException(_destinationPath.DestinationFileType, ex); } } private int GetLastArtifactId(Context context) { int result = -1; if (context.ContainsKey("LastArtifactId")) result = (int)context.get_Item("LastArtifactId"); else _logger.LogWarning("Failed to retrieve artifactId from retry context. Continuing with -1.", Array.Empty<object>()); return result; } public void Dispose() { _streamWriter?.Dispose(); } public void SaveState() { _lastBatchSavedState = _streamWriterLastPosition; } public void RestoreLastState() { if (_initialCreation) _logger.LogVerbose("StreamWriter hasn't been initialized. Nothing to restore.", Array.Empty<object>()); else { _streamWriter = _streamFactory.Create(_streamWriter, _lastBatchSavedState, _destinationPath.Path, _destinationPath.Encoding, true); _streamWriterLastPosition = _lastBatchSavedState; } } } }