File: Utils\CallbackThrottler.cs
Web Access
Project: src\src\Aspire.Dashboard\Aspire.Dashboard.csproj (Aspire.Dashboard)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
 
namespace Aspire.Dashboard.Utils;
 
[DebuggerDisplay("Name = {Name}")]
public sealed class CallbackThrottler
{
    private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
    private DateTime? _lastExecute;
 
    public CallbackThrottler(string name, ILogger logger, TimeSpan minExecuteInterval, Func<Task> callback, ExecutionContext? executionContext)
    {
        Name = name;
        _logger = logger;
        _minExecuteInterval = minExecuteInterval;
        _callback = callback;
        _executionContext = executionContext;
        _cts = new CancellationTokenSource();
        _cancellationToken = _cts.Token;
    }
 
    public string Name { get; }
 
    private readonly ILogger _logger;
    private readonly TimeSpan _minExecuteInterval;
    private readonly Func<Task> _callback;
    private readonly ExecutionContext? _executionContext;
    private readonly CancellationTokenSource _cts;
    private readonly CancellationToken _cancellationToken;
 
    private async Task<bool> TryQueueAsync(CancellationToken cancellationToken)
    {
        var success = _lock.Wait(0, cancellationToken);
        if (!success)
        {
            _logger.LogTrace("Callback '{Name}' update already queued.", Name);
            return false;
        }
 
        try
        {
            var lastExecute = _lastExecute;
            if (lastExecute != null)
            {
                var minExecuteInterval = _minExecuteInterval;
                var s = lastExecute.Value.Add(minExecuteInterval) - DateTime.UtcNow;
                if (s > TimeSpan.Zero)
                {
                    _logger.LogTrace("Callback '{Name}' minimum execute interval of {MinExecuteInterval} hit. Waiting {DelayInterval}.", Name, minExecuteInterval, s);
                    await Task.Delay(s, cancellationToken).ConfigureAwait(false);
                }
            }
 
            _lastExecute = DateTime.UtcNow;
            return true;
        }
        finally
        {
            _lock.Release();
        }
    }
 
    private async Task ExecuteAsync()
    {
        // Try to queue the subscription callback.
        // If another caller is already in the queue then exit without calling the callback.
        if (!await TryQueueAsync(_cancellationToken).ConfigureAwait(false))
        {
            return;
        }
 
        try
        {
            // Set the execution context to the one captured when the subscription was created.
            // This ensures that the callback runs in the same context as the subscription was created.
            // For example, the request culture is used to format content in the callback.
            //
            // No need to restore back to the original context because the callback is running on
            // a background task. The task finishes immediately after the callback.
            if (_executionContext != null)
            {
                ExecutionContext.Restore(_executionContext);
            }
 
            _logger.LogTrace("Callback '{Name}' executing.", Name);
            await _callback().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in callback.");
        }
    }
 
    public void Execute()
    {
        // Execute on a background thread.
        // The caller doesn't want to wait while the execution is running or receive exceptions.
        _ = Task.Run(ExecuteAsync);
    }
 
    public void Dispose()
    {
        _cts.Cancel();
        _cts.Dispose();
        _lock.Dispose();
    }
}