// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
namespace Aspire.Dashboard.Otlp.Storage;
public sealed class Subscription : IDisposable
private readonly Func<Task> _callback;
private readonly ExecutionContext? _executionContext;
private readonly TelemetryRepository _telemetryRepository;
private readonly CancellationTokenSource _cts;
private readonly CancellationToken _cancellationToken;
private readonly Action _unsubscribe;
private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
private ILogger Logger => _telemetryRepository._otlpContext.Logger;
private DateTime? _lastExecute;
public ApplicationKey? ApplicationKey { get; }
public SubscriptionType SubscriptionType { get; }
public string Name { get; }
public Subscription(string name, ApplicationKey? applicationKey, SubscriptionType subscriptionType, Func<Task> callback, Action unsubscribe, ExecutionContext? executionContext, TelemetryRepository telemetryRepository)
Name = name;
ApplicationKey = applicationKey;
SubscriptionType = subscriptionType;
_callback = callback;
_unsubscribe = unsubscribe;
_executionContext = executionContext;
_telemetryRepository = telemetryRepository;
_cts = new CancellationTokenSource();
_cancellationToken = _cts.Token;
private async Task<bool> TryQueueAsync(CancellationToken cancellationToken)
var success = _lock.Wait(0, cancellationToken);
if (!success)
Logger.LogDebug("Subscription '{Name}' update already queued.", Name);
return false;
var lastExecute = _lastExecute;
if (lastExecute != null)
var minExecuteInterval = _telemetryRepository._subscriptionMinExecuteInterval;
var s = lastExecute.Value.Add(minExecuteInterval) - DateTime.UtcNow;
if (s > TimeSpan.Zero)
Logger.LogTrace("Subscription '{Name}' minimum execute interval of {MinExecuteInterval} hit. Waiting {DelayInterval}.", Name, minExecuteInterval, s);
await Task.Delay(s, cancellationToken).ConfigureAwait(false);
_lastExecute = DateTime.UtcNow;
return true;
public void Execute()
// Execute the subscription callback on a background thread.
// The caller doesn't want to wait while the subscription is running or receive exceptions.
_ = Task.Run(async () =>
// Try to queue the subscription callback.
// If another caller is already in the queue then exit without calling the callback.
if (!await TryQueueAsync(_cancellationToken).ConfigureAwait(false))
// Set the execution context to the one captured when the subscription was created.
// This ensures that the callback runs in the same context as the subscription was created.
// For example, the request culture is used to format content in the callback.
// No need to restore back to the original context because the callback is running on
// a background task. The task finishes immediately after the callback.
if (_executionContext != null)
Logger.LogTrace("Subscription '{Name}' executing.", Name);
await _callback().ConfigureAwait(false);
catch (Exception ex)
Logger.LogError(ex, "Error in subscription callback");
public void Dispose()