|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
#nullable disable
namespace Microsoft.Build.Graph
{
/// <summary>
/// Provides deduping of expensive work by a key, or modeling of a set of deduped work that
/// can be awaited as a unit. Completed results are kept in the collection for reuse.
/// </summary>
internal class ParallelWorkSet<TKey, TResult>
{
private readonly CancellationToken _cancellationToken;
/// <summary>
/// Number of workers to process work items.
/// </summary>
private readonly int _degreeOfParallelism;
private readonly ConcurrentDictionary<TKey, Lazy<TResult>> _inProgressOrCompletedWork;
private bool _isSchedulingCompleted;
private long _pendingCount;
private readonly ConcurrentQueue<Lazy<TResult>> _queue =
new ConcurrentQueue<Lazy<TResult>>();
private readonly SemaphoreSlim _semaphore;
private readonly List<Task> _tasks;
private readonly List<Exception> _exceptions = new List<Exception>(0);
/// <summary>
/// Retrieves all completed work items.
/// </summary>
internal Dictionary<TKey, TResult> CompletedWork
{
get
{
var completedWork = new Dictionary<TKey, TResult>(_inProgressOrCompletedWork.Count);
foreach (KeyValuePair<TKey, Lazy<TResult>> kvp in _inProgressOrCompletedWork)
{
Lazy<TResult> workItem = kvp.Value;
if (workItem.IsValueCreated)
{
completedWork[kvp.Key] = workItem.Value;
}
}
if (_exceptions.Count > 0)
{
throw new AggregateException(_exceptions);
}
return completedWork;
}
}
/// <summary>
/// Checks if the work set has been marked as completed.
/// </summary>
internal bool IsCompleted
{
get => Volatile.Read(ref _isSchedulingCompleted);
private set => Volatile.Write(ref _isSchedulingCompleted, value);
}
internal ParallelWorkSet(int degreeOfParallelism, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
{
if (degreeOfParallelism < 0)
{
throw new ArgumentException("Degree of parallelism must be a positive integer.");
}
_cancellationToken = cancellationToken;
_degreeOfParallelism = degreeOfParallelism;
_inProgressOrCompletedWork = new ConcurrentDictionary<TKey, Lazy<TResult>>(comparer);
// Semaphore count is 0 to ensure that all the tasks are blocked unless new data is scheduled.
_semaphore = new SemaphoreSlim(0, int.MaxValue);
_tasks = new List<Task>(degreeOfParallelism);
for (int i = 0; i < degreeOfParallelism; i++)
{
_tasks.Add(CreateProcessorItemTask());
}
}
/// <summary>
/// Enqueues a work item to the work set.
/// </summary>
/// <param name="key"></param>
/// <param name="workFunc"></param>
internal void AddWork(TKey key, Func<TResult> workFunc)
{
if (IsCompleted)
{
throw new InvalidOperationException("Cannot add new work after work set is marked as completed.");
}
var workItem = new Lazy<TResult>(workFunc);
if (!_inProgressOrCompletedWork.TryAdd(key, workItem))
{
return;
}
Interlocked.Increment(ref _pendingCount);
// NOTE: Enqueue MUST happen before releasing the semaphore
// to ensure WaitAsync below never returns when there is not
// a corresponding item in the queue to be dequeued. The only
// exception is on completion of all items.
_queue.Enqueue(workItem);
_semaphore.Release();
}
/// <summary>
/// Assists processing items until all the items added to the queue are processed, completes the work set, and
/// propagates any exceptions thrown by workers.
/// </summary>
internal void WaitForAllWorkAndComplete()
{
if (IsCompleted)
{
return;
}
while (!_cancellationToken.IsCancellationRequested && Interlocked.Read(ref _pendingCount) > 0)
{
ExecuteWorkItem();
}
IsCompleted = true;
// Release one thread that will release all the threads when all the elements are processed.
_semaphore.Release();
Task.WaitAll(_tasks.ToArray());
if (_exceptions.Count > 0)
{
throw new AggregateException(_exceptions);
}
}
private Task CreateProcessorItemTask()
{
return Task.Run(
async () =>
{
bool shouldStopAllWorkers = false;
while (!shouldStopAllWorkers)
{
await _semaphore.WaitAsync(_cancellationToken);
try
{
ExecuteWorkItem();
}
finally
{
shouldStopAllWorkers = Interlocked.Read(ref _pendingCount) == 0 && IsCompleted;
if (shouldStopAllWorkers)
{
// Ensure all tasks are unblocked and can gracefully
// finish since there are at most degreeOfParallelism - 1 tasks
// waiting at this point
_semaphore.Release(_degreeOfParallelism);
}
}
}
},
_cancellationToken);
}
private void ExecuteWorkItem()
{
if (_queue.TryDequeue(out Lazy<TResult> workItem))
{
try
{
_ = workItem.Value;
}
catch (Exception ex)
{
lock (_exceptions)
{
_exceptions.Add(ex);
}
}
finally
{
Interlocked.Decrement(ref _pendingCount);
}
}
}
}
}
|