File: NatsFunctionalTests.cs
Web Access
Project: src\tests\Aspire.Hosting.Nats.Tests\Aspire.Hosting.Nats.Tests.csproj (Aspire.Hosting.Nats.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using Aspire.Components.Common.Tests;
using Aspire.Hosting.Utils;
using Xunit;
using Xunit.Abstractions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using Aspire.Hosting.ApplicationModel;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Aspire.Hosting.Tests.Utils;
 
namespace Aspire.Hosting.Nats.Tests;
 
public class NatsFunctionalTests(ITestOutputHelper testOutputHelper)
{
    private const string StreamName = "test-stream";
    private const string SubjectName = "test-subject";
 
    [Fact]
    [RequiresDocker]
    public async Task VerifyNatsResource()
    {
        using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
 
        var nats = builder.AddNats("nats")
            .WithJetStream();
 
        using var app = builder.Build();
 
        await app.StartAsync();
 
        await app.WaitForTextAsync("Listening for client connections", nats.Resource.Name);
 
        var hb = Host.CreateApplicationBuilder();
 
        hb.Configuration[$"ConnectionStrings:{nats.Resource.Name}"] = await nats.Resource.ConnectionStringExpression.GetValueAsync(default);
 
        hb.AddNatsClient("nats", configureOptions: opts =>
        {
            var jsonRegistry = new NatsJsonContextSerializerRegistry(AppJsonContext.Default);
            return opts with { SerializerRegistry = jsonRegistry };
        });
 
        hb.AddNatsJetStream();
 
        using var host = hb.Build();
 
        await host.StartAsync();
 
        var jetStream = host.Services.GetRequiredService<INatsJSContext>();
 
        await CreateTestData(jetStream, default);
        await ConsumeTestData(jetStream, default);
    }
 
    [Theory]
    [RequiresDocker]
    [InlineData(null, null)]
    [InlineData("nats", null)]
    [InlineData(null, "password")]
    [InlineData("nats", "password")]
    public async Task AuthenticationShouldWork(string? user, string? password)
    {
        var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
      
        using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
 
        var usernameParameter = user is null ? null : builder.AddParameter("user", user);
        var passwordParameter = password is null ? null : builder.AddParameter("pass", password);
 
        var nats = builder.AddNats("nats", userName: usernameParameter, password: passwordParameter);
 
        using var app = builder.Build();
 
        await app.StartAsync();
 
        await app.WaitForTextAsync("Listening for client connections", nats.Resource.Name);
 
        var hb = Host.CreateApplicationBuilder();
 
        var connectionString = await nats.Resource.ConnectionStringExpression.GetValueAsync(default);
        hb.Configuration[$"ConnectionStrings:{nats.Resource.Name}"] = connectionString;
 
        hb.AddNatsClient("nats", configureOptions: opts =>
        {
            var jsonRegistry = new NatsJsonContextSerializerRegistry(AppJsonContext.Default);
            return opts with { SerializerRegistry = jsonRegistry };
        });
 
        using var host = hb.Build();
 
        await host.StartAsync();
 
        var natsConnection = host.Services.GetRequiredService<INatsConnection>();
        await natsConnection.ConnectAsync();
        Assert.Equal(NatsConnectionState.Open, natsConnection.ConnectionState);
    }
 
    [Theory]
    [RequiresDocker]
    [InlineData("user", "wrong-password")]
    [InlineData("wrong-user", "password")]
    [InlineData(null, null)]
    public async Task AuthenticationShouldFailOnWrongOrMissingCredentials(string? user, string? password)
    {
        var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
 
        using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
        builder.Configuration["Parameters:user"] = "user";
        builder.Configuration["Parameters:pass"] = "password";
 
        var usernameParameter = builder.AddParameter("user");
        var passwordParameter = builder.AddParameter("pass");
 
        var nats = builder.AddNats("nats", userName: usernameParameter, password: passwordParameter);
 
        using var app = builder.Build();
 
        await app.StartAsync();
 
        await app.WaitForTextAsync("Listening for client connections", nats.Resource.Name);
 
        var hb = Host.CreateApplicationBuilder();
 
        var connectionString = await nats.Resource.ConnectionStringExpression.GetValueAsync(default);
        var modifiedConnectionString = user is null
            ? connectionString!.Replace(new Uri(connectionString).UserInfo, null)
            : connectionString!.Replace("user", user).Replace("password", password);
 
        hb.Configuration[$"ConnectionStrings:{nats.Resource.Name}"] = modifiedConnectionString;
 
        hb.AddNatsClient("nats", configureOptions: opts =>
        {
            var jsonRegistry = new NatsJsonContextSerializerRegistry(AppJsonContext.Default);
            return opts with { SerializerRegistry = jsonRegistry };
        });
 
        using var host = hb.Build();
 
        await host.StartAsync();
 
        var natsConnection = host.Services.GetRequiredService<INatsConnection>();
 
        var exception = await Assert.ThrowsAsync<NatsException>(async () => await natsConnection.ConnectAsync());
        Assert.IsType<NatsServerException>(exception.InnerException);
    }
 
    [Theory]
    [InlineData(true)]
    [InlineData(false)]
    [RequiresDocker]
    public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)
    {
        string? volumeName = null;
        string? bindMountPath = null;
 
        try
        {
            using var builder1 = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
            var nats1 = builder1.AddNats("nats")
                .WithJetStream();
 
            if (useVolume)
            {
                // Use a deterministic volume name to prevent them from exhausting the machines if deletion fails
                volumeName = VolumeNameGenerator.Generate(nats1, nameof(WithDataShouldPersistStateBetweenUsages));
 
                // if the volume already exists (because of a crashing previous run), delete it
                DockerUtils.AttemptDeleteDockerVolume(volumeName, throwOnFailure: true);
                nats1.WithDataVolume(volumeName);
            }
            else
            {
                bindMountPath = Directory.CreateTempSubdirectory().FullName;
                nats1.WithDataBindMount(bindMountPath);
            }
 
            using (var app = builder1.Build())
            {
                await app.StartAsync();
 
                await app.WaitForTextAsync("Listening for client connections", nats1.Resource.Name);
                try
                {
                    var hb = Host.CreateApplicationBuilder();
 
                    hb.Configuration[$"ConnectionStrings:{nats1.Resource.Name}"] = await nats1.Resource.ConnectionStringExpression.GetValueAsync(default);
 
                    hb.AddNatsClient("nats", configureOptions: opts =>
                    {
                        var jsonRegistry = new NatsJsonContextSerializerRegistry(AppJsonContext.Default);
                        return opts with { SerializerRegistry = jsonRegistry };
                    });
 
                    hb.AddNatsJetStream();
 
                    using (var host = hb.Build())
                    {
                        await host.StartAsync();
 
                        var jetStream = host.Services.GetRequiredService<INatsJSContext>();
                        await CreateTestData(jetStream, default);
                        await ConsumeTestData(jetStream, default);
                    }
                }
                finally
                {
                    // Stops the container, or the Volume/mount would still be in use
                    await app.StopAsync();
                }
            }
 
            using var builder2 = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
            var nats2 = builder2.AddNats("nats")
                .WithJetStream();
 
            if (useVolume)
            {
                nats2.WithDataVolume(volumeName);
            }
            else
            {
                nats2.WithDataBindMount(bindMountPath!);
            }
 
            using (var app = builder2.Build())
            {
                await app.StartAsync();
 
                await app.WaitForTextAsync("Listening for client connections", nats2.Resource.Name);
                try
                {
                    var hb = Host.CreateApplicationBuilder();
 
                    hb.Configuration[$"ConnectionStrings:{nats2.Resource.Name}"] = await nats2.Resource.ConnectionStringExpression.GetValueAsync(default);
                    hb.AddNatsClient("nats", configureOptions: opts =>
                    {
                        var jsonRegistry = new NatsJsonContextSerializerRegistry(AppJsonContext.Default);
                        return opts with { SerializerRegistry = jsonRegistry };
                    });
 
                    hb.AddNatsJetStream();
 
                    using (var host = hb.Build())
                    {
                        await host.StartAsync();
 
                        var jetStream = host.Services.GetRequiredService<INatsJSContext>();
                        await ConsumeTestData(jetStream, default);
                    }
                }
                finally
                {
                    // Stops the container, or the Volume/mount would still be in use
                    await app.StopAsync();
                }
            }
        }
        finally
        {
            if (volumeName is not null)
            {
                DockerUtils.AttemptDeleteDockerVolume(volumeName);
            }
 
            if (bindMountPath is not null)
            {
                try
                {
                    Directory.Delete(bindMountPath, recursive: true);
                }
                catch
                {
                    // Don't fail test if we can't clean the temporary folder
                }
            }
        }
    }
 
    private static async Task ConsumeTestData(INatsJSContext jetStream, CancellationToken token)
    {
        var stream = await jetStream.GetStreamAsync(StreamName, cancellationToken: token);
        var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: token);
 
        var events = new List<AppEvent>();
        await foreach (var msg in consumer.ConsumeAsync<AppEvent>(cancellationToken: token))
        {
            events.Add(msg.Data!);
            await msg.AckAsync(cancellationToken: token);
            if (msg.Metadata?.NumPending == 0)
            {
                break;
            }
        }
 
        for (var i = 0; i < 10; i++)
        {
            var @event = events[i];
            Assert.Equal($"test-event-{i}", @event.Name);
            Assert.Equal($"test-event-description-{i}", @event.Description);
        }
    }
 
    private static async Task CreateTestData(INatsJSContext jetStream, CancellationToken token)
    {
        var stream = await jetStream.CreateStreamAsync(new StreamConfig(StreamName, [SubjectName]), cancellationToken: token);
        Assert.Equal(StreamName, stream.Info.Config.Name);
 
        for (var i = 0; i < 10; i++)
        {
            var appEvent = new AppEvent(SubjectName, $"test-event-{i}", $"test-event-description-{i}", i);
            var ack = await jetStream.PublishAsync(appEvent.Subject, appEvent, cancellationToken: token);
            ack.EnsureSuccess();
        }
    }
 
    [Fact]
    [RequiresDocker]
    public async Task VerifyWaitForOnNatsBlocksDependentResources()
    {
        var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
        using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
 
        var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
        builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
        {
            return healthCheckTcs.Task;
        });
 
        var resource = builder.AddNats("resource")
                              .WithHealthCheck("blocking_check");
 
        var dependentResource = builder.AddNats("dependentresource")
                                       .WaitFor(resource);
 
        using var app = builder.Build();
 
        var pendingStart = app.StartAsync(cts.Token);
 
        var rns = app.Services.GetRequiredService<ResourceNotificationService>();
 
        await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);
 
        await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);
 
        healthCheckTcs.SetResult(HealthCheckResult.Healthy());
 
        await rns.WaitForResourceHealthyAsync(resource.Resource.Name, cts.Token);
 
        await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);
 
        await pendingStart;
 
        await app.StopAsync();
    }
}