CircuitStateController<T>
Thread-safe controller that holds and manages the circuit breaker state transitions.
using Polly.Telemetry;
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Polly.CircuitBreaker
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal sealed class CircuitStateController<[System.Runtime.CompilerServices.Nullable(2)] T> : IDisposable
{
private readonly object _lock = new object();
private readonly ScheduledTaskExecutor _executor = new ScheduledTaskExecutor();
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0,
1
})]
private readonly Func<OnCircuitOpenedArguments<T>, ValueTask> _onOpened;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0,
1
})]
private readonly Func<OnCircuitClosedArguments<T>, ValueTask> _onClosed;
[System.Runtime.CompilerServices.Nullable(2)]
private readonly Func<OnCircuitHalfOpenedArguments, ValueTask> _onHalfOpen;
private readonly TimeProvider _timeProvider;
private readonly ResilienceStrategyTelemetry _telemetry;
private readonly CircuitBehavior _behavior;
private readonly TimeSpan _breakDuration;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0
})]
private readonly Func<BreakDurationGeneratorArguments, ValueTask<TimeSpan>> _breakDurationGenerator;
private DateTimeOffset _blockedUntil;
private CircuitState _circuitState;
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
private Outcome<T>? _lastOutcome;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _breakingException;
private bool _disposed;
private int _halfOpenAttempts;
public CircuitState CircuitState {
get {
EnsureNotDisposed();
lock (_lock) {
return _circuitState;
}
}
}
[System.Runtime.CompilerServices.Nullable(2)]
public Exception LastException {
[System.Runtime.CompilerServices.NullableContext(2)]
get {
EnsureNotDisposed();
lock (_lock) {
ref Outcome<T>? lastOutcome = ref _lastOutcome;
return lastOutcome.HasValue ? lastOutcome.GetValueOrDefault().Exception : null;
}
}
}
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
public Outcome<T>? LastHandledOutcome {
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
get {
EnsureNotDisposed();
lock (_lock) {
return _lastOutcome;
}
}
}
public CircuitStateController(TimeSpan breakDuration, [System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0,
1
})] Func<OnCircuitOpenedArguments<T>, ValueTask> onOpened, [System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0,
1
})] Func<OnCircuitClosedArguments<T>, ValueTask> onClosed, [System.Runtime.CompilerServices.Nullable(2)] Func<OnCircuitHalfOpenedArguments, ValueTask> onHalfOpen, CircuitBehavior behavior, TimeProvider timeProvider, ResilienceStrategyTelemetry telemetry, [System.Runtime.CompilerServices.Nullable(new byte[] {
2,
0
})] Func<BreakDurationGeneratorArguments, ValueTask<TimeSpan>> breakDurationGenerator)
{
_breakDuration = breakDuration;
_onOpened = onOpened;
_onClosed = onClosed;
_onHalfOpen = onHalfOpen;
_behavior = behavior;
_timeProvider = timeProvider;
_telemetry = telemetry;
_breakDurationGenerator = breakDurationGenerator;
}
public Task IsolateCircuitAsync(ResilienceContext context)
{
EnsureNotDisposed();
context.Initialize<T>(false);
Task task = default(Task);
lock (_lock) {
IsolatedCircuitException ex = new IsolatedCircuitException();
_telemetry.SetTelemetrySource(ex);
SetLastHandledOutcome_NeedsLock(Outcome.FromException<T>((Exception)ex));
task = OpenCircuitFor_NeedsLock(Outcome.FromResult<T>(default(T)), TimeSpan.MaxValue, true, context);
_circuitState = CircuitState.Isolated;
}
return ExecuteScheduledTaskAsync(task, context);
}
public Task CloseCircuitAsync(ResilienceContext context)
{
EnsureNotDisposed();
context.Initialize<T>(false);
Task task = default(Task);
lock (_lock) {
task = CloseCircuit_NeedsLock(Outcome.FromResult<T>(default(T)), true, context);
}
return ExecuteScheduledTaskAsync(task, context);
}
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
0,
0,
1
})]
public ValueTask<Outcome<T>?> OnActionPreExecuteAsync(ResilienceContext context)
{
EnsureNotDisposed();
BrokenCircuitException ex = null;
bool flag = false;
Task task = Task.CompletedTask;
lock (_lock) {
if (_circuitState == CircuitState.Open && PermitHalfOpenCircuitTest_NeedsLock()) {
_halfOpenAttempts++;
_circuitState = CircuitState.HalfOpen;
_telemetry.Report<OnCircuitHalfOpenedArguments>(new ResilienceEvent(ResilienceEventSeverity.Warning, "OnCircuitHalfOpened"), context, new OnCircuitHalfOpenedArguments(context));
flag = true;
}
BrokenCircuitException ex2;
switch (_circuitState) {
case CircuitState.Open:
ex2 = CreateBrokenCircuitException();
break;
case CircuitState.HalfOpen:
if (!flag) {
ex2 = CreateBrokenCircuitException();
break;
}
goto default;
case CircuitState.Isolated:
ex2 = new IsolatedCircuitException();
break;
default:
ex2 = null;
break;
}
ex = ex2;
if (flag && _onHalfOpen != null)
task = ScheduleHalfOpenTask(context);
}
if (ex != null) {
_telemetry.SetTelemetrySource(ex);
return new ValueTask<Outcome<T>?>(new Outcome<T>(ex));
}
task = ExecuteScheduledTaskAsync(task, context);
if (!task.IsCompleted)
return WaitHalfOpenTask(task, context.ContinueOnCapturedContext);
task.GetAwaiter().GetResult();
return default(ValueTask<Outcome<T>?>);
}
[AsyncStateMachine(typeof(CircuitStateController<>.<WaitHalfOpenTask>d__26))]
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
0,
0,
1
})]
private static ValueTask<Outcome<T>?> WaitHalfOpenTask(Task task, bool continueOnCapturedContext)
{
<WaitHalfOpenTask>d__26 stateMachine = default(<WaitHalfOpenTask>d__26);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder<Outcome<T>?>.Create();
stateMachine.task = task;
stateMachine.continueOnCapturedContext = continueOnCapturedContext;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public Task OnUnhandledOutcomeAsync([System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})] Outcome<T> outcome, ResilienceContext context)
{
EnsureNotDisposed();
Task task = Task.CompletedTask;
lock (_lock) {
_behavior.OnActionSuccess(_circuitState);
if (_circuitState == CircuitState.HalfOpen)
task = CloseCircuit_NeedsLock(outcome, false, context);
}
return ExecuteScheduledTaskAsync(task, context);
}
public Task OnHandledOutcomeAsync([System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})] Outcome<T> outcome, ResilienceContext context)
{
EnsureNotDisposed();
Task task = Task.CompletedTask;
lock (_lock) {
SetLastHandledOutcome_NeedsLock(outcome);
_behavior.OnActionFailure(_circuitState, out bool shouldBreak);
if (_circuitState == CircuitState.HalfOpen || ((_circuitState == CircuitState.Closed) & shouldBreak))
task = OpenCircuitFor_NeedsLock(outcome, _breakDuration, false, context);
}
return ExecuteScheduledTaskAsync(task, context);
}
public void Dispose()
{
_executor.Dispose();
_disposed = true;
}
internal static Task ExecuteScheduledTaskAsync(Task task, ResilienceContext context)
{
if (context.IsSynchronous && !task.IsCompleted)
task.GetAwaiter().GetResult();
return task;
}
private static bool IsDateTimeOverflow(DateTimeOffset utcNow, TimeSpan breakDuration)
{
TimeSpan t = DateTimeOffset.MaxValue - utcNow;
return breakDuration > t;
}
private void EnsureNotDisposed()
{
if (_disposed)
throw new ObjectDisposedException("CircuitStateController");
}
private Task CloseCircuit_NeedsLock([System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})] Outcome<T> outcome, bool manual, ResilienceContext context)
{
_blockedUntil = DateTimeOffset.MinValue;
_lastOutcome = null;
_halfOpenAttempts = 0;
CircuitState circuitState = _circuitState;
_circuitState = CircuitState.Closed;
_behavior.OnCircuitClosed();
if (circuitState != 0) {
OnCircuitClosedArguments<T> args = new OnCircuitClosedArguments<T>(context, outcome, manual);
_telemetry.Report<OnCircuitClosedArguments<T>, T>(new ResilienceEvent(ResilienceEventSeverity.Information, "OnCircuitClosed"), args);
if (_onClosed != null)
return _executor.ScheduleTask(() => _onClosed(args).AsTask());
}
return Task.CompletedTask;
}
private bool PermitHalfOpenCircuitTest_NeedsLock()
{
DateTimeOffset utcNow = _timeProvider.GetUtcNow();
if (utcNow >= _blockedUntil) {
_blockedUntil = utcNow + _breakDuration;
return true;
}
return false;
}
private void SetLastHandledOutcome_NeedsLock([System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})] Outcome<T> outcome)
{
_lastOutcome = outcome;
_breakingException = outcome.Exception;
}
private BrokenCircuitException CreateBrokenCircuitException()
{
TimeSpan retryAfter = _blockedUntil - _timeProvider.GetUtcNow();
Exception breakingException = _breakingException;
BrokenCircuitException ex = (breakingException == null) ? new BrokenCircuitException("The circuit is now open and is not allowing calls.", retryAfter) : new BrokenCircuitException("The circuit is now open and is not allowing calls.", retryAfter, breakingException);
BrokenCircuitException ex2 = ex;
_telemetry.SetTelemetrySource(ex2);
return ex2;
}
private Task OpenCircuitFor_NeedsLock([System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})] Outcome<T> outcome, TimeSpan breakDuration, bool manual, ResilienceContext context)
{
DateTimeOffset utcNow = _timeProvider.GetUtcNow();
if (_breakDurationGenerator != null)
breakDuration = _breakDurationGenerator(new BreakDurationGeneratorArguments(_behavior.FailureRate, _behavior.FailureCount, context, _halfOpenAttempts)).GetAwaiter().GetResult();
_blockedUntil = (IsDateTimeOverflow(utcNow, breakDuration) ? DateTimeOffset.MaxValue : (utcNow + breakDuration));
_circuitState = CircuitState.Open;
OnCircuitOpenedArguments<T> args = new OnCircuitOpenedArguments<T>(context, outcome, breakDuration, manual);
_telemetry.Report<OnCircuitOpenedArguments<T>, T>(new ResilienceEvent(ResilienceEventSeverity.Error, "OnCircuitOpened"), args);
if (_onOpened != null)
return _executor.ScheduleTask(() => _onOpened(args).AsTask());
return Task.CompletedTask;
}
private Task ScheduleHalfOpenTask(ResilienceContext context)
{
return _executor.ScheduleTask(() => _onHalfOpen(new OnCircuitHalfOpenedArguments(context)).AsTask());
}
}
}