File: Client\Parallel\ParallelProxyDiscoveryManager.cs
Web Access
Project: src\src\vstest\src\Microsoft.TestPlatform.CrossPlatEngine\Microsoft.TestPlatform.CrossPlatEngine.csproj (Microsoft.TestPlatform.CrossPlatEngine)
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Engine;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Logging;

namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel;

/// <summary>
/// ParallelProxyDiscoveryManager that manages parallel discovery
/// </summary>
internal sealed class ParallelProxyDiscoveryManager : IParallelProxyDiscoveryManager, IDisposable
{
    private readonly IDataSerializer _dataSerializer;
    private readonly DiscoveryDataAggregator _dataAggregator;
    private readonly bool _isParallel;
    private readonly ParallelOperationManager<IProxyDiscoveryManager, ITestDiscoveryEventsHandler2, DiscoveryCriteria> _parallelOperationManager;
    private readonly Dictionary<string, TestRuntimeProviderInfo> _sourceToTestHostProviderMap;
    private readonly IRequestData _requestData;

    private int _discoveryCompletedClients;
    private int _availableTestSources;
    private int _availableWorkloads;
    private bool _skipDefaultAdapters;
    private bool _isDisposed;

    public bool IsAbortRequested { get; private set; }

    /// <summary>
    /// LockObject to update discovery status in parallel
    /// </summary>
    private readonly object _discoveryStatusLockObject = new();

    public ParallelProxyDiscoveryManager(
        IRequestData requestData,
        Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> actualProxyManagerCreator,
        DiscoveryDataAggregator dataAggregator,
        int parallelLevel,
        List<TestRuntimeProviderInfo> testHostProviders)
        : this(requestData, actualProxyManagerCreator, dataAggregator, JsonDataSerializer.Instance, parallelLevel, testHostProviders)
    {
    }

    internal ParallelProxyDiscoveryManager(
        IRequestData requestData,
        Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> actualProxyManagerCreator,
        DiscoveryDataAggregator dataAggregator,
        IDataSerializer dataSerializer,
        int parallelLevel,
        List<TestRuntimeProviderInfo> testHostProviders)
    {
        _requestData = requestData;
        _dataSerializer = dataSerializer;
        _dataAggregator = dataAggregator;
        _isParallel = parallelLevel > 1;
        _parallelOperationManager = new(actualProxyManagerCreator, parallelLevel);
        _sourceToTestHostProviderMap = testHostProviders
            .SelectMany(provider => provider.SourceDetails.Select(s => new KeyValuePair<string, TestRuntimeProviderInfo>(s.Source!, provider)))
            .ToDictionary(pair => pair.Key, pair => pair.Value);
    }

    #region IProxyDiscoveryManager

    /// <inheritdoc/>
    public void Initialize(bool skipDefaultAdapters)
    {
        _skipDefaultAdapters = skipDefaultAdapters;
    }

    /// <inheritdoc/>
    public void DiscoverTests(DiscoveryCriteria discoveryCriteria, ITestDiscoveryEventsHandler2 eventHandler)
    {
        ValidateArg.NotNull(discoveryCriteria, nameof(discoveryCriteria));
        ValidateArg.NotNull(eventHandler, nameof(eventHandler));

        var workloads = SplitToWorkloads(discoveryCriteria, _sourceToTestHostProviderMap);
        _availableTestSources = workloads.SelectMany(w => w.Work.Sources).Count();
        var runnableWorkloads = workloads.Where(workload => workload.HasProvider).ToList();
        _availableWorkloads = runnableWorkloads.Count;
        var nonRunnableWorkloads = workloads.Where(workload => !workload.HasProvider).ToList();

        EqtTrace.Verbose("ParallelProxyDiscoveryManager.DiscoverTests: Start discovery. Total sources: " + _availableTestSources);

        // Mark all sources as NotDiscovered here because if we get an early cancellation it's
        // possible that we didn't yet start all the proxy managers and so we didn't mark all sources
        // as NotDiscovered.
        // For example, let's assume we have 10 sources, a batch size of 10 but only 8 cores, we
        // will then spawn 8 instances of this and if we now cancel, we will have 2 sources not
        // marked as NotDiscovered.
        _dataAggregator.MarkSourcesWithStatus(discoveryCriteria.Sources, DiscoveryStatus.NotDiscovered);

        if (nonRunnableWorkloads.Count > 0)
        {
            // We found some sources that don't associate to any runtime provider and so they cannot run.
            // Mark the sources as skipped.

            _dataAggregator.MarkSourcesWithStatus(nonRunnableWorkloads.SelectMany(w => w.Work.Sources), DiscoveryStatus.SkippedDiscovery);
            // TODO: in strict mode keep them as non-discovered, and mark the run as aborted.
            // _dataAggregator.MarkAsAborted();
        }

        _parallelOperationManager.StartWork(runnableWorkloads, eventHandler, GetParallelEventHandler, InitializeDiscoverTestsOnConcurrentManager, DiscoverTestsOnConcurrentManager);
    }

