File: System\IO\Packaging\InterleavedZipPackagePartStream.cs
Web Access
Project: src\src\libraries\System.IO.Packaging\src\System.IO.Packaging.csproj (System.IO.Packaging)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
 
namespace System.IO.Packaging
{
    /// <summary>
    /// The class InterleavedZipPackagePartStream is used to wrap one or more Zip package part streams
    /// for an interleaved part. It hides the interleaving from its callers by offering
    /// the abstraction of a continuous stream across pieces.
    /// </summary>
    /// <remarks>
    /// This class is defined for the benefit of ZipPackage, ZipPackagePart and
    /// InternalRelationshipCollection.
    /// Although it is quite specialized, it would hardly make sense to nest its definition in any
    /// of these clases.
    /// </remarks>
    internal sealed partial class InterleavedZipPackagePartStream : Stream
    {
        // High-level object to access the collection of pieces by offset and pieceNumber.
        private readonly PieceDirectory _dir;
 
        // Cached value for the current piece number.
        // (Lazily sync'ed to _currentOffset when GetCurrentPieceNumber() is invoked.)
        private int _currentPieceNumber;
 
        // Control value to decide whether to use _currentPieceNumber without updating it.
        private long? _offsetForCurrentPieceNumber;
 
        // This variable continuously tracks the current stream position.
        private long _currentOffset;
 
        // Closed status.
        private bool _closed;
 
        /// <summary>
        /// Build a System.IO.Stream on a part that possibly consists of multiple files
        /// An InterleavedZipPackagePartStream gets created by ZipPackagePart.GetStreamCore when the part
        /// is interleaved. It wraps one or more Zip streams (one per piece).
        /// (pieces).
        /// </summary>
        /// <param name="access">Access (read, write, etc.) with which piece streams should be opened</param>
        /// <param name="owningPart">
        /// The part to build a stream on. It contains all ZipFileInfo descriptors for the part's pieces
        /// (see ZipPackage.GetPartsCore).
        /// </param>
        /// <param name="zipStreamManager"></param>
        internal InterleavedZipPackagePartStream(ZipPackagePart owningPart, ZipStreamManager zipStreamManager, FileAccess access)
            : this(zipStreamManager, owningPart.PieceDescriptors, access)
        {
        }
 
        /// <summary>
        /// This constructor is provided to be able to interleave other files than just parts,
        /// notably the contents type file.
        /// </summary>
        internal InterleavedZipPackagePartStream(ZipStreamManager zipStreamManager, List<ZipPackagePartPiece> sortedPieceInfoList, FileAccess access)
        {
            // The PieceDirectory mediates access to pieces.
            // It maps offsets to piece numbers and piece numbers to streams and start offsets.
            // Mode and access are entirely managed by the underlying streams.
            _dir = new PieceDirectory(sortedPieceInfoList, zipStreamManager, access);
 
            // GetCurrentPieceNumber is operational from the beginning.
            Debug.Assert(_dir.GetStartOffset(GetCurrentPieceNumber()) == 0);
        }
 
        /// <inheritdoc/>
        public override int Read(byte[] buffer, int offset, int count)
            => ReadCore(new Span<byte>(buffer, offset, count));
 
#if NET
        /// <inheritdoc/>
        public override int Read(Span<byte> buffer)
            => ReadCore(buffer);
#endif
 
