File: Internal\DefaultHybridCache.L2.cs
Web Access
Project: src\src\Libraries\Microsoft.Extensions.Caching.Hybrid\Microsoft.Extensions.Caching.Hybrid.csproj (Microsoft.Extensions.Caching.Hybrid)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Caching.Memory;
 
namespace Microsoft.Extensions.Caching.Hybrid.Internal;
 
internal partial class DefaultHybridCache
{
    private const int MaxCacheDays = 1000;
    private const string TagKeyPrefix = "__MSFT_HCT__";
    private static readonly DistributedCacheEntryOptions _tagInvalidationEntryOptions = new() { AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(MaxCacheDays) };
 
    [SuppressMessage("Performance", "CA1849:Call async methods when in an async method", Justification = "Manual sync check")]
    [SuppressMessage("Usage", "VSTHRD003:Avoid awaiting foreign Tasks", Justification = "Manual sync check")]
    [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Explicit async exception handling")]
    [SuppressMessage("Reliability", "CA2000:Dispose objects before losing scope", Justification = "Deliberate recycle only on success")]
    internal ValueTask<BufferChunk> GetFromL2DirectAsync(string key, CancellationToken token)
    {
        switch (GetFeatures(CacheFeatures.BackendCache | CacheFeatures.BackendBuffers))
        {
            case CacheFeatures.BackendCache: // legacy byte[]-based
 
                var pendingLegacy = _backendCache!.GetAsync(key, token);
 
#if NETCOREAPP2_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
                if (!pendingLegacy.IsCompletedSuccessfully)
#else
                if (pendingLegacy.Status != TaskStatus.RanToCompletion)
#endif
                {
                    return new(AwaitedLegacyAsync(pendingLegacy, this));
                }
 
                return new(GetValidPayloadSegment(pendingLegacy.Result)); // already complete
 
            case CacheFeatures.BackendCache | CacheFeatures.BackendBuffers: // IBufferWriter<byte>-based
                RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(MaximumPayloadBytes);
                var cache = Unsafe.As<IBufferDistributedCache>(_backendCache!); // type-checked already
 
                var pendingBuffers = cache.TryGetAsync(key, writer, token);
                if (!pendingBuffers.IsCompletedSuccessfully)
                {
                    return new(AwaitedBuffersAsync(pendingBuffers, writer));
                }
 
                BufferChunk result = pendingBuffers.GetAwaiter().GetResult()
                    ? new(writer.DetachCommitted(out var length), 0, length, returnToPool: true)
                    : default;
                writer.Dispose(); // it is not accidental that this isn't "using"; avoid recycling if not 100% sure what happened
                return new(result);
        }
 
        return default; // treat as a "miss"
 
        static async Task<BufferChunk> AwaitedLegacyAsync(Task<byte[]?> pending, DefaultHybridCache @this)
        {
            var bytes = await pending.ConfigureAwait(false);
            return @this.GetValidPayloadSegment(bytes);
        }
 
        static async Task<BufferChunk> AwaitedBuffersAsync(ValueTask<bool> pending, RecyclableArrayBufferWriter<byte> writer)
        {
            BufferChunk result = await pending.ConfigureAwait(false)
                    ? new(writer.DetachCommitted(out var length), 0, length, returnToPool: true)
                    : default;
            writer.Dispose(); // it is not accidental that this isn't "using"; avoid recycling if not 100% sure what happened
            return result;
        }
    }
 
    internal ValueTask SetL2Async(string key, CacheItem cacheItem, in BufferChunk buffer, HybridCacheEntryOptions? options, CancellationToken token)
        => HasBackendCache ? WritePayloadAsync(key, cacheItem, buffer, options, token) : default;
 
    internal ValueTask SetDirectL2Async(string key, in BufferChunk buffer, DistributedCacheEntryOptions options, CancellationToken token)
    {
        Debug.Assert(buffer.OversizedArray is not null, "array should be non-null");
        switch (GetFeatures(CacheFeatures.BackendCache | CacheFeatures.BackendBuffers))
        {
            case CacheFeatures.BackendCache: // legacy byte[]-based
                var arr = buffer.OversizedArray!;
                if (buffer.Offset != 0 || arr.Length != buffer.Length)
                {
                    // we'll need a right-sized snapshot
                    arr = buffer.ToArray();
                }
 
                return new(_backendCache!.SetAsync(key, arr, options, token));
            case CacheFeatures.BackendCache | CacheFeatures.BackendBuffers: // ReadOnlySequence<byte>-based
                var cache = Unsafe.As<IBufferDistributedCache>(_backendCache!); // type-checked already
                return cache.SetAsync(key, buffer.AsSequence(), options, token);
        }
 
        return default;
    }
 
    [SuppressMessage("Performance", "CA1849:Call async methods when in an async method", Justification = "Manual async core implementation")]
    internal ValueTask InvalidateL2TagAsync(string tag, long timestamp, CancellationToken token)
    {
        if (!HasBackendCache)
        {
            return default; // no L2
        }
 
        byte[] oversized = ArrayPool<byte>.Shared.Rent(sizeof(long));
        BinaryPrimitives.WriteInt64LittleEndian(oversized, timestamp);
        var pending = SetDirectL2Async(TagKeyPrefix + tag, new BufferChunk(oversized, 0, sizeof(long), false), _tagInvalidationEntryOptions, token);
 
        if (pending.IsCompletedSuccessfully)
        {
            pending.GetAwaiter().GetResult(); // ensure observed (IVTS etc)
            ArrayPool<byte>.Shared.Return(oversized);
            return default;
        }
        else
        {
            return AwaitedAsync(pending, oversized);
        }
 
        static async ValueTask AwaitedAsync(ValueTask pending, byte[] oversized)
        {
            await pending.ConfigureAwait(false);
            ArrayPool<byte>.Shared.Return(oversized);
        }
    }
 
    [SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Cancellation handled internally")]
    [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "All failure is critical")]
    internal async Task<long> SafeReadTagInvalidationAsync(string tag)
    {
        Debug.Assert(HasBackendCache, "shouldn't be here without L2");
 
        const int READ_TIMEOUT = 4000;
 
        try
        {
            using var cts = new CancellationTokenSource(millisecondsDelay: READ_TIMEOUT);
            var buffer = await GetFromL2DirectAsync(TagKeyPrefix + tag, cts.Token).ConfigureAwait(false);
 
            long timestamp;
            if (buffer.OversizedArray is not null)
            {
                if (buffer.Length == sizeof(long))
                {
                    timestamp = BinaryPrimitives.ReadInt64LittleEndian(buffer.AsSpan());
                }
                else
                {
                    // not what we expected! assume invalid
                    timestamp = CurrentTimestamp();
                }
 
                buffer.RecycleIfAppropriate();
            }
            else
            {
                timestamp = 0; // never invalidated
            }
 
            buffer.RecycleIfAppropriate();
            return timestamp;
        }
        catch (Exception ex)
        {
            // ^^^ this catch is the "Safe" in "SafeReadTagInvalidationAsync"
            Debug.WriteLine(ex.Message);
 
            // if anything goes wrong reading tag invalidations; we have to assume the tag is invalid
            return CurrentTimestamp();
        }
    }
 
    internal void SetL1<T>(string key, CacheItem<T> value, HybridCacheEntryOptions? options)
    {
        // incr ref-count for the the cache itself; this *may* be released via the NeedsEvictionCallback path
        if (value.TryReserve())
        {
            // based on CacheExtensions.Set<TItem>, but with post-eviction recycling
 
            // intentionally use manual Dispose rather than "using"; confusingly, it is Dispose()
            // that actually commits the add - so: if we fault, we don't want to try
            // committing a partially configured cache entry
            ICacheEntry cacheEntry = _localCache.CreateEntry(key);
            cacheEntry.AbsoluteExpirationRelativeToNow = GetL1AbsoluteExpirationRelativeToNow(options);
            cacheEntry.Value = value;
 
            if (value.TryGetSize(out var size))
            {
                cacheEntry = cacheEntry.SetSize(size);
            }
 
            if (value.NeedsEvictionCallback)
            {
                cacheEntry = cacheEntry.RegisterPostEvictionCallback(CacheItem.SharedOnEviction);
            }
 
            // commit
            cacheEntry.Dispose();
 
            if (HybridCacheEventSource.Log.IsEnabled())
            {
                HybridCacheEventSource.Log.LocalCacheWrite();
            }
        }
    }
 
    private async ValueTask WritePayloadAsync(string key, CacheItem cacheItem, BufferChunk payload, HybridCacheEntryOptions? options, CancellationToken token)
    {
        // bundle a serialized payload inside the wrapper used at the DC layer
        var maxLength = HybridCachePayload.GetMaxBytes(key, cacheItem.Tags, payload.Length);
        var oversized = ArrayPool<byte>.Shared.Rent(maxLength);
 
        var length = HybridCachePayload.Write(oversized, key, cacheItem.CreationTimestamp, GetL2AbsoluteExpirationRelativeToNow(options),
            HybridCachePayload.PayloadFlags.None, cacheItem.Tags, payload.AsSequence());
 
        await SetDirectL2Async(key, new(oversized, 0, length, true), GetL2DistributedCacheOptions(options), token).ConfigureAwait(false);
 
        ArrayPool<byte>.Shared.Return(oversized);
    }
 
    private BufferChunk GetValidPayloadSegment(byte[]? payload)
    {
        if (payload is not null)
        {
            if (payload.Length > MaximumPayloadBytes)
            {
                ThrowPayloadLengthExceeded(payload.Length);
            }
 
            return new(payload);
        }
 
        return default;
    }
 
    [DoesNotReturn]
    [MethodImpl(MethodImplOptions.NoInlining)]
    private void ThrowPayloadLengthExceeded(int size) // splitting the exception bits out to a different method
    {
        // also add via logger when possible
        throw new InvalidOperationException($"Maximum cache length ({MaximumPayloadBytes} bytes) exceeded");
    }
 
#if NET8_0_OR_GREATER
    [SuppressMessage("Maintainability", "CA1508:Avoid dead conditional code", Justification = "False positive from unsafe accessor")]
#endif
    private DistributedCacheEntryOptions GetL2DistributedCacheOptions(HybridCacheEntryOptions? options)
    {
        DistributedCacheEntryOptions? result = null;
        if (options is not null)
        {
            var expiration = GetL2AbsoluteExpirationRelativeToNow(options);
            if (expiration != _defaultExpiration)
            {
                // ^^^ avoid creating unnecessary DC options objects if the expiration still matches the default
                result = ToDistributedCacheEntryOptions(options);
            }
        }
 
        return result ?? _defaultDistributedCacheExpiration;
 
#if NET8_0_OR_GREATER
        // internal method memoizes this allocation; since it is "init", it is immutable (outside reflection)
        [UnsafeAccessor(UnsafeAccessorKind.Method, Name = nameof(ToDistributedCacheEntryOptions))]
        extern static DistributedCacheEntryOptions? ToDistributedCacheEntryOptions(HybridCacheEntryOptions options);
#else
        // without that helper method, we'll just eat the alloc (down-level TFMs)
        static DistributedCacheEntryOptions ToDistributedCacheEntryOptions(HybridCacheEntryOptions options)
            => new() { AbsoluteExpirationRelativeToNow = options.Expiration };
#endif
    }
}