File: System\Transactions\VolatileEnlistmentMultiplexing.cs
Web Access
Project: src\src\libraries\System.Transactions.Local\src\System.Transactions.Local.csproj (System.Transactions.Local)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.Threading;
 
namespace System.Transactions
{
    // The volatile Demultiplexer is a fanout point for promoted volatile enlistments.
    // When a transaction is promoted a single volatile enlistment is created in the new
    // transaction for all volitile enlistments on the transaction.  When the VolatileDemux
    // receives a preprepare it will fan that notification out to all of the enlistments
    // on the transaction.  When it has gathered all of the responses it will send a
    // single vote back to the DistributedTransactionManager.
    internal abstract class VolatileDemultiplexer : IEnlistmentNotificationInternal
    {
        // Reference the transactions so that we have access to it's enlistments
        protected InternalTransaction _transaction;
 
        // Store the IVolatileEnlistment interface to call back to the Distributed TM
        internal IPromotedEnlistment? _promotedEnlistment;
        internal IPromotedEnlistment? _preparingEnlistment;
 
        public VolatileDemultiplexer(InternalTransaction transaction)
        {
            _transaction = transaction;
        }
 
        internal static void BroadcastCommitted(ref VolatileEnlistmentSet volatiles)
        {
            // Broadcast preprepare to the volatile subordinates
            for (int i = 0; i < volatiles._volatileEnlistmentCount; i++)
            {
                volatiles._volatileEnlistments[i]._twoPhaseState!.InternalCommitted(
                    volatiles._volatileEnlistments[i]);
            }
        }
 
        // This broadcast is used by the state machines and therefore must be internal.
        internal static void BroadcastRollback(ref VolatileEnlistmentSet volatiles)
        {
            // Broadcast preprepare to the volatile subordinates
            for (int i = 0; i < volatiles._volatileEnlistmentCount; i++)
            {
                volatiles._volatileEnlistments[i]._twoPhaseState!.InternalAborted(
                    volatiles._volatileEnlistments[i]);
            }
        }
 
        internal static void BroadcastInDoubt(ref VolatileEnlistmentSet volatiles)
        {
            // Broadcast preprepare to the volatile subordinates
            for (int i = 0; i < volatiles._volatileEnlistmentCount; i++)
            {
                volatiles._volatileEnlistments[i]._twoPhaseState!.InternalIndoubt(
                    volatiles._volatileEnlistments[i]);
            }
        }
 
        // Object for synchronizing access to the entire class( avoiding lock( typeof( ... )) )
        private static object? s_classSyncObject;
        internal static object ClassSyncObject
        {
            get
            {
                if (s_classSyncObject == null)
                {
                    object o = new object();
                    Interlocked.CompareExchange(ref s_classSyncObject, o, null);
                }
                return s_classSyncObject;
            }
        }
 
        private static WaitCallback? s_prepareCallback;
        private static WaitCallback PrepareCallback => LazyInitializer.EnsureInitialized(ref s_prepareCallback, ref s_classSyncObject, () => new WaitCallback(PoolablePrepare!));
 