        private int ReadCore(Span<byte> buffer)
        {
            CheckClosed();
 
            // Check arguments.
            if (!CanRead)
                throw new NotSupportedException(SR.ReadNotSupported);
 
            // Leave capability and FileAccess checks up to the underlying stream(s).
 
            // Reading 0 bytes is a no-op.
            if (buffer.Length == 0)
                return 0;
 
            int pieceNumber = GetCurrentPieceNumber();
            int totalBytesRead = 0;
 
            Stream pieceStream = _dir.GetStream(pieceNumber);
            long pieceStreamRelativeOffset = _currentOffset - _dir.GetStartOffset(pieceNumber);
 
            // .NET Standard 2.0 doesn't support the Read(Span<byte>) method. Instead, we rent a temporary
            // buffer of the same length, read into that and perform a copy into the span.
#if !NET
            byte[] tempInputBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
#endif
 
            checked
            {
                // Find the current position of the first stream. Most of the time, this will be zero.
                // If the current position is ahead of the required relative position and the stream doesn't
                // support seeking, close and reopen the stream to force the position back to zero. Then,
                // read the remaining bytes to force the stream forwards
                if (pieceStream.CanSeek)
                {
                    if (pieceStream.Position != pieceStreamRelativeOffset)
                    {
                        pieceStream.Seek(pieceStreamRelativeOffset, SeekOrigin.Begin);
                    }
                }
                else
                {
                    pieceStream = _dir.ResetStream(pieceNumber);
                    SeekUnderlyingPieceStream(pieceStream, pieceStreamRelativeOffset);
                }
 
                try
                {
                    while (totalBytesRead < buffer.Length)
                    {
#if NET
                        int numBytesRead = pieceStream.Read(buffer.Slice(totalBytesRead));
#else
                        int numBytesRead = pieceStream.Read(
                            tempInputBuffer,
                            totalBytesRead,
                            buffer.Length - totalBytesRead);
 
                        tempInputBuffer.AsSpan(totalBytesRead, numBytesRead).CopyTo(buffer.Slice(totalBytesRead, numBytesRead));
#endif
 
                        // End of the current stream: try to move to the next stream.
                        if (numBytesRead == 0)
                        {
                            if (_dir.IsLastPiece(pieceNumber))
                                break;
 
                            ++pieceNumber;
                            Debug.Assert(_dir.GetStartOffset(pieceNumber) == _currentOffset + totalBytesRead);
 
                            pieceStream = _dir.GetStream(pieceNumber);
 
                            //Seek inorder to set the correct pointer for the next piece stream
                            if (pieceStream.CanSeek)
                            {
                                if (pieceStream.Position != 0)
                                {
                                    pieceStream.Seek(0, SeekOrigin.Begin);
                                }
                                else
                                {
                                    pieceStream = _dir.ResetStream(pieceNumber);
                                }
                            }
                        }
 
                        totalBytesRead += numBytesRead;
                    }
                }
                finally
                {
#if !NET
                    ArrayPool<byte>.Shared.Return(tempInputBuffer);
#endif
                }
 
                // Advance current position now we know the operation completed successfully.
                _currentOffset += totalBytesRead;
            }
 
            return totalBytesRead;
        }
 
        /// <inheritdoc/>
        public override long Seek(long offset, SeekOrigin origin)
        {
            CheckClosed();
 
            // Check stream capabilities. (Normally, CanSeek will be false only
            // when the stream is closed.)
            if (!CanSeek)
            {
                throw new NotSupportedException(SR.SeekNotSupported);
            }
 
            // Convert offset to a start-based offset.
            switch (origin)
            {
                case SeekOrigin.Begin:
                    break;
 
                case SeekOrigin.Current:
                    checked { offset += _currentOffset; }
                    break;
 
                case SeekOrigin.End:
                    checked { offset += Length; }
                    break;
 
                default:
                    throw new ArgumentOutOfRangeException(nameof(origin));
            }
 
            // Check offset validity.
            if (offset < 0)
            {
                throw new ArgumentException(SR.SeekNegative);
            }
 
            // OK if _currentOffset points beyond end of stream.
 
            // Update position field and return.
            _currentOffset = offset;
 
            return _currentOffset;
        }
 
        /// <inheritdoc/>
        public override void SetLength(long newLength)
        {
            CheckClosed();
 
            // Check argument and stream capabilities.
#if NET
            ArgumentOutOfRangeException.ThrowIfNegative(newLength);
#else
            if (newLength < 0)
            {
                throw new ArgumentOutOfRangeException(nameof(newLength));
            }
#endif
            if (!CanWrite)
            {
                throw new NotSupportedException(SR.StreamDoesNotSupportWrite);
            }
            if (!CanSeek)
            {
                throw new NotSupportedException(SR.SeekNotSupported);
            }
 
            // If some pieces are to be deleted, this is reflected only in memory at present.
            int lastPieceNumber;
            if (newLength == 0)
            {
                // This is special-cased because there is no last offset to speak of, and
                // so the piece directory cannot return any piece by offset.
                lastPieceNumber = 0;
            }
            else
            {
                lastPieceNumber = _dir.GetPieceNumberFromOffset(newLength - 1); // No need to use checked{] since newLength != 0
            }
            _dir.SetLogicalLastPiece(lastPieceNumber);
 
            // Adjust last active stream to new size.
            Stream lastPieceStream = _dir.GetStream(lastPieceNumber);
 
            Debug.Assert(newLength - _dir.GetStartOffset(lastPieceNumber) >= 0);
            long lastPieceStreamSize = newLength - _dir.GetStartOffset(lastPieceNumber);
            lastPieceStream.SetLength(lastPieceStreamSize);
 
            if (_currentOffset > newLength)
            {
                _currentOffset = newLength;
            }
        }
 
        /// <inheritdoc/>
        /// <remarks>
        /// Zip streams can be assumed seekable so the length will be available for chaining
        /// pieces.
        /// </remarks>
        public override void Write(byte[] buffer, int offset, int count)
            => WriteCore(new ReadOnlySpan<byte>(buffer, offset, count));
 
#if NET
        /// <inheritdoc/>
        /// <remarks>
        /// Zip streams can be assumed seekable so the length will be available for chaining
        /// pieces.
        /// </remarks>
        public override void Write(ReadOnlySpan<byte> buffer)
            => WriteCore(buffer);
#endif
 
