ParallelWorkItemDispatcher
ParallelWorkItemDispatcher handles execution of work items by
queuing them for worker threads to process.
using System;
using System.Collections.Generic;
using System.Threading;
namespace NUnit.Framework.Internal.Execution
{
public class ParallelWorkItemDispatcher : IWorkItemDispatcher
{
private static Logger log = InternalTrace.GetLogger("WorkItemDispatcher");
private int _levelOfParallelism;
private int _itemsDispatched;
private WorkShift _parallelShift = new WorkShift("Parallel");
private WorkShift _nonParallelShift = new WorkShift("NonParallel");
private WorkShift _nonParallelSTAShift = new WorkShift("NonParallelSTA");
private WorkItemQueue _parallelQueue;
private WorkItemQueue _parallelSTAQueue;
private WorkItemQueue _nonParallelQueue;
private WorkItemQueue _nonParallelSTAQueue;
private WorkItem _topLevelWorkItem;
public IEnumerable<WorkShift> Shifts { get; set; }
private WorkItemQueue ParallelQueue {
get {
if (_parallelQueue == null) {
_parallelQueue = new WorkItemQueue("ParallelQueue");
_parallelShift.AddQueue(_parallelQueue);
for (int i = 1; i <= _levelOfParallelism; i++) {
string name = string.Format("Worker#" + i.ToString());
_parallelShift.Assign(new TestWorker(_parallelQueue, name, ApartmentState.MTA));
}
}
return _parallelQueue;
}
}
private WorkItemQueue ParallelSTAQueue {
get {
if (_parallelSTAQueue == null) {
_parallelSTAQueue = new WorkItemQueue("ParallelSTAQueue");
_parallelShift.AddQueue(_parallelSTAQueue);
_parallelShift.Assign(new TestWorker(_parallelSTAQueue, "Worker#STA", ApartmentState.STA));
}
return _parallelSTAQueue;
}
}
private WorkItemQueue NonParallelQueue {
get {
if (_nonParallelQueue == null) {
_nonParallelQueue = new WorkItemQueue("NonParallelQueue");
_nonParallelShift.AddQueue(_nonParallelQueue);
_nonParallelShift.Assign(new TestWorker(_nonParallelQueue, "Worker#STA_NP", ApartmentState.MTA));
}
return _nonParallelQueue;
}
}
private WorkItemQueue NonParallelSTAQueue {
get {
if (_nonParallelSTAQueue == null) {
_nonParallelSTAQueue = new WorkItemQueue("NonParallelSTAQueue");
_nonParallelSTAShift.AddQueue(_nonParallelSTAQueue);
_nonParallelSTAShift.Assign(new TestWorker(_nonParallelSTAQueue, "Worker#NP_STA", ApartmentState.STA));
}
return _nonParallelSTAQueue;
}
}
public ParallelWorkItemDispatcher(int levelOfParallelism)
{
_levelOfParallelism = levelOfParallelism;
Shifts = new WorkShift[3] {
_parallelShift,
_nonParallelShift,
_nonParallelSTAShift
};
foreach (WorkShift shift in Shifts) {
shift.EndOfShift += OnEndOfShift;
}
}
public void Dispatch(WorkItem work)
{
if (_topLevelWorkItem == null) {
_topLevelWorkItem = work;
Enqueue(work);
StartNextShift();
} else if (work is SimpleWorkItem || work.Test.FixtureType == (Type)null) {
Execute(work);
} else {
Enqueue(work);
}
_itemsDispatched++;
}
private void Execute(WorkItem work)
{
log.Debug("Directly executing {0}", work.Test.Name);
work.Execute();
}
private void Enqueue(WorkItem work)
{
log.Debug("Enqueuing {0}", work.Test.Name);
if (work.IsParallelizable) {
if (work.TargetApartment == ApartmentState.STA)
ParallelSTAQueue.Enqueue(work);
else
ParallelQueue.Enqueue(work);
} else if (work.TargetApartment == ApartmentState.STA) {
NonParallelSTAQueue.Enqueue(work);
} else {
NonParallelQueue.Enqueue(work);
}
}
public void CancelRun()
{
foreach (WorkShift shift in Shifts) {
shift.Cancel();
}
}
private void OnEndOfShift(object sender, EventArgs ea)
{
if (!StartNextShift()) {
foreach (WorkShift shift in Shifts) {
shift.ShutDown();
}
}
}
private bool StartNextShift()
{
foreach (WorkShift shift in Shifts) {
if (shift.HasWork) {
shift.Start();
return true;
}
}
return false;
}
}
}