ParallelWorkItemDispatcher
ParallelWorkItemDispatcher handles execution of work items by
queuing them for worker threads to process.
using System;
using System.Collections.Generic;
namespace NUnit.Framework.Internal.Execution
{
public class ParallelWorkItemDispatcher : IWorkItemDispatcher
{
private static readonly Logger log = InternalTrace.GetLogger("Dispatcher");
private WorkItem _topLevelWorkItem;
private readonly Stack<WorkItem> _savedWorkItems = new Stack<WorkItem>();
private readonly object _queueLock = new object();
private int _isolationLevel;
public int LevelOfParallelism { get; }
public IEnumerable<WorkShift> Shifts {
get {
yield return ParallelShift;
yield return NonParallelShift;
}
}
public IEnumerable<WorkItemQueue> Queues {
get {
yield return ParallelQueue;
yield return NonParallelQueue;
}
}
private WorkShift ParallelShift { get; } = new WorkShift("Parallel");
private WorkShift NonParallelShift { get; } = new WorkShift("NonParallel");
private WorkItemQueue ParallelQueue { get; } = new WorkItemQueue("ParallelQueue", true);
private WorkItemQueue NonParallelQueue { get; } = new WorkItemQueue("NonParallelQueue", false);
public event ShiftChangeEventHandler ShiftStarting;
public event ShiftChangeEventHandler ShiftFinished;
public ParallelWorkItemDispatcher(int levelOfParallelism)
{
log.Info("Initializing with {0} workers", levelOfParallelism);
LevelOfParallelism = levelOfParallelism;
InitializeShifts();
}
private void InitializeShifts()
{
foreach (WorkShift shift in Shifts) {
shift.EndOfShift += OnEndOfShift;
}
ParallelShift.AddQueue(ParallelQueue);
NonParallelShift.AddQueue(NonParallelQueue);
for (int i = 1; i <= LevelOfParallelism; i++) {
string name = string.Format("ParallelWorker#" + i.ToString(), Array.Empty<object>());
ParallelShift.Assign(new TestWorker(ParallelQueue, name));
}
TestWorker testWorker = new TestWorker(NonParallelQueue, "NonParallelWorker");
testWorker.Busy += OnStartNonParallelWorkItem;
NonParallelShift.Assign(testWorker);
}
private void OnStartNonParallelWorkItem(TestWorker worker, WorkItem work)
{
if (work.IsolateChildTests)
IsolateQueues(work);
}
public void Start(WorkItem topLevelWorkItem)
{
_topLevelWorkItem = topLevelWorkItem;
Dispatch(topLevelWorkItem, InitialExecutionStrategy(topLevelWorkItem));
WorkShift workShift = SelectNextShift();
this.ShiftStarting?.Invoke(workShift);
workShift.Start();
}
private static ParallelExecutionStrategy InitialExecutionStrategy(WorkItem workItem)
{
if (workItem.ParallelScope != 0 && workItem.ParallelScope != ParallelScope.None)
return ParallelExecutionStrategy.Parallel;
return ParallelExecutionStrategy.NonParallel;
}
public void Dispatch(WorkItem work)
{
Dispatch(work, work.ExecutionStrategy);
}
private void Dispatch(WorkItem work, ParallelExecutionStrategy strategy)
{
log.Debug("Using {0} strategy for {1}", strategy, work.Name);
switch (strategy) {
default:
work.Execute();
break;
case ParallelExecutionStrategy.Parallel:
ParallelQueue.Enqueue(work);
break;
case ParallelExecutionStrategy.NonParallel:
NonParallelQueue.Enqueue(work);
break;
}
}
public void CancelRun(bool force)
{
foreach (WorkShift shift in Shifts) {
shift.Cancel(force);
}
}
internal void IsolateQueues(WorkItem work)
{
log.Info("Saving Queue State for {0}", work.Name);
lock (_queueLock) {
foreach (WorkItemQueue queue in Queues) {
queue.Save();
}
_savedWorkItems.Push(_topLevelWorkItem);
_topLevelWorkItem = work;
_isolationLevel++;
}
}
private void RestoreQueues()
{
Guard.OperationValid(_isolationLevel > 0, "Called RestoreQueues with no saved queues.");
lock (_queueLock) {
log.Info("Restoring Queue State");
foreach (WorkItemQueue queue in Queues) {
queue.Restore();
}
_topLevelWorkItem = _savedWorkItems.Pop();
_isolationLevel--;
}
}
private void OnEndOfShift(WorkShift endingShift)
{
this.ShiftFinished?.Invoke(endingShift);
WorkShift workShift = null;
while (true) {
if (_topLevelWorkItem.State != WorkItemState.Complete) {
workShift = SelectNextShift();
if (workShift != null) {
this.ShiftStarting?.Invoke(workShift);
workShift.Start();
return;
}
} else {
if (_isolationLevel <= 0)
break;
RestoreQueues();
}
}
foreach (WorkShift shift in Shifts) {
shift.ShutDown();
}
}
private WorkShift SelectNextShift()
{
foreach (WorkShift shift in Shifts) {
if (shift.HasWork)
return shift;
}
return null;
}
}
}