    private ITestDiscoveryEventsHandler2 GetParallelEventHandler(ITestDiscoveryEventsHandler2 eventHandler, IProxyDiscoveryManager concurrentManager)
        => new ParallelDiscoveryEventsHandler(
            _requestData,
            concurrentManager,
            eventHandler,
            this,
            _dataAggregator);

    /// <inheritdoc/>
    public void Abort()
    {
        IsAbortRequested = true;
        _parallelOperationManager.DoActionOnAllManagers((proxyManager) => proxyManager.Abort(), doActionsInParallel: true);
    }

    /// <inheritdoc/>
    public void Abort(ITestDiscoveryEventsHandler2 eventHandler)
    {
        IsAbortRequested = true;
        _parallelOperationManager.DoActionOnAllManagers((proxyManager) => proxyManager.Abort(eventHandler), doActionsInParallel: true);
    }

    /// <inheritdoc/>
    public void Close()
    {
        _parallelOperationManager.DoActionOnAllManagers(proxyManager => proxyManager.Close(), doActionsInParallel: true);
    }

    #endregion

    #region IParallelProxyDiscoveryManager methods

    /// <inheritdoc/>
    public bool HandlePartialDiscoveryComplete(IProxyDiscoveryManager proxyDiscoveryManager, long totalTests, IEnumerable<TestCase>? lastChunk, bool isAborted)
    {
#if DEBUG
        // Ensures that the total count of sources remains the same between each discovery
        // completion of the same initial discovery request.
        var notDiscoveredCount = _dataAggregator.GetSourcesWithStatus(DiscoveryStatus.NotDiscovered).Count;
        var partiallyDiscoveredCount = _dataAggregator.GetSourcesWithStatus(DiscoveryStatus.PartiallyDiscovered).Count;
        var fullyDiscoveredCount = _dataAggregator.GetSourcesWithStatus(DiscoveryStatus.FullyDiscovered).Count;
        var skippedCount = _dataAggregator.GetSourcesWithStatus(DiscoveryStatus.SkippedDiscovery).Count;
        var expectedCount = _availableTestSources;
        // When this fails, look at the _dataAggregator and look at the sources that it holds. It is possible that adapter incorrectly reports
        // the source on the testcase object. Each distinct source that will appear on TestCase will be considered a file.
        TPDebug.Assert(notDiscoveredCount + partiallyDiscoveredCount + fullyDiscoveredCount + skippedCount == expectedCount,
            $"Total count of sources ({expectedCount}) should match the count of sources with status not discovered ({notDiscoveredCount}), partially discovered ({partiallyDiscoveredCount}), fully discovered ({fullyDiscoveredCount}) and skipped ({skippedCount}).");
#endif

        var allDiscoverersCompleted = false;
        // TODO: Interlocked.Increment the count, and the condition below probably does not need to be in a lock?
        lock (_discoveryStatusLockObject)
        {
            // Each concurrent Executor calls this method
            // So, we need to keep track of total discovery complete calls
            _discoveryCompletedClients++;

            // If there are no more sources/testcases, a parallel executor is truly done with discovery
            allDiscoverersCompleted = _discoveryCompletedClients == _availableWorkloads;

            EqtTrace.Verbose("ParallelProxyDiscoveryManager.HandlePartialDiscoveryComplete: Total completed clients = {0}, Discovery complete = {1}, Aborted = {2}, Abort requested: {3}.", _discoveryCompletedClients, allDiscoverersCompleted, isAborted, IsAbortRequested);
        }

        // If discovery is complete or discovery aborting was requested by testPlatfrom(user)
        // we need to stop all ongoing discoveries, because we want to separate aborting request
        // when testhost crashed by itself and when user requested it (e.g. through TW).
        // Schedule the clean up for managers and handlers.
        if (allDiscoverersCompleted || IsAbortRequested)
        {
            _parallelOperationManager.StopAllManagers();

            if (allDiscoverersCompleted)
            {
                EqtTrace.Verbose("ParallelProxyDiscoveryManager.HandlePartialDiscoveryComplete: All sources were discovered.");
            }
            else
            {
                EqtTrace.Verbose($"ParallelProxyDiscoveryManager.HandlePartialDiscoveryComplete: Abort was requested.");
            }

            return true;
        }

        _parallelOperationManager.RunNextWork(proxyDiscoveryManager);

        return false;
    }

