<PackageReference Include="NUnit" Version="4.3.0" />

ParallelWorkItemDispatcher

ParallelWorkItemDispatcher handles execution of work items by queuing them for worker threads to process.
using System; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; namespace NUnit.Framework.Internal.Execution { [NullableContext(1)] [Nullable(0)] public class ParallelWorkItemDispatcher : IWorkItemDispatcher { private static readonly Logger Log = InternalTrace.GetLogger("Dispatcher"); private const int WaitForForcedTermination = 5000; [Nullable(2)] private WorkItem _topLevelWorkItem; private readonly Stack<WorkItem> _savedWorkItems = new Stack<WorkItem>(); private readonly List<CompositeWorkItem> _activeWorkItems = new List<CompositeWorkItem>(); private readonly object _queueLock = new object(); private int _isolationLevel; public int LevelOfParallelism { get; } public IEnumerable<WorkShift> Shifts { get { yield return ParallelShift; yield return NonParallelShift; yield return NonParallelSTAShift; } } public IEnumerable<WorkItemQueue> Queues { get { yield return ParallelQueue; yield return ParallelSTAQueue; yield return NonParallelQueue; yield return NonParallelSTAQueue; } } private WorkShift ParallelShift { get; } = new WorkShift("Parallel"); private WorkShift NonParallelShift { get; } = new WorkShift("NonParallel"); private WorkShift NonParallelSTAShift { get; } = new WorkShift("NonParallelSTA"); private WorkItemQueue ParallelQueue { get; } = new WorkItemQueue("ParallelQueue", true, ApartmentState.MTA); private WorkItemQueue ParallelSTAQueue { get; } = new WorkItemQueue("ParallelSTAQueue", true, ApartmentState.STA); private WorkItemQueue NonParallelQueue { get; } = new WorkItemQueue("NonParallelQueue", false, ApartmentState.MTA); private WorkItemQueue NonParallelSTAQueue { get; } = new WorkItemQueue("NonParallelSTAQueue", false, ApartmentState.STA); [Nullable(2)] [method: NullableContext(2)] [field: Nullable(2)] public event ShiftChangeEventHandler ShiftStarting; [Nullable(2)] [method: NullableContext(2)] [field: Nullable(2)] public event ShiftChangeEventHandler ShiftFinished; public ParallelWorkItemDispatcher(int levelOfParallelism) { Log.Info("Initializing with {0} workers", levelOfParallelism); LevelOfParallelism = levelOfParallelism; InitializeShifts(); } private void InitializeShifts() { foreach (WorkShift shift in Shifts) { shift.EndOfShift += OnEndOfShift; } ParallelShift.AddQueue(ParallelQueue); ParallelShift.AddQueue(ParallelSTAQueue); NonParallelShift.AddQueue(NonParallelQueue); NonParallelSTAShift.AddQueue(NonParallelSTAQueue); for (int i = 1; i <= LevelOfParallelism; i++) { DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(24, 1); defaultInterpolatedStringHandler.AppendLiteral("NUnit.Fw.ParallelWorker#"); defaultInterpolatedStringHandler.AppendFormatted(i); string name = defaultInterpolatedStringHandler.ToStringAndClear(); ParallelShift.Assign(new TestWorker(ParallelQueue, name)); } ParallelShift.Assign(new TestWorker(ParallelSTAQueue, "NUnit.Fw.ParallelSTAWorker")); TestWorker testWorker = new TestWorker(NonParallelQueue, "NUnit.Fw.NonParallelWorker"); testWorker.Busy += OnStartNonParallelWorkItem; NonParallelShift.Assign(testWorker); testWorker = new TestWorker(NonParallelSTAQueue, "NUnit.Fw.NonParallelSTAWorker"); testWorker.Busy += OnStartNonParallelWorkItem; NonParallelSTAShift.Assign(testWorker); } private void OnStartNonParallelWorkItem(TestWorker worker, WorkItem work) { if (work.IsolateChildTests) IsolateQueues(work); } public void Start(WorkItem topLevelWorkItem) { _topLevelWorkItem = topLevelWorkItem; Dispatch(topLevelWorkItem, InitialExecutionStrategy(topLevelWorkItem)); WorkShift workShift = SelectNextShift(); if (workShift != null) { this.ShiftStarting?.Invoke(workShift); workShift.Start(); } } private static ParallelExecutionStrategy InitialExecutionStrategy(WorkItem workItem) { if (workItem.ParallelScope != 0 && workItem.ParallelScope != ParallelScope.None) return ParallelExecutionStrategy.Parallel; return ParallelExecutionStrategy.NonParallel; } public void Dispatch(WorkItem work) { Dispatch(work, work.ExecutionStrategy); } private void Dispatch(WorkItem work, ParallelExecutionStrategy strategy) { Log.Debug("Using {0} strategy for {1}", strategy, work.Name); CompositeWorkItem compositeWorkItem = work as CompositeWorkItem; if (compositeWorkItem != null) { lock (_activeWorkItems) { _activeWorkItems.Add(compositeWorkItem); compositeWorkItem.Completed += OnWorkItemCompletion; } } if (!work.Context.IsSingleThreaded) { switch (strategy) { default: work.Execute(); break; case ParallelExecutionStrategy.Parallel: if (work.TargetApartment == ApartmentState.STA) ParallelSTAQueue.Enqueue(work); else ParallelQueue.Enqueue(work); break; case ParallelExecutionStrategy.NonParallel: if (work.TargetApartment == ApartmentState.STA) NonParallelSTAQueue.Enqueue(work); else NonParallelQueue.Enqueue(work); break; } } else work.Execute(); } public void CancelRun(bool force) { if (_topLevelWorkItem == null) throw new InvalidOperationException("Called Cancel without Start"); foreach (WorkShift shift in Shifts) { shift.Cancel(force); } if (force) { SpinWait.SpinUntil(() => _topLevelWorkItem.State == WorkItemState.Complete, 5000); lock (_activeWorkItems) { int num = _activeWorkItems.Count; while (num > 0) { CompositeWorkItem compositeWorkItem = _activeWorkItems[--num]; if (compositeWorkItem.State == WorkItemState.Running) new CompositeWorkItem.OneTimeTearDownWorkItem(compositeWorkItem).WorkItemCancelled(); } } } } internal void IsolateQueues(WorkItem work) { if (_topLevelWorkItem == null) throw new InvalidOperationException("Called IsolateQueues without Start"); Log.Info("Saving Queue State for {0}", work.Name); lock (_queueLock) { foreach (WorkItemQueue queue in Queues) { queue.Save(); } _savedWorkItems.Push(_topLevelWorkItem); _topLevelWorkItem = work; _isolationLevel++; } } private void TryRestoreQueues() { lock (_queueLock) { if (_isolationLevel <= 0) Log.Debug("Ignoring call to restore Queue State"); else { Log.Info("Restoring Queue State"); foreach (WorkItemQueue queue in Queues) { queue.Restore(); } _topLevelWorkItem = _savedWorkItems.Pop(); _isolationLevel--; } } } private void OnWorkItemCompletion([Nullable(2)] object sender, EventArgs args) { CompositeWorkItem compositeWorkItem = sender as CompositeWorkItem; if (compositeWorkItem != null) { lock (_activeWorkItems) { _activeWorkItems.Remove(compositeWorkItem); compositeWorkItem.Completed -= OnWorkItemCompletion; } } } private void OnEndOfShift(WorkShift endingShift) { this.ShiftFinished?.Invoke(endingShift); while (true) { if (_topLevelWorkItem.State != WorkItemState.Complete) { WorkShift workShift = SelectNextShift(); if (workShift != null) { this.ShiftStarting?.Invoke(workShift); workShift.Start(); return; } } else { if (_isolationLevel <= 0) break; TryRestoreQueues(); } } foreach (WorkShift shift in Shifts) { shift.ShutDown(); } } [NullableContext(2)] private WorkShift SelectNextShift() { foreach (WorkShift shift in Shifts) { if (shift.HasWork) return shift; } return null; } } }