|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics;
namespace Aspire.Dashboard.Otlp.Storage;
[DebuggerDisplay("Name = {Name}, ApplicationKey = {ApplicationKey}, SubscriptionId = {SubscriptionId}")]
public sealed class Subscription : IDisposable
{
private static int s_subscriptionId;
private readonly Func<Task> _callback;
private readonly ExecutionContext? _executionContext;
private readonly TelemetryRepository _telemetryRepository;
private readonly CancellationTokenSource _cts;
private readonly CancellationToken _cancellationToken;
private readonly Action _unsubscribe;
private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
private ILogger Logger => _telemetryRepository._otlpContext.Logger;
private readonly int _subscriptionId = Interlocked.Increment(ref s_subscriptionId);
private DateTime? _lastExecute;
public int SubscriptionId => _subscriptionId;
public ApplicationKey? ApplicationKey { get; }
public SubscriptionType SubscriptionType { get; }
public string Name { get; }
public Subscription(string name, ApplicationKey? applicationKey, SubscriptionType subscriptionType, Func<Task> callback, Action unsubscribe, ExecutionContext? executionContext, TelemetryRepository telemetryRepository)
{
Name = name;
ApplicationKey = applicationKey;
SubscriptionType = subscriptionType;
_callback = callback;
_unsubscribe = unsubscribe;
_executionContext = executionContext;
_telemetryRepository = telemetryRepository;
_cts = new CancellationTokenSource();
_cancellationToken = _cts.Token;
}
private async Task<bool> TryQueueAsync(CancellationToken cancellationToken)
{
var success = _lock.Wait(0, cancellationToken);
if (!success)
{
Logger.LogDebug("Subscription '{Name}' update already queued.", Name);
return false;
}
try
{
var lastExecute = _lastExecute;
if (lastExecute != null)
{
var minExecuteInterval = _telemetryRepository._subscriptionMinExecuteInterval;
var s = lastExecute.Value.Add(minExecuteInterval) - DateTime.UtcNow;
if (s > TimeSpan.Zero)
{
Logger.LogTrace("Subscription '{Name}' minimum execute interval of {MinExecuteInterval} hit. Waiting {DelayInterval}.", Name, minExecuteInterval, s);
await Task.Delay(s, cancellationToken).ConfigureAwait(false);
}
}
_lastExecute = DateTime.UtcNow;
return true;
}
finally
{
_lock.Release();
}
}
public void Execute()
{
// Execute the subscription callback on a background thread.
// The caller doesn't want to wait while the subscription is running or receive exceptions.
_ = Task.Run(async () =>
{
// Try to queue the subscription callback.
// If another caller is already in the queue then exit without calling the callback.
if (!await TryQueueAsync(_cancellationToken).ConfigureAwait(false))
{
return;
}
try
{
// Set the execution context to the one captured when the subscription was created.
// This ensures that the callback runs in the same context as the subscription was created.
// For example, the request culture is used to format content in the callback.
//
// No need to restore back to the original context because the callback is running on
// a background task. The task finishes immediately after the callback.
if (_executionContext != null)
{
ExecutionContext.Restore(_executionContext);
}
Logger.LogTrace("Subscription '{Name}' executing.", Name);
await _callback().ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogError(ex, "Error in subscription callback");
}
});
}
public void Dispose()
{
_unsubscribe();
_cts.Cancel();
_cts.Dispose();
_lock.Dispose();
}
}
|