File: TestConnectionMultiplexer.cs
Web Access
Project: src\src\SignalR\server\StackExchangeRedis\test\Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests.csproj (Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using System.Net;
using System.Reflection;
using StackExchange.Redis;
using StackExchange.Redis.Maintenance;
using StackExchange.Redis.Profiling;
 
namespace Microsoft.AspNetCore.SignalR.Tests;
 
public class TestConnectionMultiplexer : IConnectionMultiplexer
{
    public string ClientName => throw new NotImplementedException();
 
    public string Configuration => throw new NotImplementedException();
 
    public int TimeoutMilliseconds => throw new NotImplementedException();
 
    public long OperationCount => throw new NotImplementedException();
 
    public bool PreserveAsyncOrder { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
 
    public bool IsConnected => true;
 
    public bool IncludeDetailInExceptions { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
    public int StormLogThreshold { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
 
    public bool IsConnecting => throw new NotImplementedException();
 
    public event EventHandler<RedisErrorEventArgs> ErrorMessage
    {
        add { }
        remove { }
    }
 
    public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed
    {
        add { }
        remove { }
    }
 
    public event EventHandler<InternalErrorEventArgs> InternalError
    {
        add { }
        remove { }
    }
 
    public event EventHandler<ConnectionFailedEventArgs> ConnectionRestored
    {
        add { }
        remove { }
    }
 
    public event EventHandler<EndPointEventArgs> ConfigurationChanged
    {
        add { }
        remove { }
    }
 
    public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast
    {
        add { }
        remove { }
    }
 
    public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved
    {
        add { }
        remove { }
    }
 
    private readonly TestRedisServer _server;
 
    public event EventHandler<ServerMaintenanceEvent> ServerMaintenanceEvent
    {
        add { }
        remove { }
    }
 
    public TestConnectionMultiplexer(TestRedisServer server)
    {
        _server = server;
    }
 
    public void BeginProfiling(object forContext)
    {
        throw new NotImplementedException();
    }
 
    public void Close(bool allowCommandsToComplete = true)
    {
        throw new NotImplementedException();
    }
 
    public Task CloseAsync(bool allowCommandsToComplete = true)
    {
        throw new NotImplementedException();
    }
 
    public bool Configure(TextWriter log = null)
    {
        throw new NotImplementedException();
    }
 
    public Task<bool> ConfigureAsync(TextWriter log = null)
    {
        throw new NotImplementedException();
    }
 
    public void Dispose()
    {
        throw new NotImplementedException();
    }
 
    public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
    {
        throw new NotImplementedException();
    }
 
    public ServerCounters GetCounters()
    {
        throw new NotImplementedException();
    }
 
    public IDatabase GetDatabase(int db = -1, object asyncState = null)
    {
        throw new NotImplementedException();
    }
 
    public EndPoint[] GetEndPoints(bool configuredOnly = false)
    {
        throw new NotImplementedException();
    }
 
    public IServer GetServer(string host, int port, object asyncState = null)
    {
        throw new NotImplementedException();
    }
 
    public IServer GetServer(string hostAndPort, object asyncState = null)
    {
        throw new NotImplementedException();
    }
 
    public IServer GetServer(IPAddress host, int port)
    {
        throw new NotImplementedException();
    }
 
    public IServer GetServer(EndPoint endpoint, object asyncState = null)
    {
        throw new NotImplementedException();
    }
 
    public string GetStatus()
    {
        throw new NotImplementedException();
    }
 
    public void GetStatus(TextWriter log)
    {
        throw new NotImplementedException();
    }
 
    public string GetStormLog()
    {
        throw new NotImplementedException();
    }
 
    public ISubscriber GetSubscriber(object asyncState = null)
    {
        return new TestSubscriber(_server);
    }
 
    public int HashSlot(RedisKey key)
    {
        throw new NotImplementedException();
    }
 
    public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public void ResetStormLog()
    {
        throw new NotImplementedException();
    }
 
    public void Wait(Task task)
    {
        throw new NotImplementedException();
    }
 
    public T Wait<T>(Task<T> task)
    {
        throw new NotImplementedException();
    }
 
    public void WaitAll(params Task[] tasks)
    {
        throw new NotImplementedException();
    }
 
    public void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider)
    {
        throw new NotImplementedException();
    }
 
    public int GetHashSlot(RedisKey key)
    {
        throw new NotImplementedException();
    }
 
    public void ExportConfiguration(Stream destination, ExportOptions options = (ExportOptions)(-1))
    {
        throw new NotImplementedException();
    }
 
    public IServer[] GetServers()
    {
        throw new NotImplementedException();
    }
 
    public ValueTask DisposeAsync() => default;
 
    public void AddLibraryNameSuffix(string suffix) { } // don't need to implement
}
 
public class TestRedisServer
{
    private readonly ConcurrentDictionary<RedisChannel, List<(int, Action<RedisChannel, RedisValue>)>> _subscriptions =
        new ConcurrentDictionary<RedisChannel, List<(int, Action<RedisChannel, RedisValue>)>>();
 
    public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
    {
        AssertRedisChannel(channel);
 
        if (_subscriptions.TryGetValue(channel, out var handlers))
        {
            lock (handlers)
            {
                foreach (var (_, handler) in handlers)
                {
                    handler(channel, message);
                }
            }
        }
 
        return handlers != null ? handlers.Count : 0;
    }
 
    public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, CommandFlags flags = CommandFlags.None)
    {
        AssertRedisChannel(messageQueue.Channel);
 
        Action<RedisChannel, RedisValue> handler = (channel, value) =>
        {
            // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
            // ChannelMessageQueue isn't mockable currently, this works around that by using private reflection
            typeof(ChannelMessageQueue).GetMethod("Write", BindingFlags.NonPublic | BindingFlags.Instance)
            .Invoke(messageQueue, new object[] { channel, value });
        };
 
        _subscriptions.AddOrUpdate(messageQueue.Channel, _ => new List<(int, Action<RedisChannel, RedisValue>)> { (subscriberId, handler) }, (_, list) =>
        {
            lock (list)
            {
                list.Add((subscriberId, handler));
            }
            return list;
        });
    }
 
    public void Unsubscribe(RedisChannel channel, int subscriberId, CommandFlags flags = CommandFlags.None)
    {
        AssertRedisChannel(channel);
 
        if (_subscriptions.TryGetValue(channel, out var list))
        {
            lock (list)
            {
                list.RemoveAll((item) => item.Item1 == subscriberId);
            }
        }
    }
 
    internal static void AssertRedisChannel(RedisChannel channel)
    {
        Assert.False(channel.IsPattern);
    }
}
 
public class TestSubscriber : ISubscriber
{
    private static int StaticId;
 
    private readonly int _id;
    private readonly TestRedisServer _server;
    public ConnectionMultiplexer Multiplexer => throw new NotImplementedException();
 
    IConnectionMultiplexer IRedisAsync.Multiplexer => throw new NotImplementedException();
 
    public TestSubscriber(TestRedisServer server)
    {
        _server = server;
        _id = Interlocked.Increment(ref StaticId);
    }
 
    public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public bool IsConnected(RedisChannel channel = default)
    {
        throw new NotImplementedException();
    }
 
    public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
    {
        TestRedisServer.AssertRedisChannel(channel);
 
        return _server.Publish(channel, message, flags);
    }
 
    public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
    {
        TestRedisServer.AssertRedisChannel(channel);
 
        await Task.Yield();
        return Publish(channel, message, flags);
    }
 
    public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
    {
        TestRedisServer.AssertRedisChannel(channel);
 
        Subscribe(channel, handler, flags);
        return Task.CompletedTask;
    }
 
    public EndPoint SubscribedEndpoint(RedisChannel channel)
    {
        throw new NotImplementedException();
    }
 
    public bool TryWait(Task task)
    {
        throw new NotImplementedException();
    }
 
    public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
    {
        TestRedisServer.AssertRedisChannel(channel);
 
        _server.Unsubscribe(channel, _id, flags);
    }
 
    public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
    {
        throw new NotImplementedException();
    }
 
    public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
    {
        TestRedisServer.AssertRedisChannel(channel);
 
        Unsubscribe(channel, handler, flags);
        return Task.CompletedTask;
    }
 
    public void Wait(Task task)
    {
        throw new NotImplementedException();
    }
 
    public T Wait<T>(Task<T> task)
    {
        throw new NotImplementedException();
    }
 
    public void WaitAll(params Task[] tasks)
    {
        throw new NotImplementedException();
    }
 
    public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None)
    {
        TestRedisServer.AssertRedisChannel(channel);
 
        // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
        var redisSubscriberType = typeof(RedisChannel).Assembly.GetType("StackExchange.Redis.RedisSubscriber");
        var ctor = typeof(ChannelMessageQueue).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic,
            binder: null,
            new Type[] { typeof(RedisChannel).MakeByRefType(), redisSubscriberType }, modifiers: null);
 
        var queue = (ChannelMessageQueue)ctor.Invoke(new object[] { channel, null });
        _server.Subscribe(queue, _id);
        return queue;
    }
 
    public Task<ChannelMessageQueue> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
    {
        TestRedisServer.AssertRedisChannel(channel);
 
        var t = Subscribe(channel, flags);
        return Task.FromResult(t);
    }
}