File: AddKafkaTests.cs
Web Access
Project: src\tests\Aspire.Hosting.Kafka.Tests\Aspire.Hosting.Kafka.Tests.csproj (Aspire.Hosting.Kafka.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Net.Sockets;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
 
namespace Aspire.Hosting.Kafka.Tests;
 
public class AddKafkaTests
{
    [Fact]
    public void AddKafkaContainerWithDefaultsAddsAnnotationMetadata()
    {
        var appBuilder = DistributedApplication.CreateBuilder();
 
        appBuilder.AddKafka("kafka");
 
        using var app = appBuilder.Build();
 
        var appModel = app.Services.GetRequiredService<DistributedApplicationModel>();
 
        var containerResource = Assert.Single(appModel.Resources.OfType<KafkaServerResource>());
        Assert.Equal("kafka", containerResource.Name);
 
        var endpoints = containerResource.Annotations.OfType<EndpointAnnotation>();
        Assert.Equal(2, endpoints.Count());
 
        var primaryEndpoint = Assert.Single(endpoints, e => e.Name == "tcp");
        Assert.Equal(9092, primaryEndpoint.TargetPort);
        Assert.False(primaryEndpoint.IsExternal);
        Assert.Equal("tcp", primaryEndpoint.Name);
        Assert.Null(primaryEndpoint.Port);
        Assert.Equal(ProtocolType.Tcp, primaryEndpoint.Protocol);
        Assert.Equal("tcp", primaryEndpoint.Transport);
        Assert.Equal("tcp", primaryEndpoint.UriScheme);
 
        var internalEndpoint = Assert.Single(endpoints, e => e.Name == "internal");
        Assert.Equal(9093, internalEndpoint.TargetPort);
        Assert.False(internalEndpoint.IsExternal);
        Assert.Equal("internal", internalEndpoint.Name);
        Assert.Null(internalEndpoint.Port);
        Assert.Equal(ProtocolType.Tcp, internalEndpoint.Protocol);
        Assert.Equal("tcp", internalEndpoint.Transport);
        Assert.Equal("tcp", internalEndpoint.UriScheme);
 
        var containerAnnotation = Assert.Single(containerResource.Annotations.OfType<ContainerImageAnnotation>());
        Assert.Equal(KafkaContainerImageTags.Tag, containerAnnotation.Tag);
        Assert.Equal(KafkaContainerImageTags.Image, containerAnnotation.Image);
        Assert.Equal(KafkaContainerImageTags.Registry, containerAnnotation.Registry);
    }
 
    [Fact]
    public async Task KafkaCreatesConnectionString()
    {
        var appBuilder = DistributedApplication.CreateBuilder();
        appBuilder
            .AddKafka("kafka")
            .WithEndpoint("tcp", e => e.AllocatedEndpoint = new AllocatedEndpoint(e, "localhost", 27017));
 
        using var app = appBuilder.Build();
 
        var appModel = app.Services.GetRequiredService<DistributedApplicationModel>();
 
        var connectionStringResource = Assert.Single(appModel.Resources.OfType<KafkaServerResource>()) as IResourceWithConnectionString;
        var connectionString = await connectionStringResource.GetConnectionStringAsync();
 
        Assert.Equal("localhost:27017", connectionString);
        Assert.Equal("{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port}", connectionStringResource.ConnectionStringExpression.ValueExpression);
    }
 
    [Fact]
    public async Task VerifyManifest()
    {
        using var appBuilder = TestDistributedApplicationBuilder.Create();
 
        var kafka = appBuilder.AddKafka("kafka");
 
        var manifest = await ManifestUtils.GetManifest(kafka.Resource);
 
        var expectedManifest = $$"""
            {
              "type": "container.v0",
              "connectionString": "{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port}",
              "image": "{{KafkaContainerImageTags.Registry}}/{{KafkaContainerImageTags.Image}}:{{KafkaContainerImageTags.Tag}}",
              "env": {
                "KAFKA_LISTENERS": "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093",
                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT",
                "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://{kafka.bindings.tcp.host}:29092,PLAINTEXT_HOST://{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port},PLAINTEXT_INTERNAL://{kafka.bindings.internal.host}:{kafka.bindings.internal.port}"
              },
              "bindings": {
                "tcp": {
                  "scheme": "tcp",
                  "protocol": "tcp",
                  "transport": "tcp",
                  "targetPort": 9092
                },
                "internal": {
                  "scheme": "tcp",
                  "protocol": "tcp",
                  "transport": "tcp",
                  "targetPort": 9093
                }
              }
            }
            """;
        Assert.Equal(expectedManifest, manifest.ToString());
    }
 
    [Fact]
    public async Task WithDataVolumeConfigureCorrectEnvironment()
    {
        using var appBuilder = TestDistributedApplicationBuilder.Create();
 
        var kafka = appBuilder.AddKafka("kafka")
            .WithEndpoint("tcp", e => e.AllocatedEndpoint = new AllocatedEndpoint(e, "localhost", 27017))
            .WithDataVolume("kafka-data");
 
        var config = await kafka.Resource.GetEnvironmentVariableValuesAsync();
 
        var volumeAnnotation = kafka.Resource.Annotations.OfType<ContainerMountAnnotation>().Single();
 
        Assert.Equal("kafka-data", volumeAnnotation.Source);
        Assert.Equal("/var/lib/kafka/data", volumeAnnotation.Target);
        Assert.Contains(config, kvp => kvp.Key == "KAFKA_LOG_DIRS" && kvp.Value == "/var/lib/kafka/data");
    }
 
    [Fact]
    public async Task WithDataBindConfigureCorrectEnvironment()
    {
        using var appBuilder = TestDistributedApplicationBuilder.Create();
 
        var kafka = appBuilder.AddKafka("kafka")
            .WithEndpoint("tcp", e => e.AllocatedEndpoint = new AllocatedEndpoint(e, "localhost", 27017))
            .WithDataBindMount("kafka-data");
 
        var config = await kafka.Resource.GetEnvironmentVariableValuesAsync();
 
        var volumeAnnotation = kafka.Resource.Annotations.OfType<ContainerMountAnnotation>().Single();
 
        Assert.Equal(Path.Combine(appBuilder.AppHostDirectory, "kafka-data"), volumeAnnotation.Source);
        Assert.Equal("/var/lib/kafka/data", volumeAnnotation.Target);
        Assert.Contains(config, kvp => kvp.Key == "KAFKA_LOG_DIRS" && kvp.Value == "/var/lib/kafka/data");
    }
 
    public static TheoryData<string?, string, int?> WithKafkaUIAddsAnUniqueContainerSetsItsNameAndInvokesConfigurationCallbackTestVariations()
    {
        return new()
        {
            { "kafka-ui", "kafka-ui", 8081 },
            { null, "kafka1-kafka-ui", 8081 },
            { "kafka-ui", "kafka-ui", null },
            { null, "kafka1-kafka-ui", null },
        };
    }
 
    [Theory]
    [MemberData(nameof(WithKafkaUIAddsAnUniqueContainerSetsItsNameAndInvokesConfigurationCallbackTestVariations))]
    public void WithKafkaUIAddsAnUniqueContainerSetsItsNameAndInvokesConfigurationCallback(string? containerName, string expectedContainerName, int? port)
    {
        using var builder = TestDistributedApplicationBuilder.Create();
        var configureContainerInvocations = 0;
        Action<IResourceBuilder<KafkaUIContainerResource>> kafkaUIConfigurationCallback = kafkaUi =>
        {
            kafkaUi.WithHostPort(port);
            configureContainerInvocations++;
        };
        builder.AddKafka("kafka1").WithKafkaUI(configureContainer: kafkaUIConfigurationCallback, containerName: containerName);
        builder.AddKafka("kafka2").WithKafkaUI();
 
        Assert.Single(builder.Resources.OfType<KafkaUIContainerResource>());
        var kafkaUiResource = Assert.Single(builder.Resources, r => r.Name == expectedContainerName);
        Assert.Equal(1, configureContainerInvocations);
        var kafkaUiEndpoint = kafkaUiResource.Annotations.OfType<EndpointAnnotation>().Single();
        Assert.Equal(8080, kafkaUiEndpoint.TargetPort);
        Assert.Equal(port, kafkaUiEndpoint.Port);
    }
}