File: ConsumerConfigurationTests.cs
Web Access
Project: src\tests\Aspire.Confluent.Kafka.Tests\Aspire.Confluent.Kafka.Tests.csproj (Aspire.Confluent.Kafka.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Text;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;
 
namespace Aspire.Confluent.Kafka.Tests;
 
public class ConsumerConfigurationTests
{
    [Theory]
    [InlineData(true)]
    [InlineData(false)]
    public void ReadsFromConnectionStringsCorrectly(bool useKeyed)
    {
        var builder = Host.CreateEmptyApplicationBuilder(null);
 
        var key = useKeyed ? "messaging" : null;
        builder.Configuration.AddInMemoryCollection([
            new KeyValuePair<string, string?>("ConnectionStrings:messaging", CommonHelpers.TestingEndpoint),
            new KeyValuePair<string, string?>(ProducerConformanceTests.CreateConfigKey("Aspire:Confluent:Kafka:Consumer", key, "Config:GroupId"), "unused")
        ]);
 
        if (useKeyed)
        {
            builder.AddKeyedKafkaConsumer<string, string>("messaging");
        }
        else
        {
            builder.AddKafkaConsumer<string, string>("messaging");
        }
 
        using var host = builder.Build();
        var connectionFactory = useKeyed
            ? host.Services.GetRequiredKeyedService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value, "messaging")
            : host.Services.GetRequiredService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value);
 
        var config = GetConsumerConfig(connectionFactory)!;
 
        Assert.Equal(CommonHelpers.TestingEndpoint, config.BootstrapServers);
    }
 
    [Theory]
    [InlineData(true)]
    [InlineData(false)]
    public void ConnectionStringCanBeSetInCode(bool useKeyed)
    {
        var builder = Host.CreateEmptyApplicationBuilder(null);
 
        var key = useKeyed ? "messaging" : null;
        builder.Configuration.AddInMemoryCollection([
            new KeyValuePair<string, string?>("ConnectionStrings:messaging", "unused"),
            new KeyValuePair<string, string?>(ProducerConformanceTests.CreateConfigKey("Aspire:Confluent:Kafka:Consumer", key, "Config:GroupId"), "unused")
        ]);
 
        static void SetConnectionString(KafkaConsumerSettings settings) => settings.ConnectionString = CommonHelpers.TestingEndpoint;
        if (useKeyed)
        {
            builder.AddKeyedKafkaConsumer<string, string>("messaging", configureSettings: SetConnectionString);
        }
        else
        {
            builder.AddKafkaConsumer<string, string>("messaging", configureSettings: SetConnectionString);
        }
 
        using var host = builder.Build();
        var connectionFactory = useKeyed
            ? host.Services.GetRequiredKeyedService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value, "messaging")
            : host.Services.GetRequiredService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value);
 
        var config = GetConsumerConfig(connectionFactory)!;
 
        Assert.Equal(CommonHelpers.TestingEndpoint, config.BootstrapServers);
    }
 
    [Theory]
    [InlineData(true)]
    [InlineData(false)]
    public void ConnectionNameWinsOverConfigSection(bool useKeyed)
    {
        var builder = Host.CreateEmptyApplicationBuilder(null);
 
        var key = useKeyed ? "messaging" : null;
        builder.Configuration.AddInMemoryCollection([
            new KeyValuePair<string, string?>(ProducerConformanceTests.CreateConfigKey("Aspire:Confluent:Kafka:Consumer", key, "ConnectionString"), "unused"),
            new KeyValuePair<string, string?>("ConnectionStrings:messaging", CommonHelpers.TestingEndpoint),
            new KeyValuePair<string, string?>(ProducerConformanceTests.CreateConfigKey("Aspire:Confluent:Kafka:Consumer", key, "Config:GroupId"), "unused")
        ]);
 
        if (useKeyed)
        {
            builder.AddKeyedKafkaConsumer<string, string>("messaging");
        }
        else
        {
            builder.AddKafkaConsumer<string, string>("messaging");
        }
 
        using var host = builder.Build();
        var connectionFactory = useKeyed
            ? host.Services.GetRequiredKeyedService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value, "messaging")
            : host.Services.GetRequiredService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value);
 
        var config = GetConsumerConfig(connectionFactory)!;
 
        Assert.Equal(CommonHelpers.TestingEndpoint, config.BootstrapServers);
    }
 
    [Theory]
    [InlineData(true, true, false, false)]
    [InlineData(true, true, true, false)]
    [InlineData(true, true, false, true)]
    [InlineData(true, false, false, false)]
    [InlineData(true, false, true, false)]
    [InlineData(true, false, false, true)]
    [InlineData(false, true, false, false)]
    [InlineData(false, true, true, false)]
    [InlineData(false, true, false, true)]
    [InlineData(false, false, false, false)]
    [InlineData(false, false, true, false)]
    [InlineData(false, false, false, true)]
 
    public void ConfigureConsumerBuilder(bool useKeyed, bool useConfigureSettings, bool useConfigureBuilder, bool useConfigureBuilderWithServiceProvider)
    {
        var builder = Host.CreateEmptyApplicationBuilder(null);
 
        var key = useKeyed ? "messaging" : null;
        builder.Configuration.AddInMemoryCollection([
            new KeyValuePair<string, string?>("ConnectionStrings:messaging", CommonHelpers.TestingEndpoint),
            new KeyValuePair<string, string?>(ProducerConformanceTests.CreateConfigKey("Aspire:Confluent:Kafka:Consumer", key, "Config:GroupId"), "unused")
        ]);
 
        bool configureBuilderIsCalled = false, configureSettingsIsCalled = false;
 
        Action act =
            (useKeyed, useConfigureSettings, useConfigureBuilder, useConfigureBuilderWithServiceProvider) switch
            {
                (true, false, false, false) => () => builder.AddKeyedKafkaConsumer<string, string>("messaging"),
                (false, false, false, false) => () => builder.AddKafkaConsumer<string, string>("messaging"),
 
                // only configureSettings
                (true, true, false, false) => () => builder.AddKeyedKafkaConsumer<string, string>("messaging",
                    configureSettings: ConfigureSettings),
                (false, true, false, false) => () => builder.AddKafkaConsumer<string, string>("messaging",
                    configureSettings: ConfigureSettings),
 
                // only configureBuilder
                (true, false, true, false) => () => builder.AddKeyedKafkaConsumer<string, string>("messaging",
                    configureBuilder: ConfigureBuilder),
                (false, false, true, false) => () => builder.AddKafkaConsumer<string, string>("messaging",
                    configureBuilder: ConfigureBuilder),
 
                (true, false, false, true) => () => builder.AddKeyedKafkaConsumer<string, string>("messaging",
                    configureBuilder: ConfigureBuilderWithServiceProvider),
                (false, false, false, true) => () => builder.AddKafkaConsumer<string, string>("messaging",
                    configureBuilder: ConfigureBuilderWithServiceProvider),
 
                // both configureSettings, and configureBuilder
                (true, true, true, false) => () => builder.AddKeyedKafkaConsumer<string, string>("messaging",
                    configureSettings: ConfigureSettings,
                    configureBuilder: ConfigureBuilder),
                (false, true, true, false) => () => builder.AddKafkaConsumer<string, string>("messaging",
                    configureSettings: ConfigureSettings,
                    configureBuilder: ConfigureBuilder),
 
                (true, true, false, true) => () => builder.AddKeyedKafkaConsumer<string, string>("messaging",
                    configureSettings: ConfigureSettings,
                    configureBuilder: ConfigureBuilderWithServiceProvider),
                (false, true, false, true) => () => builder.AddKafkaConsumer<string, string>("messaging",
                    configureSettings: ConfigureSettings,
                    configureBuilder: ConfigureBuilderWithServiceProvider),
 
                _ => throw new InvalidOperationException()
            };
 
        act();
 
        using var host = builder.Build();
        var connectionFactory = useKeyed
            ? host.Services.GetRequiredKeyedService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value, "messaging")
            : host.Services.GetRequiredService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value);
 
        var config = GetConsumerConfig(connectionFactory)!;
 
        if (useConfigureBuilder || useConfigureBuilderWithServiceProvider)
        {
            Assert.True(configureBuilderIsCalled);
        }
 
        if (useConfigureSettings)
        {
            Assert.True(configureSettingsIsCalled);
        }
 
        Assert.Equal(CommonHelpers.TestingEndpoint, config.BootstrapServers);
        return;
 
        void ConfigureBuilder(ConsumerBuilder<string, string> _)
        {
            configureBuilderIsCalled = true;
        }
 
        void ConfigureBuilderWithServiceProvider(IServiceProvider provider, ConsumerBuilder<string, string> _)
        {
            var __ = provider.GetRequiredService<IConfiguration>();
            configureBuilderIsCalled = true;
        }
 
        void ConfigureSettings(KafkaConsumerSettings _)
        {
            configureSettingsIsCalled = true;
        }
    }
 
    [Fact]
    public void ConsumerConfigOptionsFromConfig()
    {
        static Stream CreateStreamFromString(string data) => new MemoryStream(Encoding.UTF8.GetBytes(data));
 
        using var jsonStream = CreateStreamFromString("""
        {
          "Aspire": {
            "Confluent": {
              "Kafka": {
                "Consumer": {
                  "Config": {
                    "BootstrapServers": "localhost:9092",
                    "AutoOffsetReset": "Earliest",
                    "GroupId": "consumer-group",
                    "SaslUsername": "user",
                    "SaslPassword": "password",
                    "SaslMechanism": "Plain",
                    "SecurityProtocol": "Plaintext"
                  }
                }
              }
            }
          }
        }
        """{
          "Aspire": {
            "Confluent": {
              "Kafka": {
                "Consumer": {
                  "Config": {
                    "BootstrapServers": "localhost:9092",
                    "AutoOffsetReset": "Earliest",
                    "GroupId": "consumer-group",
                    "SaslUsername": "user",
                    "SaslPassword": "password",
                    "SaslMechanism": "Plain",
                    "SecurityProtocol": "Plaintext"
                  }
                }
              }
            }
          }
        }
        """);
 
        var builder = Host.CreateEmptyApplicationBuilder(null);
 
        builder.Configuration.AddJsonStream(jsonStream);
 
        builder.AddKafkaConsumer<string, string>("messaging");
 
        using var host = builder.Build();
        var connectionFactory = host.Services.GetRequiredService(ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value);
 
        var config = GetConsumerConfig(connectionFactory)!;
 
        Assert.Equal(AutoOffsetReset.Earliest, config.AutoOffsetReset);
        Assert.Equal("consumer-group", config.GroupId);
        Assert.Equal("user", config.SaslUsername);
        Assert.Equal("password", config.SaslPassword);
        Assert.Equal(SaslMechanism.Plain, config.SaslMechanism);
        Assert.Equal(SecurityProtocol.Plaintext, config.SecurityProtocol);
    }
 
    private static ConsumerConfig? GetConsumerConfig(object o) => ReflectionHelpers.ConsumerConnectionFactoryStringKeyStringValueType.Value.GetProperty("Config")!.GetValue(o) as ConsumerConfig;
}