File: DataLoadSave\Binary\Codecs.cs
Web Access
Project: src\src\Microsoft.ML.Data\Microsoft.ML.Data.csproj (Microsoft.ML.Data)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using Microsoft.ML.Internal.Internallearn;
using Microsoft.ML.Internal.Utilities;
using Microsoft.ML.Runtime;
 
namespace Microsoft.ML.Data.IO
{
    internal sealed partial class CodecFactory
    {
        /// <summary>
        /// A convenient base class for value writers.
        /// </summary>
        private abstract class ValueWriterBase<T> : IValueWriter<T>, IDisposable
        {
            protected readonly CodecFactory Factory;
            protected Stream Stream;
            protected BinaryWriter Writer;
 
            protected bool Disposed => Writer == null;
 
            public ValueWriterBase(CodecFactory factory, Stream stream)
            {
                Contracts.AssertValue(stream);
                Contracts.AssertValue(factory);
                Factory = factory;
                Stream = stream;
                Writer = Factory.OpenBinaryWriter(Stream);
            }
 
            public virtual void Dispose()
            {
                if (!Disposed)
                {
                    Writer.Dispose();
                    Writer = null;
                    Stream = null;
                }
            }
 
            public abstract void Write(in T value);
 
            public virtual void Write(ReadOnlySpan<T> values)
            {
                // Basic un-optimized reference implementation.
                for (int i = 0; i < values.Length; ++i)
                    Write(in values[i]);
            }
 
            public abstract void Commit();
 
            public abstract long GetCommitLengthEstimate();
        }
 
        /// <summary>
        /// A convenient base class for value readers.
        /// </summary>
        private abstract class ValueReaderBase<T> : IValueReader<T>, IDisposable
        {
            protected readonly CodecFactory Factory;
            protected Stream Stream;
            protected BinaryReader Reader;
 
            protected bool Disposed => Reader == null;
 
            public ValueReaderBase(CodecFactory factory, Stream stream)
            {
                Contracts.AssertValue(stream);
                Contracts.AssertValue(factory);
                Factory = factory;
                Stream = stream;
                Reader = Factory.OpenBinaryReader(Stream);
            }
 
            public virtual void Dispose()
            {
                if (!Disposed)
                {
                    Reader.Dispose();
                    Reader = null;
                    Stream = null;
                }
            }
 
            public abstract void MoveNext();
 
            public abstract void Get(ref T value);
 
            public virtual void Read(T[] values, int index, int count)
            {
                Contracts.Assert(0 <= index && index <= Utils.Size(values));
                Contracts.Assert(0 <= count && count <= Utils.Size(values) - index);
                // Basic un-optimized reference implementation.
                for (int i = 0; i < count; ++i)
                {
                    MoveNext();
                    Get(ref values[i + index]);
                }
            }
        }
 
        /// <summary>
        /// A simple codec is useful for those types with no parameterizations.
        /// </summary>
        private abstract class SimpleCodec<T> : IValueCodec<T>
        {
            protected readonly CodecFactory Factory;
 
            public DataViewType Type { get; }
 
            // For these basic types, the class name will do perfectly well.
            public virtual string LoadName => typeof(T).Name;
 
            public SimpleCodec(CodecFactory factory, DataViewType type)
            {
                Contracts.AssertValue(factory);
                Contracts.AssertValue(type);
                Contracts.Assert(type.RawType == typeof(T));
                Factory = factory;
                Type = type;
            }
 
            public bool GetCodec(Stream definitionStream, out IValueCodec codec)
            {
                codec = this;
                return true;
            }
 
            public int WriteParameterization(Stream stream)
            {
                // The simple codecs do not have any sort of dimensionality or subtypes, so they write nothing.
                return 0;
            }
 
            public abstract IValueWriter<T> OpenWriter(Stream stream);
 
            public abstract IValueReader<T> OpenReader(Stream stream, int items);
        }
 
        /// <summary>
        /// This codec is for use with types that have <c>UnsafeTypeOps</c> operations defined.
        /// Generally, this corresponds to numeric types that can be safely blitted.
        /// </summary>
        private sealed class UnsafeTypeCodec<T> : SimpleCodec<T> where T : struct
        {
            // *** Binary block format ***
            // Packed bytes of little-endian values.
 
            private readonly UnsafeTypeOps<T> _ops;
 
            // Gatekeeper to ensure T is a type that is supported by UnsafeTypeCodec.
            // Throws an exception if T is neither a TimeSpan nor a NumberType.
            private static DataViewType UnsafeColumnType(Type type)
            {
                return type == typeof(TimeSpan) ? TimeSpanDataViewType.Instance :
                    type == typeof(DataViewRowId) ? (DataViewType)RowIdDataViewType.Instance :
                    ColumnTypeExtensions.NumberTypeFromType(type);
            }
 
            public UnsafeTypeCodec(CodecFactory factory)
                : base(factory, UnsafeColumnType(typeof(T)))
            {
                _ops = UnsafeTypeOpsFactory.Get<T>();
            }
 
            public override IValueWriter<T> OpenWriter(Stream stream)
            {
                return new Writer(this, stream);
            }
 
            public override IValueReader<T> OpenReader(Stream stream, int items)
            {
                return new Reader(this, stream, items);
            }
 
            private sealed class Writer : ValueWriterBase<T>
            {
                private readonly byte[] _buffer;
                private readonly UnsafeTypeOps<T> _ops;
                private long _numWritten;
 
                public Writer(UnsafeTypeCodec<T> codec, Stream stream)
                    : base(codec.Factory, stream)
                {
                    _buffer = new byte[1 << 15];
                    _ops = codec._ops;
                }
 
