File: src\libraries\System.Private.CoreLib\src\System\Threading\PortableThreadPool.WaitThread.cs
Web Access
Project: src\src\coreclr\System.Private.CoreLib\System.Private.CoreLib.csproj (System.Private.CoreLib)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.Diagnostics.Tracing;
using Microsoft.Win32.SafeHandles;
 
namespace System.Threading
{
    /// <summary>
    /// The info for a completed wait on a specific <see cref="RegisteredWaitHandle"/>.
    /// </summary>
    internal sealed partial class CompleteWaitThreadPoolWorkItem : IThreadPoolWorkItem
    {
        private readonly RegisteredWaitHandle _registeredWaitHandle;
        private readonly bool _timedOut;
 
        public CompleteWaitThreadPoolWorkItem(RegisteredWaitHandle registeredWaitHandle, bool timedOut)
        {
            _registeredWaitHandle = registeredWaitHandle;
            _timedOut = timedOut;
        }
    }
 
    internal sealed partial class PortableThreadPool
    {
        /// <summary>
        /// A linked list of <see cref="WaitThread"/>s.
        /// </summary>
        private WaitThreadNode? _waitThreadsHead;
 
        private readonly LowLevelLock _waitThreadLock = new LowLevelLock();
 
        /// <summary>
        /// Register a wait handle on a <see cref="WaitThread"/>.
        /// </summary>
        /// <param name="handle">A description of the requested registration.</param>
        internal void RegisterWaitHandle(RegisteredWaitHandle handle)
        {
            if (NativeRuntimeEventSource.Log.IsEnabled())
            {
                NativeRuntimeEventSource.Log.ThreadPoolIOEnqueue(handle);
            }
 
            _waitThreadLock.Acquire();
            try
            {
                WaitThreadNode? current = _waitThreadsHead ??= new WaitThreadNode(new WaitThread()); // Lazily create the first wait thread.
 
                // Register the wait handle on the first wait thread that is not at capacity.
                WaitThreadNode prev;
                do
                {
                    if (current.Thread.RegisterWaitHandle(handle))
                    {
                        return;
                    }
                    prev = current;
                    current = current.Next;
                } while (current != null);
 
                // If all wait threads are full, create a new one.
                prev.Next = new WaitThreadNode(new WaitThread());
                prev.Next.Thread.RegisterWaitHandle(handle);
                return;
            }
            finally
            {
                _waitThreadLock.Release();
            }
        }
 
        internal static void CompleteWait(RegisteredWaitHandle handle, bool timedOut)
        {
            if (NativeRuntimeEventSource.Log.IsEnabled())
            {
                NativeRuntimeEventSource.Log.ThreadPoolIODequeue(handle);
            }
 
            handle.PerformCallback(timedOut);
        }
 
        /// <summary>
        /// Attempt to remove the given wait thread from the list. It is only removed if there are no user-provided waits on the thread.
        /// </summary>
        /// <param name="thread">The thread to remove.</param>
        /// <returns><c>true</c> if the thread was successfully removed; otherwise, <c>false</c></returns>
        private bool TryRemoveWaitThread(WaitThread thread)
        {
            _waitThreadLock.Acquire();
            try
            {
                if (thread.AnyUserWaits)
                {
                    return false;
                }
                RemoveWaitThread(thread);
            }
            finally
            {
                _waitThreadLock.Release();
            }
            return true;
        }
 
        /// <summary>
        /// Removes the wait thread from the list.
        /// </summary>
        /// <param name="thread">The wait thread to remove from the list.</param>
        private void RemoveWaitThread(WaitThread thread)
        {
            WaitThreadNode? current = _waitThreadsHead!;
            if (current.Thread == thread)
            {
                _waitThreadsHead = current.Next;
                return;
            }
 
            WaitThreadNode prev;
 
            do
            {
                prev = current;
                current = current.Next;
            } while (current != null && current.Thread != thread);
 
            Debug.Assert(current != null, "The wait thread to remove was not found in the list of thread pool wait threads.");
 
            if (current != null)
            {
                prev.Next = current.Next;
            }
        }
 
        private sealed class WaitThreadNode
        {
            public WaitThread Thread { get; }
            public WaitThreadNode? Next { get; set; }
 
