ParallelWorkItemDispatcher
ParallelWorkItemDispatcher handles execution of work items by
queuing them for worker threads to process.
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
namespace NUnit.Framework.Internal.Execution
{
[NullableContext(1)]
[Nullable(0)]
public class ParallelWorkItemDispatcher : IWorkItemDispatcher
{
private static readonly Logger Log = InternalTrace.GetLogger("Dispatcher");
private const int WaitForForcedTermination = 5000;
[Nullable(2)]
private WorkItem _topLevelWorkItem;
private readonly Stack<WorkItem> _savedWorkItems = new Stack<WorkItem>();
private readonly List<CompositeWorkItem> _activeWorkItems = new List<CompositeWorkItem>();
private readonly object _queueLock = new object();
private int _isolationLevel;
public int LevelOfParallelism { get; }
public IEnumerable<WorkShift> Shifts {
get {
yield return ParallelShift;
yield return NonParallelShift;
yield return NonParallelSTAShift;
}
}
public IEnumerable<WorkItemQueue> Queues {
get {
yield return ParallelQueue;
yield return ParallelSTAQueue;
yield return NonParallelQueue;
yield return NonParallelSTAQueue;
}
}
private WorkShift ParallelShift { get; } = new WorkShift("Parallel");
private WorkShift NonParallelShift { get; } = new WorkShift("NonParallel");
private WorkShift NonParallelSTAShift { get; } = new WorkShift("NonParallelSTA");
private WorkItemQueue ParallelQueue { get; } = new WorkItemQueue("ParallelQueue", true, ApartmentState.MTA);
private WorkItemQueue ParallelSTAQueue { get; } = new WorkItemQueue("ParallelSTAQueue", true, ApartmentState.STA);
private WorkItemQueue NonParallelQueue { get; } = new WorkItemQueue("NonParallelQueue", false, ApartmentState.MTA);
private WorkItemQueue NonParallelSTAQueue { get; } = new WorkItemQueue("NonParallelSTAQueue", false, ApartmentState.STA);
[Nullable(2)]
[method: NullableContext(2)]
[field: Nullable(2)]
public event ShiftChangeEventHandler ShiftStarting;
[Nullable(2)]
[method: NullableContext(2)]
[field: Nullable(2)]
public event ShiftChangeEventHandler ShiftFinished;
public ParallelWorkItemDispatcher(int levelOfParallelism)
{
Log.Info("Initializing with {0} workers", levelOfParallelism);
LevelOfParallelism = levelOfParallelism;
InitializeShifts();
}
private void InitializeShifts()
{
foreach (WorkShift shift in Shifts) {
shift.EndOfShift += OnEndOfShift;
}
ParallelShift.AddQueue(ParallelQueue);
ParallelShift.AddQueue(ParallelSTAQueue);
NonParallelShift.AddQueue(NonParallelQueue);
NonParallelSTAShift.AddQueue(NonParallelSTAQueue);
for (int i = 1; i <= LevelOfParallelism; i++) {
DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(24, 1);
defaultInterpolatedStringHandler.AppendLiteral("NUnit.Fw.ParallelWorker#");
defaultInterpolatedStringHandler.AppendFormatted(i);
string name = defaultInterpolatedStringHandler.ToStringAndClear();
ParallelShift.Assign(new TestWorker(ParallelQueue, name));
}
ParallelShift.Assign(new TestWorker(ParallelSTAQueue, "NUnit.Fw.ParallelSTAWorker"));
TestWorker testWorker = new TestWorker(NonParallelQueue, "NUnit.Fw.NonParallelWorker");
testWorker.Busy += OnStartNonParallelWorkItem;
NonParallelShift.Assign(testWorker);
testWorker = new TestWorker(NonParallelSTAQueue, "NUnit.Fw.NonParallelSTAWorker");
testWorker.Busy += OnStartNonParallelWorkItem;
NonParallelSTAShift.Assign(testWorker);
}
private void OnStartNonParallelWorkItem(TestWorker worker, WorkItem work)
{
if (work.IsolateChildTests)
IsolateQueues(work);
}
public void Start(WorkItem topLevelWorkItem)
{
_topLevelWorkItem = topLevelWorkItem;
Dispatch(topLevelWorkItem, InitialExecutionStrategy(topLevelWorkItem));
WorkShift workShift = SelectNextShift();
if (workShift != null) {
this.ShiftStarting?.Invoke(workShift);
workShift.Start();
}
}
private static ParallelExecutionStrategy InitialExecutionStrategy(WorkItem workItem)
{
if (workItem.ParallelScope != 0 && workItem.ParallelScope != ParallelScope.None)
return ParallelExecutionStrategy.Parallel;
return ParallelExecutionStrategy.NonParallel;
}
public void Dispatch(WorkItem work)
{
Dispatch(work, work.ExecutionStrategy);
}
private void Dispatch(WorkItem work, ParallelExecutionStrategy strategy)
{
Log.Debug("Using {0} strategy for {1}", strategy, work.Name);
CompositeWorkItem compositeWorkItem = work as CompositeWorkItem;
if (compositeWorkItem != null) {
lock (_activeWorkItems) {
_activeWorkItems.Add(compositeWorkItem);
compositeWorkItem.Completed += OnWorkItemCompletion;
}
}
if (!work.Context.IsSingleThreaded) {
switch (strategy) {
default:
work.Execute();
break;
case ParallelExecutionStrategy.Parallel:
if (work.TargetApartment == ApartmentState.STA)
ParallelSTAQueue.Enqueue(work);
else
ParallelQueue.Enqueue(work);
break;
case ParallelExecutionStrategy.NonParallel:
if (work.TargetApartment == ApartmentState.STA)
NonParallelSTAQueue.Enqueue(work);
else
NonParallelQueue.Enqueue(work);
break;
}
} else
work.Execute();
}
public void CancelRun(bool force)
{
if (_topLevelWorkItem == null)
throw new InvalidOperationException("Called Cancel without Start");
foreach (WorkShift shift in Shifts) {
shift.Cancel(force);
}
if (force) {
SpinWait.SpinUntil(() => _topLevelWorkItem.State == WorkItemState.Complete, 5000);
lock (_activeWorkItems) {
int num = _activeWorkItems.Count;
while (num > 0) {
CompositeWorkItem compositeWorkItem = _activeWorkItems[--num];
if (compositeWorkItem.State == WorkItemState.Running)
new CompositeWorkItem.OneTimeTearDownWorkItem(compositeWorkItem).WorkItemCancelled();
}
}
}
}
internal void IsolateQueues(WorkItem work)
{
if (_topLevelWorkItem == null)
throw new InvalidOperationException("Called IsolateQueues without Start");
Log.Info("Saving Queue State for {0}", work.Name);
lock (_queueLock) {
foreach (WorkItemQueue queue in Queues) {
queue.Save();
}
_savedWorkItems.Push(_topLevelWorkItem);
_topLevelWorkItem = work;
_isolationLevel++;
}
}
private void TryRestoreQueues()
{
lock (_queueLock) {
if (_isolationLevel <= 0)
Log.Debug("Ignoring call to restore Queue State");
else {
Log.Info("Restoring Queue State");
foreach (WorkItemQueue queue in Queues) {
queue.Restore();
}
_topLevelWorkItem = _savedWorkItems.Pop();
_isolationLevel--;
}
}
}
private void OnWorkItemCompletion([Nullable(2)] object sender, EventArgs args)
{
CompositeWorkItem compositeWorkItem = sender as CompositeWorkItem;
if (compositeWorkItem != null) {
lock (_activeWorkItems) {
_activeWorkItems.Remove(compositeWorkItem);
compositeWorkItem.Completed -= OnWorkItemCompletion;
}
}
}
private void OnEndOfShift(WorkShift endingShift)
{
this.ShiftFinished?.Invoke(endingShift);
while (true) {
if (_topLevelWorkItem.State != WorkItemState.Complete) {
WorkShift workShift = SelectNextShift();
if (workShift != null) {
this.ShiftStarting?.Invoke(workShift);
workShift.Start();
return;
}
} else {
if (_isolationLevel <= 0)
break;
TryRestoreQueues();
}
}
foreach (WorkShift shift in Shifts) {
shift.ShutDown();
}
}
[NullableContext(2)]
private WorkShift SelectNextShift()
{
foreach (WorkShift shift in Shifts) {
if (shift.HasWork)
return shift;
}
return null;
}
}
}