|
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;
namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client;
/// <summary>
/// Manages work that is done on multiple managers (testhosts) in parallel such as parallel discovery or parallel run.
/// </summary>
internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkload> : IDisposable
{
private const int PreStart = 0;
private readonly static int VSTEST_HOSTPRESTART_COUNT =
int.TryParse(
Environment.GetEnvironmentVariable(nameof(VSTEST_HOSTPRESTART_COUNT)),
out int num)
? num
: PreStart;
private readonly Func<TestRuntimeProviderInfo, TWorkload, TManager> _createNewManager;
/// <summary>
/// Default number of Processes
/// </summary>
private TEventHandler? _eventHandler;
private Func<TEventHandler, TManager, TEventHandler>? _getEventHandler;
private Func<TManager, TEventHandler, TWorkload, Task>? _initializeWorkload;
private Action<TManager, TEventHandler, TWorkload, bool, Task?>? _runWorkload;
private bool _acceptMoreWork;
private readonly List<ProviderSpecificWorkload<TWorkload>> _workloads = new();
private readonly List<Slot> _managerSlots = new();
private readonly object _lock = new();
public int MaxParallelLevel { get; }
public int OccupiedSlotCount { get; private set; }
public int AvailableSlotCount { get; private set; }
public int PreStartCount { get; private set; }
/// <summary>
/// Creates new instance of ParallelOperationManager.
/// </summary>
/// <param name="createNewManager">Creates a new manager that is responsible for running a single part of the overall workload.
/// A manager is typically a testhost, and the part of workload is discovering or running a single test dll.</param>
/// <param name="parallelLevel">Determines the maximum amount of parallel managers that can be active at the same time.</param>
public ParallelOperationManager(Func<TestRuntimeProviderInfo, TWorkload, TManager> createNewManager, int parallelLevel)
{
_createNewManager = createNewManager;
MaxParallelLevel = parallelLevel;
// pre-start only when we don't run in parallel, if we do run in parallel,
// then pre-starting has no additional value because while one host is starting,
// another is running tests.
PreStartCount = MaxParallelLevel == 1 ? VSTEST_HOSTPRESTART_COUNT : 0;
ClearSlots(acceptMoreWork: true);
}
private void ClearSlots(bool acceptMoreWork)
{
EqtTrace.Verbose($"ParallelOperationManager.ClearSlots: Clearing all slots. Slots should accept more work: {acceptMoreWork}");
lock (_lock)
{
_acceptMoreWork = acceptMoreWork;
_managerSlots.Clear();
_managerSlots.AddRange(Enumerable.Range(0, MaxParallelLevel + PreStartCount).Select(i => new Slot { Index = i }));
SetOccupiedSlotCount();
}
}
private void SetOccupiedSlotCount()
{
AvailableSlotCount = _managerSlots.Count(s => !s.HasWork);
OccupiedSlotCount = _managerSlots.Count - AvailableSlotCount;
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.SetOccupiedSlotCount: Setting slot counts AvailableSlotCount = {AvailableSlotCount}, OccupiedSlotCount = {OccupiedSlotCount}.");
EqtTrace.Verbose($"Occupied slots:\n{(string.Join("\n", _managerSlots.Where(s => s.HasWork).Select((slot) => $"{slot.Index}: {GetSourcesForSlotExpensive(slot)}").ToArray()))}");
}
}
public void StartWork(
List<ProviderSpecificWorkload<TWorkload>> workloads,
TEventHandler eventHandler,
Func<TEventHandler, TManager, TEventHandler> getEventHandler,
Func<TManager, TEventHandler, TWorkload, Task> initializeWorkload,
Action<TManager, TEventHandler, TWorkload, bool, Task?> runWorkload)
{
_ = workloads ?? throw new ArgumentNullException(nameof(workloads));
_eventHandler = eventHandler ?? throw new ArgumentNullException(nameof(eventHandler));
_getEventHandler = getEventHandler ?? throw new ArgumentNullException(nameof(getEventHandler));
_initializeWorkload = initializeWorkload ?? throw new ArgumentNullException(nameof(initializeWorkload));
_runWorkload = runWorkload ?? throw new ArgumentNullException(nameof(runWorkload));
EqtTrace.Verbose($"ParallelOperationManager.StartWork: Starting adding {workloads.Count} workloads.");
_workloads.AddRange(workloads);
ClearSlots(acceptMoreWork: true);
RunWorkInParallel();
}
// This does not do anything in parallel, all the workloads we schedule are offloaded to separate Task in the _runWorkload callback.
// I did not want to change that, yet but this is the correct place to do that offloading. Not each manager.
private bool RunWorkInParallel()
{
// TODO: Right now we don't re-use shared hosts, but if we did, this is the place
// where we should find a workload that fits the manager if any of them is shared.
// Or tear it down, and start a new one.
if (_eventHandler == null)
throw new InvalidOperationException($"{nameof(_eventHandler)} was not provided.");
if (_getEventHandler == null)
throw new InvalidOperationException($"{nameof(_getEventHandler)} was not provided.");
if (_runWorkload == null)
throw new InvalidOperationException($"{nameof(_runWorkload)} was not provided.");
// Reserve slots and assign them work under the lock so we keep the slots consistent.
Slot[] slots;
lock (_lock)
{
// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
// it is possible that we will clear all slots, and have RunWorkInParallel waiting on the lock,
// so when it is allowed to enter it will try to add more work, but we already cancelled,
// so we should not start more work.
if (!_acceptMoreWork)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We don't accept more work, returning false.");
return false;
}
// We grab all empty slots.
var availableSlots = _managerSlots.Where(slot => !slot.HasWork).ToImmutableArray();
var occupiedSlots = MaxParallelLevel - (availableSlots.Length - PreStartCount);
// We grab all available workloads.
var availableWorkloads = _workloads.Where(workload => workload != null).ToImmutableArray();
// We take the amount of workloads to fill all the slots, or just as many workloads
// as there are if there are less workloads than slots.
var amount = Math.Min(availableSlots.Length, availableWorkloads.Length);
var workloadsToAdd = availableWorkloads.Take(amount).ToImmutableArray();
// We associate each workload to a slot, if we reached the max parallel
// level, then we will run only initialize step of the given workload.
for (int i = 0; i < amount; i++)
{
var slot = availableSlots[i];
var workload = workloadsToAdd[i];
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;
var manager = _createNewManager(workload.Provider, workload.Work);
var eventHandler = _getEventHandler(_eventHandler, manager);
slot.EventHandler = eventHandler;
slot.Manager = manager;
slot.ManagerInfo = workload.Provider;
slot.Work = workload.Work;
_workloads.Remove(workload);
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Adding 1 workload to slot, remaining workloads {_workloads.Count}.");
// This must be set last, every loop below looks at this property,
// and they can do so from a different thread. So if we mark it as HasWork before actually assigning the work
// we can pick up the slot, but it has no associated work yet.
slot.HasWork = true;
}
slots = _managerSlots.ToArray();
SetOccupiedSlotCount();
}
// Kick of the work in parallel outside of the lock so if we have more requests to run
// that come in at the same time we only block them from reserving the same slot at the same time
// but not from starting their assigned work at the same time.
// Kick of all pre-started hosts from the ones that had the longest time to initialize.
//
// This code should be safe even outside the lock since HasWork is only changed when work is
// complete and only for the slot that completed work. It is not possible to complete work before
// starting it (which is what we are trying to do here).
var startedWork = 0;
foreach (var slot in slots.Where(s => s.HasWork && !s.IsRunning && s.IsPreStarted).OrderBy(s => s.PreStartTime))
{
startedWork++;
slot.IsRunning = true;
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host for work (source) {GetSourcesForSlotExpensive(slot)}: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
}
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
break;
}
}
// We already started as many pre-started testhosts as we are allowed by the max parallel level
// skip running more work.
if (startedWork < MaxParallelLevel)
{
foreach (var slot in slots)
{
if (slot.HasWork && !slot.IsRunning)
{
if (!slot.ShouldPreStart)
{
startedWork++;
slot.IsRunning = true;
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Started host in slot number {slot.Index} for work (source): {GetSourcesForSlotExpensive(slot)}.");
}
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
}
}
// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
break;
}
}
}
var preStartedWork = 0;
foreach (var slot in slots)
{
if (slot.HasWork && slot.ShouldPreStart && !slot.IsPreStarted)
{
preStartedWork++;
slot.PreStartTime = DateTime.Now.TimeOfDay;
slot.IsPreStarted = true;
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Pre-starting a host for work (source): {GetSourcesForSlotExpensive(slot)}.");
}
slot.InitTask = _initializeWorkload!(slot.Manager!, slot.EventHandler!, slot.Work!);
}
}
// Return true when we started more work. Or false, when there was nothing more to do.
// This will propagate to handling of partial discovery or partial run.
var weAddedMoreWork = preStartedWork + startedWork > 0;
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {preStartedWork + startedWork} work items in here, returning {weAddedMoreWork}.");
return weAddedMoreWork;
}
public bool RunNextWork(TManager completedManager)
{
ValidateArg.NotNull(completedManager, nameof(completedManager));
ClearCompletedSlot(completedManager);
return RunWorkInParallel();
}
private void ClearCompletedSlot(TManager completedManager)
{
lock (_lock)
{
var completedSlot = _managerSlots.Where(s => ReferenceEquals(completedManager, s.Manager)).ToImmutableArray();
// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
// it is possible that we will clear all slots, while ClearCompletedSlot is waiting on the lock,
// so when it is allowed to enter it will fail to find the respective slot and fail. In this case it is
// okay that the slot is not found, and we do nothing, because we already stopped all work and cleared the slots.
if (completedSlot.Length == 0)
{
if (_acceptMoreWork)
{
throw new InvalidOperationException("The provided manager was not found in any slot.");
}
else
{
return;
}
}
if (completedSlot.Length > 1)
{
throw new InvalidOperationException("The provided manager was found in multiple slots.");
}
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.ClearCompletedSlot: Clearing slot number {completedSlot[0].Index} with work (source): {GetSourcesForSlotExpensive(completedSlot[0])}.");
}
var slot = completedSlot[0];
slot.PreStartTime = TimeSpan.Zero;
slot.Work = default(TWorkload);
slot.HasWork = false;
slot.ShouldPreStart = false;
slot.IsPreStarted = false;
slot.InitTask = null;
slot.IsRunning = false;
slot.Manager = default(TManager);
slot.EventHandler = default(TEventHandler);
SetOccupiedSlotCount();
}
}
private static string GetSourcesForSlotExpensive(ParallelOperationManager<TManager, TEventHandler, TWorkload>.Slot slot)
{
return string.Join(", ", (slot.Work as DiscoveryCriteria)?.Sources ?? (slot.Work as TestRunCriteria)?.Sources ?? []);
}
public void DoActionOnAllManagers(Action<TManager> action, bool doActionsInParallel = false)
{
EqtTrace.Verbose($"ParallelOperationManager.DoActionOnAllManagers: Calling an action on all managers.");
// We don't need to lock here, we just grab the current list of
// slots that are occupied (have managers) and run action on each one of them.
var managers = _managerSlots.Where(slot => slot.HasWork).Select(slot => slot.Manager).ToImmutableArray();
int i = 0;
var actionTasks = new Task[managers.Length];
foreach (var manager in managers)
{
if (manager == null)
continue;
// Read the array before firing the task - beware of closures
if (doActionsInParallel)
{
actionTasks[i] = Task.Run(() => action(manager));
i++;
}
else
{
DoManagerAction(() => action(manager));
}
}
if (doActionsInParallel)
{
DoManagerAction(() => Task.WaitAll(actionTasks));
}
}
private static void DoManagerAction(Action action)
{
try
{
action();
}
catch (Exception ex)
{
// Exception can occur if we are trying to cancel a test run on an executor where test run is not even fired
// we can safely ignore that as user is just canceling the test run and we don't care about additional parallel executors
// as we will be disposing them off soon anyway
EqtTrace.Warning("ParallelOperationManager.DoManagerAction: Exception while invoking an action on Proxy Manager instance: {0}", ex);
}
}
internal void StopAllManagers()
{
EqtTrace.Verbose($"ParallelOperationManager.StopAllManagers: Stopping all managers.");
ClearSlots(acceptMoreWork: false);
}
public void Dispose()
{
EqtTrace.Verbose($"ParallelOperationManager.Dispose: Disposing all managers.");
ClearSlots(acceptMoreWork: false);
}
private class Slot
{
public int Index { get; set; }
public bool HasWork { get; set; }
public bool ShouldPreStart { get; set; }
public Task? InitTask { get; set; }
public bool IsRunning { get; set; }
public TManager? Manager { get; set; }
public TestRuntimeProviderInfo? ManagerInfo { get; set; }
public TEventHandler? EventHandler { get; set; }
public TWorkload? Work { get; set; }
public bool IsPreStarted { get; internal set; }
public TimeSpan PreStartTime { get; internal set; }
public override string ToString()
{
return $"{Index}: HasWork: {HasWork}, ShouldPreStart: {ShouldPreStart}, IsPreStarted: {IsPreStarted}, IsRunning: {IsRunning}";
}
}
}
|