File: Engine\TaskWorkerThread.cs
Web Access
Project: ..\..\..\src\Deprecated\Engine\Microsoft.Build.Engine.csproj (Microsoft.Build.Engine)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
// THE ASSEMBLY BUILT FROM THIS SOURCE FILE HAS BEEN DEPRECATED FOR YEARS. IT IS BUILT ONLY TO PROVIDE
// BACKWARD COMPATIBILITY FOR API USERS WHO HAVE NOT YET MOVED TO UPDATED APIS. PLEASE DO NOT SEND PULL
// REQUESTS THAT CHANGE THIS FILE WITHOUT FIRST CHECKING WITH THE MAINTAINERS THAT THE FIX IS REQUIRED.
 
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Build.BuildEngine.Shared;
 
namespace Microsoft.Build.BuildEngine
{
    /// <summary>
    /// This class is a wrapper around a worker thread that executes a task
    /// </summary>
    internal class TaskWorkerThread
    {
        private enum NodeLoopExecutionMode
        {
            /// <summary>
            /// This is a mode of a thread that is not executing a task but is responsible for picking up
            /// work items as they arrive to the queue
            /// </summary>
            BaseActiveThread = 0,
            /// <summary>
            /// This is a mode of a thread that was in BaseActiveThread mode and processes a callback via
            /// IBuildEngine interface. If a work item arrives while the thread is in this morning it will pass
            /// the ownership of the queue to another thread
            /// </summary>
            WaitingActiveThread = 1,
            /// <summary>
            /// This is a mode of a thread that is not watching the work item queue and is waiting for
            /// results in order to return back to task execution.
            /// </summary>
            WaitingPassiveThread = 2
        }
 
        #region Constructors
        /// <summary>
        /// This constructor creates a worker thread which is immediately ready to be activated. Once
        /// activated the thread will execute tasks as they appear in the work item queue. Once the
        /// thread is blocked from executing tasks it will pass the ownership of the work item queue to another
        /// thread
        /// </summary>
        internal TaskWorkerThread(TaskExecutionModule parentModule, bool profileExecution)
        {
            this.parentModule = parentModule;
 
            // Initialize the data that only has to be set by the very first thread
            // created by the TEM
            this.exitTaskThreads = new ManualResetEvent(false);
            this.exitTaskThreadsCache = new ExitTaskCache(false);
            this.workerThreadQueue = new Queue<TaskWorkerThread>();
            this.handleIdToWorkerThread = new Hashtable();
            this.workItemQueue = new Queue<TaskExecutionState>();
            this.workItemInsertionEvent = new ManualResetEvent(false);
            this.waitingTasks = new Hashtable();
            this.profileExecution = profileExecution;
 
            InitializePerInstanceData();
        }
 
        /// <summary>
        /// This constructor is used by the class internally to create new instances when a thread
        /// becomes blocked by a user code callback.
        /// </summary>
        private TaskWorkerThread
        (
            TaskExecutionModule parentModule,
            ManualResetEvent exitTaskThreads,
            ExitTaskCache exitTaskThreadsCache,
            Queue<TaskWorkerThread> workerThreadQueue,
            Hashtable handleIdToWorkerThread,
            Queue<TaskExecutionState> workItemQueue,
            ManualResetEvent workItemInsertionEvent,
            Hashtable waitingTasks,
            bool profileExecution
        )
        {
            this.parentModule = parentModule;
            this.exitTaskThreads = exitTaskThreads;
            this.exitTaskThreadsCache = exitTaskThreadsCache;
            this.workerThreadQueue = workerThreadQueue;
            this.handleIdToWorkerThread = handleIdToWorkerThread;
            this.workItemQueue = workItemQueue;
            this.workItemInsertionEvent = workItemInsertionEvent;
            this.waitingTasks = waitingTasks;
            this.profileExecution = profileExecution;
 
            InitializePerInstanceData();
        }
 
        private void InitializePerInstanceData()
        {
            // Create events private to this thread
            this.activationEvent = new ManualResetEvent(false);
            this.localDoneNoticeEvent = new ManualResetEvent(false);
            this.threadActive = false;
            this.postedBuildResults = new LinkedList<BuildResult>();
            this.currentWorkitem = null;
 
            // Clear out the handles cache
            BaseActiveThreadWaitHandles = null;
            WaitingActiveThreadWaitHandles = null;
            WaitingPassiveThreadWaitHandles = null;
 
            // Start the thread that will be processing the events
            ThreadStart threadState = new ThreadStart(this.MainThreadLoop);
            Thread taskThread = new Thread(threadState);
            taskThread.Name = "MSBuild Task Worker";
            taskThread.SetApartmentState(ApartmentState.STA);
            taskThread.Start();
        }
        #endregion
 
