File: ProducerConsumer.cs
Web Access
Project: src\playground\Stress\Stress.ApiService\Stress.ApiService.csproj (Stress.ApiService)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.Threading.Channels;
 
namespace Stress.ApiService;
 
public class Data
{
    public required int Id { get; init; }
    public required Activity? Producer { get; init; }
    public required string Message { get; init; }
}
 
public class ProducerConsumer
{
    public const string ActivitySourceName = "ProducerConsumer";
 
    private readonly Channel<Data> _channel = Channel.CreateUnbounded<Data>();
 
    private static readonly ActivitySource s_activitySource = new(ActivitySourceName);
 
    public async Task ProduceAndConsumeAsync(int count)
    {
        var consumerTask = Task.Run(async () =>
        {
            using var appActivity = s_activitySource.StartActivity("ConsumerApp", ActivityKind.Internal);
 
            await foreach (var item in _channel.Reader.ReadAllAsync())
            {
                var links = new List<ActivityLink>();
                if (item.Producer != null)
                {
                    links.Add(new ActivityLink(new ActivityContext(item.Producer.TraceId, item.Producer.SpanId, ActivityTraceFlags.None)));
                }
 
                using var activity = s_activitySource.StartActivity($"Consume {item.Id}", ActivityKind.Consumer, parentId: null, links: links);
 
                await Task.Delay(Random.Shared.Next(10, 50));
            }
        });
 
        using var appActivity = s_activitySource.StartActivity("ProducerApp", ActivityKind.Internal);
 
        for (var i = 0; i < count; i++)
        {
            var id = i + 1;
 
            Data data;
            using (var activity = s_activitySource.StartActivity($"Produce {id}", ActivityKind.Producer))
            {
                await Task.Delay(Random.Shared.Next(10, 50));
 
                data = new Data
                {
                    Id = id,
                    Producer = activity,
                    Message = $"Message {id}"
                };
            }
 
            await _channel.Writer.WriteAsync(data);
        }
        _channel.Writer.Complete();
 
        await consumerTask;
    }
}