                public override void Write(in T value)
                {
                    _ops.Write(value, Writer);
                    _numWritten++;
                }
 
                public override void Write(ReadOnlySpan<T> values)
                {
                    int count = values.Length;
                    _ops.Apply(values, ptr =>
                    {
                        // REVIEW: In some future work we will want to avoid needless copies by
                        // seeing if this is a stream that can work over IntPtr writes or reads.
                        int byteLength = count * _ops.Size;
                        while (byteLength > 0)
                        {
                            int sublen = Math.Min(byteLength, _buffer.Length);
                            Marshal.Copy(ptr, _buffer, 0, sublen);
                            Stream.Write(_buffer, 0, sublen);
                            ptr += sublen;
                            byteLength -= sublen;
                        }
                    });
                    _numWritten += count;
                }
 
                public override void Commit()
                {
                    // No state or structure to flush. This does nothing.
                }
 
                public override long GetCommitLengthEstimate()
                {
                    return _ops.Size * _numWritten;
                }
            }
 
            private sealed class Reader : ValueReaderBase<T>
            {
                private readonly byte[] _buffer;
                private readonly UnsafeTypeOps<T> _ops;
                private int _remaining;
                private T _value;
 
                public Reader(UnsafeTypeCodec<T> codec, Stream stream, int items)
                    : base(codec.Factory, stream)
                {
                    _buffer = new byte[1 << 15];
                    _ops = codec._ops;
                    _remaining = items;
                }
 
                public override void MoveNext()
                {
                    Contracts.Assert(_remaining > 0);
                    _value = _ops.Read(Reader);
                    _remaining--;
                }
 
                public override void Get(ref T value)
                {
                    value = _value;
                }
 
                public override void Read(T[] values, int index, int count)
                {
                    Contracts.Assert(0 <= index && index <= Utils.Size(values));
                    Contracts.Assert(0 <= count && count <= Utils.Size(values) - index);
                    Contracts.Assert(_remaining >= count);
                    _ops.Apply(values, ptr =>
                    {
                        int offset = index * _ops.Size;
                        int byteLength = count * _ops.Size;
                        ptr += offset;
                        while (byteLength > 0)
                        {
                            int sublen = Math.Min(byteLength, _buffer.Length);
                            Stream.ReadBlock(_buffer, 0, sublen);
                            Marshal.Copy(_buffer, 0, ptr, sublen);
                            ptr += sublen;
                            byteLength -= sublen;
                        }
                    });
                    _remaining -= count;
                }
            }
        }
 
        private sealed class TextCodec : SimpleCodec<ReadOnlyMemory<char>>
        {
            private const int LengthMask = unchecked((int)0x7FFFFFFF);
 
            public override string LoadName
            {
                get { return "TextSpan"; }
            }
 
            // *** Binary block format ***
            // int: Number of entries.
            // int[entries]: The non-decreasing end-boundary character index array, with high bit set for "missing" values.
            // string: The UTF-8 encoded string, with the standard LEB128 byte-length preceding it.
 
            public TextCodec(CodecFactory factory)
                : base(factory, TextDataViewType.Instance)
            {
            }
 
            public override IValueWriter<ReadOnlyMemory<char>> OpenWriter(Stream stream)
            {
                return new Writer(this, stream);
            }
 
            public override IValueReader<ReadOnlyMemory<char>> OpenReader(Stream stream, int items)
            {
                return new Reader(this, stream, items);
            }
 
            private sealed class Writer : ValueWriterBase<ReadOnlyMemory<char>>
            {
                private StringBuilder _builder;
                private readonly List<int> _boundaries;
 
                public Writer(TextCodec codec, Stream stream)
                    : base(codec.Factory, stream)
                {
                    _builder = new StringBuilder();
                    _boundaries = new List<int>();
                }
 
                public override void Write(in ReadOnlyMemory<char> value)
                {
                    Contracts.Check(_builder != null, "writer was already committed");
                    _builder.AppendMemory(value);
                    _boundaries.Add(_builder.Length);
                }
 
                public override void Commit()
                {
                    Contracts.Check(_builder != null, "writer already committed");
 
                    Writer.Write(_boundaries.Count); // Write the number of entries.
                    Writer.WriteIntStream(_boundaries); // Write the entries end boundaries, in character counts.
                    Writer.Write(_builder.ToString());
                    _builder.Clear();
                    _builder = null;
                }
 
                public override long GetCommitLengthEstimate()
                {
                    // This is an estimate, only exact if the number of input
                    // characters takes one byte in the UTF-8 encoding.
                    return sizeof(int) * (1 + (long)_boundaries.Count) + _builder.Length;
                }
            }
 
            private sealed class Reader : ValueReaderBase<ReadOnlyMemory<char>>
            {
                private readonly int _entries;
                private readonly int[] _boundaries;
                private int _index;
                private readonly string _text;
 
                public Reader(TextCodec codec, Stream stream, int items)
                    : base(codec.Factory, stream)
                {
                    _entries = Reader.ReadInt32();
                    Contracts.CheckDecode(_entries == items);
                    _index = -1;
                    _boundaries = new int[_entries + 1];
                    int bPrev = 0;
                    for (int i = 1; i < _boundaries.Length; ++i)
                    {
                        int b = _boundaries[i] = Reader.ReadInt32();
                        Contracts.CheckDecode(b >= (bPrev & LengthMask) || (b & LengthMask) == (bPrev & LengthMask));
                        bPrev = b;
                    }
                    _text = Reader.ReadString();
                    Contracts.CheckDecode(_text.Length == (_boundaries[_entries] & LengthMask));
                }
 
