File: Utility\DedicatedAsynchronousProcessingThread.cs
Web Access
Project: src\src\nuget-client\src\NuGet.Core\NuGet.Protocol\NuGet.Protocol.csproj (NuGet.Protocol)
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace NuGet.Protocol
{
    /// <summary>
    /// This class represents a dedicated asynchronous task processing thread.
    /// Uses a queue to execute all the tasks added through the invocation of <see cref="Enqueue(Func{Task})"/>.
    /// The tasks queued here cannot be awaited (think Task.Run, rather than Task.WhenAny/WhenAll).
    /// This implementation is internal on purpose as this is specifically tailed to the plugin V2 use-case.
    /// </summary>
    internal sealed class DedicatedAsynchronousProcessingThread : IDisposable
    {
        private Task? _processingThread;
        private bool _isDisposed;
        private bool _isClosed;
        private TimeSpan _pollingDelay;
        private readonly ConcurrentQueue<Func<Task>> _taskQueue = new ConcurrentQueue<Func<Task>>();

        /// <summary>
        /// DedicatedAsync processing thread.
        /// </summary>
        /// <param name="pollingDelay">The await delay when there are no tasks in the queue.</param>
        public DedicatedAsynchronousProcessingThread(TimeSpan pollingDelay)
        {
            _pollingDelay = pollingDelay;
        }

        internal void Start()
        {
            ThrowIfDisposed();
            ThrowIfAlreadyStarted();

            _processingThread = Task.Factory.StartNew(
                ProcessAsync,
                CancellationToken.None,
                TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach,
                TaskScheduler.Default);
        }

        /// <summary>
        /// Queueus a task for execution.
        /// </summary>
        /// <param name="task">Task to be executed.</param>
        internal void Enqueue(Func<Task> task)
        {
            ThrowIfDisposed();
            ThrowIfNotAlreadyStarted();
            _taskQueue.Enqueue(task);
        }

        private async Task ProcessAsync()
        {
            while (!_isClosed)
            {
                try
                {
                    if (_taskQueue.TryDequeue(out var result))
                    {
                        await result();
                    }
                    else
                    {
                        await Task.Delay(_pollingDelay.Milliseconds);
                    }
                }
                catch
                {
                }
            }
        }

        public void Dispose()
        {
            if (_isDisposed)
            {
                return;
            }

            _isClosed = true;

            GC.SuppressFinalize(this);

            _isDisposed = true;
        }

        private void ThrowIfAlreadyStarted()
        {
            if (_processingThread != null)
            {
                throw new InvalidOperationException("The processing thread is already started.");
            }
        }

        private void ThrowIfNotAlreadyStarted()
        {
            if (_processingThread == null)
            {
                throw new InvalidOperationException("The processing thread is not started yet.");
            }
        }
        private void ThrowIfDisposed()
        {
            if (_isDisposed)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
        }
    }
}