        #region Properties
        /// <summary>
        /// This event is triggered by the node when a done notice is received
        /// </summary>
        internal ManualResetEvent LocalDoneNoticeEvent
        {
            get
            {
                return this.localDoneNoticeEvent;
            }
        }
 
        internal int WorkItemCount
        {
            get
            {
                // UNDONE this access depends on thread safety of workItemQueue.Count
                return this.workItemQueue.Count;
            }
        }
 
        #endregion
 
        #region Methods
 
        /// <summary>
        /// This a base loop of a worker thread. The worker thread is asleep waiting for either an
        /// event indicating that it should shut down or that it should become active and take
        /// ownership of the work item queue
        /// </summary>
        private void MainThreadLoop()
        {
            // Create an array of event to the node thread responds
            WaitHandle[] waitHandles = new WaitHandle[2];
            waitHandles[0] = exitTaskThreads;
            waitHandles[1] = activationEvent;
 
            bool continueExecution = true;
 
            parentModule.IncrementOverallThreadCount();
 
            try
            {
                while (continueExecution)
                {
                    // Wait for the next work item or an exit command
                    int eventType = WaitHandle.WaitAny(waitHandles);
 
                    if (eventType == 0)
                    {
                        // Exit node event
                        continueExecution = false;
                    }
                    else
                    {
                        activationEvent.Reset();
                        // Start the base loop, this will return once this is no longer the active thread
                        NodeActionLoop(NodeLoopExecutionMode.BaseActiveThread, EngineCallback.invalidEngineHandle, null);
                        // Add this thread to the inactive list
                        lock (workerThreadQueue)
                        {
                            workerThreadQueue.Enqueue(this);
                        }
                    }
                }
            }
            finally
            {
                parentModule.DecrementOverallThreadCount();
            }
        }
 
        /// <summary>
        /// Don't wait on system objects if we don't have to - see if we have work to do.
        /// </summary>
        /// <param name="executionMode"></param>
        /// <param name="index"></param>
        /// <returns></returns>
        private bool WaitAnyFast(NodeLoopExecutionMode executionMode, out int index)
        {
            index = 0;
 
            if (exitTaskThreadsCache.exitTaskThreads)
            {
                return true;
            }
 
            if (executionMode == NodeLoopExecutionMode.BaseActiveThread)
            {
                lock (workItemQueue)
                {
                    if (workItemQueue.Count > 0)
                    {
                        index = 1;
                        return true;
                    }
                }
            }
            else if (executionMode == NodeLoopExecutionMode.WaitingActiveThread)
            {
                lock (workItemQueue)
                {
                    if (workItemQueue.Count > 0)
                    {
                        index = 1;
                        return true;
                    }
                }
                lock (postedBuildResults)
                {
                    if (postedBuildResults.Count > 0)
                    {
                        index = 2;
                        return true;
                    }
                }
            }
            else if (executionMode == NodeLoopExecutionMode.WaitingPassiveThread)
            {
                lock (postedBuildResults)
                {
                    if (postedBuildResults.Count > 0)
                    {
                        index = 1;
                        return true;
                    }
                }
            }
            else
            {
                ErrorUtilities.VerifyThrow(false, "Unexpected NodeLoopExecutionMode");
            }
 
            return false;
        }
 