            public WaitThreadNode(WaitThread thread) => Thread = thread;
        }
 
        /// <summary>
        /// A thread pool wait thread.
        /// </summary>
        internal sealed class WaitThread
        {
            /// <summary>
            /// The wait handles registered on this wait thread.
            /// </summary>
            private readonly RegisteredWaitHandle[] _registeredWaits = new RegisteredWaitHandle[WaitHandle.MaxWaitHandles - 1];
            /// <summary>
            /// The raw wait handles to wait on.
            /// </summary>
            /// <remarks>
            /// The zeroth element of this array is always <see cref="_changeHandlesEvent"/>.
            /// </remarks>
            private readonly SafeWaitHandle[] _waitHandles = new SafeWaitHandle[WaitHandle.MaxWaitHandles];
            /// <summary>
            /// The number of user-registered waits on this wait thread.
            /// </summary>
            private int _numUserWaits;
 
            /// <summary>
            /// A list of removals of wait handles that are waiting for the wait thread to process.
            /// </summary>
            private readonly RegisteredWaitHandle?[] _pendingRemoves = new RegisteredWaitHandle[WaitHandle.MaxWaitHandles - 1];
            /// <summary>
            /// The number of pending removals.
            /// </summary>
            private int _numPendingRemoves;
 
            /// <summary>
            /// An event to notify the wait thread that there are pending adds or removals of wait handles so it needs to wake up.
            /// </summary>
            private readonly AutoResetEvent _changeHandlesEvent = new AutoResetEvent(false);
 
            internal bool AnyUserWaits => _numUserWaits != 0;
 
            public WaitThread()
            {
                _waitHandles[0] = _changeHandlesEvent.SafeWaitHandle;
 
                // Thread pool threads must start in the default execution context without transferring the context, so
                // using UnsafeStart() instead of Start()
                Thread waitThread = new Thread(WaitThreadStart, SmallStackSizeBytes)
                {
                    IsThreadPoolThread = true,
                    IsBackground = true,
                    Name = ".NET TP Wait"
                };
                waitThread.UnsafeStart();
            }
 
            /// <summary>
            /// The main routine for the wait thread.
            /// </summary>
            private void WaitThreadStart()
            {
                while (true)
                {
                    // This value is taken inside the lock after processing removals. In this iteration these are the number of
                    // user waits that will be waited upon. Any new waits will wake the wait and the next iteration would
                    // consider them.
                    int numUserWaits = ProcessRemovals();
 
                    int currentTimeMs = Environment.TickCount;
 
                    // Recalculate Timeout
                    int timeoutDurationMs = Timeout.Infinite;
                    if (numUserWaits == 0)
                    {
                        timeoutDurationMs = ThreadPoolThreadTimeoutMs;
                    }
                    else
                    {
                        for (int i = 0; i < numUserWaits; i++)
                        {
                            RegisteredWaitHandle registeredWait = _registeredWaits[i];
                            Debug.Assert(registeredWait != null);
                            if (registeredWait.IsInfiniteTimeout)
                            {
                                continue;
                            }
 
                            int handleTimeoutDurationMs = Math.Max(0, registeredWait.TimeoutTimeMs - currentTimeMs);
 
                            if (timeoutDurationMs == Timeout.Infinite)
                            {
                                timeoutDurationMs = handleTimeoutDurationMs;
                            }
                            else
                            {
                                timeoutDurationMs = Math.Min(handleTimeoutDurationMs, timeoutDurationMs);
                            }
 
                            if (timeoutDurationMs == 0)
                            {
                                break;
                            }
                        }
                    }
 
                    int signaledHandleIndex = WaitHandle.WaitAny(new ReadOnlySpan<SafeWaitHandle>(_waitHandles, 0, numUserWaits + 1), timeoutDurationMs);
 
                    if (signaledHandleIndex >= WaitHandle.WaitAbandoned &&
                        signaledHandleIndex < WaitHandle.WaitAbandoned + 1 + numUserWaits)
                    {
                        // For compatibility, treat an abandoned mutex wait result as a success and ignore the abandonment
                        Debug.Assert(signaledHandleIndex != WaitHandle.WaitAbandoned); // the first wait handle is an event
                        signaledHandleIndex += WaitHandle.WaitSuccess - WaitHandle.WaitAbandoned;
                    }
 
                    if (signaledHandleIndex == 0) // If we were woken up for a change in our handles, continue.
                    {
                        continue;
                    }
 
                    if (signaledHandleIndex != WaitHandle.WaitTimeout)
                    {
                        RegisteredWaitHandle signaledHandle = _registeredWaits[signaledHandleIndex - 1];
                        Debug.Assert(signaledHandle != null);
                        QueueWaitCompletion(signaledHandle, false);
                        continue;
                    }
 
                    if (numUserWaits == 0 && ThreadPoolInstance.TryRemoveWaitThread(this))
                    {
                        return;
                    }
 
                    currentTimeMs = Environment.TickCount;
                    for (int i = 0; i < numUserWaits; i++)
                    {
                        RegisteredWaitHandle registeredHandle = _registeredWaits[i];
                        Debug.Assert(registeredHandle != null);
                        if (!registeredHandle.IsInfiniteTimeout && currentTimeMs - registeredHandle.TimeoutTimeMs >= 0)
                        {
                            QueueWaitCompletion(registeredHandle, true);
                        }
                    }
                }
            }
 
