File: Eventing\DistributedApplicationEventing.cs
Web Access
Project: src\src\Aspire.Hosting\Aspire.Hosting.csproj (Aspire.Hosting)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using Aspire.Hosting.ApplicationModel;
 
namespace Aspire.Hosting.Eventing;
 
/// <inheritdoc cref="IDistributedApplicationEventing" />
public class DistributedApplicationEventing : IDistributedApplicationEventing
{
    private readonly ConcurrentDictionary<Type, List<DistributedApplicationEventSubscription>> _eventSubscriptionListLookup = new();
    private readonly ConcurrentDictionary<DistributedApplicationEventSubscription, Type> _subscriptionEventTypeLookup = new();
 
    /// <inheritdoc cref="IDistributedApplicationEventing.PublishAsync{T}(T, CancellationToken)" />
    [System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters", Justification = "Cancellation token")]
    public Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IDistributedApplicationEvent
    {
        return PublishAsync(@event, EventDispatchBehavior.BlockingSequential, cancellationToken);
    }
 
    /// <inheritdoc cref="IDistributedApplicationEventing.PublishAsync{T}(T, CancellationToken)" />
    [System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters", Justification = "Cancellation token")]
    public async Task PublishAsync<T>(T @event, EventDispatchBehavior dispatchBehavior, CancellationToken cancellationToken = default) where T : IDistributedApplicationEvent
    {
        if (_eventSubscriptionListLookup.TryGetValue(typeof(T), out var subscriptions))
        {
            if (dispatchBehavior == EventDispatchBehavior.BlockingConcurrent || dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent)
            {
                var pendingSubscriptionCallbacks = new List<Task>(subscriptions.Count);
                foreach (var subscription in subscriptions.ToArray())
                {
                    var pendingSubscriptionCallback = subscription.Callback(@event, cancellationToken);
                    pendingSubscriptionCallbacks.Add(pendingSubscriptionCallback);
                }
 
                if (dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent)
                {
                    // Non-blocking concurrent.
                    _ = Task.Run(async () =>
                    {
                        await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false);
                    }, default);
                }
                else
                {
                    // Blocking concurrent.
                    await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false);
                }
            }
            else
            {
                if (dispatchBehavior == EventDispatchBehavior.NonBlockingSequential)
                {
                    // Non-blocking sequential.
                    _ = Task.Run(async () =>
                    {
                        foreach (var subscription in subscriptions.ToArray())
                        {
                            await subscription.Callback(@event, cancellationToken).ConfigureAwait(false);
                        }
                    }, default);
                }
                else
                {
                    // Blocking sequential.
                    foreach (var subscription in subscriptions.ToArray())
                    {
                        await subscription.Callback(@event, cancellationToken).ConfigureAwait(false);
                    }
                }
            }
        }
    }
 
    /// <inheritdoc cref="IDistributedApplicationEventing.Subscribe{T}(Func{T, CancellationToken, Task})" />
    public DistributedApplicationEventSubscription Subscribe<T>(Func<T, CancellationToken, Task> callback) where T : IDistributedApplicationEvent
    {
        var subscription = new DistributedApplicationEventSubscription(async (@event, ct) =>
        {
            var typedEvent = (T)@event;
            await callback(typedEvent, ct).ConfigureAwait(false);
        });
 
        if (_eventSubscriptionListLookup.TryGetValue(typeof(T), out var subscriptions))
        {
            subscriptions.Add(subscription);
        }
        else
        {
            if (!_eventSubscriptionListLookup.TryAdd(typeof(T), new List<DistributedApplicationEventSubscription> { subscription }))
            {
                // This code only executes if we try get the subscription list and it fails, and then it is subsequently
                // added by another thread. In this case we just add our subscription. We don't invert this logic because
                // we don't want to allocate a list each time someone wants to subscribe to an event.
                _eventSubscriptionListLookup[typeof(T)].Add(subscription);
            }
        }
 
        _subscriptionEventTypeLookup[subscription] = typeof(T);
 
        return subscription;
    }
 
    /// <inheritdoc cref="IDistributedApplicationEventing.Subscribe{T}(Func{T, CancellationToken, Task})" />
    public DistributedApplicationEventSubscription Subscribe<T>(IResource resource, Func<T, CancellationToken, Task> callback) where T : IDistributedApplicationResourceEvent
    {
        var resourceFilteredCallback = async (T @event, CancellationToken cancellationToken) =>
        {
            if (@event.Resource == resource)
            {
                await callback(@event, cancellationToken).ConfigureAwait(false);
            }
        };
 
        return Subscribe(resourceFilteredCallback);
    }
 
    /// <inheritdoc cref="IDistributedApplicationEventing.Unsubscribe(DistributedApplicationEventSubscription)" />
    public void Unsubscribe(DistributedApplicationEventSubscription subscription)
    {
        if (_subscriptionEventTypeLookup.TryGetValue(subscription, out var eventType))
        {
            if (_eventSubscriptionListLookup.TryGetValue(eventType, out var subscriptions))
            {
                subscriptions.Remove(subscription);
                _subscriptionEventTypeLookup.Remove(subscription, out _);
            }
        }
    }
}