                public override void MoveNext()
                {
                    Contracts.Check(++_index < _entries, "reader already read all values");
                }
 
                public override void Get(ref ReadOnlyMemory<char> value)
                {
                    Contracts.Assert(_index < _entries);
                    int b = _boundaries[_index + 1];
                    int start = _boundaries[_index] & LengthMask;
                    if (b >= 0)
                        value = _text.AsMemory().Slice(start, (b & LengthMask) - start);
                    else
                    {
                        //For backward compatibility when NA values existed, treat them
                        //as empty string.
                        value = ReadOnlyMemory<char>.Empty;
                    }
                }
            }
        }
 
        /// <summary>
        /// This is a boolean code that reads from a form that serialized
        /// 1 bit per value. The old encoding (implemented by a different codec)
        /// uses 2 bits per value so NA values can be supported.
        /// </summary>
        private sealed class BoolCodec : SimpleCodec<bool>
        {
            // *** Binary block format ***
            // Packed bits.
 
            public BoolCodec(CodecFactory factory)
                : base(factory, BooleanDataViewType.Instance)
            {
            }
 
            public override string LoadName
            {
                get { return typeof(bool).Name; }
            }
 
            public override IValueWriter<bool> OpenWriter(Stream stream)
            {
                return new Writer(this, stream);
            }
 
            private sealed class Writer : ValueWriterBase<bool>
            {
                // Pack 8 values into 8 bits.
                private byte _currentBits;
                private long _numWritten;
                private byte _currentIndex;
 
                public Writer(BoolCodec codec, Stream stream)
                    : base(codec.Factory, stream)
                {
                }
 
                public override void Write(in bool value)
                {
                    Contracts.Assert(0 <= _currentIndex && _currentIndex < 8);
 
                    _numWritten++;
                    if (value)
                        _currentBits |= (byte)(1 << _currentIndex);
 
                    _currentIndex++;
                    if (_currentIndex == 8)
                    {
                        Writer.Write(_currentBits);
                        _currentBits = 0;
                        _currentIndex = 0;
                    }
                }
 
                // REVIEW: More efficient array writers are certainly possible.
 
                public override long GetCommitLengthEstimate()
                {
                    return 4 * (((_numWritten - 1) >> 4) + 1);
                }
 
                public override void Commit()
                {
                    if (_currentIndex > 0)
                    {
                        Writer.Write(_currentBits);
                        _currentBits = 0;
                        _currentIndex = 0;
                    }
                }
            }
 
            public override IValueReader<bool> OpenReader(Stream stream, int items)
            {
                return new Reader(this, stream, items);
            }
 
            private sealed class Reader : ValueReaderBase<bool>
            {
                private byte _currentBits;
                private int _currentIndex;
                private int _remaining;
 
                public Reader(BoolCodec codec, Stream stream, int items)
                    : base(codec.Factory, stream)
                {
                    _remaining = items;
                    _currentIndex = -1;
                }
 
                public override void MoveNext()
                {
                    Contracts.Assert(0 < _remaining, "already consumed all values");
                    --_remaining;
                    if ((_currentIndex = (_currentIndex + 1) & 7) == 0)
                        _currentBits = Reader.ReadByte();
                    else
                        _currentBits >>= 1;
                }
 
                public override void Get(ref bool value)
                {
                    Contracts.Assert(0 <= _currentIndex, "have not moved in");
                    Contracts.Assert(_currentIndex < 8);
                    value = (_currentBits & 1) != 0;
                }
            }
        }
 
        private sealed class OldBoolCodec : SimpleCodec<bool>
        {
            // *** Binary block format ***
            // Pack 16 values into 32 bits, with 00 for false, 01 for true and 10 for NA.
 
            public OldBoolCodec(CodecFactory factory)
                : base(factory, BooleanDataViewType.Instance)
            {
            }
 
            public override IValueWriter<bool> OpenWriter(Stream stream)
            {
                Contracts.Assert(false, "This older form only supports reading");
                throw Contracts.ExceptNotSupp("Writing single bit booleans no longer supported");
            }
 
            public override IValueReader<bool> OpenReader(Stream stream, int items)
            {
                return new Reader(this, stream, items);
            }
 
            private sealed class Reader : ValueReaderBase<bool>
            {
                private int _currentBits;
                private int _currentSlot;
                private int _remaining;
 
                public Reader(OldBoolCodec codec, Stream stream, int items)
                    : base(codec.Factory, stream)
                {
                    _remaining = items;
                    _currentSlot = -1;
                }
 
                public override void MoveNext()
                {
                    Contracts.Assert(0 < _remaining, "already consumed all values");
                    --_remaining;
                    if ((_currentSlot = (_currentSlot + 1) & 0x0F) == 0)
                        _currentBits = Reader.ReadInt32();
                    else
                        _currentBits = (int)((uint)_currentBits >> 2);
                }
 
                public override void Get(ref bool value)
                {
                    Contracts.Assert(0 <= _currentSlot, "have not moved in");
                    Contracts.Assert(_currentSlot < 16);
                    switch (_currentBits & 0x3)
                    {
                        case 0x0:
                            value = false;
                            break;
                        case 0x1:
                            value = true;
                            break;
                        case 0x2:
                            value = false;
                            break;
                        default:
                            throw Contracts.ExceptDecode("Invalid bit pattern in BoolCodec");
                    }
                }
            }
        }
 
        private sealed class DateTimeCodec : SimpleCodec<DateTime>
        {
            public DateTimeCodec(CodecFactory factory)
                : base(factory, DateTimeDataViewType.Instance)
            {
            }
 
