<PackageReference Include="NUnit" Version="3.11.0" />

ParallelWorkItemDispatcher

ParallelWorkItemDispatcher handles execution of work items by queuing them for worker threads to process.
using System; using System.Collections.Generic; namespace NUnit.Framework.Internal.Execution { public class ParallelWorkItemDispatcher : IWorkItemDispatcher { private static readonly Logger log = InternalTrace.GetLogger("Dispatcher"); private WorkItem _topLevelWorkItem; private readonly Stack<WorkItem> _savedWorkItems = new Stack<WorkItem>(); private readonly object _queueLock = new object(); private int _isolationLevel; public int LevelOfParallelism { get; } public IEnumerable<WorkShift> Shifts { get { yield return ParallelShift; yield return NonParallelShift; } } public IEnumerable<WorkItemQueue> Queues { get { yield return ParallelQueue; yield return NonParallelQueue; } } private WorkShift ParallelShift { get; } = new WorkShift("Parallel"); private WorkShift NonParallelShift { get; } = new WorkShift("NonParallel"); private WorkItemQueue ParallelQueue { get; } = new WorkItemQueue("ParallelQueue", true); private WorkItemQueue NonParallelQueue { get; } = new WorkItemQueue("NonParallelQueue", false); public event ShiftChangeEventHandler ShiftStarting; public event ShiftChangeEventHandler ShiftFinished; public ParallelWorkItemDispatcher(int levelOfParallelism) { log.Info("Initializing with {0} workers", levelOfParallelism); LevelOfParallelism = levelOfParallelism; InitializeShifts(); } private void InitializeShifts() { foreach (WorkShift shift in Shifts) { shift.EndOfShift += OnEndOfShift; } ParallelShift.AddQueue(ParallelQueue); NonParallelShift.AddQueue(NonParallelQueue); for (int i = 1; i <= LevelOfParallelism; i++) { string name = string.Format("ParallelWorker#" + i.ToString(), Array.Empty<object>()); ParallelShift.Assign(new TestWorker(ParallelQueue, name)); } TestWorker testWorker = new TestWorker(NonParallelQueue, "NonParallelWorker"); testWorker.Busy += OnStartNonParallelWorkItem; NonParallelShift.Assign(testWorker); } private void OnStartNonParallelWorkItem(TestWorker worker, WorkItem work) { if (work.IsolateChildTests) IsolateQueues(work); } public void Start(WorkItem topLevelWorkItem) { _topLevelWorkItem = topLevelWorkItem; Dispatch(topLevelWorkItem, InitialExecutionStrategy(topLevelWorkItem)); WorkShift workShift = SelectNextShift(); this.ShiftStarting?.Invoke(workShift); workShift.Start(); } private static ParallelExecutionStrategy InitialExecutionStrategy(WorkItem workItem) { if (workItem.ParallelScope != 0 && workItem.ParallelScope != ParallelScope.None) return ParallelExecutionStrategy.Parallel; return ParallelExecutionStrategy.NonParallel; } public void Dispatch(WorkItem work) { Dispatch(work, work.ExecutionStrategy); } private void Dispatch(WorkItem work, ParallelExecutionStrategy strategy) { log.Debug("Using {0} strategy for {1}", strategy, work.Name); switch (strategy) { default: work.Execute(); break; case ParallelExecutionStrategy.Parallel: ParallelQueue.Enqueue(work); break; case ParallelExecutionStrategy.NonParallel: NonParallelQueue.Enqueue(work); break; } } public void CancelRun(bool force) { foreach (WorkShift shift in Shifts) { shift.Cancel(force); } } internal void IsolateQueues(WorkItem work) { log.Info("Saving Queue State for {0}", work.Name); lock (_queueLock) { foreach (WorkItemQueue queue in Queues) { queue.Save(); } _savedWorkItems.Push(_topLevelWorkItem); _topLevelWorkItem = work; _isolationLevel++; } } private void RestoreQueues() { Guard.OperationValid(_isolationLevel > 0, "Called RestoreQueues with no saved queues."); lock (_queueLock) { log.Info("Restoring Queue State"); foreach (WorkItemQueue queue in Queues) { queue.Restore(); } _topLevelWorkItem = _savedWorkItems.Pop(); _isolationLevel--; } } private void OnEndOfShift(WorkShift endingShift) { this.ShiftFinished?.Invoke(endingShift); WorkShift workShift = null; while (true) { if (_topLevelWorkItem.State != WorkItemState.Complete) { workShift = SelectNextShift(); if (workShift != null) { this.ShiftStarting?.Invoke(workShift); workShift.Start(); return; } } else { if (_isolationLevel <= 0) break; RestoreQueues(); } } foreach (WorkShift shift in Shifts) { shift.ShutDown(); } } private WorkShift SelectNextShift() { foreach (WorkShift shift in Shifts) { if (shift.HasWork) return shift; } return null; } } }