|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Aspire.Hosting.ConsoleLogs;
using Microsoft.Extensions.Logging;
namespace Aspire.Hosting.ApplicationModel;
/// <summary>
/// A service that provides loggers for resources to write to.
/// </summary>
public class ResourceLoggerService
{
// Internal for testing.
internal TimeProvider TimeProvider { get; set; } = TimeProvider.System;
private readonly ConcurrentDictionary<string, ResourceLoggerState> _loggers = new();
private Action<(string, ResourceLoggerState)>? _loggerAdded;
private event Action<(string, ResourceLoggerState)> LoggerAdded
{
add
{
_loggerAdded += value;
foreach (var logger in _loggers)
{
value((logger.Key, logger.Value));
}
}
remove
{
_loggerAdded -= value;
}
}
/// <summary>
/// Gets the logger for the resource to write to.
/// </summary>
/// <param name="resource">The resource name</param>
/// <returns>An <see cref="ILogger"/> which represents the resource.</returns>
public ILogger GetLogger(IResource resource)
{
ArgumentNullException.ThrowIfNull(resource);
var resourceNames = resource.GetResolvedResourceNames();
if (resourceNames.Length > 1)
{
// If a resource has multiple replicas then return a composite logger that writes to multiple.
var loggers = new List<ILogger>();
foreach (var resourceName in resourceNames)
{
loggers.Add(GetResourceLoggerState(resourceName).Logger);
}
return new CompositeLogger(loggers);
}
else
{
return GetResourceLoggerState(resourceNames[0]).Logger;
}
}
private sealed class CompositeLogger(List<ILogger> innerLoggers) : ILogger
{
private readonly List<ILogger> _innerLoggers = innerLoggers;
public IDisposable? BeginScope<TState>(TState state) where TState : notnull
{
var scopes = new List<IDisposable>();
foreach (var logger in _innerLoggers)
{
if (logger.BeginScope(state) is { } scope)
{
scopes.Add(scope);
}
}
if (scopes.Count == 0)
{
return null;
}
else if (scopes.Count == 1)
{
return scopes[0];
}
else
{
return new CompositeDisposable(scopes);
}
}
private sealed class CompositeDisposable(List<IDisposable> disposables) : IDisposable
{
private readonly List<IDisposable> _disposables = disposables;
public void Dispose()
{
foreach (var disposable in _disposables)
{
disposable.Dispose();
}
}
}
public bool IsEnabled(LogLevel logLevel)
{
// All loggers have the same log level.
return _innerLoggers[0].IsEnabled(logLevel);
}
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
{
foreach (var logger in _innerLoggers)
{
logger.Log(logLevel, eventId, state, exception, formatter);
}
}
}
/// <summary>
/// Gets the logger for the resource to write to.
/// </summary>
/// <param name="resourceName">The name of the resource from the Aspire application model.</param>
/// <returns>An <see cref="ILogger"/> which represents the named resource.</returns>
public ILogger GetLogger(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);
return GetResourceLoggerState(resourceName).Logger;
}
/// <summary>
/// The internal logger is used when adding logs from resource's stream logs.
/// It allows the parsed date from text to be used as the log line date.
/// </summary>
internal Action<LogEntry> GetInternalLogger(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);
var state = GetResourceLoggerState(resourceName);
return (logEntry) => state.AddLog(logEntry, inMemorySource: false);
}
/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <param name="resource">The resource to watch for logs.</param>
/// <returns>An async enumerable that returns the logs as they are written.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
{
ArgumentNullException.ThrowIfNull(resource);
var resourceNames = resource.GetResolvedResourceNames();
if (resourceNames.Length > 1)
{
return WatchMultipleAsync(resourceNames, WatchAsync);
}
else
{
return WatchAsync(resourceNames[0]);
}
static async IAsyncEnumerable<IReadOnlyList<LogLine>> WatchMultipleAsync(string[] resourceNames, Func<string, IAsyncEnumerable<IReadOnlyList<LogLine>>> watch)
{
var channel = Channel.CreateUnbounded<IReadOnlyList<LogLine>>();
var readTasks = resourceNames.Select(async (name) =>
{
await foreach (var logLines in watch(name).ConfigureAwait(false))
{
channel.Writer.TryWrite(logLines);
}
});
var completionTask = Task.Run(async () =>
{
try
{
await Task.WhenAll(readTasks).ConfigureAwait(false);
}
finally
{
channel.Writer.Complete();
}
});
await foreach (var item in channel.Reader.ReadAllAsync().ConfigureAwait(false))
{
yield return item;
}
await completionTask.ConfigureAwait(false);
}
}
/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <param name="resourceName">The resource name</param>
/// <returns>An async enumerable that returns the logs as they are written.</returns>
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);
return GetResourceLoggerState(resourceName).WatchAsync();
}
/// <summary>
/// Watch for subscribers to the log stream for a resource.
/// </summary>
/// <returns>
/// An async enumerable that returns when the first subscriber is added to a log,
/// or when the last subscriber is removed.
/// </returns>
public async IAsyncEnumerable<LogSubscriber> WatchAnySubscribersAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var channel = Channel.CreateUnbounded<LogSubscriber>();
void OnLoggerAdded((string Name, ResourceLoggerState State) loggerItem)
{
var (name, state) = loggerItem;
state.OnSubscribersChanged += (hasSubscribers) =>
{
channel.Writer.TryWrite(new(name, hasSubscribers));
};
}
LoggerAdded += OnLoggerAdded;
try
{
await foreach (var entry in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
yield return entry;
}
}
finally
{
LoggerAdded -= OnLoggerAdded;
}
}
/// <summary>
/// Completes the log stream for the resource.
/// </summary>
/// <param name="resource">The <see cref="IResource"/>.</param>
public void Complete(IResource resource)
{
ArgumentNullException.ThrowIfNull(resource);
var resourceNames = resource.GetResolvedResourceNames();
foreach (var resourceName in resourceNames)
{
if (_loggers.TryGetValue(resourceName, out var logger))
{
logger.Complete();
}
}
}
/// <summary>
/// Completes the log stream for the resource.
/// </summary>
/// <param name="name">The name of the resource.</param>
public void Complete(string name)
{
ArgumentNullException.ThrowIfNull(name);
if (_loggers.TryGetValue(name, out var logger))
{
logger.Complete();
}
}
/// <summary>
/// Clears the log stream's backlog for the resource.
/// </summary>
public void ClearBacklog(string resourceName)
{
ArgumentNullException.ThrowIfNull(resourceName);
if (_loggers.TryGetValue(resourceName, out var logger))
{
logger.ClearBacklog();
}
}
// Internal for testing.
internal ResourceLoggerState GetResourceLoggerState(string resourceName) =>
_loggers.GetOrAdd(resourceName, (name, context) =>
{
var state = new ResourceLoggerState(TimeProvider);
context._loggerAdded?.Invoke((name, state));
return state;
},
this);
internal Dictionary<string, ResourceLoggerState> Loggers => _loggers.ToDictionary();
/// <summary>
/// A logger for the resource to write to.
/// </summary>
internal sealed class ResourceLoggerState
{
private const int MaxLogCount = 10_000;
private readonly ResourceLogger _logger;
private readonly CancellationTokenSource _logStreamCts = new();
private readonly object _lock = new();
private readonly CircularBuffer<LogEntry> _inMemoryEntries = new(MaxLogCount);
private readonly LogEntries _backlog = new(MaxLogCount) { BaseLineNumber = 0 };
private readonly TimeProvider _timeProvider;
/// <summary>
/// Creates a new <see cref="ResourceLoggerState"/>.
/// </summary>
public ResourceLoggerState(TimeProvider timeProvider)
{
_logger = new ResourceLogger(this);
_timeProvider = timeProvider;
}
private Action<bool>? _onSubscribersChanged;
public event Action<bool> OnSubscribersChanged
{
add
{
_onSubscribersChanged += value;
var hasSubscribers = false;
lock (_lock)
{
if (_onNewLog is not null) // we have subscribers
{
hasSubscribers = true;
}
}
if (hasSubscribers)
{
value(hasSubscribers);
}
}
remove
{
_onSubscribersChanged -= value;
}
}
/// <summary>
/// Watch for changes to the log stream for a resource.
/// </summary>
/// <returns>The log stream for the resource.</returns>
public async IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Line number always restarts from 1 when watching logs.
// Note that this will need to be improved if the log source (DCP) is changed to return a maximum number of lines.
var lineNumber = 1;
var channel = Channel.CreateUnbounded<LogEntry>();
using var _ = _logStreamCts.Token.Register(() => channel.Writer.TryComplete());
// No need to lock in the log method because TryWrite/TryComplete are already thread safe.
void Log(LogEntry log) => channel.Writer.TryWrite(log);
LogEntry[] backlogSnapshot;
lock (_lock)
{
// If there are no subscribers then the backlog must be empty. Populate it with any in-memory logs.
if (!HasSubscribers)
{
Debug.Assert(_backlog.EntriesCount == 0, "The backlog should be empty if there are no subscribers.");
// Populate backlog with in-memory log messages on first subscription.
foreach (var logEntry in _inMemoryEntries)
{
_backlog.InsertSorted(logEntry);
}
}
backlogSnapshot = GetBacklogSnapshot();
OnNewLog += Log;
}
try
{
if (backlogSnapshot.Length > 0)
{
yield return CreateLogLines(ref lineNumber, backlogSnapshot);
}
await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
{
yield return CreateLogLines(ref lineNumber, entry);
}
}
finally
{
lock (_lock)
{
OnNewLog -= Log;
channel.Writer.TryComplete();
}
}
static LogLine[] CreateLogLines(ref int lineNumber, IReadOnlyList<LogEntry> entries)
{
var logs = new LogLine[entries.Count];
for (var i = 0; i < entries.Count; i++)
{
var entry = entries[i];
var content = entry.Content ?? string.Empty;
if (entry.Timestamp != null)
{
content = entry.Timestamp.Value.ToString(KnownFormats.ConsoleLogsTimestampFormat, CultureInfo.InvariantCulture) + " " + content;
}
logs[i] = new LogLine(lineNumber, content, entry.Type == LogEntryType.Error);
lineNumber++;
}
return logs;
}
}
private bool HasSubscribers
{
get
{
Debug.Assert(Monitor.IsEntered(_lock));
return _onNewLog != null;
}
}
// This provides the fan out to multiple subscribers.
private Action<LogEntry>? _onNewLog;
private event Action<LogEntry> OnNewLog
{
add
{
Debug.Assert(Monitor.IsEntered(_lock));
// When this is the first subscriber, raise event so WatchAnySubscribersAsync publishes an update.
// Is this the first subscriber?
var raiseSubscribersChanged = _onNewLog is null;
_onNewLog += value;
if (raiseSubscribersChanged)
{
_onSubscribersChanged?.Invoke(true);
}
}
remove
{
Debug.Assert(Monitor.IsEntered(_lock));
_onNewLog -= value;
// When there are no more subscribers, raise event so WatchAnySubscribersAsync publishes an update.
// Is this the last subscriber?
var raiseSubscribersChanged = _onNewLog is null;
if (raiseSubscribersChanged)
{
// Clear backlog immediately.
// Avoids a race between message being subscription changed notification eventually clearing the
// logs and someone else watching logs and getting the backlog + complete replay off all logs.
ClearBacklog();
_onSubscribersChanged?.Invoke(false);
}
}
}
/// <summary>
/// The logger for the resource to write to. This will write updates to the live log stream for this resource.
/// </summary>
public ILogger Logger => _logger;
/// <summary>
/// Close the log stream for the resource. Future subscribers will not receive any updates and will complete immediately.
/// </summary>
public void Complete()
{
// REVIEW: Do we clean up the backlog?
_logStreamCts.Cancel();
}
public void ClearBacklog()
{
lock (_lock)
{
_backlog.Clear();
_backlog.BaseLineNumber = 0;
}
}
internal LogEntry[] GetBacklogSnapshot()
{
lock (_lock)
{
return [.. _backlog.GetEntries()];
}
}
public void AddLog(LogEntry logEntry, bool inMemorySource)
{
lock (_lock)
{
// Only add logs into the backlog if there are subscribers. If there aren't subscribers then
// logs are replayed into this collection from various sources (DCP, in-memory).
if (HasSubscribers)
{
_backlog.InsertSorted(logEntry);
}
// Keep in-memory logs (i.e. logs not loaded from DCP) in their own collection.
// These logs are replayed into the backlog when a log watch starts.
if (inMemorySource)
{
_inMemoryEntries.Add(logEntry);
}
}
_onNewLog?.Invoke(logEntry);
}
private sealed class ResourceLogger(ResourceLoggerState loggerState) : ILogger
{
IDisposable? ILogger.BeginScope<TState>(TState state) => null;
bool ILogger.IsEnabled(LogLevel logLevel) => true;
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
{
if (loggerState._logStreamCts.IsCancellationRequested)
{
// Noop if logging after completing the stream
return;
}
var logTime = loggerState._timeProvider.GetUtcNow().UtcDateTime;
var logMessage = formatter(state, exception) + (exception is null ? "" : $"\n{exception}");
var isErrorMessage = logLevel >= LogLevel.Error;
loggerState.AddLog(LogEntry.Create(logTime, logMessage, isErrorMessage), inMemorySource: true);
}
}
}
}
/// <summary>
/// Represents a log subscriber for a resource.
/// </summary>
/// <param name="Name">The the resource name.</param>
/// <param name="AnySubscribers">Determines if there are any subscribers.</param>
public readonly record struct LogSubscriber(string Name, bool AnySubscribers);
|