WorkItemQueue
A WorkItemQueue holds work items that are ready to
be run, either initially or after some dependency
has been satisfied.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace NUnit.Framework.Internal.Execution
{
public class WorkItemQueue
{
private class SavedState
{
public ConcurrentQueue<WorkItem>[] InnerQueues;
public int AddId;
public int RemoveId;
public SavedState(WorkItemQueue queue)
{
InnerQueues = queue._innerQueues;
AddId = queue._addId;
RemoveId = queue._removeId;
}
}
private const int SPIN_COUNT = 5;
private const int HIGH_PRIORITY = 0;
private const int NORMAL_PRIORITY = 1;
private const int PRIORITY_LEVELS = 2;
private Logger log = InternalTrace.GetLogger("WorkItemQueue");
private ConcurrentQueue<WorkItem>[] _innerQueues;
private Stack<SavedState> _savedState = new Stack<SavedState>();
private readonly ManualResetEventSlim _mreAdd = new ManualResetEventSlim(false);
private int _addId = -2147483648;
private int _removeId = -2147483648;
private int _itemsProcessed;
private int _state;
public string Name { get; set; }
public bool IsParallelQueue { get; set; }
public ApartmentState TargetApartment { get; set; }
public int ItemsProcessed {
get {
return _itemsProcessed;
}
private set {
_itemsProcessed = value;
}
}
public WorkItemQueueState State {
get {
return (WorkItemQueueState)_state;
}
private set {
_state = (int)value;
}
}
public bool IsEmpty {
get {
ConcurrentQueue<WorkItem>[] innerQueues = _innerQueues;
for (int i = 0; i < innerQueues.Length; i++) {
if (!innerQueues[i].IsEmpty)
return false;
}
return true;
}
}
public WorkItemQueue(string name, bool isParallel, ApartmentState apartment)
{
Name = name;
IsParallelQueue = isParallel;
TargetApartment = apartment;
State = WorkItemQueueState.Paused;
ItemsProcessed = 0;
InitializeQueues();
}
private void InitializeQueues()
{
ConcurrentQueue<WorkItem>[] array = new ConcurrentQueue<WorkItem>[2];
for (int i = 0; i < 2; i++) {
array[i] = new ConcurrentQueue<WorkItem>();
}
_innerQueues = array;
_addId = (_removeId = 0);
}
public void Enqueue(WorkItem work)
{
Enqueue(work, (!(work is CompositeWorkItem.OneTimeTearDownWorkItem)) ? 1 : 0);
}
internal void Enqueue(WorkItem work, int priority)
{
Guard.ArgumentInRange(priority >= 0 && priority < 2, "Invalid priority specified", "priority");
int addId;
do {
addId = _addId;
} while (Interlocked.CompareExchange(ref _addId, addId + 1, addId) != addId);
_innerQueues[priority].Enqueue(work);
_mreAdd.Set();
}
public WorkItem Dequeue()
{
SpinWait spinWait = default(SpinWait);
while (true) {
WorkItemQueueState state = State;
if (state == WorkItemQueueState.Stopped)
return null;
int removeId = _removeId;
int addId = _addId;
if (removeId == addId || state == WorkItemQueueState.Paused) {
if (spinWait.Count <= 5)
spinWait.SpinOnce();
else {
_mreAdd.Reset();
if ((removeId != _removeId || addId != _addId) && state != 0)
_mreAdd.Set();
else
_mreAdd.Wait(500);
}
} else if (Interlocked.CompareExchange(ref _removeId, removeId + 1, removeId) == removeId) {
break;
}
}
WorkItem result = null;
while (result == null) {
ConcurrentQueue<WorkItem>[] innerQueues = _innerQueues;
for (int i = 0; i < innerQueues.Length && !innerQueues[i].TryDequeue(out result); i++) {
}
}
Interlocked.Increment(ref _itemsProcessed);
return result;
}
public void Start()
{
log.Info("{0}.{1} starting", Name, _savedState.Count);
if (Interlocked.CompareExchange(ref _state, 1, 0) == 0)
_mreAdd.Set();
}
public void Stop()
{
log.Info("{0}.{1} stopping - {2} WorkItems processed", Name, _savedState.Count, ItemsProcessed);
if (Interlocked.Exchange(ref _state, 2) != 2)
_mreAdd.Set();
}
public void Pause()
{
log.Debug("{0}.{1} pausing", Name, _savedState.Count);
Interlocked.CompareExchange(ref _state, 0, 1);
}
internal void Save()
{
bool num = State == WorkItemQueueState.Running;
if (num)
Pause();
_savedState.Push(new SavedState(this));
InitializeQueues();
if (num)
Start();
}
internal void Restore()
{
SavedState savedState = _savedState.Pop();
for (int i = 0; i < 2; i++) {
WorkItem result;
while (_innerQueues[i].TryDequeue(out result)) {
savedState.InnerQueues[i].Enqueue(result);
}
}
_innerQueues = savedState.InnerQueues;
_addId += savedState.AddId;
_removeId += savedState.RemoveId;
}
internal string DumpContents()
{
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.AppendLine($"""{Name}""{_savedState.Count}");
if (IsEmpty)
stringBuilder.AppendLine(" <empty>");
else {
for (int i = 0; i < 2; i++) {
foreach (WorkItem item in _innerQueues[i]) {
stringBuilder.AppendLine($"""{i}""{item.Name}");
}
}
}
int num = 0;
foreach (SavedState item2 in _savedState) {
stringBuilder.AppendLine($"""{num++}");
bool flag = true;
for (int j = 0; j < 2; j++) {
foreach (WorkItem item3 in item2.InnerQueues[j]) {
stringBuilder.AppendLine($"""{j}""{item3.Name}");
flag = false;
}
}
if (flag)
stringBuilder.AppendLine(" <empty>");
}
return stringBuilder.ToString();
}
}
}