            public override IValueWriter<DateTime> OpenWriter(Stream stream)
            {
                return new Writer(this, stream);
            }
 
            public override IValueReader<DateTime> OpenReader(Stream stream, int items)
            {
                return new Reader(this, stream, items);
            }
 
            private sealed class Writer : ValueWriterBase<DateTime>
            {
                private long _numWritten;
 
                public Writer(DateTimeCodec codec, Stream stream)
                    : base(codec.Factory, stream)
                {
                }
 
                public override void Write(in DateTime value)
                {
                    Writer.Write(value.Ticks);
                    _numWritten++;
                }
 
                public override void Commit()
                {
                    // No state or structure to flush. This does nothing.
                }
 
                public override long GetCommitLengthEstimate()
                {
                    return _numWritten * sizeof(long);
                }
            }
 
            private sealed class Reader : ValueReaderBase<DateTime>
            {
                private int _remaining;
                private DateTime _value;
 
                public Reader(DateTimeCodec codec, Stream stream, int items)
                    : base(codec.Factory, stream)
                {
                    _remaining = items;
                }
 
                public override void MoveNext()
                {
                    Contracts.Assert(_remaining > 0, "already consumed all values");
 
                    var ticks = Reader.ReadInt64();
                    _value = new DateTime(ticks == long.MinValue ? default : ticks);
                    _remaining--;
                }
 
                public override void Get(ref DateTime value)
                {
                    value = _value;
                }
            }
        }
 
        private sealed class DateTimeOffsetCodec : SimpleCodec<DateTimeOffset>
        {
            private readonly MadeObjectPool<long[]> _longBufferPool;
            private readonly MadeObjectPool<short[]> _shortBufferPool;
 
            public DateTimeOffsetCodec(CodecFactory factory)
                : base(factory, DateTimeOffsetDataViewType.Instance)
            {
                _longBufferPool = new MadeObjectPool<long[]>(() => null);
                _shortBufferPool = new MadeObjectPool<short[]>(() => null);
            }
 
            public override IValueWriter<DateTimeOffset> OpenWriter(Stream stream)
            {
                return new Writer(this, stream);
            }
 
            public override IValueReader<DateTimeOffset> OpenReader(Stream stream, int items)
            {
                return new Reader(this, stream, items);
            }
 
            private sealed class Writer : ValueWriterBase<DateTimeOffset>
            {
                private List<short> _offsets;
                private List<long> _ticks;
 
                public Writer(DateTimeOffsetCodec codec, Stream stream)
                    : base(codec.Factory, stream)
                {
                    _offsets = new List<short>();
                    _ticks = new List<long>();
                }
 
                public override void Write(in DateTimeOffset value)
                {
                    Contracts.Assert(_offsets != null, "writer was already committed");
 
                    _ticks.Add(value.DateTime.Ticks);
 
                    //DateTimeOffset exposes its offset as a TimeSpan, but internally it uses short and in minutes.
                    //https://github.com/dotnet/coreclr/blob/9499b08eefd895158c3f3c7834e185a73619128d/src/System.Private.CoreLib/shared/System/DateTimeOffset.cs#L51-L53
                    //https://github.com/dotnet/coreclr/blob/9499b08eefd895158c3f3c7834e185a73619128d/src/System.Private.CoreLib/shared/System/DateTimeOffset.cs#L286-L292
                    //From everything online(ISO8601, RFC3339, SQL Server doc, the offset supports the range -14 to 14 hours, and only supports minute precision.
                    _offsets.Add((short)(value.Offset.TotalMinutes));
                }
 
                public override void Commit()
                {
                    Contracts.Assert(_offsets != null, "writer was already committed");
                    Contracts.Assert(Utils.Size(_offsets) == Utils.Size(_ticks));
 
                    Writer.WriteShortStream(_offsets); // Write the offsets.
                    Writer.WriteLongStream(_ticks); // Write the tick values.
                    _offsets = null;
                    _ticks = null;
                }
 
                public override long GetCommitLengthEstimate()
                {
                    return (long)_offsets.Count * (sizeof(long) + sizeof(short));
                }
            }
 
            private sealed class Reader : ValueReaderBase<DateTimeOffset>
            {
                private readonly DateTimeOffsetCodec _codec;
 
                private readonly int _entries;
                private short[] _offsets;
                private long[] _ticks;
                private int _index;
                private bool _disposed;
 
                public Reader(DateTimeOffsetCodec codec, Stream stream, int items)
                    : base(codec.Factory, stream)
                {
                    _codec = codec;
                    _entries = items;
                    _index = -1;
 
                    _offsets = _codec._shortBufferPool.Get();
                    Utils.EnsureSize(ref _offsets, _entries, false);
                    for (int i = 0; i < _entries; i++)
                        _offsets[i] = Reader.ReadInt16();
 
                    _ticks = _codec._longBufferPool.Get();
                    Utils.EnsureSize(ref _ticks, _entries, false);
                    for (int i = 0; i < _entries; i++)
                        _ticks[i] = Reader.ReadInt64();
                }
 
                public override void MoveNext()
                {
                    Contracts.Assert(!_disposed);
                    Contracts.Check(++_index < _entries, "reader already read all values");
                }
 
                public override void Get(ref DateTimeOffset value)
                {
                    Contracts.Assert(!_disposed);
                    var ticks = _ticks[_index];
                    var offset = _offsets[_index];
                    value = new DateTimeOffset(new DateTime(ticks == long.MinValue ? default : ticks), new TimeSpan(0, offset == short.MinValue ? default : offset, 0));
                }
 