        protected static void PoolablePrepare(object state)
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer)state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessful give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                Monitor.TryEnter(demux._transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalPrepare();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(PrepareCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            TraceSourceType.TraceSourceLtm,
                            SR.UnexpectedFailureOfThreadPool,
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    Monitor.Exit(demux._transaction);
                }
            }
        }
 
        private static WaitCallback? s_commitCallback;
        private static WaitCallback CommitCallback => LazyInitializer.EnsureInitialized(ref s_commitCallback, ref s_classSyncObject, () => new WaitCallback(PoolableCommit!));
 
        protected static void PoolableCommit(object state)
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer)state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessful give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                Monitor.TryEnter(demux._transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalCommit();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(CommitCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            TraceSourceType.TraceSourceLtm,
                            SR.UnexpectedFailureOfThreadPool,
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    Monitor.Exit(demux._transaction);
                }
            }
        }
 
        private static WaitCallback? s_rollbackCallback;
        private static WaitCallback RollbackCallback => LazyInitializer.EnsureInitialized(ref s_rollbackCallback, ref s_classSyncObject, () => new WaitCallback(PoolableRollback!));
 
        protected static void PoolableRollback(object state)
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer)state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessful give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                Monitor.TryEnter(demux._transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalRollback();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(RollbackCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            TraceSourceType.TraceSourceLtm,
                            SR.UnexpectedFailureOfThreadPool,
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    Monitor.Exit(demux._transaction);
                }
            }
        }
 
        private static WaitCallback? s_inDoubtCallback;
        private static WaitCallback InDoubtCallback => LazyInitializer.EnsureInitialized(ref s_inDoubtCallback, ref s_classSyncObject, () => new WaitCallback(PoolableInDoubt!));
 
        protected static void PoolableInDoubt(object state)
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer)state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessful give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                Monitor.TryEnter(demux._transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalInDoubt();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(InDoubtCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            TraceSourceType.TraceSourceLtm,
                            SR.UnexpectedFailureOfThreadPool,
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    Monitor.Exit(demux._transaction);
                }
            }
        }
 
        protected abstract void InternalPrepare();
        protected abstract void InternalCommit();
        protected abstract void InternalRollback();
        protected abstract void InternalInDoubt();
 
        #region IEnlistmentNotification Members
 
        // Fanout Preprepare notifications
        public abstract void Prepare(IPromotedEnlistment en);
 
        public abstract void Commit(IPromotedEnlistment en);
 
        public abstract void Rollback(IPromotedEnlistment en);
 
        public abstract void InDoubt(IPromotedEnlistment en);
 
        #endregion
    }
 
 
    // This class implements the phase 0 version of a volatile demux.
    internal sealed class Phase0VolatileDemultiplexer : VolatileDemultiplexer
    {
        public Phase0VolatileDemultiplexer(InternalTransaction transaction) : base(transaction) { }
 
        protected override void InternalPrepare()
        {
            Debug.Assert(_promotedEnlistment != null && _transaction.State != null);
            try
            {
                _transaction.State.ChangeStatePromotedPhase0(_transaction);
            }
            catch (TransactionAbortedException e)
            {
                _promotedEnlistment.ForceRollback(e);
                TransactionsEtwProvider etwLog = TransactionsEtwProvider.Log;
                if (etwLog.IsEnabled())
                {
                    etwLog.ExceptionConsumed(e);
                }
            }
            catch (TransactionInDoubtException e)
            {
                _promotedEnlistment.EnlistmentDone();
                TransactionsEtwProvider etwLog = TransactionsEtwProvider.Log;
                if (etwLog.IsEnabled())
                {
                    etwLog.ExceptionConsumed(e);
                }
            }
        }
 
        protected override void InternalCommit()
        {
            Debug.Assert(_promotedEnlistment != null && _transaction.State != null);
 
            // Respond immediately to the TM
            _promotedEnlistment.EnlistmentDone();
 
            _transaction.State.ChangeStatePromotedCommitted(_transaction);
        }
 
        protected override void InternalRollback()
        {
            Debug.Assert(_promotedEnlistment != null && _transaction.State != null);
 
            // Respond immediately to the TM
            _promotedEnlistment.EnlistmentDone();
 
            _transaction.State.ChangeStatePromotedAborted(_transaction);
        }
 
        protected override void InternalInDoubt()
        {
            Debug.Assert(_transaction.State != null);
            _transaction.State.InDoubtFromDtc(_transaction);
        }
 
        #region IEnlistmentNotification Members
 
        // Fanout Preprepare notifications
        public override void Prepare(IPromotedEnlistment en)
        {
            _preparingEnlistment = en;
            PoolablePrepare(this);
        }
 
 
        public override void Commit(IPromotedEnlistment en)
        {
            _promotedEnlistment = en;
            PoolableCommit(this);
        }
 
 
        public override void Rollback(IPromotedEnlistment en)
        {
            _promotedEnlistment = en;
            PoolableRollback(this);
        }
 
 
        public override void InDoubt(IPromotedEnlistment en)
        {
            _promotedEnlistment = en;
            PoolableInDoubt(this);
        }
 
        #endregion
    }
 
    // This class implements the phase 1 version of a volatile demux.
    internal sealed class Phase1VolatileDemultiplexer : VolatileDemultiplexer
    {
        public Phase1VolatileDemultiplexer(InternalTransaction transaction) : base(transaction) { }
 
        protected override void InternalPrepare()
        {
            Debug.Assert(_promotedEnlistment != null && _transaction.State != null);
            try
            {
                _transaction.State.ChangeStatePromotedPhase1(_transaction);
            }
            catch (TransactionAbortedException e)
            {
                _promotedEnlistment.ForceRollback(e);
                TransactionsEtwProvider etwLog = TransactionsEtwProvider.Log;
                if (etwLog.IsEnabled())
                {
                    etwLog.ExceptionConsumed(e);
                }
            }
            catch (TransactionInDoubtException e)
            {
                _promotedEnlistment.EnlistmentDone();
                TransactionsEtwProvider etwLog = TransactionsEtwProvider.Log;
                if (etwLog.IsEnabled())
                {
                    etwLog.ExceptionConsumed(e);
                }
            }
        }
 
 
        protected override void InternalCommit()
        {
            Debug.Assert(_promotedEnlistment != null && _transaction.State != null);
 
            // Respond immediately to the TM
            _promotedEnlistment.EnlistmentDone();
 
            _transaction.State.ChangeStatePromotedCommitted(_transaction);
        }
 
 
        protected override void InternalRollback()
        {
            Debug.Assert(_promotedEnlistment != null && _transaction.State != null);
 
            // Respond immediately to the TM
            _promotedEnlistment.EnlistmentDone();
 
            _transaction.State.ChangeStatePromotedAborted(_transaction);
        }
 
 
        protected override void InternalInDoubt()
        {
            Debug.Assert(_transaction.State != null);
            _transaction.State.InDoubtFromDtc(_transaction);
        }
 
 
        // Fanout Preprepare notifications
        public override void Prepare(IPromotedEnlistment en)
        {
            _preparingEnlistment = en;
            PoolablePrepare(this);
        }
 
 
        public override void Commit(IPromotedEnlistment en)
        {
            _promotedEnlistment = en;
            PoolableCommit(this);
        }
 
 
        public override void Rollback(IPromotedEnlistment en)
        {
            _promotedEnlistment = en;
            PoolableRollback(this);
        }
 
 
        public override void InDoubt(IPromotedEnlistment en)
        {
            _promotedEnlistment = en;
            PoolableInDoubt(this);
        }
    }
 
 
 
    internal struct VolatileEnlistmentSet
    {
        internal InternalEnlistment[] _volatileEnlistments;
        internal int _volatileEnlistmentCount;
        internal int _volatileEnlistmentSize;
        internal int _dependentClones;
 
        // Track the number of volatile enlistments that have prepared.
        internal int _preparedVolatileEnlistments;
 
        // This is a single pinpoint enlistment to represent all volatile enlistments that
        // may exist on a promoted transaction.  This member should only be initialized if
        // a transaction is promoted.
        private VolatileDemultiplexer _volatileDemux;
        internal VolatileDemultiplexer VolatileDemux
        {
            get { return _volatileDemux; }
            set
            {
                Debug.Assert(_volatileDemux == null, "volatileDemux can only be set once.");
                _volatileDemux = value;
            }
        }
    }
}