File: Internal\RedisSubscriptionManager.cs
Web Access
Project: src\src\SignalR\server\StackExchangeRedis\src\Microsoft.AspNetCore.SignalR.StackExchangeRedis.csproj (Microsoft.AspNetCore.SignalR.StackExchangeRedis)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
 
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal;
 
internal sealed class RedisSubscriptionManager
{
    private readonly ConcurrentDictionary<string, HubConnectionStore> _subscriptions = new ConcurrentDictionary<string, HubConnectionStore>(StringComparer.Ordinal);
    private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
 
    public async Task AddSubscriptionAsync(string id, HubConnectionContext connection, Func<string, HubConnectionStore, Task> subscribeMethod)
    {
        await _lock.WaitAsync();
 
        try
        {
            // Avoid adding subscription if connection is closing/closed
            // We're in a lock and ConnectionAborted is triggered before OnDisconnectedAsync is called so this is guaranteed to be safe when adding while connection is closing and removing items
            if (connection.ConnectionAborted.IsCancellationRequested)
            {
                return;
            }
 
            var subscription = _subscriptions.GetOrAdd(id, _ => new HubConnectionStore());
 
            subscription.Add(connection);
 
            // Subscribe once
            if (subscription.Count == 1)
            {
                await subscribeMethod(id, subscription);
            }
        }
        finally
        {
            _lock.Release();
        }
    }
 
    public async Task RemoveSubscriptionAsync(string id, HubConnectionContext connection, object state, Func<object, string, Task> unsubscribeMethod)
    {
        await _lock.WaitAsync();
 
        try
        {
            if (!_subscriptions.TryGetValue(id, out var subscription))
            {
                return;
            }
 
            subscription.Remove(connection);
 
            if (subscription.Count == 0)
            {
                _subscriptions.TryRemove(id, out _);
                await unsubscribeMethod(state, id);
            }
        }
        finally
        {
            _lock.Release();
        }
    }
}