|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Aspire.Dashboard.Model.Otlp;
using Aspire.Dashboard.Otlp.Model;
namespace Aspire.Dashboard.Otlp.Storage;
/// <summary>
/// Partial class containing push-based streaming (watcher) functionality.
/// </summary>
public sealed partial class TelemetryRepository
{
// Watcher fields are defined in the main file:
// private readonly object _watchersLock;
// private List<SpanWatcher>? _spanWatchers;
// private List<LogWatcher>? _logWatchers;
/// <summary>
/// Maximum number of items to fetch in the initial snapshot for streaming.
/// Prevents unbounded memory usage on large repositories.
/// </summary>
private const int MaxWatcherSnapshotCount = 10000;
/// <summary>
/// Streams spans as they arrive using push-based delivery.
/// Yields existing spans first, then new ones as they're added.
/// O(1) per new span instead of O(n) re-query.
/// </summary>
/// <param name="resourceKey">Optional filter by resource.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An async enumerable of spans.</returns>
public async IAsyncEnumerable<OtlpSpan> WatchSpansAsync(
ResourceKey? resourceKey,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// Create a bounded channel to receive pushed spans
var channel = Channel.CreateBounded<OtlpSpan>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropOldest
});
var watcher = new SpanWatcher(resourceKey, channel);
// Register watcher FIRST to avoid race condition where spans could be
// added between getting the snapshot and registering.
lock (_watchersLock)
{
_spanWatchers ??= new List<SpanWatcher>();
_spanWatchers.Add(watcher);
}
try
{
// Get existing spans from traces (capped to prevent OOM)
var existingTraces = GetTraces(new GetTracesRequest
{
ResourceKey = resourceKey,
StartIndex = 0,
Count = MaxWatcherSnapshotCount,
Filters = [],
FilterText = string.Empty
});
// Track seen span IDs to deduplicate spans that arrive during the snapshot read.
// Race condition: watcher is registered BEFORE GetTraces, so spans arriving during
// the snapshot read are pushed to the channel AND included in the snapshot.
// Unlike logs (which have monotonically increasing InternalId), span IDs are random
// hex strings, so we need a HashSet rather than a simple counter.
// The HashSet is cleared after draining to prevent unbounded memory growth.
var seenSpanIds = new HashSet<string>();
// Yield existing spans
foreach (var trace in existingTraces.PagedResult.Items)
{
foreach (var span in trace.Spans)
{
// Filter by resource if specified
if (resourceKey is not null && !span.Source.ResourceKey.Equals(resourceKey))
{
continue;
}
seenSpanIds.Add(span.SpanId);
yield return span;
}
}
// Drain any spans that arrived during the snapshot to ensure we don't miss them
while (channel.Reader.TryRead(out var pendingSpan))
{
if (seenSpanIds.Add(pendingSpan.SpanId))
{
yield return pendingSpan;
}
}
// Clear the HashSet - spans arriving after the drain are guaranteed to be new
// (they weren't in the snapshot taken earlier). This prevents unbounded memory growth.
seenSpanIds.Clear();
// Stream new spans as they're pushed
await foreach (var span in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
yield return span;
}
}
finally
{
// Clean up watcher
lock (_watchersLock)
{
_spanWatchers?.Remove(watcher);
}
channel.Writer.TryComplete();
}
}
/// <summary>
/// Streams logs as they arrive using push-based delivery.
/// Yields existing logs first, then new ones as they're added.
/// O(1) per new log instead of O(n) re-query.
/// </summary>
/// <param name="resourceKey">Optional filter by resource.</param>
/// <param name="filters">Optional filters for logs.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An async enumerable of log entries.</returns>
public async IAsyncEnumerable<OtlpLogEntry> WatchLogsAsync(
ResourceKey? resourceKey,
IEnumerable<TelemetryFilter>? filters,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// Create a bounded channel to receive pushed logs
var channel = Channel.CreateBounded<OtlpLogEntry>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropOldest
});
var filterList = filters?.ToList() ?? [];
var watcher = new LogWatcher(resourceKey, filterList, channel);
// Register watcher FIRST to avoid race condition where logs could be
// added between getting the snapshot and registering.
lock (_watchersLock)
{
_logWatchers ??= new List<LogWatcher>();
_logWatchers.Add(watcher);
}
try
{
// Get existing logs snapshot (capped to prevent OOM)
var existingLogs = GetLogs(new GetLogsContext
{
ResourceKey = resourceKey,
StartIndex = 0,
Count = MaxWatcherSnapshotCount,
Filters = filterList
});
// Track the highest log ID we've yielded to deduplicate
long maxYieldedLogId = 0;
// Yield existing logs
foreach (var log in existingLogs.Items)
{
if (log.InternalId > maxYieldedLogId)
{
maxYieldedLogId = log.InternalId;
}
yield return log;
}
// Drain any logs that arrived during the snapshot
// Filters are already applied when pushing to channel
while (channel.Reader.TryRead(out var pendingLog))
{
if (pendingLog.InternalId > maxYieldedLogId)
{
maxYieldedLogId = pendingLog.InternalId;
yield return pendingLog;
}
}
// Stream new logs as they're pushed, deduplicating by ID
// Filters are already applied when pushing to channel
await foreach (var log in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
// Skip if we already yielded this log in the initial batch
if (log.InternalId <= maxYieldedLogId)
{
continue;
}
maxYieldedLogId = log.InternalId;
yield return log;
}
}
finally
{
// Clean up watcher
lock (_watchersLock)
{
_logWatchers?.Remove(watcher);
}
channel.Writer.TryComplete();
}
}
private void PushSpansToWatchers(List<OtlpSpan> spans, ResourceKey resourceKey)
{
// Take a snapshot of watchers to avoid holding the lock while writing
SpanWatcher[]? watchers;
lock (_watchersLock)
{
if (_spanWatchers is null || _spanWatchers.Count == 0)
{
return;
}
watchers = _spanWatchers.ToArray();
}
foreach (var span in spans)
{
foreach (var watcher in watchers)
{
// Check if watcher is filtering by resource
if (watcher.ResourceKey is { } key && !key.Equals(resourceKey))
{
continue;
}
// TryWrite is non-blocking - if channel is full, oldest item is dropped
if (!watcher.Channel.Writer.TryWrite(span))
{
_logger.LogWarning("Span watcher channel is full, dropping span {SpanId}. Consumer may be slow.", span.SpanId);
}
}
}
}
private void PushLogsToWatchers(List<OtlpLogEntry> logs, ResourceKey resourceKey)
{
if (logs.Count == 0)
{
return;
}
// Take a snapshot of watchers to avoid holding the lock while writing
LogWatcher[]? watchers;
lock (_watchersLock)
{
if (_logWatchers is null || _logWatchers.Count == 0)
{
return;
}
watchers = _logWatchers.ToArray();
}
foreach (var log in logs)
{
foreach (var watcher in watchers)
{
// Check if watcher is filtering by resource
if (watcher.ResourceKey is { } key && !key.Equals(resourceKey))
{
continue;
}
// Apply watcher filters before pushing to channel
if (watcher.Filters.Count > 0 && !MatchesFilters(log, watcher.Filters))
{
continue;
}
// TryWrite is non-blocking - if channel is full, oldest item is dropped
if (!watcher.Channel.Writer.TryWrite(log))
{
_logger.LogWarning("Log watcher channel is full, dropping log {LogId}. Consumer may be slow.", log.InternalId);
}
}
}
}
private static bool MatchesFilters(OtlpLogEntry log, List<TelemetryFilter> filters)
{
// Check if log passes all enabled filters
// Apply filters returns items that match, so we use a single-item enumerable
IEnumerable<OtlpLogEntry> result = [log];
foreach (var filter in filters)
{
if (!filter.Enabled)
{
continue;
}
result = filter.Apply(result);
}
return result.Any();
}
private void DisposeWatchers()
{
// Complete all watcher channels to signal consumers to stop
lock (_watchersLock)
{
if (_spanWatchers is not null)
{
foreach (var watcher in _spanWatchers)
{
watcher.Channel.Writer.TryComplete();
}
_spanWatchers.Clear();
}
if (_logWatchers is not null)
{
foreach (var watcher in _logWatchers)
{
watcher.Channel.Writer.TryComplete();
}
_logWatchers.Clear();
}
}
}
/// <summary>
/// Represents a span watcher for push-based streaming.
/// </summary>
private sealed class SpanWatcher(ResourceKey? resourceKey, Channel<OtlpSpan> channel)
{
public ResourceKey? ResourceKey => resourceKey;
public Channel<OtlpSpan> Channel => channel;
}
/// <summary>
/// Represents a log watcher for push-based streaming.
/// Includes filters to apply when pushing logs to the channel.
/// </summary>
private sealed class LogWatcher(ResourceKey? resourceKey, List<TelemetryFilter> filters, Channel<OtlpLogEntry> channel)
{
public ResourceKey? ResourceKey => resourceKey;
public List<TelemetryFilter> Filters => filters;
public Channel<OtlpLogEntry> Channel => channel;
}
}
|