WorkItemQueue
A WorkItemQueue holds work items that are ready to
be run, either initially or after some dependency
has been satisfied.
using System;
using System.Collections.Concurrent;
using System.Threading;
namespace NUnit.Framework.Internal.Execution
{
public class WorkItemQueue
{
private const int spinCount = 5;
private Logger log = InternalTrace.GetLogger("WorkItemQueue");
private readonly ConcurrentQueue<WorkItem> _innerQueue = new ConcurrentQueue<WorkItem>();
private readonly ManualResetEventSlim _mreAdd = new ManualResetEventSlim(false);
private int _addId = -2147483648;
private int _removeId = -2147483648;
private int _itemsProcessed;
private int _maxCount;
private int _state;
public string Name { get; set; }
public int ItemsProcessed {
get {
return _itemsProcessed;
}
private set {
_itemsProcessed = value;
}
}
public int MaxCount {
get {
return _maxCount;
}
private set {
_maxCount = value;
}
}
public WorkItemQueueState State {
get {
return (WorkItemQueueState)_state;
}
private set {
_state = (int)value;
}
}
public bool IsEmpty => _innerQueue.IsEmpty;
public WorkItemQueue(string name)
{
Name = name;
State = WorkItemQueueState.Paused;
MaxCount = 0;
ItemsProcessed = 0;
}
public void Enqueue(WorkItem work)
{
int addId;
do {
addId = _addId;
} while (Interlocked.CompareExchange(ref _addId, addId + 1, addId) != addId);
_innerQueue.Enqueue(work);
int num = _maxCount;
int num2;
do {
num2 = num;
num = Interlocked.CompareExchange(ref _maxCount, Math.Max(num2, _innerQueue.Count), num2);
} while (num2 != num);
_mreAdd.Set();
}
public WorkItem Dequeue()
{
SpinWait spinWait = default(SpinWait);
while (true) {
WorkItemQueueState state = State;
if (state == WorkItemQueueState.Stopped)
return null;
int removeId = _removeId;
int addId = _addId;
if (removeId == addId || state == WorkItemQueueState.Paused) {
if (spinWait.Count <= 5)
spinWait.SpinOnce();
else {
_mreAdd.Reset();
if ((removeId != _removeId || addId != _addId) && state != 0)
_mreAdd.Set();
else
_mreAdd.Wait(500);
}
} else if (Interlocked.CompareExchange(ref _removeId, removeId + 1, removeId) == removeId) {
break;
}
}
WorkItem result;
while (!_innerQueue.TryDequeue(out result)) {
}
Interlocked.Increment(ref _itemsProcessed);
return result;
}
public void Start()
{
log.Info("{0} starting", Name);
if (Interlocked.CompareExchange(ref _state, 1, 0) == 0)
_mreAdd.Set();
}
public void Stop()
{
log.Info("{0} stopping - {1} WorkItems processed, max size {2}", Name, ItemsProcessed, MaxCount);
if (Interlocked.Exchange(ref _state, 2) != 2)
_mreAdd.Set();
}
public void Pause()
{
log.Info("{0} pausing", Name);
Interlocked.CompareExchange(ref _state, 0, 1);
}
}
}