        /// <summary>
        /// This function calculates the array of events the thread should wait on depending on its
        /// execution mode
        /// </summary>
        /// <param name="executionMode">Current execution mode</param>
        /// <returns>Array of handles to wait on</returns>
        private WaitHandle[] GetHandlesArray(NodeLoopExecutionMode executionMode)
        {
            WaitHandle[] waitHandles = null;
 
            if (executionMode == NodeLoopExecutionMode.BaseActiveThread)
            {
                if (BaseActiveThreadWaitHandles == null)
                {
                    BaseActiveThreadWaitHandles = new WaitHandle[2];
                    BaseActiveThreadWaitHandles[0] = exitTaskThreads;
                    BaseActiveThreadWaitHandles[1] = workItemInsertionEvent;
                }
                waitHandles = BaseActiveThreadWaitHandles;
            }
            else if (executionMode == NodeLoopExecutionMode.WaitingActiveThread)
            {
                if (WaitingActiveThreadWaitHandles == null)
                {
                    WaitingActiveThreadWaitHandles = new WaitHandle[3];
                    WaitingActiveThreadWaitHandles[0] = exitTaskThreads;
                    WaitingActiveThreadWaitHandles[1] = workItemInsertionEvent;
                    WaitingActiveThreadWaitHandles[2] = localDoneNoticeEvent;
                }
                waitHandles = WaitingActiveThreadWaitHandles;
            }
            else if (executionMode == NodeLoopExecutionMode.WaitingPassiveThread)
            {
                if (WaitingPassiveThreadWaitHandles == null)
                {
                    WaitingPassiveThreadWaitHandles = new WaitHandle[2];
                    WaitingPassiveThreadWaitHandles[0] = exitTaskThreads;
                    WaitingPassiveThreadWaitHandles[1] = localDoneNoticeEvent;
                }
                waitHandles = WaitingPassiveThreadWaitHandles;
            }
            else
            {
                ErrorUtilities.VerifyThrow(false, "Unexpected NodeLoopExecutionMode");
            }
 
            return waitHandles;
        }
 
