File: Shared\Utilities\AsyncBatchingWorkQueue`2.cs
Web Access
Project: src\src\Workspaces\Core\Portable\Microsoft.CodeAnalysis.Workspaces.csproj (Microsoft.CodeAnalysis.Workspaces)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Collections;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.CodeAnalysis.PooledObjects;
using Microsoft.CodeAnalysis.Shared.TestHooks;
 
namespace Roslyn.Utilities;
 
/// <summary>
/// A queue where items can be added to to be processed in batches after some delay has passed. When processing
/// happens, all the items added since the last processing point will be passed along to be worked on.  Rounds of
/// processing happen serially, only starting up after a previous round has completed.
/// <para>
/// Failure to complete a particular batch (either due to cancellation or some faulting error) will not prevent
/// further batches from executing. The only thing that will permenantly stop this queue from processing items is if
/// the <see cref="CancellationToken"/> passed to the constructor switches to <see
/// cref="CancellationToken.IsCancellationRequested"/>.
/// </para>
/// </summary>
internal class AsyncBatchingWorkQueue<TItem, TResult>
{
    /// <summary>
    /// Delay we wait after finishing the processing of one batch and starting up on then.
    /// </summary>
    private readonly TimeSpan _delay;
 
    /// <summary>
    /// Equality comparer uses to dedupe items if present.
    /// </summary>
    private readonly IEqualityComparer<TItem>? _equalityComparer;
 
    /// <summary>
    /// Callback to actually perform the processing of the next batch of work.
    /// </summary>
    private readonly Func<ImmutableSegmentedList<TItem>, CancellationToken, ValueTask<TResult>> _processBatchAsync;
    private readonly IAsynchronousOperationListener _asyncListener;
 
    /// <summary>
    /// Cancellation token controlling the entire queue.  Once this is triggered, we don't want to do any more work
    /// at all.
    /// </summary>
    private readonly CancellationToken _entireQueueCancellationToken;
 
    /// <summary>
    /// Cancellation series we use so we can cancel individual batches of work if requested.  The client of the
    /// queue can cancel existing work by either calling <see cref="CancelExistingWork"/> directly, or passing <see
    /// langword="true"/> to <see cref="AddWork(TItem, bool)"/>.  Work in the queue that has not started will be
    /// immediately discarded. The cancellation token passed to <see cref="_processBatchAsync"/> will be triggered
    /// allowing the client callback to cooperatively cancel the current batch of work it is performing.
    /// </summary>
    private readonly CancellationSeries _cancellationSeries;
 
    #region protected by lock
 
    /// <summary>
    /// Lock we will use to ensure the remainder of these fields can be accessed in a threadsafe
    /// manner.  When work is added we'll place the data into <see cref="_nextBatch"/>.
    /// We'll then kick of a task to process this in the future if we don't already have an
    /// existing task in flight for that.
    /// </summary>
    private readonly object _gate = new();
 
    /// <summary>
    /// Data added that we want to process in our next update task.
    /// </summary>
    private readonly ImmutableSegmentedList<TItem>.Builder _nextBatch = ImmutableSegmentedList.CreateBuilder<TItem>();
 
    /// <summary>
    /// CancellationToken controlling the next batch of items to execute.
    /// </summary>
    private CancellationToken _nextBatchCancellationToken;
 
    /// <summary>
    /// Used if <see cref="_equalityComparer"/> is present to ensure only unique items are added to <see
    /// cref="_nextBatch"/>.
    /// </summary>
    private readonly SegmentedHashSet<TItem> _uniqueItems;
 
    /// <summary>
    /// Task kicked off to do the next batch of processing of <see cref="_nextBatch"/>. These
    /// tasks form a chain so that the next task only processes when the previous one completes.
    /// </summary>
    private Task<(bool ranToCompletion, TResult? result)> _updateTask = Task.FromResult((ranToCompletion: true, default(TResult?)));
 
    /// <summary>
    /// Whether or not there is an existing task in flight that will process the current batch
    /// of <see cref="_nextBatch"/>.  If there is an existing in flight task, we don't need to
    /// kick off a new one if we receive more work before it runs.
    /// </summary>
    private bool _taskInFlight = false;
 
    #endregion
 
    /// <param name="processBatchAsync">Callback to process queued work items.  The list of items passed in is
    /// guaranteed to always be non-empty.</param>
    public AsyncBatchingWorkQueue(
        TimeSpan delay,
        Func<ImmutableSegmentedList<TItem>, CancellationToken, ValueTask<TResult>> processBatchAsync,
        IEqualityComparer<TItem>? equalityComparer,
        IAsynchronousOperationListener asyncListener,
        CancellationToken cancellationToken)
    {
        _delay = delay;
        _processBatchAsync = processBatchAsync;
        _equalityComparer = equalityComparer;
        _asyncListener = asyncListener;
        _entireQueueCancellationToken = cancellationToken;
 
        _uniqueItems = new SegmentedHashSet<TItem>(equalityComparer);
 
        // Combine with the queue cancellation token so that any batch is controlled by that token as well.
        _cancellationSeries = new CancellationSeries(_entireQueueCancellationToken);
        CancelExistingWork();
    }
 
    /// <summary>
    /// Cancels any outstanding work in this queue.  Work that has not yet started will never run. Work that is in
    /// progress will request cancellation in a standard best effort fashion.
    /// </summary>
    public void CancelExistingWork()
    {
        lock (_gate)
        {
            // Cancel out the current executing batch, and create a new token for the next batch.
            _nextBatchCancellationToken = _cancellationSeries.CreateNext();
 
            // Clear out the existing items that haven't run yet.  There is no point keeping them around now.
            _nextBatch.Clear();
            _uniqueItems.Clear();
        }
    }
 
    public void AddWork(TItem item, bool cancelExistingWork = false)
    {
        using var _ = ArrayBuilder<TItem>.GetInstance(out var items);
        items.Add(item);
 
        AddWork(items, cancelExistingWork);
    }
 
    public void AddWork(IEnumerable<TItem> items, bool cancelExistingWork = false)
    {
        // Don't do any more work if we've been asked to shutdown.
        if (_entireQueueCancellationToken.IsCancellationRequested)
            return;
 
        lock (_gate)
        {
            // if we were asked to cancel the prior set of items, do so now.
            if (cancelExistingWork)
                CancelExistingWork();
 
            // add our work to the set we'll process in the next batch.
            AddItemsToBatch(items);
 
            if (!_taskInFlight)
            {
                // No in-flight task.  Kick one off to process these messages a second from now.
                // We always attach the task to the previous one so that notifications to the ui
                // follow the same order as the notification the OOP server sent to us.
                _updateTask = ContinueAfterDelayAsync(_updateTask);
                _taskInFlight = true;
            }
        }
 
        return;
 
        void AddItemsToBatch(IEnumerable<TItem> items)
        {
            // no equality comparer.  We want to process all items.
            if (_equalityComparer == null)
            {
                _nextBatch.AddRange(items);
                return;
            }
 
            // We're deduping items.  Only add the item if it's the first time we've seen it.
            foreach (var item in items)
            {
                if (_uniqueItems.Add(item))
                    _nextBatch.Add(item);
            }
        }
 
        async Task<(bool ranToCompletion, TResult? result)> ContinueAfterDelayAsync(Task lastTask)
        {
            using var _ = _asyncListener.BeginAsyncOperation(nameof(AddWork));
 
            // Await the previous item in the task chain in a non-throwing fashion.  Regardless of whether that last
            // task completed successfully or not, we want to move onto the next batch.
            await lastTask.NoThrowAwaitableInternal(captureContext: false);
 
            // If we were asked to shutdown, immediately transition to the canceled state without doing any more work.
            if (_entireQueueCancellationToken.IsCancellationRequested)
                return (ranToCompletion: false, default(TResult?));
 
            // Ensure that we always yield the current thread this is necessary for correctness as we are called
            // inside a lock that _taskInFlight to true.  We must ensure that the work to process the next batch
            // must be on another thread that runs afterwards, can only grab the thread once we release it and will
            // then reset that bool back to false
            await Task.Yield().ConfigureAwait(false);
            await _asyncListener.Delay(_delay, _entireQueueCancellationToken).NoThrowAwaitableInternal(false);
 
            // If we were asked to shutdown, immediately transition to the canceled state without doing any more work.
            if (_entireQueueCancellationToken.IsCancellationRequested)
                return (ranToCompletion: false, default(TResult?));
 
            return (ranToCompletion: true, await ProcessNextBatchAsync().ConfigureAwait(false));
        }
    }
 
    /// <summary>
    /// Waits until the current batch of work completes and returns the last value successfully computed from <see
    /// cref="_processBatchAsync"/>.  If the last <see cref="_processBatchAsync"/> canceled or failed, then a
    /// corresponding canceled or faulted task will be returned that propagates that outwards.
    /// </summary>
    public async Task<TResult?> WaitUntilCurrentBatchCompletesAsync()
    {
        Task<(bool ranToCompletion, TResult? result)> updateTask;
        lock (_gate)
        {
            updateTask = _updateTask;
        }
 
        var (ranToCompletion, result) = await updateTask.ConfigureAwait(false);
        if (!ranToCompletion)
        {
            Debug.Assert(_entireQueueCancellationToken.IsCancellationRequested);
            _entireQueueCancellationToken.ThrowIfCancellationRequested();
        }
 
        return result;
    }
 
    private async ValueTask<TResult?> ProcessNextBatchAsync()
    {
        _entireQueueCancellationToken.ThrowIfCancellationRequested();
        try
        {
            var (nextBatch, batchCancellationToken) = GetNextBatchAndResetQueue();
 
            // We may have no items if the entire batch was canceled (and no new work was added).
            if (nextBatch.IsEmpty)
                return default;
 
            var batchResultTask = _processBatchAsync(nextBatch, batchCancellationToken).Preserve();
            await batchResultTask.NoThrowAwaitableInternal(false);
            if (batchResultTask.IsCompletedSuccessfully)
            {
                return batchResultTask.Result;
            }
            else if (batchResultTask.IsCanceled && !_entireQueueCancellationToken.IsCancellationRequested)
            {
                // Don't bubble up cancellation to the queue for the nested batch cancellation.  Just because we decided
                // to cancel this batch isn't something that should stop processing further batches.
                return default;
            }
            else
            {
                // Realize the completed result to force the exception to be thrown.
                batchResultTask.VerifyCompleted();
                throw ExceptionUtilities.Unreachable();
            }
        }
        catch (Exception ex) when (FatalError.ReportAndPropagateUnlessCanceled(ex, ErrorSeverity.Critical))
        {
            // Report an exception if the batch fails for a non-cancellation reason.
            //
            // Note: even though we propagate this exception outwards, we will still continue processing further
            // batches due to the `await NoThrowAwaitableInternal()` above.  The sentiment being that generally
            // failures are recoverable here, and we will have reported the error so we can see in telemetry if we
            // have a problem that needs addressing.
            //
            // Not this code is unreachable because ReportAndPropagateUnlessCanceled returns false along all codepaths.
            throw ExceptionUtilities.Unreachable();
        }
    }
 
    private (ImmutableSegmentedList<TItem> items, CancellationToken batchCancellationToken) GetNextBatchAndResetQueue()
    {
        lock (_gate)
        {
            var nextBatch = _nextBatch.ToImmutable();
 
            // mark there being no existing update task so that the next OOP notification will
            // kick one off.
            _nextBatch.Clear();
            _uniqueItems.Clear();
            _taskInFlight = false;
 
            return (nextBatch, _nextBatchCancellationToken);
        }
    }
}