File: Utilities\ThreadUtils.cs
Web Access
Project: src\src\Microsoft.ML.Core\Microsoft.ML.Core.csproj (Microsoft.ML.Core)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ML.Runtime;
 
namespace Microsoft.ML.Internal.Utilities
{
    internal static partial class Utils
    {
        public static Task RunOnBackgroundThreadAsync(Action start) =>
            ImmediateBackgroundThreadPool.QueueAsync(start);
 
        public static Task RunOnBackgroundThreadAsync(Action<object> start, object obj) =>
            ImmediateBackgroundThreadPool.QueueAsync(start, obj);
 
        public static Thread RunOnForegroundThread(ParameterizedThreadStart start) =>
            new Thread(start) { IsBackground = false };
 
        /// <summary>
        /// Naive thread pool focused on reducing the latency to execution of chunky work items as much as possible.
        /// If a thread is ready to process a work item the moment a work item is queued, it's used, otherwise
        /// a new thread is created. This is meant as a stop-gap measure for workloads that would otherwise be
        /// creating a new thread for every work item.
        /// </summary>
        private static class ImmediateBackgroundThreadPool
        {
            /// <summary>How long should threads wait around for additional work items before retiring themselves.</summary>
            private const int IdleMilliseconds = 1_000;
            /// <summary>The queue of work items. Also used as a lock to protect all relevant state.</summary>
            private static readonly Queue<(Delegate, object, TaskCompletionSource<bool>)> _queue = new Queue<(Delegate, object, TaskCompletionSource<bool>)>();
            /// <summary>The number of threads currently waiting for work to arrive.</summary>
            private static int _availableThreads = 0;
 
            /// <summary>
            /// Queues an <see cref="Action"/> delegate to be executed immediately on another thread,
            /// and returns a <see cref="Task"/> that represents its eventual completion. The task will
            /// always end in the <see cref="TaskStatus.RanToCompletion"/> state; if the delegate throws
            /// an exception, it'll be allowed to propagate on the thread, crashing the process.
            /// </summary>
            public static Task QueueAsync(Action threadStart) => QueueAsync((Delegate)threadStart, null);
 
            /// <summary>
            /// Queues an <see cref="Action{Object}"/> delegate and associated state to be executed immediately on another thread,
            /// and returns a <see cref="Task"/> that represents its eventual completion. The task will
            /// always end in the <see cref="TaskStatus.RanToCompletion"/> state; if the delegate throws
            /// an exception, it'll be allowed to propagate on the thread, crashing the process.
            /// </summary>
            public static Task QueueAsync(Action<object> threadStart, object state) => QueueAsync((Delegate)threadStart, state);
 
            private static Task QueueAsync(Delegate threadStart, object state)
            {
                // Create the TaskCompletionSource used to represent this work.
                // Call sites only care about completion, not about the distinction between
                // success and failure and do not expect exceptions to be propagated in this manner,
                // so only SetResult is used.
                var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
 
                // Queue the work for a thread to pick up. If no thread is immediately available, it will create one.
                Enqueue((threadStart, state, tcs));
 
                // Return the task.
                return tcs.Task;
 
                static void CreateThread()
                {
                    // Create a new background thread to run the work.
                    var t = new Thread(() =>
                    {
                        // Repeatedly get the next item and invoke it, setting its TCS when we're done.
                        // This will wait for up to the idle time before giving up and exiting.
                        while (TryDequeue(out (Delegate action, object state, TaskCompletionSource<bool> tcs) item))
                        {
                            try
                            {
                                if (item.action is Action<object> pts)
                                {
                                    pts(item.state);
                                }
                                else
                                {
                                    ((Action)item.action)();
                                }
                            }
                            finally
                            {
                                item.tcs.SetResult(true);
                            }
                        }
                    });
                    t.IsBackground = true;
                    t.Start();
                }
 
                static void Enqueue((Delegate, object, TaskCompletionSource<bool>) item)
                {
                    // Enqueue the work. If there are currently fewer threads waiting
                    // for work than there are work items in the queue, create another
                    // thread. This is a heuristic, in that we might end up creating
                    // more threads than are truly needed, but this whole type is being
                    // used to replace a previous solution where every work item created
                    // its own thread, so this is an improvement regardless of any
                    // such inefficiencies.
                    lock (_queue)
                    {
                        _queue.Enqueue(item);
 
                        if (_queue.Count <= _availableThreads)
                        {
                            Monitor.Pulse(_queue);
                            return;
                        }
                    }
 
                    // No thread was currently available.  Create one.
                    CreateThread();
                }
 
                static bool TryDequeue(out (Delegate action, object state, TaskCompletionSource<bool> tcs) item)
                {
                    // Dequeues the next item if one is available. Before checking,
                    // the available thread count is increased, so that enqueuers can
                    // see how many threads are currently waiting, with the count
                    // decreased after. Each time it waits, it'll wait for at most
                    // the idle timeout before giving up.
                    lock (_queue)
                    {
                        _availableThreads++;
                        try
                        {
                            while (_queue.Count == 0)
                            {
                                if (!Monitor.Wait(_queue, IdleMilliseconds))
                                {
                                    if (_queue.Count > 0)
                                    {
                                        break;
                                    }
 
                                    item = default;
                                    return false;
                                }
                            }
                        }
                        finally
                        {
                            _availableThreads--;
                        }
 
                        item = _queue.Dequeue();
                        return true;
                    }
                }
            }
        }
    }
 
