File: Dashboard\ResourcePublisherTests.cs
Web Access
Project: src\tests\Aspire.Hosting.Tests\Aspire.Hosting.Tests.csproj (Aspire.Hosting.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using Aspire.Dashboard.Model;
using Aspire.Hosting.Dashboard;
using Microsoft.AspNetCore.InternalTesting;
using Xunit;
 
namespace Aspire.Hosting.Tests.Dashboard;
 
public class ResourcePublisherTests
{
    [Fact]
    public async Task ProducesExpectedSnapshotAndUpdates()
    {
        CancellationTokenSource cts = new();
        ResourcePublisher publisher = new(cts.Token);
 
        var a = CreateResourceSnapshot("A");
        var b = CreateResourceSnapshot("B");
        var c = CreateResourceSnapshot("C");
 
        await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
        await publisher.IntegrateAsync(new TestResource("B"), b, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
 
        Assert.Equal(0, publisher.OutgoingSubscriberCount);
 
        var (snapshot, subscription) = publisher.Subscribe();
 
        Assert.Equal(1, publisher.OutgoingSubscriberCount);
 
        Assert.Equal(2, snapshot.Length);
        Assert.Single(snapshot.Where(s => s.Name == "A"));
        Assert.Single(snapshot.Where(s => s.Name == "B"));
 
        var tcs = new TaskCompletionSource<IReadOnlyList<ResourceSnapshotChange>>(TaskCreationOptions.RunContinuationsAsynchronously);
 
        var task = Task.Run(async () =>
        {
            await foreach (var change in subscription)
            {
                tcs.TrySetResult(change);
            }
        });
 
        await publisher.IntegrateAsync(new TestResource("C"), c, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
 
        var change = Assert.Single(await tcs.Task.DefaultTimeout());
        Assert.Equal(ResourceSnapshotChangeType.Upsert, change.ChangeType);
        Assert.Equal("C", change.Resource.Name);
 
        await cts.CancelAsync().DefaultTimeout();
 
        try
        {
            await task.DefaultTimeout();
        }
        catch (OperationCanceledException)
        {
            // Ignore possible cancellation error.
        }
 
        Assert.Equal(0, publisher.OutgoingSubscriberCount);
    }
 
    [Fact]
    public async Task SupportsMultipleSubscribers()
    {
        CancellationTokenSource cts = new();
        ResourcePublisher publisher = new(cts.Token);
 
        var a = CreateResourceSnapshot("A");
        var b = CreateResourceSnapshot("B");
        var c = CreateResourceSnapshot("C");
 
        await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
        await publisher.IntegrateAsync(new TestResource("B"), b, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
 
        Assert.Equal(0, publisher.OutgoingSubscriberCount);
 
        var (snapshot1, subscription1) = publisher.Subscribe();
        var (snapshot2, subscription2) = publisher.Subscribe();
 
        Assert.Equal(2, publisher.OutgoingSubscriberCount);
 
        Assert.Equal(2, snapshot1.Length);
        Assert.Equal(2, snapshot2.Length);
 
        await publisher.IntegrateAsync(new TestResource("C"), c, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
 
        var enumerator1 = subscription1.GetAsyncEnumerator(cts.Token);
        var enumerator2 = subscription2.GetAsyncEnumerator(cts.Token);
 
        await enumerator1.MoveNextAsync().DefaultTimeout();
        await enumerator2.MoveNextAsync().DefaultTimeout();
 
        var v1 = Assert.Single(enumerator1.Current);
        var v2 = Assert.Single(enumerator2.Current);
 
        Assert.Equal(ResourceSnapshotChangeType.Upsert, v1.ChangeType);
        Assert.Equal(ResourceSnapshotChangeType.Upsert, v2.ChangeType);
        Assert.Equal("C", v1.Resource.Name);
        Assert.Equal("C", v2.Resource.Name);
 
        await cts.CancelAsync().DefaultTimeout();
 
        Assert.False(await enumerator1.MoveNextAsync().DefaultTimeout());
        Assert.False(await enumerator2.MoveNextAsync().DefaultTimeout());
 
        Assert.Equal(0, publisher.OutgoingSubscriberCount);
    }
 
    [Fact]
    public async Task MergesResourcesInSnapshot()
    {
        CancellationTokenSource cts = new();
        ResourcePublisher publisher = new(cts.Token);
 
        var a1 = CreateResourceSnapshot("A");
        var a2 = CreateResourceSnapshot("A");
        var a3 = CreateResourceSnapshot("A");
 
        await publisher.IntegrateAsync(new TestResource("A"), a1, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
        await publisher.IntegrateAsync(new TestResource("A"), a2, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
        await publisher.IntegrateAsync(new TestResource("A"), a3, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
 
        var (snapshot, _) = publisher.Subscribe();
 
        Assert.Equal("A", Assert.Single(snapshot).Name);
 
        await cts.CancelAsync().DefaultTimeout();
    }
 
    [Fact]
    public async Task DeletesRemoveFromSnapshot()
    {
        CancellationTokenSource cts = new();
        ResourcePublisher publisher = new(cts.Token);
 
        var a = CreateResourceSnapshot("A");
        var b = CreateResourceSnapshot("B");
 
        await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
        await publisher.IntegrateAsync(new TestResource("B"), b, ResourceSnapshotChangeType.Upsert).DefaultTimeout();
        await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Delete).DefaultTimeout();
 
        var (snapshot, _) = publisher.Subscribe();
 
        Assert.Equal("B", Assert.Single(snapshot).Name);
 
        await cts.CancelAsync().DefaultTimeout();
    }
 
    [Fact]
    public async Task CancelledSubscriptionIsCleanedUp()
    {
        ResourcePublisher publisher = new(CancellationToken.None);
        CancellationTokenSource cts = new();
        var called = false;
 
        var (_, subscription) = publisher.Subscribe();
 
        var task = Task.Run(async () =>
        {
            await foreach (var item in subscription.WithCancellation(cts.Token).ConfigureAwait(false))
            {
                // We should only loop one time.
                Assert.False(called);
                called = true;
 
                // Now we've received something, cancel.
                await cts.CancelAsync().DefaultTimeout();
            }
        });
 
        // Push through an update.
        await publisher.IntegrateAsync(new TestResource("A"), CreateResourceSnapshot("A"), ResourceSnapshotChangeType.Upsert).DefaultTimeout();
 
        // Let the subscriber exit.
        await task.DefaultTimeout();
    }
 
    private static GenericResourceSnapshot CreateResourceSnapshot(string name)
    {
        return new GenericResourceSnapshot(new()
        {
            Properties = [],
            ResourceType = KnownResourceTypes.Container
        })
        {
            Name = name,
            Uid = "",
            State = null,
            StateStyle = null,
            ExitCode = null,
            CreationTimeStamp = null,
            StartTimeStamp = null,
            StopTimeStamp = null,
            DisplayName = "",
            Urls = [],
            Volumes = [],
            Environment = [],
            HealthReports = [],
            Commands = [],
            Relationships = []
        };
    }
 
    private sealed class TestResource(string name) : Resource(name)
    {
    }
}