File: Dashboard\ResourcePublisher.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Aspire.Hosting.ApplicationModel;
 
namespace Aspire.Hosting.Dashboard;
 
/// <summary>
/// Builds a collection of resources by integrating incoming resource changes,
/// and allowing multiple subscribers to receive the current resource collection
/// snapshot and future updates.
/// </summary>
internal sealed class ResourcePublisher(CancellationToken cancellationToken)
{
    private sealed record SourceAndResourceSnapshot(IResource Source, ResourceSnapshot Snapshot);
 
    private readonly object _syncLock = new();
    private readonly Dictionary<string, SourceAndResourceSnapshot> _snapshot = [];
    private ImmutableHashSet<Channel<ResourceSnapshotChange>> _outgoingChannels = [];
 
    // For testing purposes
    internal int OutgoingSubscriberCount => _outgoingChannels.Count;
 
    internal bool TryGetResource(string resourceName, [NotNullWhen(returnValue: true)] out ResourceSnapshot? snapshot, [NotNullWhen(returnValue: true)] out IResource? resource)
    {
        lock (_syncLock)
        {
            if (_snapshot.TryGetValue(resourceName, out var r))
            {
                snapshot = r.Snapshot;
                resource = r.Source;
                return true;
            }
 
            snapshot = null;
            resource = null;
            return false;
        }
    }
 
    public ResourceSnapshotSubscription Subscribe()
    {
        lock (_syncLock)
        {
            var channel = Channel.CreateUnbounded<ResourceSnapshotChange>(
                new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = true });
 
            ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Add(channel), channel);
 
            return new ResourceSnapshotSubscription(
                InitialState: _snapshot.Select(r => r.Value.Snapshot).ToImmutableArray(),
                Subscription: StreamUpdates());
 
            async IAsyncEnumerable<IReadOnlyList<ResourceSnapshotChange>> StreamUpdates([EnumeratorCancellation] CancellationToken enumeratorCancellationToken = default)
            {
                using var linked = CancellationTokenSource.CreateLinkedTokenSource(enumeratorCancellationToken, cancellationToken);
 
                try
                {
                    await foreach (var batch in channel.GetBatchesAsync(cancellationToken: linked.Token).ConfigureAwait(false))
                    {
                        yield return batch;
                    }
                }
                finally
                {
                    ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Remove(channel), channel);
                }
            }
        }
    }
 
    /// <summary>
    /// Integrates a changed resource within the cache, and broadcasts the update to any subscribers.
    /// </summary>
    /// <param name="source">The source resource.</param>
    /// <param name="snapshot">The resource snapshot that was modified.</param>
    /// <param name="changeType">The change type (Added, Modified, Deleted).</param>
    /// <returns>A task that completes when the cache has been updated and all subscribers notified.</returns>
    internal async ValueTask IntegrateAsync(IResource source, ResourceSnapshot snapshot, ResourceSnapshotChangeType changeType)
    {
        ImmutableHashSet<Channel<ResourceSnapshotChange>> channels;
 
        lock (_syncLock)
        {
            switch (changeType)
            {
                case ResourceSnapshotChangeType.Upsert:
                    _snapshot[snapshot.Name] = new SourceAndResourceSnapshot(source, snapshot);
                    break;
 
                case ResourceSnapshotChangeType.Delete:
                    _snapshot.Remove(snapshot.Name);
                    break;
            }
 
            channels = _outgoingChannels;
        }
 
        foreach (var channel in channels)
        {
            await channel.Writer.WriteAsync(new(changeType, snapshot), cancellationToken).ConfigureAwait(false);
        }
    }
}
 
internal sealed record ResourceSnapshotSubscription(
    ImmutableArray<ResourceSnapshot> InitialState,
    IAsyncEnumerable<IReadOnlyList<ResourceSnapshotChange>> Subscription);
 
internal sealed record ResourceSnapshotChange(
    ResourceSnapshotChangeType ChangeType,
    ResourceSnapshot Resource);
 
internal enum ResourceSnapshotChangeType
{
    /// <summary>
    /// The object was added if new, or updated if not.
    /// </summary>
    Upsert,
 
    /// <summary>
    /// The object was deleted.
    /// </summary>
    Delete
}