File: ConsumerConformanceTests.cs
Web Access
Project: src\tests\Aspire.Confluent.Kafka.Tests\Aspire.Confluent.Kafka.Tests.csproj (Aspire.Confluent.Kafka.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using Aspire.Components.ConformanceTests;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;
 
namespace Aspire.Confluent.Kafka.Tests;
public class ConsumerConformanceTests : ConformanceTests<IConsumer<string, string>, KafkaConsumerSettings>
{
    protected override ServiceLifetime ServiceLifetime => ServiceLifetime.Singleton;
 
    protected override string ActivitySourceName => throw new NotImplementedException();
 
    protected override string[] RequiredLogCategories => [
        "Aspire.Confluent.Kafka"
        ];
 
    protected override string? ConfigurationSectionName => "Aspire:Confluent:Kafka:Consumer";
 
    protected override void PopulateConfiguration(ConfigurationManager configuration, string? key = null)
    {
        configuration.AddInMemoryCollection(new KeyValuePair<string, string?>[]
        {
            new KeyValuePair<string, string?>(CreateConfigKey("Aspire:Confluent:Kafka:Consumer", key, "ConnectionString"),
                           "localhost:9092"),
            new KeyValuePair<string, string?>(CreateConfigKey("Aspire:Confluent:Kafka:Consumer", key, "Config:GroupId"),
                           "test")
        });
    }
 
    protected override void RegisterComponent(HostApplicationBuilder builder, Action<KafkaConsumerSettings>? configure = null, string? key = null)
    {
        if (key is null)
        {
            builder.AddKafkaConsumer<string, string>("messaging", configure);
        }
        else
        {
            builder.AddKeyedKafkaConsumer<string, string>(key, configure);
        }
    }
 
    protected override void SetHealthCheck(KafkaConsumerSettings options, bool enabled) => options.DisableHealthChecks = !enabled;
 
    protected override void SetMetrics(KafkaConsumerSettings options, bool enabled) => options.DisableMetrics = !enabled;
 
    protected override void SetTracing(KafkaConsumerSettings options, bool enabled)
    {
        throw new NotImplementedException();
    }
 
    protected override void TriggerActivity(IConsumer<string, string> service)
    {
        service.Subscribe("test");
    }
 
    protected override bool SupportsKeyedRegistrations => true;
 
    protected override string ValidJsonConfig => """
                {
                    "Aspire": {
                        "Confluent": {
                            "Kafka": {
                                "Consumer": {
                                    "ConnectionString": "localhost:9092",
                                    "DisableHealthChecks": false,
                                    "DisableMetrics": false,
                                    "Config": {
                                        "GroupId": "test"
                                    }
                                }
                            }
                        }
                    }
                }
                """{
                    "Aspire": {
                        "Confluent": {
                            "Kafka": {
                                "Consumer": {
                                    "ConnectionString": "localhost:9092",
                                    "DisableHealthChecks": false,
                                    "DisableMetrics": false,
                                    "Config": {
                                        "GroupId": "test"
                                    }
                                }
                            }
                        }
                    }
                }
                """;
 
    protected override (string json, string error)[] InvalidJsonToErrorMessage => new[]
        {
            ("""{"Aspire": { "Confluent":{ "Kafka": { "Consumer": { "DisableMetrics": 0}}}}}"""{"Aspire": { "Confluent":{ "Kafka": { "Consumer": { "DisableMetrics": 0}}}}}""", "Value is \"integer\" but should be \"boolean\""),
            ("""{"Aspire": { "Confluent":{ "Kafka": { "Consumer": { "DisableHealthChecks": 0}}}}}"""{"Aspire": { "Confluent":{ "Kafka": { "Consumer": { "DisableHealthChecks": 0}}}}}""", "Value is \"integer\" but should be \"boolean\"")
        };
 
    [Fact]
    public void CanAddMultipleKeyedServices()
    {
        var builder = Host.CreateEmptyApplicationBuilder(null);
        builder.Configuration.AddInMemoryCollection([
            new KeyValuePair<string, string?>("ConnectionStrings:messaging1", "localhost:9091"),
            new KeyValuePair<string, string?>("Aspire:Confluent:Kafka:Consumer:Config:GroupId", "messaging1"),
 
            new KeyValuePair<string, string?>("ConnectionStrings:messaging2", "localhost:9092"),
            new KeyValuePair<string, string?>("Aspire:Confluent:Kafka:Consumer:messaging2:Config:GroupId", "messaging2"),
 
            new KeyValuePair<string, string?>("ConnectionStrings:messaging3", "localhost:9093"),
            new KeyValuePair<string, string?>("Aspire:Confluent:Kafka:Consumer:messaging3:Config:GroupId", "messaging3"),
        ]);
 
        builder.AddKafkaConsumer<string, string>("messaging1");
        builder.AddKeyedKafkaConsumer<string, string>("messaging2");
        builder.AddKeyedKafkaConsumer<string, string>("messaging3");
 
        using var host = builder.Build();
 
        var client1 = host.Services.GetRequiredService<IConsumer<string, string>>();
        var client2 = host.Services.GetRequiredKeyedService<IConsumer<string, string>>("messaging2");
        var client3 = host.Services.GetRequiredKeyedService<IConsumer<string, string>>("messaging3");
 
        Assert.NotSame(client1, client2);
        Assert.NotSame(client1, client3);
        Assert.NotSame(client2, client3);
    }
}