File: TestConnectionMultiplexer.cs
Web Access
Project: src\src\SignalR\server\StackExchangeRedis\test\Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests.csproj (Microsoft.AspNetCore.SignalR.StackExchangeRedis.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Net;
using System.Reflection;
using StackExchange.Redis;
using StackExchange.Redis.Maintenance;
using StackExchange.Redis.Profiling;
namespace Microsoft.AspNetCore.SignalR.Tests;
public class TestConnectionMultiplexer : IConnectionMultiplexer
    public string ClientName => throw new NotImplementedException();
    public string Configuration => throw new NotImplementedException();
    public int TimeoutMilliseconds => throw new NotImplementedException();
    public long OperationCount => throw new NotImplementedException();
    public bool PreserveAsyncOrder { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
    public bool IsConnected => true;
    public bool IncludeDetailInExceptions { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
    public int StormLogThreshold { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
    public bool IsConnecting => throw new NotImplementedException();
    public event EventHandler<RedisErrorEventArgs> ErrorMessage
        add { }
        remove { }
    public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed
        add { }
        remove { }
    public event EventHandler<InternalErrorEventArgs> InternalError
        add { }
        remove { }
    public event EventHandler<ConnectionFailedEventArgs> ConnectionRestored
        add { }
        remove { }
    public event EventHandler<EndPointEventArgs> ConfigurationChanged
        add { }
        remove { }
    public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast
        add { }
        remove { }
    public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved
        add { }
        remove { }
    private readonly TestRedisServer _server;
    public event EventHandler<ServerMaintenanceEvent> ServerMaintenanceEvent
        add { }
        remove { }
    public TestConnectionMultiplexer(TestRedisServer server)
        _server = server;
    public void BeginProfiling(object forContext)
        throw new NotImplementedException();
    public void Close(bool allowCommandsToComplete = true)
        throw new NotImplementedException();
    public Task CloseAsync(bool allowCommandsToComplete = true)
        throw new NotImplementedException();
    public bool Configure(TextWriter log = null)
        throw new NotImplementedException();
    public Task<bool> ConfigureAsync(TextWriter log = null)
        throw new NotImplementedException();
    public void Dispose()
        throw new NotImplementedException();
    public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true)
        throw new NotImplementedException();
    public ServerCounters GetCounters()
        throw new NotImplementedException();
    public IDatabase GetDatabase(int db = -1, object asyncState = null)
        throw new NotImplementedException();
    public EndPoint[] GetEndPoints(bool configuredOnly = false)
        throw new NotImplementedException();
    public IServer GetServer(string host, int port, object asyncState = null)
        throw new NotImplementedException();
    public IServer GetServer(string hostAndPort, object asyncState = null)
        throw new NotImplementedException();
    public IServer GetServer(IPAddress host, int port)
        throw new NotImplementedException();
    public IServer GetServer(EndPoint endpoint, object asyncState = null)
        throw new NotImplementedException();
    public string GetStatus()
        throw new NotImplementedException();
    public void GetStatus(TextWriter log)
        throw new NotImplementedException();
    public string GetStormLog()
        throw new NotImplementedException();
    public ISubscriber GetSubscriber(object asyncState = null)
        return new TestSubscriber(_server);
    public int HashSlot(RedisKey key)
        throw new NotImplementedException();
    public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public void ResetStormLog()
        throw new NotImplementedException();
    public void Wait(Task task)
        throw new NotImplementedException();
    public T Wait<T>(Task<T> task)
        throw new NotImplementedException();
    public void WaitAll(params Task[] tasks)
        throw new NotImplementedException();
    public void RegisterProfiler(Func<ProfilingSession> profilingSessionProvider)
        throw new NotImplementedException();
    public int GetHashSlot(RedisKey key)
        throw new NotImplementedException();
    public void ExportConfiguration(Stream destination, ExportOptions options = (ExportOptions)(-1))
        throw new NotImplementedException();
    public IServer[] GetServers()
        throw new NotImplementedException();
    public ValueTask DisposeAsync() => default;
    public void AddLibraryNameSuffix(string suffix) { } // don't need to implement
public class TestRedisServer
    private readonly ConcurrentDictionary<RedisChannel, List<(int, Action<RedisChannel, RedisValue>)>> _subscriptions =
        new ConcurrentDictionary<RedisChannel, List<(int, Action<RedisChannel, RedisValue>)>>();
    public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
        if (_subscriptions.TryGetValue(channel, out var handlers))
            lock (handlers)
                foreach (var (_, handler) in handlers)
                    handler(channel, message);
        return handlers != null ? handlers.Count : 0;
    public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, CommandFlags flags = CommandFlags.None)
        Action<RedisChannel, RedisValue> handler = (channel, value) =>
            // Workaround for
            // ChannelMessageQueue isn't mockable currently, this works around that by using private reflection
            typeof(ChannelMessageQueue).GetMethod("Write", BindingFlags.NonPublic | BindingFlags.Instance)
            .Invoke(messageQueue, new object[] { channel, value });
        _subscriptions.AddOrUpdate(messageQueue.Channel, _ => new List<(int, Action<RedisChannel, RedisValue>)> { (subscriberId, handler) }, (_, list) =>
            lock (list)
                list.Add((subscriberId, handler));
            return list;
    public void Unsubscribe(RedisChannel channel, int subscriberId, CommandFlags flags = CommandFlags.None)
        if (_subscriptions.TryGetValue(channel, out var list))
            lock (list)
                list.RemoveAll((item) => item.Item1 == subscriberId);
    internal static void AssertRedisChannel(RedisChannel channel)
public class TestSubscriber : ISubscriber
    private static int StaticId;
    private readonly int _id;
    private readonly TestRedisServer _server;
    public ConnectionMultiplexer Multiplexer => throw new NotImplementedException();
    IConnectionMultiplexer IRedisAsync.Multiplexer => throw new NotImplementedException();
    public TestSubscriber(TestRedisServer server)
        _server = server;
        _id = Interlocked.Increment(ref StaticId);
    public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public bool IsConnected(RedisChannel channel = default)
        throw new NotImplementedException();
    public TimeSpan Ping(CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
        return _server.Publish(channel, message, flags);
    public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
        await Task.Yield();
        return Publish(channel, message, flags);
    public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
        Subscribe(channel, handler, flags);
        return Task.CompletedTask;
    public EndPoint SubscribedEndpoint(RedisChannel channel)
        throw new NotImplementedException();
    public bool TryWait(Task task)
        throw new NotImplementedException();
    public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
        _server.Unsubscribe(channel, _id, flags);
    public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
        throw new NotImplementedException();
    public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
        Unsubscribe(channel, handler, flags);
        return Task.CompletedTask;
    public void Wait(Task task)
        throw new NotImplementedException();
    public T Wait<T>(Task<T> task)
        throw new NotImplementedException();
    public void WaitAll(params Task[] tasks)
        throw new NotImplementedException();
    public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None)
        // Workaround for
        var redisSubscriberType = typeof(RedisChannel).Assembly.GetType("StackExchange.Redis.RedisSubscriber");
        var ctor = typeof(ChannelMessageQueue).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic,
            binder: null,
            new Type[] { typeof(RedisChannel).MakeByRefType(), redisSubscriberType }, modifiers: null);
        var queue = (ChannelMessageQueue)ctor.Invoke(new object[] { channel, null });
        _server.Subscribe(queue, _id);
        return queue;
    public Task<ChannelMessageQueue> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
        var t = Subscribe(channel, flags);
        return Task.FromResult(t);