    #endregion

    private List<ProviderSpecificWorkload<DiscoveryCriteria>> SplitToWorkloads(DiscoveryCriteria discoveryCriteria, Dictionary<string, TestRuntimeProviderInfo> sourceToTestHostProviderMap)
    {
        var sources = discoveryCriteria.Sources;
        // Each source is grouped with its respective provider.
        var providerGroups = sources
            .Select(source => new ProviderSpecificWorkload<string>(source, sourceToTestHostProviderMap[source]))
            .GroupBy(psw => psw.Provider);

        List<ProviderSpecificWorkload<DiscoveryCriteria>> workloads = new();
        foreach (var group in providerGroups)
        {
            var testhostProviderInfo = group.Key;
            // If the run is not parallel and the host is shared, put all testcases on single testhost.
            // For parallel we prefer to run each source on its own host because we don't know how big the source is
            // (how many tests there are to discover), and running 10 sources on 10 parallel testhosts is faster
            // in almost all cases than running 10 sources on 1 testhost.
            List<string[]> sourceBatches;
            if (!_isParallel && testhostProviderInfo.Shared)
            {
                // Create one big source batch that will be single workload for single testhost.
                sourceBatches = [group.Select(w => w.Work).ToArray()];
            }
            else
            {
                // Create multiple source batches, each having one source, so each testhost will end up running one source.
                sourceBatches = group.Select(w => new[] { w.Work }).ToList();
            }

            foreach (var sourcesToDiscover in sourceBatches)
            {
                var updatedCriteria = NewDiscoveryCriteriaFromSourceAndSettings(sourcesToDiscover, discoveryCriteria, testhostProviderInfo.RunSettings);
                var workload = new ProviderSpecificWorkload<DiscoveryCriteria>(updatedCriteria, testhostProviderInfo);
                workloads.Add(workload);
            }
        }

        return workloads;

        static DiscoveryCriteria NewDiscoveryCriteriaFromSourceAndSettings(IEnumerable<string> sources, DiscoveryCriteria discoveryCriteria, string? runsettingsXml)
        {
            var criteria = new DiscoveryCriteria(
                sources,
                discoveryCriteria.FrequencyOfDiscoveredTestsEvent,
                discoveryCriteria.DiscoveredTestEventTimeout,
                runsettingsXml,
                discoveryCriteria.TestSessionInfo
            );

            criteria.TestCaseFilter = discoveryCriteria.TestCaseFilter;

            return criteria;
        }
    }

    /// <summary>
    /// Triggers the discovery for the next data object on the concurrent discoverer
    /// Each concurrent discoverer calls this method, once its completed working on previous data
    /// </summary>
    /// <param name="proxyDiscoveryManager">Proxy discovery manager instance.</param>
    /// <param name="eventHandler">Discovery events handler.</param>
    /// <param name="discoveryCriteria">Discovery criteria a parameters.</param>
    private Task InitializeDiscoverTestsOnConcurrentManager(IProxyDiscoveryManager proxyDiscoveryManager, ITestDiscoveryEventsHandler2 eventHandler, DiscoveryCriteria discoveryCriteria)
    {
        // Kick off another discovery task for the next source
        return Task.Run(() =>
        {
            EqtTrace.Verbose("ProxyParallelDiscoveryManager.InitializeDiscoverTestsOnConcurrentManager: Discovery preparation started.");

            proxyDiscoveryManager.Initialize(_skipDefaultAdapters);
            proxyDiscoveryManager.InitializeDiscovery(discoveryCriteria, eventHandler, _skipDefaultAdapters);

            EqtTrace.Verbose($"ProxyParallelDiscoveryManager.InitializeDiscoverTestsOnConcurrentManager: Init only: {string.Join(", ", discoveryCriteria.Sources)}");
        });
    }