                public override void Dispose()
                {
                    if (!_disposed)
                    {
                        _codec._shortBufferPool.Return(_offsets);
                        _codec._longBufferPool.Return(_ticks);
                        _offsets = null;
                        _ticks = null;
                        _disposed = true;
                    }
                    base.Dispose();
                }
            }
        }
 
        private sealed class VBufferCodec<T> : IValueCodec<VBuffer<T>>
        {
            // *** Binary block format ***
            // int: Number of vectors.
            // int: If positive the length of all encoded vectors, or 0 if they are of variable size.
            // int[numVectors] (optional): If vectors of variable size, this array contains the lengths.
            // int[numVectors]: Counts of active elements per vector, or -1 if the array is dense.
            // int: Total number of indices. This must equal the sum of all non-negative elements of the counts array.
            // int[numIndices]: The packed array of indices for all the vbuffers.
            // <Values>: The packed sequence of values for all the vbuffers, written using the inner value codec's scheme.
 
            private readonly CodecFactory _factory;
            private readonly VectorDataViewType _type;
            // The codec for the internal elements.
            private readonly IValueCodec<T> _innerCodec;
            private readonly MadeObjectPool<T[]> _bufferPool;
            private readonly MadeObjectPool<int[]> _intBufferPool;
 
            public string LoadName { get { return "VBuffer"; } }
 
            public DataViewType Type { get { return _type; } }
 
            public VBufferCodec(CodecFactory factory, VectorDataViewType type, IValueCodec<T> innerCodec)
            {
                Contracts.AssertValue(factory);
                Contracts.AssertValue(type);
                Contracts.AssertValue(innerCodec);
                Contracts.Assert(type.RawType == typeof(VBuffer<T>));
                Contracts.Assert(innerCodec.Type == type.ItemType);
                _factory = factory;
                _type = type;
                _innerCodec = innerCodec;
                _bufferPool = new MadeObjectPool<T[]>(() => null);
                if (typeof(T) == typeof(int))
                    _intBufferPool = _bufferPool as MadeObjectPool<int[]>;
                else
                    _intBufferPool = new MadeObjectPool<int[]>(() => null);
            }
 
            public int WriteParameterization(Stream stream)
            {
                int total = _factory.WriteCodec(stream, _innerCodec);
                int count = _type.Dimensions.Length;
                total += sizeof(int) * (1 + count);
                using (BinaryWriter writer = _factory.OpenBinaryWriter(stream))
                {
                    writer.Write(count);
                    for (int i = 0; i < count; i++)
                        writer.Write(_type.Dimensions[i]);
                }
                return total;
            }
 
            public IValueWriter<VBuffer<T>> OpenWriter(Stream stream)
            {
                return new Writer(this, stream);
            }
 
            public IValueReader<VBuffer<T>> OpenReader(Stream stream, int items)
            {
                return new Reader(this, stream, items);
            }
 
            private sealed class Writer : ValueWriterBase<VBuffer<T>>
            {
                private readonly int _size;
                private readonly List<int> _lengths;
                private readonly List<int> _counts;
                private readonly List<int> _indices;
 
                private MemoryStream _valuesStream;
                private IValueWriter<T> _valueWriter;
 
                private bool FixedLength { get { return _size > 0; } }
 
                public Writer(VBufferCodec<T> codec, Stream stream)
                    : base(codec._factory, stream)
                {
                    _size = codec._type.Size;
                    _lengths = FixedLength ? null : new List<int>();
                    _counts = new List<int>();
                    _indices = new List<int>();
 
                    _valuesStream = Factory._memPool.Get();
                    _valueWriter = codec._innerCodec.OpenWriter(_valuesStream);
                }
 
                public override void Commit()
                {
                    Contracts.Check(_valuesStream != null, "writer already committed");
 
                    _valueWriter.Commit();
                    _valueWriter.Dispose();
                    _valueWriter = null;
 
                    // The number of vectors.
                    Writer.Write(_counts.Count);
                    // The fixed length of vectors, 0 if not fixed length.
                    if (FixedLength)
                    {
                        // If fixed length, we just write the length.
                        Writer.Write(_size);
                    }
                    else
                    {
                        // If not fixed length, still write the single length and skip the length
                        // if they happen to all be the same size. Else write 0.
                        int len = _lengths.Count == 0 ? 0 : _lengths[0];
                        for (int i = 1; i < _lengths.Count; ++i)
                        {
                            if (len != _lengths[i])
                            {
                                len = 0;
                                break;
                            }
                        }
                        Writer.Write(len);
                        if (len == 0)
                            Writer.WriteIntStream(_lengths);
                    }
                    // Write the counts.
                    Writer.WriteIntStream(_counts);
                    // Write the number of indices.
                    Writer.Write(_indices.Count);
                    // Write the indices.
                    Writer.WriteIntStream(_indices);
 
                    // Write the values. The way we created the memory stream, TryGetBuffer should not fail.
                    ArraySegment<byte> buffer;
                    bool tmp = _valuesStream.TryGetBuffer(out buffer);
                    Contracts.Assert(tmp, "TryGetBuffer failed in VBufferCodec!");
                    Stream.Write(buffer.Array, buffer.Offset, buffer.Count);
 
                    Factory._memPool.Return(ref _valuesStream);
                }
 
                public override long GetCommitLengthEstimate()
                {
                    long structureLength = sizeof(int) * (2 + (long)Utils.Size(_lengths) + _counts.Count + 1 + _indices.Count);
                    return structureLength + _valueWriter.GetCommitLengthEstimate();
                }
 