        private void WriteCore(ReadOnlySpan<byte> buffer)
        {
            CheckClosed();
 
            if (!CanWrite)
            {
                throw new NotSupportedException(SR.WriteNotSupported);
            }
 
            // No check for FileAccess and stream capability (CanWrite). This is the responsibility
            // of the underlying stream(s).
 
            // A no-op if zero bytes to write.
            if (buffer.Length == 0)
            {
                return;
            }
 
            // Write into piece streams, preserving all lengths in non-terminal pieces.
            int totalBytesWritten = 0;
            int pieceNumber = GetCurrentPieceNumber();
            Stream pieceStream = _dir.GetStream(pieceNumber);
 
            // .NET Standard 2.0 doesn't support the Write(ReadOnlySpan<byte>) method. Instead, rent a temporary
            // buffer of a specific length, write into that and write that to the underlying stream.
            // To slightly reduce memory usage, this buffer is reallocated with every new piece.
#if !NET
            byte[] tempInputBuffer = null;
#endif
 
            checked
            {
                //Seek to the correct location in the underlying stream for the current piece
                pieceStream.Seek(_currentOffset - _dir.GetStartOffset(pieceNumber), SeekOrigin.Begin);
 
                while (totalBytesWritten < buffer.Length)
                {
                    // Compute the number of bytes to write into pieceStream.
                    int numBytesToWriteInCurrentPiece = buffer.Length - totalBytesWritten;
                    if (!_dir.IsLastPiece(pieceNumber))
                    {
                        // The write should not change the length of an intermediate piece.
                        long currentPosition = _currentOffset + totalBytesWritten;
                        long maxPosition = _dir.GetStartOffset(pieceNumber + 1) - 1;
                        if (numBytesToWriteInCurrentPiece > (maxPosition - currentPosition + 1))
                        {
                            // Cast from long to cast is safe in so far as *count*, which is the
                            // absolute max for all byte counts, is a positive int.
                            numBytesToWriteInCurrentPiece = checked((int)(maxPosition - currentPosition + 1));
                        }
                    }
 
#if !NET
                    // Allocate memory to tempInputBuffer, copy the correct segment from the ReadOnlySpan, and
                    // do the write to pieceStream.
                    tempInputBuffer ??= ArrayPool<byte>.Shared.Rent(numBytesToWriteInCurrentPiece);
 
                    buffer.Slice(totalBytesWritten, numBytesToWriteInCurrentPiece).CopyTo(tempInputBuffer);
                    pieceStream.Write(tempInputBuffer, 0, numBytesToWriteInCurrentPiece);
#else
                    // Do the write.
                    pieceStream.Write(buffer.Slice(totalBytesWritten, numBytesToWriteInCurrentPiece));
#endif
 
                    // Update the tally.
                    totalBytesWritten += numBytesToWriteInCurrentPiece;
 
                    // If there is more data to write, get the next piece stream
                    if (!_dir.IsLastPiece(pieceNumber) && totalBytesWritten < buffer.Length)
                    {
                        // The next write, should involve the next piece.
                        ++pieceNumber;
 
                        pieceStream = _dir.GetStream(pieceNumber);
 
                        //Seek inorder to set the correct pointer for the next piece stream
                        pieceStream.Seek(0, SeekOrigin.Begin);
 
#if !NET
                        // Return and unset tempInputBuffer, forcing it to be reallocated with the size of
                        // the next piece.
                        ArrayPool<byte>.Shared.Return(tempInputBuffer);
                        tempInputBuffer = null;
#endif
                    }
                }
 
                // Now we know the operation has completed, the current position can be updated.
                Debug.Assert(totalBytesWritten == buffer.Length);
                _currentOffset += totalBytesWritten;
            }
        }
 
        /// <summary>
        /// Flush all dirty streams and commit pending piece deletions.
        /// </summary>
        /// <remarks>
        /// Flush gets called on all underlying streams ever accessed. If it turned out
        /// this is too inefficient, the PieceDirectory could be made to expose a SetDirty
        /// method that takes a piece number.
        /// </remarks>
        public override void Flush()
        {
            CheckClosed();
 
            // The underlying streams know whether they are dirty or not;
            // so _dir will indiscriminately flush all the streams that have been accessed.
            // It will also carry out necessary renamings and deletions to reflect calls to
            // SetLogicalLastPiece.
            _dir.Flush();
        }
 