    /// <summary>
    /// An object that serves as a source of a cancellation token, as well as having the ability
    /// for anything to push an exception into the message, to trigger the cancellation. The general
    /// intended usage is that, prior to creating a bunch of collaborating thread workers, this
    /// object is created and made accessible to them, somehow. Each thread worker will work as they
    /// would be engineered without this class, except they will wrap their contents in a try-catch
    /// block to push any exceptions (hopefully none) into this marshaller, using <see cref="Set"/>.
    /// Further, any potentially blocking operation of the thread workers must be changed to use
    /// <see cref="Token"/> as the cancellation token (this token is canceled iff <see cref="Set"/>
    /// is ever called). The controlling thread, whatever that may be, once it is either sure
    /// <see cref="Set"/> has been called (possibly by receiving the cancellation) or is sure somehow
    /// that the workers have finished by its own means, will call <see cref="ThrowIfSet"/> to throw
    /// the set exception as an inner exception, in the wrapping thread.
    /// </summary>
    [BestFriend]
    internal sealed class ExceptionMarshaller : IDisposable
    {
        private readonly CancellationTokenSource _ctSource;
        private readonly object _lock;
 
        // The stored exception
        private string _component;
        private Exception _ex;
 
        /// <summary>
        /// A cancellation token, whose source will be canceled if <see cref="Set"/> is ever called.
        /// Any thread blocking operation of a family of thread workers using this structure
        /// must use this cancellation token, or else there is a strong possibility for threads
        /// to stop responding if an exception is thrown at any point.
        /// </summary>
        public CancellationToken Token => _ctSource.Token;
 
        public ExceptionMarshaller()
        {
            _ctSource = new CancellationTokenSource();
            _lock = new object();
        }
 
        public void Dispose()
        {
            // We don't just put the ThrowIfSet here since we shouldn't throw in dispose.
            _ctSource.Dispose();
        }
 
        /// <summary>
        /// Store an exception and set the cancellation token. If this was already
        /// called, this newly passed in exception is ignored. (Important, since a
        /// common source of exceptions would be the exceptions relating to the cancellation.)
        /// </summary>
        /// <param name="component">The type of worker that threw the exception, used
        /// in the description of the wrapping exception</param>
        /// <param name="ex">The exception that will become the inner exception</param>
        public void Set(string component, Exception ex)
        {
            Contracts.AssertNonEmpty(component);
            Contracts.AssertValue(ex);
 
            lock (_lock)
            {
                if (_ex == null)
                {
                    _component = component;
                    _ex = ex;
                }
                _ctSource.Cancel();
            }
        }
 
        /// <summary>
        /// If an exception was ever set through <see cref="Set"/>, raise it as an appropriate
        /// inner exception. This should only be called just prior to dispose, when the workers
        /// have already finished. If there is no stored exception, this will do nothing. Note
        /// that this does not "expire" the exception, that is, if you were to call this again,
        /// it would throw the same exception.
        /// </summary>
        public void ThrowIfSet(IExceptionContext ectx)
        {
            if (_ex != null)
                throw ectx.Except(_ex, "Exception thrown in {0}", _component);
        }
    }
}