                public override void Write(in VBuffer<T> value)
                {
                    Contracts.Check(_valuesStream != null, "writer already committed");
                    if (FixedLength)
                    {
                        if (value.Length != _size)
                            throw Contracts.Except("Length mismatch: expected {0} slots but got {1}", _size, value.Length);
                    }
                    else
                        _lengths.Add(value.Length);
                    // REVIEW: In the non-fixed length case we can still check that the
                    // length is a multiple of the product of the non-zero tail sizes of the type.
                    var valueValues = value.GetValues();
                    if (value.IsDense)
                    {
                        _counts.Add(-1);
                        _valueWriter.Write(valueValues);
                    }
                    else
                    {
                        _counts.Add(valueValues.Length);
                        if (valueValues.Length > 0)
                        {
                            var valueIndices = value.GetIndices();
                            for (int i = 0; i < valueIndices.Length; i++)
                                _indices.Add(valueIndices[i]);
                            _valueWriter.Write(valueValues);
                        }
                    }
                }
            }
 
            private sealed class Reader : ValueReaderBase<VBuffer<T>>
            {
                private readonly VBufferCodec<T> _codec;
 
                // The fixed size of the vectors returned, with the same semantics as ColumnType.VectorSize,
                // that is, if zero, the vectors are *not* of fixed size. Even if the ColumnType happens
                // to be of non-fixed size, this may have a positive value if the vectors just so happen
                // to have equal length.
                private readonly int _size;
                // The number of vectors.
                private readonly int _numVectors;
                // This will be non-null only if the vectors are not of fixed size. In such a case, it is of
                // length equal to the number of vectors, with each element holding the length of the vectors.
                private readonly int[] _lengths;
                // This is of length equal to the number of vectors, and holds the count of values for the
                // sparse vbuffer (in which case it is non-negative), or -1 if the vector is dense.
                private readonly int[] _counts;
                // The packed array of indices. This may be null, if there are no indices, that is, all vectors
                // are dense.
                private readonly int[] _indices;
                // The packed array of values. This will be equal to the count of values in all arrays.
                private readonly T[] _values;
 
                private bool _disposed;
                // The current index of the vector. This will index _counts, and if applicable _lengths. This
                // will be incremented by one on movenext.
                private int _vectorIndex;
                // The current offset into the _indices array, for the current vector index. This will be
                // incremented on movenext by the current (soon to be previous) vector's number of specified
                // indices (0 if the vector is dense, or sparse with no values).
                private int _indicesOffset;
                // The current offset into the _values array, for the current vector index. This will be
                // incremented on movenext by the current (soon to be previous) vector's "count" of number
                // of specified values.
                private int _valuesOffset;
 
                private bool FixedLength { get { return _size > 0; } }
 
                public Reader(VBufferCodec<T> codec, Stream stream, int items)
                    : base(codec._factory, stream)
                {
                    _codec = codec;
 
                    // The number of vectors.
                    _numVectors = Reader.ReadInt32();
                    Contracts.CheckDecode(_numVectors == items);
 
                    // The length of all those vectors.
                    _size = Reader.ReadInt32();
                    if (codec._type.IsKnownSize)
                        Contracts.CheckDecode(codec._type.Size == _size);
                    else
                        Contracts.CheckDecode(_size >= 0);
                    if (!FixedLength)
                        _lengths = ReadIntArray(_numVectors);
 
                    // The counts of all such vectors.
                    _counts = ReadIntArray(_numVectors);
                    int numIndices = Reader.ReadInt32();
                    Contracts.CheckDecode(numIndices >= 0);
                    _indices = ReadIntArray(numIndices);
 
                    // Validate the number of indices
                    int totalItems = 0;
                    for (int i = 0, ii = 0; i < _numVectors; ++i)
                    {
                        int count = _counts[i];
                        int len = FixedLength ? _size : _lengths[i];
                        Contracts.CheckDecode(len >= 0);
                        if (count < 0) // dense
                        {
                            Contracts.CheckDecode(count == -1);
                            count = len;
                            totalItems += count;
                        }
                        else // sparse
                        {
                            Contracts.CheckDecode(count < len);
                            numIndices += count;
                            totalItems += count;
 
                            // Check the correctness of the indices.
                            int prev = -1;
                            count += ii;
                            for (int j = ii; j < count; j++)
                            {
                                Contracts.CheckDecode(prev < _indices[j]);
                                prev = _indices[j];
                            }
                            ii = count;
                            Contracts.CheckDecode(prev < len);
                        }
                    }
 
                    // Get a buffer.
                    var values = codec._bufferPool.Get();
                    Utils.EnsureSize(ref values, totalItems, false);
                    using (var reader = codec._innerCodec.OpenReader(stream, totalItems))
                        reader.Read(values, 0, totalItems);
                    _values = values;
                    _vectorIndex = -1;
                }
 
                public override void Dispose()
                {
                    if (!_disposed)
                    {
                        _codec._bufferPool.Return(_values);
                        _codec._intBufferPool.Return(_counts);
                        _codec._intBufferPool.Return(_indices);
                        if (_lengths != null)
                            _codec._intBufferPool.Return(_lengths);
                        _disposed = true;
                    }
                    base.Dispose();
                }
 
                private int[] ReadIntArray(int count)
                {
                    int[] values = _codec._intBufferPool.Get();
                    Utils.EnsureSize(ref values, count, false);
                    for (int i = 0; i < count; ++i)
                        values[i] = Reader.ReadInt32();
                    return values;
                }
 
