File: Internal\AckHandler.cs
Web Access
Project: src\src\SignalR\server\StackExchangeRedis\src\Microsoft.AspNetCore.SignalR.StackExchangeRedis.csproj (Microsoft.AspNetCore.SignalR.StackExchangeRedis)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using Microsoft.Extensions.Internal;
 
namespace Microsoft.AspNetCore.SignalR.StackExchangeRedis.Internal;
 
internal sealed class AckHandler : IDisposable
{
    private readonly ConcurrentDictionary<int, AckInfo> _acks = new ConcurrentDictionary<int, AckInfo>();
    private readonly Timer _timer;
    private readonly long _ackThreshold = (long)TimeSpan.FromSeconds(30).TotalMilliseconds;
    private readonly TimeSpan _ackInterval = TimeSpan.FromSeconds(5);
    private readonly object _lock = new object();
    private bool _disposed;
 
    public AckHandler()
    {
        _timer = NonCapturingTimer.Create(state => ((AckHandler)state!).CheckAcks(), state: this, dueTime: _ackInterval, period: _ackInterval);
    }
 
    public Task CreateAck(int id)
    {
        lock (_lock)
        {
            if (_disposed)
            {
                return Task.CompletedTask;
            }
 
            return _acks.GetOrAdd(id, _ => new AckInfo()).Tcs.Task;
        }
    }
 
    public void TriggerAck(int id)
    {
        if (_acks.TryRemove(id, out var ack))
        {
            ack.Tcs.TrySetResult();
        }
    }
 
    private void CheckAcks()
    {
        if (_disposed)
        {
            return;
        }
 
        var currentTick = Environment.TickCount64;
 
        foreach (var pair in _acks)
        {
            var elapsed = currentTick - pair.Value.CreatedTick;
            if (elapsed > _ackThreshold)
            {
                if (_acks.TryRemove(pair.Key, out var ack))
                {
                    ack.Tcs.TrySetCanceled();
                }
            }
        }
    }
 
    public void Dispose()
    {
        lock (_lock)
        {
            _disposed = true;
 
            _timer.Dispose();
 
            foreach (var pair in _acks)
            {
                if (_acks.TryRemove(pair.Key, out var ack))
                {
                    ack.Tcs.TrySetCanceled();
                }
            }
        }
    }
 
    private sealed class AckInfo
    {
        public TaskCompletionSource Tcs { get; private set; }
        public long CreatedTick { get; private set; }
 
        public AckInfo()
        {
            CreatedTick = Environment.TickCount64;
            Tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
        }
    }
}