        /// <summary>
        /// This is the loop for all active threads. Depending on the current execution mode the thread
        /// will listen to different events. There is only one thread at the time that owns the workitem
        /// queue and listens for tasks to be executed. There is also only one thread at a time that is
        /// execution a task. That thread owns the current directory and the environment block.
        /// </summary>
        private void NodeActionLoop
        (
            NodeLoopExecutionMode executionMode,
            int handleId,
            BuildResult[] buildResults
        )
        {
            // Create an array of event to the node thread responds
            WaitHandle[] waitHandles = GetHandlesArray(executionMode);
 
            int resultCount = 0;
            long entryTime = 0;
 
            // A thread that is waiting for a done notification is no longer
            // actively executing a task so the active cound needs to be decreased
            if (executionMode != NodeLoopExecutionMode.BaseActiveThread)
            {
                parentModule.DecrementActiveThreadCount();
                // If requested measure the time spent waiting for the results
                if (profileExecution)
                {
                    entryTime = DateTime.Now.Ticks;
                }
            }
 
            bool continueExecution = true;
            while (continueExecution)
            {
                int eventType;
 
                // Try and avoid the wait on kernel objects if possible.
                if (!WaitAnyFast(executionMode, out eventType))
                {
                    eventType = WaitHandle.WaitAny(waitHandles);
                }
 
                if (Engine.debugMode)
                {
                    Console.WriteLine("TaskWorkerThread: HandleId " + handleId + " waking up due to event type " + eventType);
                }
 
                // Node exit event - all threads need to exit
                if (eventType == 0)
                {
                    continueExecution = false;
                }
                // New work item has appeared in the queue
                else if (eventType == 1 && executionMode != NodeLoopExecutionMode.WaitingPassiveThread)
                {
                    ErrorUtilities.VerifyThrow(
                                    executionMode == NodeLoopExecutionMode.WaitingActiveThread ||
                                    executionMode == NodeLoopExecutionMode.BaseActiveThread,
                                    "Only active threads should receive work item events");
 
                    if (executionMode == NodeLoopExecutionMode.BaseActiveThread)
                    {
                        // Wait until all there are no other active threads, we
                        // always transition from 0 to 1 atomically before executing the task
                        parentModule.WaitForZeroActiveThreadCount();
 
                        TaskExecutionState taskToExecute = null;
                        lock (workItemQueue)
                        {
                            taskToExecute = workItemQueue.Dequeue();
                            // We may get a single event for multiple messages so only reset the event
                            // if the queue is empty
                            if (workItemQueue.Count == 0)
                            {
                                workItemInsertionEvent.Reset();
                            }
                        }
 
                        // Execute the task either directly or on a child thread
                        ErrorUtilities.VerifyThrow(taskToExecute != null, "Expected a workitem");
 
                        handleIdToWorkerThread[taskToExecute.HandleId] = this;
                        currentWorkitem = taskToExecute;
 
                        // Actually execute the task (never throws - all exceptions are captured)
                        taskToExecute.ExecuteTask();
 
                        currentWorkitem = null;
                        handleIdToWorkerThread.Remove(taskToExecute.HandleId);
 
                        // Indicate that this thread is no longer active
                        parentModule.DecrementActiveThreadCount();
                    }
                    else
                    {
                        // Change the thread execution mode since it will no longer be
                        // listening to the work item queue
                        executionMode = NodeLoopExecutionMode.WaitingPassiveThread;
                        threadActive = false;
                        waitHandles = GetHandlesArray(executionMode);
 
                        TaskWorkerThread workerThread = null;
                        lock (workerThreadQueue)
                        {
                            if (workerThreadQueue.Count != 0)
                            {
                                //Console.WriteLine("REUSING a thread");
                                workerThread = workerThreadQueue.Dequeue();
                            }
                        }
                        if (workerThread == null)
                        {
                            //Console.WriteLine("CREATING a thread");
                            workerThread = new TaskWorkerThread(parentModule, exitTaskThreads, exitTaskThreadsCache, workerThreadQueue, handleIdToWorkerThread,
                                                                workItemQueue, workItemInsertionEvent, waitingTasks, profileExecution);
                        }
 
                        workerThread.ActivateThread();
                    }
                }
                else if ((eventType == 1 && executionMode == NodeLoopExecutionMode.WaitingPassiveThread) ||
                         (eventType == 2 && executionMode == NodeLoopExecutionMode.WaitingActiveThread))
                {
                    // There maybe multiple results in the list so we need to loop over it
                    // and store the results
                    int originalResultCount = resultCount;
                    lock (postedBuildResults)
                    {
                        LinkedListNode<BuildResult> currentNode = postedBuildResults.First;
                        while (currentNode != null)
                        {
                            BuildResult buildResult = currentNode.Value;
                            ErrorUtilities.VerifyThrow(
                                            buildResult.RequestId < buildResults.Length,
                                            "The request ids should be inside the array");
                            buildResults[buildResult.RequestId] = buildResult;
                            // Increment the result count to indicate that we got another result
                            resultCount++;
                            // Go to the next entry in the list (most of the time there will be just one entry)
                            currentNode = currentNode.Next;
                        }
                        postedBuildResults.Clear();
                        // Reset the handle now that we done with the events
                        int handleIndex = executionMode == NodeLoopExecutionMode.WaitingPassiveThread ? 1 : 2;
                        ((ManualResetEvent)waitHandles[handleIndex]).Reset();
                    }
                    ErrorUtilities.VerifyThrow(originalResultCount < resultCount, "We should have found at least 1 result");
                    // If we received results for all the requests we need to exit
                    if (resultCount == buildResults.Length)
                    {
                        continueExecution = false;
                    }
                }
                // Check if we need to update the state
                if (executionMode == NodeLoopExecutionMode.BaseActiveThread && !threadActive)
                {
                    continueExecution = false;
                }
            }
 
            ErrorUtilities.VerifyThrow
                (resultCount == 0 || executionMode != NodeLoopExecutionMode.BaseActiveThread,
                 "The base thread should never return a value");
 
            // If a thread exits this loop it is back to actively executing the task,
            // so the active thread count has to be increased
            if (executionMode != NodeLoopExecutionMode.BaseActiveThread)
            {
                parentModule.WaitForZeroActiveThreadCount();
                // Sent the time spent waiting for results to the ExecutionState so that the task execution time can be measured correctly
                if (profileExecution)
                {
                    this.currentWorkitem.NotifyOfWait(entryTime);
                }
            }
        }
 
        private TaskWorkerThread GetWorkerThreadForHandleId(int handleId)
        {
            return (TaskWorkerThread)handleIdToWorkerThread[handleId];
        }
 
        /// <summary>
        /// This method is called to cause a thread to become active and take ownership of the workitem
        /// queue
        /// </summary>
        internal void ActivateThread()
        {
            threadActive = true;
            activationEvent.Set();
        }
 
