<PackageReference Include="System.Reactive" Version="6.0.0-preview.1" />
    
	
	
		
		
		
	 
	
	
        
                
                RedoSerializedObserver<X>
                
                
                
                
using System.
Collections.
Concurrent;
using System.
Runtime.
CompilerServices;
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
    [
System.
Runtime.
CompilerServices.
NullableContext(
1)]
    [
System.
Runtime.
CompilerServices.
Nullable(
0)]
    
internal sealed class RedoSerializedObserver<[
System.
Runtime.
CompilerServices.
Nullable(
2)] 
X> : 
IObserver<
X>
    {
        
private static readonly Exception SignaledIndicator = 
new Exception();
        
private readonly IObserver<
X> 
_downstream;
        
private readonly ConcurrentQueue<
X> 
_queue;
        
private int _wip;
        [
System.
Runtime.
CompilerServices.
Nullable(
2)]
        
private Exception _terminalException;
        
internal RedoSerializedObserver(
IObserver<
X> 
downstream)
        {
            
_downstream = 
downstream;
            
_queue = 
new ConcurrentQueue<
X>();
        }
        
public void OnCompleted()
        {
            
if (
Interlocked.
CompareExchange<
Exception>(
ref _terminalException, 
ExceptionHelper.
Terminated, (
Exception)
null) == 
null)
                
Drain();
        }
        
public void OnError(
Exception error)
        {
            
if (
Interlocked.
CompareExchange<
Exception>(
ref _terminalException, 
error, (
Exception)
null) == 
null)
                
Drain();
        }
        
public void OnNext(
X value)
        {
            
_queue.
Enqueue(
value);
            
Drain();
        }
        
private void Clear()
        {
            
X result;
            
while (
_queue.
TryDequeue(
out result)) {
            }
        }
        
private void Drain()
        {
            
if (
Interlocked.
Increment(
ref _wip) == 
1) {
                
int num = 
1;
                
do {
                    
Exception ex = 
Volatile.
Read<
Exception>(
ref _terminalException);
                    
if (
ex == 
null) {
                        
X result;
                        
while (
_queue.
TryDequeue(
out result)) {
                            
_downstream.
OnNext(
result);
                        }
                    } 
else {
                        
if (
ex != 
SignaledIndicator) {
                            
Interlocked.
Exchange<
Exception>(
ref _terminalException, 
SignaledIndicator);
                            
if (
ex != 
ExceptionHelper.
Terminated)
                                
_downstream.
OnError(
ex);
                            
else
                                _downstream.
OnCompleted();
                        }
                        
Clear();
                    }
                    
num = 
Interlocked.
Add(
ref _wip, -
num);
                } 
while (
num != 
0);
            }
        }
    }
}