<PackageReference Include="NUnit" Version="3.0.0-alpha-4" />

ParallelWorkItemDispatcher

ParallelWorkItemDispatcher handles execution of work items by queuing them for worker threads to process.
using System; using System.Collections.Generic; using System.Threading; namespace NUnit.Framework.Internal.Execution { public class ParallelWorkItemDispatcher : IWorkItemDispatcher { private static Logger log = InternalTrace.GetLogger("WorkItemDispatcher"); private int _levelOfParallelism; private int _itemsDispatched; private WorkShift _parallelShift = new WorkShift("Parallel"); private WorkShift _nonParallelShift = new WorkShift("NonParallel"); private WorkShift _nonParallelSTAShift = new WorkShift("NonParallelSTA"); private WorkItemQueue _parallelQueue; private WorkItemQueue _parallelSTAQueue; private WorkItemQueue _nonParallelQueue; private WorkItemQueue _nonParallelSTAQueue; private WorkItem _topLevelWorkItem; public IEnumerable<WorkShift> Shifts { get; set; } private WorkItemQueue ParallelQueue { get { if (_parallelQueue == null) { _parallelQueue = new WorkItemQueue("ParallelQueue"); _parallelShift.AddQueue(_parallelQueue); for (int i = 1; i <= _levelOfParallelism; i++) { string name = string.Format("Worker#" + i.ToString()); _parallelShift.Assign(new TestWorker(_parallelQueue, name, ApartmentState.MTA)); } } return _parallelQueue; } } private WorkItemQueue ParallelSTAQueue { get { if (_parallelSTAQueue == null) { _parallelSTAQueue = new WorkItemQueue("ParallelSTAQueue"); _parallelShift.AddQueue(_parallelSTAQueue); _parallelShift.Assign(new TestWorker(_parallelSTAQueue, "Worker#STA", ApartmentState.STA)); } return _parallelSTAQueue; } } private WorkItemQueue NonParallelQueue { get { if (_nonParallelQueue == null) { _nonParallelQueue = new WorkItemQueue("NonParallelQueue"); _nonParallelShift.AddQueue(_nonParallelQueue); _nonParallelShift.Assign(new TestWorker(_nonParallelQueue, "Worker#STA_NP", ApartmentState.MTA)); } return _nonParallelQueue; } } private WorkItemQueue NonParallelSTAQueue { get { if (_nonParallelSTAQueue == null) { _nonParallelSTAQueue = new WorkItemQueue("NonParallelSTAQueue"); _nonParallelSTAShift.AddQueue(_nonParallelSTAQueue); _nonParallelSTAShift.Assign(new TestWorker(_nonParallelSTAQueue, "Worker#NP_STA", ApartmentState.STA)); } return _nonParallelSTAQueue; } } public ParallelWorkItemDispatcher(int levelOfParallelism) { _levelOfParallelism = levelOfParallelism; Shifts = new WorkShift[3] { _parallelShift, _nonParallelShift, _nonParallelSTAShift }; foreach (WorkShift shift in Shifts) { shift.EndOfShift += OnEndOfShift; } } public void Dispatch(WorkItem work) { if (_topLevelWorkItem == null) { _topLevelWorkItem = work; Enqueue(work); StartNextShift(); } else if (work is SimpleWorkItem || work.Test.FixtureType == (Type)null) { Execute(work); } else { Enqueue(work); } _itemsDispatched++; } private void Execute(WorkItem work) { log.Debug("Directly executing {0}", work.Test.Name); work.Execute(); } private void Enqueue(WorkItem work) { log.Debug("Enqueuing {0}", work.Test.Name); if (work.IsParallelizable) { if (work.TargetApartment == ApartmentState.STA) ParallelSTAQueue.Enqueue(work); else ParallelQueue.Enqueue(work); } else if (work.TargetApartment == ApartmentState.STA) { NonParallelSTAQueue.Enqueue(work); } else { NonParallelQueue.Enqueue(work); } } public void CancelRun() { foreach (WorkShift shift in Shifts) { shift.Cancel(); } } private void OnEndOfShift(object sender, EventArgs ea) { if (!StartNextShift()) { foreach (WorkShift shift in Shifts) { shift.ShutDown(); } } } private bool StartNextShift() { foreach (WorkShift shift in Shifts) { if (shift.HasWork) { shift.Start(); return true; } } return false; } } }