            /// <summary>
            /// Go through the <see cref="_pendingRemoves"/> array and remove those registered wait handles from the <see cref="_registeredWaits"/>
            /// and <see cref="_waitHandles"/> arrays, filling the holes along the way.
            /// </summary>
            private int ProcessRemovals()
            {
                PortableThreadPool threadPoolInstance = ThreadPoolInstance;
                threadPoolInstance._waitThreadLock.Acquire();
                try
                {
                    Debug.Assert(_numPendingRemoves >= 0);
                    Debug.Assert(_numPendingRemoves <= _pendingRemoves.Length);
                    Debug.Assert(_numUserWaits >= 0);
                    Debug.Assert(_numUserWaits <= _registeredWaits.Length);
                    Debug.Assert(_numPendingRemoves <= _numUserWaits, $"Num removals {_numPendingRemoves} should be less than or equal to num user waits {_numUserWaits}");
 
                    if (_numPendingRemoves == 0 || _numUserWaits == 0)
                    {
                        return _numUserWaits; // return the value taken inside the lock for the caller
                    }
                    int originalNumUserWaits = _numUserWaits;
                    int originalNumPendingRemoves = _numPendingRemoves;
 
                    // This is O(N^2), but max(N) = 63 and N will usually be very low
                    for (int i = 0; i < _numPendingRemoves; i++)
                    {
                        RegisteredWaitHandle waitHandleToRemove = _pendingRemoves[i]!;
                        int numUserWaits = _numUserWaits;
                        int j = 0;
                        for (; j < numUserWaits && waitHandleToRemove != _registeredWaits[j]; j++)
                        {
                        }
                        Debug.Assert(j < numUserWaits);
 
                        waitHandleToRemove.OnRemoveWait();
 
                        if (j + 1 < numUserWaits)
                        {
                            // Not removing the last element. Due to the possibility of there being duplicate system wait
                            // objects in the wait array, perhaps even with different handle values due to the use of
                            // DuplicateHandle(), don't reorder handles for fairness. When there are duplicate system wait
                            // objects in the wait array and the wait object gets signaled, the system may release the wait in
                            // in deterministic order based on the order in the wait array. Instead, shift the array.
 
                            int removeAt = j;
                            int count = numUserWaits;
                            Array.Copy(_registeredWaits, removeAt + 1, _registeredWaits, removeAt, count - (removeAt + 1));
                            _registeredWaits[count - 1] = null!;
 
                            // Corresponding elements in the wait handles array are shifted up by one
                            removeAt++;
                            count++;
                            Array.Copy(_waitHandles, removeAt + 1, _waitHandles, removeAt, count - (removeAt + 1));
                            _waitHandles[count - 1] = null!;
                        }
                        else
                        {
                            // Removing the last element
                            _registeredWaits[j] = null!;
                            _waitHandles[j + 1] = null!;
                        }
 
                        _numUserWaits = numUserWaits - 1;
                        _pendingRemoves[i] = null;
 
                        waitHandleToRemove.Handle.DangerousRelease();
                    }
                    _numPendingRemoves = 0;
 
                    Debug.Assert(originalNumUserWaits - originalNumPendingRemoves == _numUserWaits,
                        $"{originalNumUserWaits} - {originalNumPendingRemoves} == {_numUserWaits}");
                    return _numUserWaits; // return the value taken inside the lock for the caller
                }
                finally
                {
                    threadPoolInstance._waitThreadLock.Release();
                }
            }
 