        /// <inheritdoc/>
        /// <remarks>
        /// <para>
        /// Here, the assumption, as in all capability tests, is that the status of
        /// the first piece reflects the status of all pieces for the part.
        /// This is justified by the fact that (i) all piece streams are opened with the same
        /// parameters against the same archive and (ii) the current piece stream cannot get
        /// closed unless the whole part stream is closed.
        /// </para>
        /// <para>
        /// A further assumption is that, as soon as interleaved zip part stream is initialized, there
        /// is a descriptor for the 1st piece.
        /// </para>
        /// </remarks>
        public override bool CanRead => _closed ? false : _dir.GetStream(0).CanRead;
 
        /// <inheritdoc/>
        /// <remarks>
        /// <para>
        /// Here, the assumption, as in all capability tests, is that the status of
        /// the first piece reflects the status of all pieces for the part.
        /// This is justified by the fact that (i) all piece streams are opened with the same
        /// parameters against the same archive and (ii) the current piece stream cannot get
        /// closed unless the whole part stream is closed.
        /// </para>
        /// <para>
        /// A further assumption is that, as soon as interleaved zip part stream is initialized, there
        /// is a descriptor for the 1st piece.
        /// </para>
        /// </remarks>
        public override bool CanSeek => _closed ? false : _dir.GetStream(0).CanSeek;
 
        /// <inheritdoc/>
        /// <remarks>
        /// <para>
        /// Here, the assumption, as in all capability tests, is that the status of
        /// the first piece reflects the status of all pieces for the part.
        /// This is justified by the fact that (i) all piece streams are opened with the same
        /// parameters against the same archive and (ii) the current piece stream cannot get
        /// closed unless the whole part stream is closed.
        /// </para>
        /// <para>
        /// A further assumption is that, as soon as interleaved zip part stream is initialized, there
        /// is a descriptor for the 1st piece.
        /// </para>
        /// </remarks>
        //
        public override bool CanWrite => _closed ? false : _dir.GetStream(0).CanWrite;
 
        /// <inheritdoc/>
        public override long Position
        {
            get
            {
                CheckClosed();
 
                // Current offset is systematically updated to reflect the current position.
                return _currentOffset;
            }
            set
            {
                CheckClosed();
                Seek(value, SeekOrigin.Begin);
            }
        }
 
        /// <inheritdoc/>
        public override long Length
        {
            get
            {
                CheckClosed();
                Debug.Assert(CanSeek);
 
                long length = 0;
                for (int pieceNumber = 0; pieceNumber < _dir.GetNumberOfPieces(); ++pieceNumber)
                {
                    checked { length += _dir.GetStream(pieceNumber).Length; }
                }
                return length;
            }
        }
 
        /// <inheritdoc/>
        protected override void Dispose(bool disposing)
        {
            try
            {
                if (disposing)
                {
                    if (!_closed)
                    {
                        _dir.Close();
                    }
                }
            }
            finally
            {
                _closed = true;
                base.Dispose(disposing);
            }
        }
 
        private void CheckClosed()
        {
            if (_closed)
            {
                throw new ObjectDisposedException(null, SR.StreamObjectDisposed);
            }
        }
 
        /// <summary>
        /// Infer the current piece number from _currentOffset.
        /// </summary>
        /// <remarks>
        /// Storing the current piece number in a field and computing the current offset from it
        /// would also have been possible, but less efficient.
        /// </remarks>
        private int GetCurrentPieceNumber()
        {
            // Since this property is likely to be read more often than _currentOffset
            // gets updated, its value is cached in _currentPieceNumber.
            // The validity of the cached value is monitored using _offsetForCurrentPieceNumber.
            if (_offsetForCurrentPieceNumber != _currentOffset)
            {
                // Cached value is stale. Refresh.
                _currentPieceNumber = _dir.GetPieceNumberFromOffset(_currentOffset);
                _offsetForCurrentPieceNumber = _currentOffset;
            }
            return _currentPieceNumber;
        }
 
        /// <summary>
        /// Moves a piece stream at position zero to its new absolute position by reading from it.
        /// </summary>
        private static void SeekUnderlyingPieceStream(Stream pieceStream, long byteCount)
        {
            const int BufferSize = 4096;
            long remainingBytes = byteCount;
            byte[] readBuffer = ArrayPool<byte>.Shared.Rent(BufferSize);
 
            try
            {
                int bytesRead;
 
                do
                {
                    int bytesToRead = remainingBytes < readBuffer.Length ? (int)remainingBytes : readBuffer.Length;
 
                    bytesRead = pieceStream.Read(readBuffer, 0, bytesToRead);
                    remainingBytes -= bytesRead;
                } while (remainingBytes > 0 && bytesRead > 0);
            }
            finally
            {
                ArrayPool<byte>.Shared.Return(readBuffer);
            }
 
            if (remainingBytes != 0)
            {
                throw new EndOfStreamException();
            }
        }
    }
}