                public override void MoveNext()
                {
                    Contracts.Assert(_vectorIndex < _numVectors - 1, "already consumed all vectors");
                    if (_vectorIndex >= 0)
                    {
                        // We are on a vector. Skip to the next vector.
                        int count = _counts[_vectorIndex];
                        if (count < 0)
                        {
                            _valuesOffset += FixedLength ? _size : _lengths[_vectorIndex];
                        }
                        else
                        {
                            _indicesOffset += count;
                            _valuesOffset += count;
                        }
                    }
                    _vectorIndex++;
                }
 
                public override void Get(ref VBuffer<T> value)
                {
                    Contracts.Assert(_vectorIndex >= 0, "have not moved in");
                    int length = FixedLength ? _size : _lengths[_vectorIndex];
                    int count = _counts[_vectorIndex];
 
                    if (count < 0)
                    {
                        // dense
                        var editor = VBufferEditor.Create(ref value, length);
                        if (length > 0)
                        {
                            _values.AsSpan(_valuesOffset, length)
                                .CopyTo(editor.Values);
                        }
                        value = editor.Commit();
                    }
                    else
                    {
                        // sparse
                        var editor = VBufferEditor.Create(ref value, length, count);
                        if (count > 0)
                        {
                            _values.AsSpan(_valuesOffset, count)
                                .CopyTo(editor.Values);
                            _indices.AsSpan(_indicesOffset, count)
                                .CopyTo(editor.Indices);
                        }
                        value = editor.Commit();
                    }
                }
            }
        }
 
        private bool GetVBufferCodec(Stream definitionStream, out IValueCodec codec)
        {
            // The first value in the definition stream will be the internal codec.
            IValueCodec innerCodec;
            if (!TryReadCodec(definitionStream, out innerCodec))
            {
                codec = default(IValueCodec);
                return false;
            }
            // From this internal codec, get the VBuffer type of the codec we will return.
            var itemType = innerCodec.Type as PrimitiveDataViewType;
            Contracts.CheckDecode(itemType != null);
            // Following the internal type definition is the dimensions.
            VectorDataViewType type;
            using (BinaryReader reader = OpenBinaryReader(definitionStream))
            {
                var dims = reader.ReadIntArray();
                if (Utils.Size(dims) > 0)
                {
                    foreach (int d in dims)
                        Contracts.CheckDecode(d >= 0);
                    type = new VectorDataViewType(itemType, dims);
                }
                else
                {
                    // In prior times, in the case where the VectorType was of single rank, *and* of unknown length,
                    // then the vector type would be considered to have a dimension count of 0, for some reason.
                    // This can no longer occur, but in the case where we read an older file we have to account for
                    // the fact that nothing may have been written.
                    type = new VectorDataViewType(itemType);
                }
            }
            // Next create the vbuffer codec.
            Type codecType = typeof(VBufferCodec<>).MakeGenericType(itemType.RawType);
            codec = (IValueCodec)Activator.CreateInstance(codecType, this, type, innerCodec);
            return true;
        }
 
        private bool GetVBufferCodec(VectorDataViewType type, out IValueCodec codec)
        {
            DataViewType itemType = type.ItemType;
            // First create the element codec.
            IValueCodec innerCodec;
            if (!TryGetCodec(itemType, out innerCodec))
            {
                codec = default(IValueCodec);
                return false;
            }
            // Next create the vbuffer codec.
            Type codecType = typeof(VBufferCodec<>).MakeGenericType(itemType.RawType);
            codec = (IValueCodec)Activator.CreateInstance(codecType, this, type, innerCodec);
            return true;
        }
 
        private sealed class KeyCodecOld<T> : IValueCodec<T>
        {
            // *** Binary block format ***
            // Identical to UnsafeTypeCodec, packed bytes of little-endian values.
 
            private readonly CodecFactory _factory;
            private readonly KeyDataViewType _type;
            // We rely on a more basic value codec to do the actual saving and loading.
            private readonly IValueCodec<T> _innerCodec;
 
            public string LoadName { get { return "Key"; } }
 
            public DataViewType Type { get { return _type; } }
 
            public KeyCodecOld(CodecFactory factory, KeyDataViewType type, IValueCodec<T> innerCodec)
            {
                Contracts.AssertValue(factory);
                Contracts.AssertValue(type);
                Contracts.AssertValue(innerCodec);
                Contracts.Assert(type.RawType == typeof(T));
                Contracts.Assert(innerCodec.Type.RawType == type.RawType);
                _factory = factory;
                _type = type;
                _innerCodec = innerCodec;
            }
 
            public int WriteParameterization(Stream stream)
            {
                int total = _factory.WriteCodec(stream, _innerCodec);
                using (BinaryWriter writer = _factory.OpenBinaryWriter(stream))
                {
                    writer.Write(_type.Count);
                    total += sizeof(ulong);
                }
                return total;
            }
 
            // REVIEW: There is something a little bit troubling here. If someone, say,
            // produces a column on KeyType(I4, 4) and then returns 10 as a value in
            // that column, that's obviously a violation of the type, and lots of things
            // downstream may complain, but it is a "valid" cursor in that it produces values
            // and does not throw. So from that perspective of the codecs and their users being
            // common and indifferent carriers, it's not clear tha these codecs should take on the
            // responsibility for validating the input. On the *other* hand, if we know that we
            // wrote valid data, when reading it back from a stream should we not take advantage
            // of this, to validate the correctness of the decoding? On the other other hand, is
            // validating the correctness for the decoding of things like streams any less urgent?
 
            public IValueWriter<T> OpenWriter(Stream stream)
            {
                return _innerCodec.OpenWriter(stream);
            }
 
            public IValueReader<T> OpenReader(Stream stream, int items)
            {
                return _innerCodec.OpenReader(stream, items);
            }
        }
 