        /// <summary>
        /// This function is called when the task executes a callback via IBuildEngine interface. A thread
        /// that currently owns the workitem queue will continue to own it, unless a work item comes in while
        /// it is inside the callback. A thread that enters the callback no longer owns the current directory and
        /// environment block, but it will always regain them before returning to the task.
        /// </summary>
        internal void WaitForResults
        (
            int handleId,
            BuildResult[] buildResults,
            BuildRequest[] buildRequests
        )
        {
            TaskWorkerThread workerThread = GetWorkerThreadForHandleId(handleId);
            ErrorUtilities.VerifyThrow(workerThread != null, "Worker thread should be in the table");
            WaitingTaskData taskData = new WaitingTaskData(buildRequests, buildResults);
            lock (waitingTasks)
            {
                waitingTasks.Add(handleId, taskData);
            }
            workerThread.NodeActionLoop(workerThread.threadActive ? NodeLoopExecutionMode.WaitingActiveThread :
                                        NodeLoopExecutionMode.WaitingPassiveThread,
                                        handleId, buildResults);
            lock (waitingTasks)
            {
                waitingTasks.Remove(handleId);
            }
        }
 
 
        internal int[] GetWaitingTasksData(List<BuildRequest[]> outstandingRequests)
        {
            int[] waitingTasksArray;
            lock (waitingTasks)
            {
                waitingTasksArray = new int[waitingTasks.Keys.Count];
                int i = 0;
                foreach (DictionaryEntry entry in waitingTasks)
                {
                    // Store the node proxy
                    waitingTasksArray[i] = (int)entry.Key;
                    // Loop through the build requests and add uncomplete requests to the list
                    WaitingTaskData taskData = (WaitingTaskData)entry.Value;
                    List<BuildRequest> requests = new List<BuildRequest>();
                    for (int j = 0; j < taskData.buildRequests.Length; j++)
                    {
                        if (taskData.buildResults[j] == null)
                        {
                            requests.Add(taskData.buildRequests[j]);
                        }
                    }
                    outstandingRequests.Add(requests.ToArray());
                    // Move to the next output entry
                    i++;
                }
            }
            return waitingTasksArray;
        }
 
        internal void PostWorkItem(TaskExecutionState workItem)
        {
            lock (workItemQueue)
            {
                workItemQueue.Enqueue(workItem);
                workItemInsertionEvent.Set();
            }
        }
 
        internal void PostBuildResult(BuildResult buildResult)
        {
            TaskWorkerThread workerThread = GetWorkerThreadForHandleId(buildResult.HandleId);
 
            if (workerThread != null)
            {
                lock (workerThread.postedBuildResults)
                {
                    workerThread.postedBuildResults.AddLast(new LinkedListNode<BuildResult>(buildResult));
                    workerThread.LocalDoneNoticeEvent.Set();
                }
            }
        }
 
        internal void Shutdown()
        {
            exitTaskThreads.Set();
            exitTaskThreadsCache.exitTaskThreads = true;
        }
 
        #endregion
 
        #region Data
        // Per instance data
        private ManualResetEvent activationEvent;
        private ManualResetEvent localDoneNoticeEvent;
        private bool threadActive;
        private LinkedList<BuildResult> postedBuildResults;
        private TaskExecutionState currentWorkitem;
        private bool profileExecution;
 
        // Private cache arrays of handles
        private WaitHandle[] BaseActiveThreadWaitHandles;
        private WaitHandle[] WaitingActiveThreadWaitHandles;
        private WaitHandle[] WaitingPassiveThreadWaitHandles;
 
        // Data shared between worked threads for one TEM, this data is initialized by the first
        // thread
        private ManualResetEvent exitTaskThreads;          // Used to signal all threads to exit
        private ExitTaskCache exitTaskThreadsCache;        // cached value to avoid waiting on the kernel event
        private Queue<TaskWorkerThread> workerThreadQueue; // Queue of idle worker thread ready to be activated
        private Hashtable handleIdToWorkerThread;           // Table mapping in progress Ids to worker threads
        private Queue<TaskExecutionState> workItemQueue;   // Queue of workitems that need to be executed
        private ManualResetEvent workItemInsertionEvent;   // Used to signal a new work item
        private Hashtable waitingTasks;                    // Hastable containing information about in progress
                                                           // task, used for determining if all threads are blocked
        private TaskExecutionModule parentModule;          // A pointer to the parent TEM
 
        #endregion
 
        #region Private struct
        private class WaitingTaskData
        {
            internal WaitingTaskData(BuildRequest[] buildRequests, BuildResult[] buildResults)
            {
                this.buildRequests = buildRequests;
                this.buildResults = buildResults;
            }
 
            internal BuildRequest[] buildRequests;
            internal BuildResult[] buildResults;
        }
 
        private class ExitTaskCache
        {
            internal ExitTaskCache(bool value)
            {
                this.exitTaskThreads = value;
            }
 
            internal bool exitTaskThreads;
        }
        #endregion
    }
}