File: KafkaFunctionalTests.cs
Project: src\tests\Aspire.Hosting.Kafka.Tests\Aspire.Hosting.Kafka.Tests.csproj (Aspire.Hosting.Kafka.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using Aspire.Components.Common.Tests;
using Aspire.Hosting.Tests.Utils;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Polly;
using Xunit;
using Xunit.Abstractions;
namespace Aspire.Hosting.Kafka.Tests;
public class KafkaFunctionalTests(ITestOutputHelper testOutputHelper)
    public async Task VerifyKafkaResource()
        var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
        using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
        var kafka = builder.AddKafka("kafka");
        using var app = builder.Build();
        await app.StartAsync();
        var hb = Host.CreateApplicationBuilder();
        hb.Configuration[$"ConnectionStrings:{kafka.Resource.Name}"] = await kafka.Resource.ConnectionStringExpression.GetValueAsync(default);
        hb.AddKafkaProducer<string, string>("kafka");
        hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
            consumerBuilder.Config.GroupId = "aspire-consumer-group";
            consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
        using var host = hb.Build();
        await host.StartAsync();
        var topic = "test-topic";
        var pipeline = new ResiliencePipelineBuilder()
            .AddRetry(new() { MaxRetryAttempts = 10, Delay = TimeSpan.FromSeconds(1), ShouldHandle = new PredicateBuilder().Handle<ProduceException<string, string>>() })
        await pipeline.ExecuteAsync(async token =>
            var producer = host.Services.GetRequiredService<IProducer<string, string>>();
            for (var i = 0; i < 10; i++)
                await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" }, token);
        }, cts.Token);
        var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
        for (var i = 0; i < 10; i++)
            var result = consumer.Consume(cts.Token);
            Assert.Equal($"test-key", result.Message.Key);
            Assert.Equal($"test-value{i}", result.Message.Value);
    public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)
        var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
        var topic = "test-topic";
        string? volumeName = null;
        string? bindMountPath = null;
            using var builder1 = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
            var kafka1 = builder1.AddKafka("kafka");
            if (useVolume)
                // Use a deterministic volume name to prevent them from exhausting the machines if deletion fails
                volumeName = VolumeNameGenerator.CreateVolumeName(kafka1, nameof(WithDataShouldPersistStateBetweenUsages));
                // if the volume already exists (because of a crashing previous run), delete it
                DockerUtils.AttemptDeleteDockerVolume(volumeName, throwOnFailure: true);
                bindMountPath = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
                if (!OperatingSystem.IsWindows())
                    // the docker container runs as a non-root user, so we need to grant other user's read/write permission
                    // to the bind mount directory.
                    // Note that we need to do this after creating the directory, because the umask is applied at the time of creation.
                    const UnixFileMode BindMountPermissions =
                        UnixFileMode.UserRead | UnixFileMode.UserWrite | UnixFileMode.UserExecute |
                        UnixFileMode.GroupRead | UnixFileMode.GroupWrite | UnixFileMode.GroupExecute |
                        UnixFileMode.OtherRead | UnixFileMode.OtherWrite | UnixFileMode.OtherExecute;
                    File.SetUnixFileMode(bindMountPath, BindMountPermissions);
            using (var app = builder1.Build())
                await app.StartAsync();
                await app.WaitForTextAsync("Server started, listening for requests...", kafka1.Resource.Name);
                    var hb = Host.CreateApplicationBuilder();
                    hb.Configuration[$"ConnectionStrings:{kafka1.Resource.Name}"] = await kafka1.Resource.ConnectionStringExpression.GetValueAsync(default);
                    hb.AddKafkaProducer<string, string>("kafka");
                    using (var host = hb.Build())
                        await host.StartAsync();
                        var pipeline = new ResiliencePipelineBuilder()
                            .AddRetry(new() { MaxRetryAttempts = 10, Delay = TimeSpan.FromSeconds(1), ShouldHandle = new PredicateBuilder().Handle<ProduceException<string, string>>() })
                        await pipeline.ExecuteAsync(async token =>
                            var producer = host.Services.GetRequiredService<IProducer<string, string>>();
                            for (var i = 0; i < 10; i++)
                                await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" }, token);
                        }, cts.Token);
                    // Stops the container, or the Volume/mount would still be in use
                    await app.StopAsync();
                    //kafka shutdown has delay,so without delay to running instance using same data and second instance failed to start.
                    await Task.Delay(TimeSpan.FromMinutes(1));
            using var builder2 = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
            var kafka2 = builder2.AddKafka("kafka");
            if (useVolume)
            using (var app = builder2.Build())
                await app.StartAsync();
                await app.WaitForTextAsync("Server started, listening for requests...", kafka1.Resource.Name);
                    var hb = Host.CreateApplicationBuilder();
                    hb.Configuration[$"ConnectionStrings:{kafka2.Resource.Name}"] = await kafka2.Resource.ConnectionStringExpression.GetValueAsync(default);
                    hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
                        consumerBuilder.Config.GroupId = "aspire-consumer-group";
                        consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
                    using (var host = hb.Build())
                        await host.StartAsync();
                        var pipeline = new ResiliencePipelineBuilder()
                            .AddRetry(new() { MaxRetryAttempts = 10, Delay = TimeSpan.FromSeconds(1), ShouldHandle = new PredicateBuilder().Handle<ConsumeException>() })
                        pipeline.Execute(() =>
                            var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
                            for (var i = 0; i < 10; i++)
                                var result = consumer.Consume(cts.Token);
                                Assert.Equal($"test-key", result.Message.Key);
                                Assert.Equal($"test-value{i}", result.Message.Value);
                    // Stops the container, or the Volume/mount would still be in use
                    await app.StopAsync();
            if (volumeName is not null)
            if (bindMountPath is not null)
                    Directory.Delete(bindMountPath, recursive: true);
                    // Don't fail test if we can't clean the temporary folder