        private bool GetKeyCodecOld(Stream definitionStream, out IValueCodec codec)
        {
            // The first value in the definition stream will be the internal codec.
            IValueCodec innerCodec;
            if (!TryReadCodec(definitionStream, out innerCodec))
            {
                codec = default;
                return false;
            }
            // Construct the key type.
            var itemType = innerCodec.Type as PrimitiveDataViewType;
            Contracts.CheckDecode(itemType != null);
            Contracts.CheckDecode(KeyDataViewType.IsValidDataType(itemType.RawType));
            KeyDataViewType type;
            using (BinaryReader reader = OpenBinaryReader(definitionStream))
            {
                bool contiguous = reader.ReadBoolByte();
                ulong min = reader.ReadUInt64();
                int count = reader.ReadInt32();
 
                // Since we no longer support the notion of min != 0 or non contiguous values we throw in that case.
                Contracts.CheckDecode(min == 0);
                Contracts.CheckDecode(0 <= count);
                Contracts.CheckDecode((ulong)count <= itemType.GetRawKind().ToMaxInt());
                Contracts.CheckDecode(contiguous);
 
                // Since we removed the notion of unknown cardinality (count == 0), we map to the maximum value.
                if (count == 0)
                    type = new KeyDataViewType(itemType.RawType, itemType.RawType.ToMaxInt());
                else
                    type = new KeyDataViewType(itemType.RawType, count);
            }
            // Next create the key codec.
            Type codecType = typeof(KeyCodecOld<>).MakeGenericType(itemType.RawType);
            codec = (IValueCodec)Activator.CreateInstance(codecType, this, type, innerCodec);
            return true;
        }
 
        private sealed class KeyCodec<T> : IValueCodec<T>
        {
            // *** Binary block format ***
            // Identical to UnsafeTypeCodec, packed bytes of little-endian values.
 
            private readonly CodecFactory _factory;
            private readonly KeyDataViewType _type;
            // We rely on a more basic value codec to do the actual saving and loading.
            private readonly IValueCodec<T> _innerCodec;
 
            public string LoadName { get { return "Key2"; } }
 
            public DataViewType Type { get { return _type; } }
 
            public KeyCodec(CodecFactory factory, KeyDataViewType type, IValueCodec<T> innerCodec)
            {
                Contracts.AssertValue(factory);
                Contracts.AssertValue(type);
                Contracts.AssertValue(innerCodec);
                Contracts.Assert(type.RawType == typeof(T));
                Contracts.Assert(innerCodec.Type.RawType == type.RawType);
                _factory = factory;
                _type = type;
                _innerCodec = innerCodec;
            }
 
            public int WriteParameterization(Stream stream)
            {
                int total = _factory.WriteCodec(stream, _innerCodec);
                using (BinaryWriter writer = _factory.OpenBinaryWriter(stream))
                {
                    writer.Write(_type.Count);
                    total += sizeof(ulong);
                }
                return total;
            }
 
            // REVIEW: There is something a little bit troubling here. If someone, say,
            // produces a column on KeyType(I4, 4) and then returns 10 as a value in
            // that column, that's obviously a violation of the type, and lots of things
            // downstream may complain, but it is a "valid" cursor in that it produces values
            // and does not throw. So from that perspective of the codecs and their users being
            // common and indifferent carriers, it's not clear tha these codecs should take on the
            // responsibility for validating the input. On the *other* hand, if we know that we
            // wrote valid data, when reading it back from a stream should we not take advantage
            // of this, to validate the correctness of the decoding? On the other other hand, is
            // validating the correctness for the decoding of things like streams any less urgent?
 
            public IValueWriter<T> OpenWriter(Stream stream)
            {
                return _innerCodec.OpenWriter(stream);
            }
 
            public IValueReader<T> OpenReader(Stream stream, int items)
            {
                return _innerCodec.OpenReader(stream, items);
            }
        }
 
        private bool GetKeyCodec(Stream definitionStream, out IValueCodec codec)
        {
            // The first value in the definition stream will be the internal codec.
            IValueCodec innerCodec;
            if (!TryReadCodec(definitionStream, out innerCodec))
            {
                codec = default;
                return false;
            }
            // Construct the key type.
            var itemType = innerCodec.Type as PrimitiveDataViewType;
            Contracts.CheckDecode(itemType != null);
            Contracts.CheckDecode(KeyDataViewType.IsValidDataType(itemType.RawType));
            KeyDataViewType type;
            using (BinaryReader reader = OpenBinaryReader(definitionStream))
            {
                ulong count = reader.ReadUInt64();
 
                Contracts.CheckDecode(0 < count);
                Contracts.CheckDecode(count <= itemType.RawType.ToMaxInt());
 
                type = new KeyDataViewType(itemType.RawType, count);
            }
            // Next create the key codec.
            Type codecType = typeof(KeyCodec<>).MakeGenericType(itemType.RawType);
            codec = (IValueCodec)Activator.CreateInstance(codecType, this, type, innerCodec);
            return true;
        }
 
        private bool GetKeyCodec(DataViewType type, out IValueCodec codec)
        {
            if (!(type is KeyDataViewType))
                throw Contracts.ExceptParam(nameof(type), "type must be a key type");
            // Create the internal codec the key codec will use to do the actual reading/writing.
            IValueCodec innerCodec;
            if (!TryGetCodec(ColumnTypeExtensions.NumberTypeFromType(type.RawType), out innerCodec))
            {
                codec = default;
                return false;
            }
            // Next create the key codec.
            Type codecType = typeof(KeyCodec<>).MakeGenericType(type.RawType);
            codec = (IValueCodec)Activator.CreateInstance(codecType, this, type, innerCodec);
            return true;
        }
    }
}