            /// <summary>
            /// Queue a call to complete the wait on the ThreadPool.
            /// </summary>
            /// <param name="registeredHandle">The handle that completed.</param>
            /// <param name="timedOut">Whether or not the wait timed out.</param>
            private void QueueWaitCompletion(RegisteredWaitHandle registeredHandle, bool timedOut)
            {
                registeredHandle.RequestCallback();
 
                // If the handle is a repeating handle, set up the next call. Otherwise, remove it from the wait thread.
                if (registeredHandle.Repeating)
                {
                    if (!registeredHandle.IsInfiniteTimeout)
                    {
                        registeredHandle.RestartTimeout();
                    }
                }
                else
                {
                    UnregisterWait(registeredHandle, blocking: false); // We shouldn't block the wait thread on the unregistration.
                }
 
                ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(
                    new CompleteWaitThreadPoolWorkItem(registeredHandle, timedOut));
            }
 
            /// <summary>
            /// Register a wait handle on this <see cref="WaitThread"/>.
            /// </summary>
            /// <param name="handle">The handle to register.</param>
            /// <returns>If the handle was successfully registered on this wait thread.</returns>
            public bool RegisterWaitHandle(RegisteredWaitHandle handle)
            {
                ThreadPoolInstance._waitThreadLock.VerifyIsLocked();
                if (_numUserWaits == WaitHandle.MaxWaitHandles - 1)
                {
                    return false;
                }
 
                bool success = false;
                handle.Handle.DangerousAddRef(ref success);
                Debug.Assert(success);
 
                _registeredWaits[_numUserWaits] = handle;
                _waitHandles[_numUserWaits + 1] = handle.Handle;
                _numUserWaits++;
 
                handle.WaitThread = this;
 
                _changeHandlesEvent.Set();
                return true;
            }
 
            /// <summary>
            /// Unregisters a wait handle.
            /// </summary>
            /// <param name="handle">The handle to unregister.</param>
            /// <remarks>
            /// As per CoreCLR's behavior, if the user passes in an invalid <see cref="WaitHandle"/>
            /// into <see cref="RegisteredWaitHandle.Unregister(WaitHandle)"/>, then the unregistration of the wait handle is blocking.
            /// Otherwise, the unregistration of the wait handle is queued on the wait thread.
            /// </remarks>
            public void UnregisterWait(RegisteredWaitHandle handle)
            {
                UnregisterWait(handle, true);
            }
 
            /// <summary>
            /// Unregister a wait handle.
            /// </summary>
            /// <param name="handle">The wait handle to unregister.</param>
            /// <param name="blocking">Should the unregistration block at all.</param>
            private void UnregisterWait(RegisteredWaitHandle handle, bool blocking)
            {
                bool pendingRemoval = false;
                // TODO: Optimization: Try to unregister wait directly if it isn't being waited on.
                PortableThreadPool threadPoolInstance = ThreadPoolInstance;
                threadPoolInstance._waitThreadLock.Acquire();
                try
                {
                    // If this handle is not already pending removal and hasn't already been removed
                    if (Array.IndexOf(_registeredWaits, handle) >= 0)
                    {
                        if (Array.IndexOf(_pendingRemoves, handle) < 0)
                        {
                            _pendingRemoves[_numPendingRemoves++] = handle;
                            _changeHandlesEvent.Set(); // Tell the wait thread that there are changes pending.
                        }
 
                        pendingRemoval = true;
                    }
                }
                finally
                {
                    threadPoolInstance._waitThreadLock.Release();
                }
 
                if (blocking)
                {
                    if (handle.IsBlocking)
                    {
                        handle.WaitForCallbacks();
                    }
                    else if (pendingRemoval)
                    {
                        handle.WaitForRemoval();
                    }
                }
            }
        }
    }
}