File: Internals\System\Runtime\IOThreadScheduler.cs
Web Access
Project: src\src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.csproj (System.ServiceModel.Primitives)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Runtime
{
    internal class IOThreadScheduler
    {
        // Do not increase the maximum capacity above 32k!  It must be a power of two, 0x8000 or less, in order to
        // work with the strategy for 'headTail'.
        private const int MaximumCapacity = 0x8000;
 
        private static class Bits
        {
            public const int HiShift = 32 / 2;
 
            public const int HiOne = 1 << HiShift;
            public const int LoHiBit = HiOne >> 1;
            public const int HiHiBit = LoHiBit << HiShift;
            public const int LoCountMask = LoHiBit - 1;
            public const int HiCountMask = LoCountMask << HiShift;
            public const int LoMask = LoCountMask | LoHiBit;
            public const int HiMask = HiCountMask | HiHiBit;
            public const int HiBits = LoHiBit | HiHiBit;
 
            public static int Count(int slot)
            {
                return ((slot >> HiShift) - slot + 2 & LoMask) - 1;
            }
 
            public static int CountNoIdle(int slot)
            {
                return (slot >> HiShift) - slot + 1 & LoMask;
            }
 
            public static int IncrementLo(int slot)
            {
                return slot + 1 & LoMask | slot & HiMask;
            }
 
            // This method is only valid if you already know that (gate & HiBits) != 0.
            public static bool IsComplete(int gate)
            {
                return (gate & HiMask) == gate << HiShift;
            }
        }
 
        private static IOThreadScheduler s_current = new IOThreadScheduler(32);
        private static readonly SynchronizationContext s_syncContext = new IOThreadSchedulerSynchronizationContext();
#pragma warning disable IDE1006 // Naming Styles
        private static TaskScheduler s_IOTaskScheduler;
#pragma warning restore IDE1006 // Naming Styles
        private readonly ScheduledOverlapped _overlapped;
        private readonly Slot[] _slots;
        private static readonly ThreadLocal<bool> s_isIoThread = new();
 
        // This field holds both the head (HiWord) and tail (LoWord) indicies into the slot array.  This limits each
        // value to 64k.  In order to be able to distinguish wrapping the slot array (allowed) from wrapping the
        // indicies relative to each other (not allowed), the size of the slot array is limited by an additional bit
        // to 32k.
        //
        // The HiWord (head) holds the index of the last slot to have been scheduled into.  The LoWord (tail) holds
        // the index of the next slot to be dispatched from.  When the queue is empty, the LoWord will be exactly
        // one slot ahead of the HiWord.  When the two are equal, the queue holds one item.
        //
        // When the tail is *two* slots ahead of the head (equivalent to a count of -1), that means the IOTS is
        // idle.  Hence, we start out headTail with a -2 (equivalent) in the head and zero in the tail.
        private int _headTail = -2 << Bits.HiShift;
 
        private IOThreadScheduler(int capacity)
        {
            Contract.Assert(capacity > 0, "Capacity must be positive.");
            Contract.Assert(capacity <= 0x8000, "Capacity cannot exceed 32k.");
 
            _slots = new Slot[capacity];
            Contract.Assert((_slots.Length & SlotMask) == 0, "Capacity must be a power of two.");
 
            _overlapped = new ScheduledOverlapped();
        }
 
        public static TaskScheduler IOTaskScheduler
        {
            get
            {
                if (s_IOTaskScheduler == null)
                {
                    SynchronizationContext savedCtx = SynchronizationContext.Current;
                    SynchronizationContext.SetSynchronizationContext(s_syncContext);
                    s_IOTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
                    SynchronizationContext.SetSynchronizationContext(savedCtx);
                }
 
                return s_IOTaskScheduler;
            }
        }
 
        public static void ScheduleCallbackNoFlow(Action<object> callback, object state)
        {
            if (callback == null)
            {
                throw Fx.Exception.ArgumentNull(nameof(callback));
            }
 
            bool queued = false;
            while (!queued)
            {
                try
                { }
                finally
                {
                    // Called in a finally because it needs to run uninterrupted in order to maintain consistency.
                    queued = s_current.ScheduleCallbackHelper(callback, state);
                }
            }
        }
 
        public static void ScheduleCallbackLowPriNoFlow(Action<object> callback, object state)
        {
            if (callback == null)
            {
                throw Fx.Exception.ArgumentNull(nameof(callback));
            }
 
            Task.Factory.StartNew(callback, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
        }
 
        // Returns true if successfully scheduled, false otherwise.
        private bool ScheduleCallbackHelper(Action<object> callback, object state)
        {
            // See if there's a free slot.  Fortunately the overflow bit is simply lost.
            int slot = Interlocked.Add(ref _headTail, Bits.HiOne);
 
            // If this brings us to 'empty', then the IOTS used to be 'idle'.  Remember that, and increment
            // again.  This doesn't need to be in a loop, because until we call Post(), we can't go back to idle.
            bool wasIdle = Bits.Count(slot) == 0;
            if (wasIdle)
            {
                slot = Interlocked.Add(ref _headTail, Bits.HiOne);
                Contract.Assert(Bits.Count(slot) != 0, "IOTS went idle when it shouldn't have.");
            }
 
            // Check if we wrapped *around* to idle.
            if (Bits.Count(slot) == -1)
            {
                // Since the capacity is limited to 32k, this means we wrapped the array at least twice.  That's bad
                // because headTail no longer knows how many work items we have - it looks like zero.  This can
                // only happen if 32k threads come through here while one is swapped out.
                throw Fx.AssertAndThrowFatal("Head/Tail overflow!");
            }
 
            bool queued = _slots[slot >> Bits.HiShift & SlotMask].TryEnqueueWorkItem(callback, state, out bool wrapped);
 
            if (wrapped)
            {
                // Wrapped around the circular buffer.  Create a new, bigger IOThreadScheduler.
                IOThreadScheduler next =
                    new IOThreadScheduler(Math.Min(_slots.Length * 2, MaximumCapacity));
                Interlocked.CompareExchange(ref s_current, next, this);
            }
 
            if (wasIdle)
            {
                // It's our responsibility to kick off the runner thread.
                _overlapped.Post(this);
            }
 
            return queued;
        }
 
        private void CompletionCallback(out Action<object> callback, out object state)
        {
            int slot = _headTail;
            while (true)
            {
                Contract.Assert(Bits.Count(slot) != -1, "CompletionCallback called on idle IOTS!");
 
                bool wasEmpty = Bits.Count(slot) == 0;
 
                if (slot == (slot = Interlocked.CompareExchange(ref _headTail, Bits.IncrementLo(slot), slot)))
                {
                    if (!wasEmpty)
                    {
                        _overlapped.Post(this);
                        _slots[slot & SlotMask].DequeueWorkItem(out callback, out state);
                        return;
                    }
 
                    break;
                }
            }
 
            callback = null;
            state = null;
        }
 
        private bool TryCoalesce(out Action<object> callback, out object state)
        {
            int slot = _headTail;
            while (true)
            {
                if (Bits.Count(slot) > 0)
                {
                    if (slot == (slot = Interlocked.CompareExchange(ref _headTail, Bits.IncrementLo(slot), slot)))
                    {
                        _slots[slot & SlotMask].DequeueWorkItem(out callback, out state);
                        return true;
                    }
                    continue;
                }
 
                break;
            }
 
            callback = null;
            state = null;
            return false;
        }
 
        private int SlotMask
        {
            get
            {
                return _slots.Length - 1;
            }
        }
 
        public static bool IsRunningOnIOThread => s_isIoThread.IsValueCreated && s_isIoThread.Value;
 
        ~IOThreadScheduler()
        {
            // If the AppDomain is shutting down, we may still have pending ops.  The AppDomain shutdown will clean
            // everything up.
            if (!Environment.HasShutdownStarted)
            {
#if DEBUG
                DebugVerifyHeadTail();
#endif
                Cleanup();
            }
        }
 
        private void Cleanup()
        {
            if (_overlapped != null)
            {
                _overlapped.Cleanup();
            }
        }
 
#if DEBUG
 
        private void DebugVerifyHeadTail()
        {
            if (_slots != null)
            {
                // The headTail value could technically be zero if the constructor was aborted early.  The
                // constructor wasn't aborted early if the slot array got created.
                Contract.Assert(Bits.Count(_headTail) == -1, "IOTS finalized while not idle.");
 
                for (int i = 0; i < _slots.Length; i++)
                {
                    _slots[i].DebugVerifyEmpty();
                }
            }
        }
 
#endif 
 
        // TryEnqueueWorkItem and DequeueWorkItem use the slot's 'gate' field for synchronization.  Because the
        // slot array is circular and there are no locks, we must assume that multiple threads can be entering each
        // method simultaneously.  If the first DequeueWorkItem occurs before the first TryEnqueueWorkItem, the
        // sequencing (and the enqueue) fails.
        //
        // The gate is a 32-bit int divided into four fields.  The bottom 15 bits (0x00007fff) are the count of
        // threads that have entered TryEnqueueWorkItem.  The first thread to enter is the one responsible for
        // filling the slot with work.  The 16th bit (0x00008000) is a flag indicating that the slot has been
        // successfully filled.  Only the first thread to enter TryEnqueueWorkItem can set this flag.  The
        // high-word (0x7fff0000) is the count of threads entering DequeueWorkItem.  The first thread to enter
        // is the one responsible for accepting (and eventually dispatching) the work in the slot.  The
        // high-bit (0x80000000) is a flag indicating that the slot has been successfully emptied.
        //
        // When the low-word and high-work counters are equal, and both bit flags have been set, the gate is considered
        // 'complete' and can be reset back to zero.  Any operation on the gate might bring it to this state.
        // It's the responsibility of the thread that brings the gate to a completed state to reset it to zero.
        // (It's possible that the gate will fall out of the completed state before it can be reset - that's ok,
        // the next time it becomes completed it can be reset.)
        //
        // It's unlikely either count will ever go higher than 2 or 3.
        //
        // The value of 'callback' has these properties:
        //   -  When the gate is zero, callback is null.
        //   -  When the low-word count is non-zero, but the 0x8000 bit is unset, callback is writable by the thread
        //      that incremented the low word to 1.  Its value is undefined for other threads.  The thread that
        //      sets callback is responsible for setting the 0x8000 bit when it's done.
        //   -  When the 0x8000 bit is set and the high-word count is zero, callback is valid.  (It may be null.)
        //   -  When the 0x8000 bit is set, the high-word count is non-zero, and the high bit is unset, callback is
        //      writable by the thread that incremented the high word to 1 *or* the thread that set the 0x8000 bit,
        //      whichever happened last.  That thread can read the value and set callback to null.  Its value is
        //      undefined for other threads.  The thread that clears the callback is responsible for setting the
        //      high bit.
        //   -  When the high bit is set, callback is null.
        //   -  It's illegal for the gate to be in a state that would satisfy more than one of these conditions.
        //   -  The state field follows the same rules as callback.
 
        // Slot is configured to have an explicit size of 64 bytes. This is to prevent two different slots being accessed
        // by two different CPU cores from having cache line contention when doing interlocked operations on Slot.gate.
        [StructLayout(LayoutKind.Sequential, Size = 64)]
        private struct Slot
        {
            private int _gate;
            private Action<object> _callback;
            private object _state;
 
            public bool TryEnqueueWorkItem(Action<object> callback, object state, out bool wrapped)
            {
                // Register our arrival and check the state of this slot.  If the slot was already full, we wrapped.
                int gateSnapshot = Interlocked.Increment(ref _gate);
                wrapped = (gateSnapshot & Bits.LoCountMask) != 1;
                if (wrapped)
                {
                    if ((gateSnapshot & Bits.LoHiBit) != 0 && Bits.IsComplete(gateSnapshot))
                    {
                        Interlocked.CompareExchange(ref _gate, 0, gateSnapshot);
                    }
                    return false;
                }
 
                Contract.Assert(_callback == null, "Slot already has a work item.");
                Contract.Assert((gateSnapshot & Bits.HiBits) == 0, "Slot already marked.");
 
                _state = state;
                _callback = callback;
 
                // Set the special bit to show that the slot is filled.
                gateSnapshot = Interlocked.Add(ref _gate, Bits.LoHiBit);
                Contract.Assert((gateSnapshot & Bits.HiBits) == Bits.LoHiBit, "Slot already empty.");
 
                if ((gateSnapshot & Bits.HiCountMask) == 0)
                {
                    // Good - no one has shown up looking for this work yet.
                    return true;
                }
 
                // Oops - someone already came looking for this work.  We have to abort and reschedule.
                _state = null;
                _callback = null;
 
                // Indicate that the slot is clear.  We might be able to bypass setting the high bit.
                if (gateSnapshot >> Bits.HiShift != (gateSnapshot & Bits.LoCountMask) ||
                    Interlocked.CompareExchange(ref _gate, 0, gateSnapshot) != gateSnapshot)
                {
                    gateSnapshot = Interlocked.Add(ref _gate, Bits.HiHiBit);
                    if (Bits.IsComplete(gateSnapshot))
                    {
                        Interlocked.CompareExchange(ref _gate, 0, gateSnapshot);
                    }
                }
 
                return false;
            }
 
            public void DequeueWorkItem(out Action<object> callback, out object state)
            {
                // Stake our claim on the item.
                int gateSnapshot = Interlocked.Add(ref _gate, Bits.HiOne);
 
                if ((gateSnapshot & Bits.LoHiBit) == 0)
                {
                    // Whoops, a race.  The work item hasn't made it in yet.  In this context, returning a null callback
                    // is treated like a degenrate work item (rather than an empty queue).  The enqueuing thread will
                    // notice this race and reschedule the real work in a new slot.  Do not reset the slot to zero,
                    // since it's still going to get enqueued into.  (The enqueueing thread will reset it.)
                    callback = null;
                    state = null;
                    return;
                }
 
                // If we're the first, we get to do the work.
                if ((gateSnapshot & Bits.HiCountMask) == Bits.HiOne)
                {
                    callback = _callback;
                    state = _state;
                    _state = null;
                    _callback = null;
 
                    // Indicate that the slot is clear.
                    // We should be able to bypass setting the high-bit in the common case.
                    if ((gateSnapshot & Bits.LoCountMask) != 1 ||
                        Interlocked.CompareExchange(ref _gate, 0, gateSnapshot) != gateSnapshot)
                    {
                        gateSnapshot = Interlocked.Add(ref _gate, Bits.HiHiBit);
                        if (Bits.IsComplete(gateSnapshot))
                        {
                            Interlocked.CompareExchange(ref _gate, 0, gateSnapshot);
                        }
                    }
                }
                else
                {
                    callback = null;
                    state = null;
 
                    // If we're the last, we get to reset the slot.
                    if (Bits.IsComplete(gateSnapshot))
                    {
                        Interlocked.CompareExchange(ref _gate, 0, gateSnapshot);
                    }
                }
            }
 
#if DEBUG
            public void DebugVerifyEmpty()
            {
                Contract.Assert(_gate == 0, "Finalized with unfinished slot.");
                Contract.Assert(_callback == null, "Finalized with leaked callback.");
                Contract.Assert(_state == null, "Finalized with leaked state.");
            }
#endif
        }
 
        // A note about the IOThreadScheduler and the ScheduledOverlapped references:
        // Although for each scheduler we have a single instance of overlapped, we cannot point to the scheduler from the
        // overlapped, through the entire lifetime of the overlapped. This is because the ScheduledOverlapped is pinned
        // and if it has a reference to the IOTS, it would be rooted and the finalizer will never get called.
        // Therefore, we are passing the reference, when we post a pending callback and reset it, once the callback was
        // invoked; during that time the scheduler is rooted but in that time we don't want that it would be collected
        // by the GC anyway.
        private unsafe class ScheduledOverlapped
        {
            private readonly NativeOverlapped* _nativeOverlapped;
            private IOThreadScheduler _scheduler;
            private readonly Action _postDelegate;
 
            public ScheduledOverlapped()
            {
                if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
                {
                    _nativeOverlapped = (new Overlapped()).UnsafePack(
                        Fx.ThunkCallback(new IOCompletionCallback(IOCallback)), null);
                    _postDelegate = PostIOCP;
                }
                else
                {
                    _postDelegate = PostNewThread;
                }
            }
 
            private void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* nativeOverlapped)
            {
                Callback();
            }
 
            private void Callback()
            {
                try
                {
                    InitThreadDebugData();
                    CallbackCore();
                }
                finally
                {
                    ClearThreadDebugData();
                }
            }
 
            [Conditional("DEBUG")]
            private static void InitThreadDebugData()
            {
                s_isIoThread.Value = true;
                Thread.CurrentThread.Name = "IOThreadScheduler.IOCallback";
            }
 
            [Conditional("DEBUG")]
            private static void ClearThreadDebugData()
            {
                s_isIoThread.Value = false;
            }
 
            private void CallbackCore()
            {
                // Unhook the IOThreadScheduler ASAP to prevent it from leaking.
                IOThreadScheduler iots = _scheduler;
                _scheduler = null;
                Fx.Assert(iots != null, "Overlapped completed without a scheduler.");
 
                Action<object> callback;
                object state;
                try
                { }
                finally
                {
                    // Called in a finally because it needs to run uninterrupted in order to maintain consistency.
                    iots.CompletionCallback(out callback, out state);
                }
 
                bool found = true;
                while (found)
                {
                    // The callback can be null if synchronization misses result in unusable slots.  Keep going onto
                    // the next slot in such cases until there are no more slots.
                    callback?.Invoke(state);
 
                    try
                    { }
                    finally
                    {
                        // Called in a finally because it needs to run uninterrupted in order to maintain consistency.
                        found = iots.TryCoalesce(out callback, out state);
                    }
                }
            }
 
            public void Post(IOThreadScheduler iots)
            {
                Fx.Assert(_scheduler == null, "Post called on an overlapped that is already posted.");
                Fx.Assert(iots != null, "Post called with a null scheduler.");
 
                _scheduler = iots;
                _postDelegate();
            }
 
            [SupportedOSPlatform("windows")]
            private void PostIOCP()
            {
                ThreadPool.UnsafeQueueNativeOverlapped(_nativeOverlapped);
            }
 
            private void PostNewThread()
            {
                var thread = new Thread(new ThreadStart(Callback));
                thread.Start();
            }
 
            public void Cleanup()
            {
                if (_scheduler != null)
                {
                    throw Fx.AssertAndThrowFatal("Cleanup called on an overlapped that is in-flight.");
                }
				
                if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
                {
                    Overlapped.Free(_nativeOverlapped);
                }
            }
        }
 
        private class IOThreadSchedulerSynchronizationContext : SynchronizationContext
        {
            public override void Post(SendOrPostCallback d, object state)
            {
                ScheduleCallbackNoFlow((s) => d(s), state);
            }
        }
    }
}