    /// <summary>
    /// Triggers the discovery for the next data object on the concurrent discoverer
    /// Each concurrent discoverer calls this method, once its completed working on previous data
    /// </summary>
    /// <param name="proxyDiscoveryManager"></param>
    /// <param name="eventHandler"></param>
    /// <param name="discoveryCriteria"></param>
    /// <param name="initialized"></param>
    /// <param name="task"></param>
    private void DiscoverTestsOnConcurrentManager(
        IProxyDiscoveryManager proxyDiscoveryManager,
        ITestDiscoveryEventsHandler2 eventHandler,
        DiscoveryCriteria discoveryCriteria,
        bool initialized,
        Task? task)
    {
        // If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before.
        if (discoveryCriteria == null)
        {
            throw new ArgumentNullException(nameof(discoveryCriteria));
        }

        // Kick off another discovery task for the next source
        Task.Run(() =>
            {
                EqtTrace.Verbose("ParallelProxyDiscoveryManager: Discovery started.");
                if (!initialized)
                {
                    EqtTrace.Verbose($"ProxyParallelDiscoveryManager.DiscoverTestsOnConcurrentManager: Initialize right before run: {string.Join(", ", discoveryCriteria.Sources)}");
                    proxyDiscoveryManager.Initialize(_skipDefaultAdapters);
                    proxyDiscoveryManager.InitializeDiscovery(discoveryCriteria, eventHandler, _skipDefaultAdapters);
                }
                else
                {
                    task?.Wait();
                }

                EqtTrace.Verbose($"ProxyParallelDiscoveryManager.DiscoverTestsOnConcurrentManager: Run: {string.Join(", ", discoveryCriteria.Sources)}");
                proxyDiscoveryManager.DiscoverTests(discoveryCriteria, eventHandler);
            }).ContinueWith(t => HandleError(eventHandler, t), TaskContinuationOptions.OnlyOnFaulted);

        EqtTrace.Verbose("ProxyParallelDiscoveryManager.DiscoverTestsOnConcurrentManager: No sources available for discovery.");
    }

    private void HandleError(ITestDiscoveryEventsHandler2 eventHandler, Task t)
    {
        // Just in case, the actual discovery couldn't start for an instance. Ensure that
        // we call discovery complete since we have already fetched a source. Otherwise
        // discovery will not terminate
        EqtTrace.Error("ParallelProxyDiscoveryManager: Failed to trigger discovery. Exception: " + t.Exception);

        var handler = eventHandler;
        var exceptionString = t.Exception?.ToString();
        var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionString };
        handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload));
        handler.HandleLogMessage(TestMessageLevel.Error, exceptionString);

        // Send discovery complete. Similar logic is also used in ProxyDiscoveryManager.DiscoverTests.
        // Differences:
        // Total tests must be zero here since parallel discovery events handler adds the count
        // Keep `lastChunk` as null since we don't want a message back to the IDE (discovery didn't even begin)
        // Set `isAborted` as true since we want this instance of discovery manager to be replaced
        // TODO: the comment above mentions 0 tests but sends -1. Make sense of this.
        var discoveryCompleteEventsArgs = new DiscoveryCompleteEventArgs(-1, true);
        handler.HandleDiscoveryComplete(discoveryCompleteEventsArgs, null);
    }

    public void InitializeDiscovery(DiscoveryCriteria discoveryCriteria, ITestDiscoveryEventsHandler2 eventHandler, bool skipDefaultAdapters)
    {
        // Leaving this empty as it is not really relevant to the parallel proxy managers.
        // The idea of pre-initializing the test run makes sense only for single proxies like
        // ProxyExecutionManager or ProxyDiscoveryManager.
    }

    public void Dispose()
    {
        if (!_isDisposed)
        {
            _parallelOperationManager.Dispose();
            _isDisposed = true;
        }
    }
}