|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.CodeAnalysis;
/// <summary>
/// Helpers to create temporary streams backed by pooled memory
/// </summary>
internal static class SerializableBytes
{
private const int ChunkSize = SharedPools.ByteBufferSize;
internal static PooledStream CreateReadableStream(byte[] bytes)
=> CreateReadableStream(bytes, bytes.Length);
internal static PooledStream CreateReadableStream(byte[] bytes, int length)
{
var stream = CreateWritableStream();
stream.Write(bytes, 0, length);
stream.Position = 0;
return stream;
}
internal static async Task<PooledStream> CreateReadableStreamAsync(Stream stream, CancellationToken cancellationToken)
{
var length = stream.Length;
var chunkCount = (length + ChunkSize - 1) / ChunkSize;
var chunks = new byte[chunkCount][];
try
{
for (long i = 0, c = 0; i < length; i += ChunkSize, c++)
{
var count = (int)Math.Min(ChunkSize, length - i);
var chunk = SharedPools.ByteArray.Allocate();
var chunkOffset = 0;
while (count > 0)
{
var bytesRead = await stream.ReadAsync(chunk, chunkOffset, count, cancellationToken).ConfigureAwait(false);
if (bytesRead > 0)
{
count -= bytesRead;
chunkOffset += bytesRead;
}
else
{
break;
}
}
chunks[c] = chunk;
}
var result = new ReadStream(length, chunks);
chunks = null;
return result;
}
finally
{
BlowChunks(chunks);
}
}
// free any chunks remaining
private static void BlowChunks(byte[][]? chunks)
{
if (chunks != null)
{
for (long c = 0; c < chunks.Length; c++)
{
if (chunks[c] != null)
{
SharedPools.ByteArray.Free(chunks[c]);
chunks[c] = null!;
}
}
}
}
internal static ReadWriteStream CreateWritableStream()
=> new();
public abstract class PooledStream : Stream
{
protected List<byte[]> chunks;
protected long position;
protected long length;
protected PooledStream(long length, List<byte[]> chunks)
{
this.position = 0;
this.length = length;
this.chunks = chunks;
}
public override long Length => this.length;
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => false;
public override void Flush()
{
// nothing to do, this is a read-only stream
}
public override long Position
{
get => this.position;
set
{
if (value < 0 || value >= length)
{
throw new ArgumentOutOfRangeException(nameof(value));
}
this.position = value;
}
}
public override long Seek(long offset, SeekOrigin origin)
{
long target;
try
{
target = origin switch
{
SeekOrigin.Begin => offset,
SeekOrigin.Current => checked(offset + position),
SeekOrigin.End => checked(offset + length),
_ => throw new ArgumentOutOfRangeException(nameof(origin)),
};
}
catch (OverflowException)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (target < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
position = target;
return target;
}
public override int ReadByte()
{
if (position >= length)
return -1;
var result = chunks[CurrentChunkIndex][CurrentChunkOffset];
this.position++;
return result;
}
public override int Read(byte[] buffer, int index, int count)
{
if (count <= 0 || position >= length)
{
return 0;
}
var totalCopyCount = Read(this.chunks, this.position, this.length, buffer, index, count);
this.position += totalCopyCount;
return totalCopyCount;
}
private static int Read(List<byte[]> chunks, long position, long length, byte[] buffer, int index, int count)
{
var oldPosition = position;
while (count > 0 && position < length)
{
var chunk = chunks[GetChunkIndex(position)];
var currentOffset = GetChunkOffset(position);
var copyCount = Math.Min(Math.Min(ChunkSize - currentOffset, count), (int)(length - position));
Array.Copy(chunk, currentOffset, buffer, index, copyCount);
position += copyCount;
index += copyCount;
count -= copyCount;
}
return (int)(position - oldPosition);
}
public byte[] ToArray()
{
if (this.Length == 0)
{
return [];
}
var array = new byte[this.Length];
// read entire array
Read(this.chunks, 0, this.length, array, 0, array.Length);
return array;
}
public ImmutableArray<byte> ToImmutableArray()
{
// ImmutableArray only supports int-sized arrays
var count = checked((int)Length);
var builder = ImmutableArray.CreateBuilder<byte>(count);
var chunkIndex = 0;
while (count > 0)
{
var chunk = chunks[chunkIndex];
var copyCount = Math.Min(chunk.Length, count);
builder.AddRange(chunk, copyCount);
count -= copyCount;
chunkIndex++;
}
Debug.Assert(count == 0);
return builder.MoveToImmutable();
}
protected int CurrentChunkIndex { get { return GetChunkIndex(this.position); } }
protected int CurrentChunkOffset { get { return GetChunkOffset(this.position); } }
protected static int GetChunkIndex(long value)
=> (int)(value / ChunkSize);
protected static int GetChunkOffset(long value)
=> (int)(value % ChunkSize);
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (chunks != null)
{
foreach (var chunk in chunks)
{
SharedPools.ByteArray.Free(chunk);
}
chunks = null!;
}
}
public override void SetLength(long value)
=> throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
}
private sealed class ReadStream(long length, byte[][] chunks) : PooledStream(length, [.. chunks])
{
}
public sealed class ReadWriteStream : PooledStream
{
public ReadWriteStream()
: base(length: 0, chunks: SharedPools.BigDefault<List<byte[]>>().AllocateAndClear())
{
// growing list on EnsureSize shown as perf bottleneck. reuse shared list so that
// we don't re-allocate as much.
}
public override bool CanWrite => true;
public override long Position
{
get
{
return base.Position;
}
set
{
if (value < 0)
{
throw new ArgumentOutOfRangeException(nameof(value));
}
this.position = value;
}
}
private void EnsureCapacity(long value)
{
var nextIndex = GetChunkIndex(value);
for (var i = this.chunks.Count; i <= nextIndex; i++)
{
// allocate memory and initialize it to zero
var chunk = SharedPools.ByteArray.Allocate();
Array.Clear(chunk, 0, chunk.Length);
chunks.Add(chunk);
}
}
public override void SetLength(long value)
=> SetLength(value, truncate: true);
/// <summary>
/// Sets the length of this stream (see <see cref="SetLength(long)"/>. If <paramref name="truncate"/> is <see
/// langword="false"/>, the internal buffers will be left as is, and the data in them will be left as garbage.
/// If it is <see langword="true"/> then any fully unused chunks will be discarded. If there is a final chunk
/// the stream is partway through, the remainder of that chunk will be zeroed out.
/// </summary>
public void SetLength(long value, bool truncate)
{
EnsureCapacity(value);
if (value < length && truncate)
{
var chunkIndex = GetChunkIndex(value);
var chunkOffset = GetChunkOffset(value);
Array.Clear(chunks[chunkIndex], chunkOffset, chunks[chunkIndex].Length - chunkOffset);
var trimIndex = chunkIndex + 1;
for (var i = trimIndex; i < chunks.Count; i++)
SharedPools.ByteArray.Free(chunks[i]);
chunks.RemoveRange(trimIndex, chunks.Count - trimIndex);
}
length = value;
if (position > value)
{
position = value;
}
}
public override void WriteByte(byte value)
{
EnsureCapacity(this.position + 1);
var currentIndex = CurrentChunkIndex;
var currentOffset = CurrentChunkOffset;
chunks[currentIndex][currentOffset] = value;
this.position++;
if (this.position >= length)
{
this.length = this.position;
}
}
public override void Write(byte[] buffer, int index, int count)
{
EnsureCapacity(this.position + count);
var currentIndex = index;
var countLeft = count;
while (countLeft > 0)
{
var chunk = chunks[CurrentChunkIndex];
var currentOffset = CurrentChunkOffset;
var writeCount = Math.Min(ChunkSize - currentOffset, countLeft);
Array.Copy(buffer, currentIndex, chunk, currentOffset, writeCount);
this.position += writeCount;
currentIndex += writeCount;
countLeft -= writeCount;
}
if (this.position >= length)
{
this.length = this.position;
}
}
protected override void Dispose(bool disposing)
{
var temp = this.chunks;
base.Dispose(disposing);
SharedPools.BigDefault<List<byte[]>>().ClearAndFree(temp);
}
}
}
|