|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ML;
using Microsoft.ML.Command;
using Microsoft.ML.CommandLine;
using Microsoft.ML.Data;
using Microsoft.ML.Data.IO;
using Microsoft.ML.Internal.Utilities;
using Microsoft.ML.Runtime;
using Microsoft.ML.Transforms;
[assembly: LoadableClass(BinaryLoader.Summary, typeof(BinaryLoader), typeof(BinaryLoader.Arguments), typeof(SignatureDataLoader),
"Binary Loader",
BinaryLoader.LoadName,
"Binary",
"Bin")]
[assembly: LoadableClass(BinaryLoader.Summary, typeof(BinaryLoader), null, typeof(SignatureLoadDataLoader),
"Binary Data View Loader", BinaryLoader.LoaderSignature)]
[assembly: LoadableClass(typeof(BinaryLoader.InfoCommand), typeof(BinaryLoader.InfoCommand.Arguments), typeof(SignatureCommand),
"", BinaryLoader.InfoCommand.LoadName, "idv")]
namespace Microsoft.ML.Data.IO
{
[BestFriend]
internal sealed class BinaryLoader : ILegacyDataLoader, IDisposable
{
public sealed class Arguments
{
[Argument(ArgumentType.LastOccurrenceWins, HelpText = "The number of worker decompressor threads to use", ShortName = "t")]
public int? Threads;
// REVIEW: Is this the right knob? The other thing we could do is have a bound on number
// of MB, based on an analysis of average block size.
[Argument(ArgumentType.LastOccurrenceWins, HelpText = "When shuffling, the number of blocks worth of data to keep in the shuffle pool. " +
"Larger values will make the shuffling more random, but use more memory. Set to 0 to use only block shuffling.", ShortName = "pb")]
public Double PoolBlocks = _defaultShuffleBlocks;
}
/// <summary>
/// Each column corresponds to a table of contents entry, describing information about the column
/// and how values may be extracted. For columns represented physically within the stream this will
/// include its location within the stream and a codec to decode the bytestreams, and for generated
/// columns procedures to create them. This structure is used both for those columns that
/// we know how to access (called alive columns), and those columns we do not know how to access
/// (either because the value codec or compressions scheme is unrecognized, called a dead column).
/// </summary>
private sealed class TableOfContentsEntry
{
/// <summary>
/// The name of the column.
/// </summary>
public readonly string Name;
/// <summary>
/// The codec we will use to read the values from the stream. This will be null if
/// and only if this is a dead or generated column.
/// </summary>
public readonly IValueCodec Codec;
/// <summary>
/// The column type of the column. This will be null if and only if this is a dead
/// column.
/// </summary>
public readonly DataViewType Type;
/// <summary>
/// The compression scheme used on this column's blocks.
/// </summary>
public readonly CompressionKind Compression;
/// <summary>
/// The number of rows in each block (except for the last one).
/// </summary>
public readonly int RowsPerBlock;
/// <summary>
/// The offset into the stream where the lookup table for this column is stored.
/// </summary>
public readonly long LookupOffset;
/// <summary>
/// The offset into the stream where the metadata TOC entries for this column are
/// stored. This will be 0 if there is no metadata for this column.
/// </summary>
public readonly long MetadataTocOffset;
/// <summary>
/// The index of the column. Note that if there are dead columns, this value may
/// differ from the corresponding column index as reported by the dataview.
/// </summary>
public readonly int ColumnIndex;
// Non-null only for generated columns.
private readonly Delegate _generatorDelegate;
private readonly BinaryLoader _parent;
private readonly IExceptionContext _ectx;
// Initially null, but lazily constructed by GetLookup.
private volatile BlockLookup[] _lookup;
// Both initially -1, but lazily constructed by GetLookup.
private volatile int _maxCompLen;
private volatile int _maxDecompLen;
// Initially null, but lazily constructed by GetMetadataTOC.
private volatile MetadataTableOfContentsEntry[] _metadataToc;
// Initially null, but lazily constructed by GetMetadataTOC. This contains
// the descriptions of uninterpretable metadata, akin to the _deadColumns
// array in the loader.
private volatile MetadataTableOfContentsEntry[] _deadMetadataToc;
// Initially null, but lazily constructed by GetMetadataTOCEntryOrNull.
private volatile Dictionary<string, MetadataTableOfContentsEntry> _metadataMap;
private long _metadataTocEnd;
/// <summary>
/// Whether this is a generated column, that is, something dependent on no actual block data
/// in the file.
/// </summary>
public bool IsGenerated { get { return ColumnIndex == -1; } }
public TableOfContentsEntry(BinaryLoader parent, int index, string name, IValueCodec codec,
CompressionKind compression, int rowsPerBlock, long lookupOffset, long metadataTocOffset)
{
Contracts.AssertValue(parent, "parent");
Contracts.AssertValue(parent._host, "parent");
_parent = parent;
_ectx = _parent._host;
_ectx.Assert(0 <= index && index < parent._header.ColumnCount);
_ectx.AssertValue(name);
_ectx.AssertValueOrNull(codec);
_ectx.Assert(metadataTocOffset == 0 || Header.HeaderSize <= metadataTocOffset);
// REVIEW: Should we allow lookup offset to be 0, if the binary file has no rows?
_ectx.Assert(Header.HeaderSize <= lookupOffset);
ColumnIndex = index;
Name = name;
Codec = codec;
Type = Codec?.Type;
Compression = compression;
RowsPerBlock = rowsPerBlock;
LookupOffset = lookupOffset;
MetadataTocOffset = metadataTocOffset;
_maxCompLen = -1;
_maxDecompLen = -1;
_ectx.Assert(!IsGenerated);
}
/// <summary>
/// Constructor for a generated column, which corresponds to no column in the original file,
/// and has no stored blocks associated with it. The input <paramref name="valueMapper"/> must
/// be a <c>ValueMapper</c> mapping a <c>long</c> zero based row index, to some value with the
/// same type as the raw type in <paramref name="type"/>.
/// </summary>
public TableOfContentsEntry(BinaryLoader parent, string name, DataViewType type, Delegate valueMapper)
{
Contracts.AssertValue(parent, "parent");
Contracts.AssertValue(parent._host, "parent");
_parent = parent;
_ectx = _parent._host;
_ectx.AssertValue(name);
_ectx.AssertValue(type);
_ectx.AssertValue(valueMapper);
ColumnIndex = -1;
Name = name;
Type = type;
_generatorDelegate = valueMapper;
_maxCompLen = 0;
_maxDecompLen = 0;
_ectx.Assert(IsGenerated);
#if DEBUG
Action del = AssertGeneratorValid<int>;
var meth = del.GetMethodInfo().GetGenericMethodDefinition().MakeGenericMethod(Type.RawType);
meth.Invoke(this, null);
#endif
}
#if DEBUG
private void AssertGeneratorValid<T>()
{
_ectx.Assert(IsGenerated);
_ectx.AssertValue(_generatorDelegate);
_ectx.AssertValue(Type);
ValueMapper<long, T> del = _generatorDelegate as ValueMapper<long, T>;
_ectx.AssertValue(del);
}
#endif
/// <summary>
/// Returns the value mapper for a generated column. Only a valid call if
/// <typeparamref name="T"/> is the same type as <see cref="DataViewType.RawType"/>.
/// </summary>
public ValueMapper<long, T> GetValueMapper<T>()
{
_ectx.Assert(IsGenerated);
_ectx.Assert(typeof(T) == Type.RawType);
return (ValueMapper<long, T>)_generatorDelegate;
}
/// <summary>
/// Gets an array, one for each block of this column, describing its location within the file.
/// This will return null if and only if this is a generated column.
/// </summary>
public BlockLookup[] GetLookup()
{
_ectx.Assert(!IsGenerated == (LookupOffset > 0));
if (LookupOffset > 0 && _maxCompLen == -1)
{
Stream stream = _parent._stream;
lock (stream)
{
if (_maxCompLen == -1)
{
long rc = _parent._header.RowCount;
if (rc == 0)
return _lookup = new BlockLookup[0];
long numBlocks = (rc - 1) / RowsPerBlock + 1;
// By the format it's perfectly legal, but we don't yet support reading so many blocks.
if (numBlocks > int.MaxValue)
throw _ectx.ExceptNotSupp("This version of the software does not support {0} blocks", numBlocks);
var lookup = new BlockLookup[numBlocks];
stream.Seek(LookupOffset, SeekOrigin.Begin);
var reader = _parent._reader;
int maxCompLen = 0;
int maxDecompLen = 0;
for (int b = 0; b < numBlocks; ++b)
{
long offset = reader.ReadInt64();
int compLen = reader.ReadInt32();
int decompLen = reader.ReadInt32();
_ectx.CheckDecode(0 <= compLen, "negative compressed block length detected");
_ectx.CheckDecode(0 <= decompLen, "negative decompressed block length detected");
if (maxCompLen < compLen)
maxCompLen = compLen;
if (maxDecompLen < decompLen)
maxDecompLen = decompLen;
_ectx.CheckDecode(Header.HeaderSize <= offset && offset <= _parent._header.TailOffset - compLen, "block offset out of range");
lookup[b] = new BlockLookup(offset, compLen, decompLen);
}
_lookup = lookup;
_metadataTocEnd = stream.Position;
// Assign in this order since tests of validity are on comp len.
_maxDecompLen = maxDecompLen;
_maxCompLen = maxCompLen;
}
}
_ectx.AssertValue(_lookup);
}
_ectx.Assert(_maxDecompLen >= 0);
_ectx.Assert(_maxCompLen >= 0);
return _lookup;
}
/// <summary>
/// Fetches the maximum block sizes for both the compressed and decompressed
/// block sizes, for this column. If there are no blocks associated with this
/// column, for whatever reason (for example, a data view with no rows, or a generated
/// column), this will return 0 in both vlaues.
/// </summary>
/// <param name="compressed">The maximum value of the compressed block size
/// (that is, the actual size of the block in stream) among all blocks for this
/// column</param>
/// <param name="decompressed">The maximum value of the block size when
/// decompressed among all blocks for this column</param>
public void GetMaxBlockSizes(out int compressed, out int decompressed)
{
if (_maxCompLen == -1)
GetLookup();
_ectx.Assert(0 <= _maxCompLen);
_ectx.Assert(0 <= _maxDecompLen);
compressed = _maxCompLen;
decompressed = _maxDecompLen;
}
private void EnsureMetadataStructuresInitialized()
{
if (MetadataTocOffset <= 0 || _metadataToc != null)
return;
Stream stream = _parent._stream;
lock (stream)
{
if (_metadataToc != null)
return;
using (var ch = _parent._host.Start("Metadata TOC Read"))
{
ReadTocMetadata(ch, stream);
}
}
}
private void ReadTocMetadata(IChannel ch, Stream stream)
{
_ectx.AssertValue(ch);
ch.AssertValue(stream);
stream.Seek(MetadataTocOffset, SeekOrigin.Begin);
var reader = _parent._reader;
ulong mtocCount = reader.ReadLeb128Int();
ch.CheckDecode(0 < mtocCount && mtocCount < int.MaxValue,
"Bad number of metadata TOC entries read");
var mtocEntries = new List<MetadataTableOfContentsEntry>();
var deadMtocEntries = new List<MetadataTableOfContentsEntry>();
var map = new Dictionary<string, MetadataTableOfContentsEntry>();
// This may have more entries than map if some metadata blocks are uninterpretable.
var kinds = new HashSet<string>();
for (int i = 0; i < (int)mtocCount; ++i)
{
string kind = reader.ReadString();
ch.CheckDecode(!string.IsNullOrEmpty(kind), "Metadata kind must be non-empty string");
ch.CheckDecode(kinds.Add(kind), "Duplicate metadata kind read from file");
IValueCodec codec;
bool gotCodec = _parent._factory.TryReadCodec(stream, out codec);
// Even in the case where we did not succeed loading the codec, we still
// want to skip to the next table of contents entry, so keep reading.
CompressionKind compression = (CompressionKind)reader.ReadByte();
bool knowCompression = Enum.IsDefined(typeof(CompressionKind), compression);
long blockOffset = reader.ReadInt64();
ch.CheckDecode(Header.HeaderSize <= blockOffset && blockOffset <= _parent._header.TailOffset,
"Metadata block offset out of range");
ulong ublockSize = reader.ReadLeb128Int();
ch.CheckDecode(ublockSize <= long.MaxValue, "Metadata block size out of range");
long blockSize = (long)ublockSize;
ch.CheckDecode(0 < blockSize && blockSize <= _parent._header.TailOffset - blockOffset,
"Metadata block size out of range");
if (gotCodec && knowCompression)
{
var entry = MetadataTableOfContentsEntry.Create(_parent, kind, codec, compression, blockOffset, blockSize);
mtocEntries.Add(entry);
map[kind] = entry;
}
else
{
ch.Warning("Cannot interpret metadata of kind '{0}' because {1} unrecognized",
gotCodec ? "compression" : (knowCompression ? "codec" : "codec and compression"));
var entry = MetadataTableOfContentsEntry.CreateDead(_parent, kind, codec, compression, blockOffset, blockSize);
deadMtocEntries.Add(entry);
}
}
// It is possible for this to be empty but non-null if the associated codec
// or compression schemes for all pieces of metadata is unknown. We want to
// keep it empty but non-null so we can distinguish between the two cases of
// "couldn't read anything" vs. "didn't read anything," lest we attempt to
// re-read the metadata TOC on every request.
_metadataToc = mtocEntries.ToArray();
_deadMetadataToc = deadMtocEntries.ToArray();
_metadataMap = map;
_metadataTocEnd = stream.Position;
}
/// <summary>
/// Gets an array containing the metadata TOC entries. This will return null if there
/// are no entries stored at all, and empty if there is metadata, but none of it was
/// readable. (To inspect attributes of the unreadable metadata, if any, see
/// <see cref="GetDeadMetadataTocArray"/>.) All entries will point to metadata with
/// known codecs and compression schemes.
/// </summary>
public MetadataTableOfContentsEntry[] GetMetadataTocArray()
{
EnsureMetadataStructuresInitialized();
_ectx.Assert((MetadataTocOffset == 0) == (_metadataToc == null));
_ectx.Assert((MetadataTocOffset == 0) == (_deadMetadataToc == null));
return _metadataToc;
}
/// <summary>
/// Gets an array containing the metadata TOC entries for all "dead" pieces of metadata. This
/// will return null if there are no metadata stored at all either readable or unreadable, and
/// empty if there is no unreadable piece of metadata. A piece of metadata is considered "dead"
/// if either its codec or compression kind is unknown. This is primarily for diagnostic purposes.
/// </summary>
public MetadataTableOfContentsEntry[] GetDeadMetadataTocArray()
{
EnsureMetadataStructuresInitialized();
_ectx.Assert((MetadataTocOffset == 0) == (_metadataToc == null));
_ectx.Assert((MetadataTocOffset == 0) == (_deadMetadataToc == null));
return _deadMetadataToc;
}
/// <summary>
/// Returns the entry for a valid "live" piece of metadata given a kind.
/// </summary>
public MetadataTableOfContentsEntry GetMetadataTocEntryOrNull(string kind)
{
_ectx.AssertNonEmpty(kind);
EnsureMetadataStructuresInitialized();
if (_metadataMap == null)
{
_ectx.Assert(MetadataTocOffset == 0);
return null;
}
MetadataTableOfContentsEntry retval;
_metadataMap.TryGetValue(kind, out retval);
return retval;
}
/// <summary>
/// Returns the location in the stream just past the end of the metadata table of contents.
/// If this column has no metadata table of contents defined, this will return 0. This is
/// primarily for diagnostic purposes.
/// </summary>
/// <returns></returns>
public long GetMetadataTocEndOffset()
{
EnsureMetadataStructuresInitialized();
_ectx.Assert((_metadataTocEnd == 0) == (MetadataTocOffset == 0));
return _metadataTocEnd;
}
}
/// <summary>
/// A column can be associated with metadata, in which case it will have one or more table of contents entries,
/// each represented by one of these entries.
/// </summary>
private abstract class MetadataTableOfContentsEntry
{
/// <summary>
/// The kind of the metadata, an identifying name.
/// </summary>
public readonly string Kind;
/// <summary>
/// The codec we will use to read the metadata value. If this is <c>null</c>,
/// the metadata is considered "dead," that is, uninterpretable.
/// </summary>
public abstract IValueCodec Codec { get; }
/// <summary>
/// The compression scheme used on the metadata block. If this is an unknown
/// type, the metadata is considered "dead," that is, uninterpretable.
/// </summary>
public readonly CompressionKind Compression;
/// <summary>
/// The offset into the stream where the metadata block begins.
/// </summary>
public readonly long BlockOffset;
/// <summary>
/// The number of bytes used to store the metadata block.
/// </summary>
public readonly long BlockSize;
protected readonly BinaryLoader Parent;
/// <summary>
/// Return <see cref="ValueGetter{TValue}"/> to the stored entry value as <see cref="Delegate"/>. An example of stored value is
/// <see cref="MetadataTableOfContentsEntry{T}.Value"/>. For implementations of <see cref="GetGetter"/>, see <see cref="ImplDead"/>,
/// <see cref="ImplOne{T}"/>, and <see cref="ImplVec{T}"/>.
/// </summary>
public abstract Delegate GetGetter();
protected MetadataTableOfContentsEntry(BinaryLoader parent, string kind,
CompressionKind compression, long blockOffset, long blockSize)
{
Contracts.AssertValue(parent, "Parent");
Contracts.AssertValue(parent._host, "parent");
Contracts.AssertNonEmpty(kind);
Contracts.Assert(Header.HeaderSize <= blockOffset);
Contracts.Assert(0 <= blockSize);
Parent = parent;
Kind = kind;
Compression = compression;
BlockOffset = blockOffset;
BlockSize = blockSize;
}
public static MetadataTableOfContentsEntry Create(BinaryLoader parent, string kind, IValueCodec codec,
CompressionKind compression, long blockOffset, long blockSize)
{
Contracts.AssertValue(parent, "parent");
Contracts.AssertValue(parent._host, "parent");
IExceptionContext ectx = parent._host;
ectx.AssertValue(codec);
ectx.Assert(Enum.IsDefined(typeof(CompressionKind), compression));
var type = codec.Type;
Type entryType;
if (type is VectorDataViewType)
{
Type valueType = type.RawType;
ectx.Assert(valueType.IsGenericEx(typeof(VBuffer<>)));
Type[] args = valueType.GetGenericArguments();
ectx.Assert(args.Length == 1);
entryType = typeof(MetadataTableOfContentsEntry.ImplVec<>).MakeGenericType(args);
}
else
{
entryType = typeof(MetadataTableOfContentsEntry.ImplOne<>).MakeGenericType(type.RawType);
}
var result = (MetadataTableOfContentsEntry)Activator.CreateInstance(entryType,
parent, kind, codec, compression, blockOffset, blockSize);
ectx.AssertValue(result.Codec);
return result;
}
public static MetadataTableOfContentsEntry CreateDead(BinaryLoader parent, string kind, IValueCodec codec,
CompressionKind compression, long blockOffset, long blockSize)
{
// We should be creating "dead" metadata only if we either couldn't interpret the codec,
// or the compression kind, I should expect.
Contracts.Assert((codec == null) || !Enum.IsDefined(typeof(CompressionKind), compression));
return new ImplDead(parent, kind, codec, compression, blockOffset, blockSize);
}
/// <summary>
/// Information on a metadata that could not be interpreted for some reason.
/// </summary>
private sealed class ImplDead : MetadataTableOfContentsEntry
{
private readonly IValueCodec _codec;
public override IValueCodec Codec { get { return _codec; } }
public ImplDead(BinaryLoader parent, string kind, IValueCodec codec,
CompressionKind compression, long blockOffset, long blockSize)
: base(parent, kind, compression, blockOffset, blockSize)
{
_codec = codec;
}
public override Delegate GetGetter() => null;
}
private sealed class ImplOne<T> : MetadataTableOfContentsEntry<T>
{
public ImplOne(BinaryLoader parent, string kind, IValueCodec<T> codec,
CompressionKind compression, long blockOffset, long blockSize)
: base(parent, kind, codec, compression, blockOffset, blockSize)
{
}
public override Delegate GetGetter()
{
EnsureValue();
ValueGetter<T> getter = (ref T value) => value = Value;
return getter;
}
}
private sealed class ImplVec<T> : MetadataTableOfContentsEntry<VBuffer<T>>
{
public ImplVec(BinaryLoader parent, string kind, IValueCodec<VBuffer<T>> codec,
CompressionKind compression, long blockOffset, long blockSize)
: base(parent, kind, codec, compression, blockOffset, blockSize)
{
}
public override Delegate GetGetter()
{
EnsureValue();
ValueGetter<VBuffer<T>> getter = (ref VBuffer<T> value) => Value.CopyTo(ref value);
return getter;
}
}
}
private abstract class MetadataTableOfContentsEntry<T> : MetadataTableOfContentsEntry
{
private bool _fetched;
private readonly IValueCodec<T> _codec;
protected T Value;
public override IValueCodec Codec { get { return _codec; } }
protected MetadataTableOfContentsEntry(BinaryLoader parent, string kind, IValueCodec<T> codec,
CompressionKind compression, long blockOffset, long blockSize)
: base(parent, kind, compression, blockOffset, blockSize)
{
// REVIEW: Do we want to have the capability to track "dead" pieces of
// metadata, that is, metadata for which we have an unrecognized codec?
Contracts.AssertValue(codec);
_codec = codec;
}
/// <summary>
/// By calling <see cref="EnsureValue"/>, we make sure <see cref="Value"/>'s content get loaded definitely.
/// Without calling <see cref="EnsureValue"/>, <see cref="Value"/> could be default value of its type.
/// </summary>
protected void EnsureValue()
{
if (!_fetched)
{
Stream stream = Parent._stream;
lock (stream)
{
if (!_fetched)
{
stream.Seek(BlockOffset, SeekOrigin.Begin);
using (var subset = new SubsetStream(stream, BlockSize))
using (var decompressed = Compression.DecompressStream(subset))
using (var bufferedStream = new BufferedStream(decompressed))
using (var valueReader = _codec.OpenReader(bufferedStream, 1))
{
valueReader.MoveNext();
valueReader.Get(ref Value);
}
_fetched = true;
}
}
}
}
}
/// <summary>
/// This function returns output schema, <see cref="Schema"/>, of <see cref="BinaryLoader"/> by translating <see cref="_aliveColumns"/> into
/// <see cref="DataViewSchema.Column"/>s. If a <see cref="BinaryLoader"/> loads a text column from the input file, its <see cref="Schema"/>
/// should contains a <see cref="DataViewSchema.Column"/> with <see cref="TextDataViewType.Instance"/> as its <see cref="DataViewType"/>.
/// </summary>
/// <returns><see cref="Schema"/> of loaded file.</returns>
private DataViewSchema ComputeOutputSchema()
{
var schemaBuilder = new DataViewSchema.Builder();
for (int i = 0; i < _aliveColumns.Length; ++i)
{
// Informaiton of a column loaded from a binary file.
var loadedColumn = _aliveColumns[i];
// Metadata fields of the loaded column.
var metadataArray = loadedColumn.GetMetadataTocArray();
if (Utils.Size(metadataArray) > 0)
{
// We got some metadata fields here.
var metadataBuilder = new DataViewSchema.Annotations.Builder();
foreach (var loadedMetadataColumn in metadataArray)
{
var metadataGetter = loadedMetadataColumn.GetGetter();
if (metadataGetter == null)
throw AnnotationUtils.ExceptGetAnnotation();
metadataBuilder.Add(loadedMetadataColumn.Kind, loadedMetadataColumn.Codec.Type, metadataGetter);
}
schemaBuilder.AddColumn(loadedColumn.Name, loadedColumn.Type, metadataBuilder.ToAnnotations());
}
else
// This case has no metadata.
schemaBuilder.AddColumn(loadedColumn.Name, loadedColumn.Type);
}
return schemaBuilder.ToSchema();
}
private readonly Stream _stream;
private readonly BinaryReader _reader;
private readonly CodecFactory _factory;
private readonly Header _header;
private readonly DataViewSchema _outputSchema;
private readonly bool _autodeterminedThreads;
private readonly int _threads;
private bool _disposed;
private readonly TableOfContentsEntry[] _aliveColumns;
// We still want to be able to access information about the columns we could not read, like their
// name, where they are, how much space they're taking, etc. Conceivably for some operations (for example,
// column filtering) whether or not we can interpret the values in the column is totally irrelevant.
private readonly TableOfContentsEntry[] _deadColumns;
// The number of rows per block. The format supports having different rows per block in each column,
// but in software we do not yet support this.
private readonly int _rowsPerBlock;
// The stream offset at the end of the table of contents. This is useful for diagnostic purposes.
private readonly long _tocEndLim;
private readonly MemoryStreamCollection _bufferCollection;
private readonly IHost _host;
// The number of blocks worth of data to keep in the shuffle pool.
private readonly Double _shuffleBlocks;
// The actual more convenient number of rows to use in the pool, calculated from the shuffle
// count. This is not serialized to the data model, since it depends on the block size
// which can change from input data file to data file, as the same data model is applied to
// different data files.
private readonly int _randomShufflePoolRows;
private const Double _defaultShuffleBlocks = 4;
/// <summary>
/// Upper inclusive bound of versions this reader can read.
/// </summary>
private const ulong ReaderVersion = StandardDataTypesVersion;
/// <summary>
/// The first version that removes DvTypes and uses .NET standard
/// data types.
/// </summary>
private const ulong StandardDataTypesVersion = 0x0001000100010006;
/// <summary>
/// The first version of the format that accommodated DvText.NA.
/// </summary>
private const ulong MissingTextVersion = 0x0001000100010005;
/// <summary>
/// The first version of the format that accommodated arbitrary metadata.
/// </summary>
private const ulong MetadataVersion = 0x0001000100010004;
/// <summary>
/// The first version of the format that accommodated slot names.
/// </summary>
private const ulong SlotNamesVersion = 0x0001000100010003;
/// <summary>
/// Low inclusive bound of versions this reader can read.
/// </summary>
private const ulong ReaderFirstVersion = 0x0001000100010002;
public DataViewSchema Schema => _outputSchema;
private long RowCount => _header.RowCount;
public long? GetRowCount() => RowCount;
public bool CanShuffle => true;
internal const string Summary = "Loads native Binary IDV data file.";
internal const string LoadName = "BinaryLoader";
internal const string LoaderSignature = "BinaryLoader";
private static VersionInfo GetVersionInfo()
{
return new VersionInfo(
modelSignature: "BINLOADR",
//verWrittenCur: 0x00010001, // Initial
//verWrittenCur: 0x00010002, // Generated row index column
//verWrittenCur: 0x00010003, // Number of blocks to put in the shuffle pool
verWrittenCur: 0x00010004, // Row index column no longer being generated
verReadableCur: 0x00010004,
verWeCanReadBack: 0x00010001,
loaderSignature: LoaderSignature,
loaderAssemblyName: typeof(BinaryLoader).Assembly.FullName);
}
private BinaryLoader(Arguments args, IHost host, Stream stream, bool leaveOpen)
{
Contracts.AssertValue(host, "host");
_host = host;
_host.CheckValue(args, nameof(args));
_host.CheckValue(stream, nameof(stream));
_host.CheckParam(stream.CanRead, nameof(stream), "input stream must be readable");
_host.CheckParam(stream.CanSeek, nameof(stream), "input stream must be seekable");
_host.CheckParam(stream.Position == 0, nameof(stream), "input stream must be at head");
_host.CheckUserArg(0 <= args.PoolBlocks, nameof(args.PoolBlocks), "must be non-negative");
using (var ch = _host.Start("Initializing"))
{
_stream = stream;
_reader = new BinaryReader(_stream, Encoding.UTF8, leaveOpen);
_factory = new CodecFactory(_host);
_header = InitHeader();
_autodeterminedThreads = args.Threads == null;
_threads = Math.Max(1, args.Threads ?? (Environment.ProcessorCount / 2));
InitToc(ch, out _aliveColumns, out _deadColumns, out _rowsPerBlock, out _tocEndLim);
_outputSchema = ComputeOutputSchema();
_host.Assert(_outputSchema.Count == Utils.Size(_aliveColumns));
_bufferCollection = new MemoryStreamCollection();
if (Utils.Size(_deadColumns) > 0)
ch.Warning("BinaryLoader does not know how to interpret {0} columns", Utils.Size(_deadColumns));
_shuffleBlocks = args.PoolBlocks;
CalculateShufflePoolRows(ch, out _randomShufflePoolRows);
}
}
/// <summary>
/// Constructs a new data view loader.
/// </summary>
/// <param name="stream">A seekable, readable stream. Note that the data view loader assumes
/// that it is the exclusive owner of this stream.</param>
/// <param name="args">Arguments</param>
/// <param name="env">Host environment</param>
/// <param name="leaveOpen">Whether to leave the input stream open</param>
internal BinaryLoader(IHostEnvironment env, Arguments args, Stream stream, bool leaveOpen = true)
: this(args, env.Register(LoadName), stream, leaveOpen)
{
}
public BinaryLoader(IHostEnvironment env, Arguments args, string filename)
: this(env, args, OpenStream(filename), leaveOpen: false)
{
}
public BinaryLoader(IHostEnvironment env, Arguments args, IMultiStreamSource file)
: this(env, args, OpenStream(file), leaveOpen: false)
{
}
/// <summary>
/// Creates a binary loader from a <see cref="ModelLoadContext"/>. Since the loader code
/// opens the file, this will always take ownership of the stream, that is, this is always
/// akin to <c>leaveOpen</c> in the other constructor being false.
/// </summary>
private BinaryLoader(IHost host, ModelLoadContext ctx, Stream stream)
{
Contracts.AssertValue(host, "host");
_host = host;
_host.AssertValue(ctx);
_host.CheckValue(stream, nameof(stream));
_host.CheckParam(stream.CanRead, nameof(stream), "input stream must be readable");
_host.CheckParam(stream.CanSeek, nameof(stream), "input stream must be seekable");
_host.CheckParam(stream.Position == 0, nameof(stream), "input stream must be at head");
// *** Binary format **
// int: Number of threads if explicitly defined, or 0 if the
// number of threads was automatically determined
// Double: The randomness coefficient.
using (var ch = _host.Start("Initializing"))
{
_stream = stream;
if (ctx.Header.ModelVerWritten >= 0x00010002)
{
_threads = ctx.Reader.ReadInt32();
ch.CheckDecode(_threads >= 0);
if (_threads == 0)
{
_autodeterminedThreads = true;
_threads = Math.Max(1, Environment.ProcessorCount / 2);
}
if (ctx.Header.ModelVerWritten == 0x00010002 || ctx.Header.ModelVerWritten == 0x00010003)
ctx.LoadStringOrNull(); // for _generatedRowIndexName in previous model versions
}
else
{
_threads = Math.Max(1, Environment.ProcessorCount / 2);
}
if (ctx.Header.ModelVerWritten >= 0x00010003)
{
_shuffleBlocks = ctx.Reader.ReadDouble();
ch.CheckDecode(0 <= _shuffleBlocks);
}
else
_shuffleBlocks = _defaultShuffleBlocks;
_reader = new BinaryReader(_stream, Encoding.UTF8, leaveOpen: false);
_factory = new CodecFactory(_host);
_header = InitHeader();
InitToc(ch, out _aliveColumns, out _deadColumns, out _rowsPerBlock, out _tocEndLim);
_outputSchema = ComputeOutputSchema();
ch.Assert(_outputSchema.Count == Utils.Size(_aliveColumns));
_bufferCollection = new MemoryStreamCollection();
if (Utils.Size(_deadColumns) > 0)
ch.Warning("BinaryLoader does not know how to interpret {0} columns", Utils.Size(_deadColumns));
CalculateShufflePoolRows(ch, out _randomShufflePoolRows);
}
}
private static BinaryLoader Create(IHostEnvironment env, ModelLoadContext ctx, IMultiStreamSource files)
{
Contracts.CheckValue(env, nameof(env));
IHost h = env.Register(LoadName);
h.CheckValue(ctx, nameof(ctx));
ctx.CheckAtModel(GetVersionInfo());
h.CheckValue(files, nameof(files));
return h.Apply("Loading Model",
ch =>
{
if (files.Count == 0)
{
BinaryLoader retVal = null;
// In the case where we have no input streams, but we have an input schema from
// the model repository, we still want to surface ourselves as being a binary loader
// with the existing schema. The loader "owns" this stream.
if (ctx.TryLoadBinaryStream("Schema.idv",
r => retVal = new BinaryLoader(h, ctx, HybridMemoryStream.CreateCache(r.BaseStream))))
{
h.AssertValue(retVal);
h.CheckDecode(retVal.RowCount == 0);
// REVIEW: Do we want to be a bit more restrictive around uninterpretable columns?
return retVal;
}
h.Assert(retVal == null);
// Fall through, allow the failure to be on OpenStream.
}
return new BinaryLoader(h, ctx, OpenStream(files));
});
}
/// <summary>
/// Creates a binary loader from a stream that is not owned by the loader.
/// This creates its own independent copy of input stream for the binary loader.
/// </summary>
private static BinaryLoader Create(IHostEnvironment env, ModelLoadContext ctx, Stream stream)
{
Contracts.CheckValue(env, nameof(env));
IHost h = env.Register(LoadName);
return new BinaryLoader(h, ctx, HybridMemoryStream.CreateCache(stream));
}
private static Stream OpenStream(IMultiStreamSource files)
{
Contracts.CheckValue(files, nameof(files));
Contracts.CheckParam(files.Count == 1, nameof(files), "binary loader must be created with one file");
return files.Open(0);
}
private static Stream OpenStream(string filename)
{
Contracts.CheckNonEmpty(filename, nameof(filename));
var files = new MultiFileSource(filename);
return OpenStream(files);
}
void ICanSaveModel.Save(ModelSaveContext ctx)
{
_host.CheckValue(ctx, nameof(ctx));
ctx.CheckAtModel();
ctx.SetVersionInfo(GetVersionInfo());
_host.Assert(_threads >= 1);
SaveParameters(ctx, _autodeterminedThreads ? 0 : _threads, _shuffleBlocks);
int[] unsavable;
SaveSchema(_host, ctx, Schema, out unsavable);
_host.Assert(Utils.Size(unsavable) == 0);
}
/// <summary>
/// Write the parameters of a loader to the save context. Can be called by <see cref="SaveInstance"/>, where there's no actual
/// loader, only default parameters.
/// </summary>
private static void SaveParameters(ModelSaveContext ctx, int threads, Double shuffleBlocks)
{
// *** Binary format **
// int: Number of threads if explicitly defined, or 0 if the
// number of threads was automatically determined
// Double: The randomness coefficient.
Contracts.Assert(threads >= 0);
ctx.Writer.Write(threads);
Contracts.Assert(0 <= shuffleBlocks);
ctx.Writer.Write(shuffleBlocks);
}
/// <summary>
/// Save a zero-row dataview that will be used to infer schema information, used in the case
/// where the binary loader is instantiated with no input streams.
/// </summary>
private static void SaveSchema(IHostEnvironment env, ModelSaveContext ctx, DataViewSchema schema, out int[] unsavableColIndices)
{
Contracts.AssertValue(env, "env");
var h = env.Register(LoadName);
h.AssertValue(ctx);
h.AssertValue(schema);
var noRows = new EmptyDataView(h, schema);
h.Assert(noRows.GetRowCount() == 0);
var saverArgs = new BinarySaver.Arguments();
saverArgs.Silent = true;
var saver = new BinarySaver(env, saverArgs);
var cols = Enumerable.Range(0, schema.Count)
.Select(x => new { col = x, isSavable = saver.IsColumnSavable(schema[x].Type) });
int[] toSave = cols.Where(x => x.isSavable).Select(x => x.col).ToArray();
unsavableColIndices = cols.Where(x => !x.isSavable).Select(x => x.col).ToArray();
ctx.SaveBinaryStream("Schema.idv", w => saver.SaveData(w.BaseStream, noRows, toSave));
}
/// <summary>
/// Given the schema and a model context, save an imaginary instance of a binary loader with the
/// specified schema. Deserialization from this context should produce a real binary loader that
/// has the specified schema.
///
/// This is used in an API scenario, when the data originates from something other than a loader.
/// Since our model file requires a loader at the beginning, we have to construct a bogus 'binary' loader
/// to begin the pipe with, with the assumption that the user will bypass the loader at deserialization
/// time by providing a starting data view.
/// </summary>
public static void SaveInstance(IHostEnvironment env, ModelSaveContext ctx, DataViewSchema schema)
{
Contracts.CheckValue(env, nameof(env));
var h = env.Register(LoadName);
h.CheckValue(ctx, nameof(ctx));
h.CheckValue(ctx, nameof(schema));
ctx.CheckAtModel();
ctx.SetVersionInfo(GetVersionInfo());
SaveParameters(ctx, 0, _defaultShuffleBlocks);
int[] unsavable;
SaveSchema(env, ctx, schema, out unsavable);
// REVIEW: we silently ignore unsavable columns.
// This method is invoked only in an API scenario, where we need to save a loader but we only have a schema.
// In this case, the API user is likely not subscribed to our environment's channels. Also, in this case, the presence of
// unsavable columns is not necessarily a bad thing: the user typically provides his own data when loading the transforms,
// thus bypassing the bogus loader.
}
private unsafe Header InitHeader()
{
byte[] headerBytes = new byte[Header.HeaderSize];
int cb = _reader.Read(headerBytes, 0, Header.HeaderSize);
if (cb != Header.HeaderSize)
{
throw _host.ExceptDecode("Read only {0} bytes in file, expected header size of {1}",
cb, Header.HeaderSize);
}
Header header;
unsafe
{
Marshal.Copy(headerBytes, 0, (IntPtr)(&header), Header.HeaderSize);
}
// Validate the header before returning. CheckDecode is used for incorrect
// formatting.
_host.CheckDecode(header.Signature == Header.SignatureValue,
"This does not appear to be a binary dataview file");
// Obviously the compatibility version can't exceed the true version of the file.
if (header.CompatibleVersion > header.Version)
{
throw _host.ExceptDecode("Compatibility version {0} cannot be greater than file version {1}",
Header.VersionToString(header.CompatibleVersion), Header.VersionToString(header.Version));
}
if (header.Version < ReaderFirstVersion)
{
throw _host.ExceptDecode("Unexpected version {0} encountered, earliest expected here was {1}",
Header.VersionToString(header.Version), Header.VersionToString(ReaderFirstVersion));
}
// Check the versions.
if (header.CompatibleVersion < MetadataVersion)
{
// This is distinct from the earlier message semantically in that the check
// against ReaderFirstVersion is an indication of format impurity, whereas this
// is simply a matter of software support.
throw _host.Except("Cannot read version {0} data, earliest that can be handled is {1}",
Header.VersionToString(header.CompatibleVersion), Header.VersionToString(MetadataVersion));
}
if (header.CompatibleVersion > ReaderVersion)
{
throw _host.Except("Cannot read version {0} data, latest that can be handled is {1}",
Header.VersionToString(header.CompatibleVersion), Header.VersionToString(ReaderVersion));
}
_host.CheckDecode(header.RowCount >= 0, "Row count cannot be negative");
_host.CheckDecode(header.ColumnCount >= 0, "Column count cannot be negative");
// Check the table of contents offset, though we do not at this time have the contents themselves.
if (header.ColumnCount != 0 && header.TableOfContentsOffset < Header.HeaderSize)
throw _host.ExceptDecode("Table of contents offset {0} less than header size, impossible", header.TableOfContentsOffset);
// Check the tail signature.
if (header.TailOffset < Header.HeaderSize)
throw _host.ExceptDecode("Tail offset {0} less than header size, impossible", header.TailOffset);
_stream.Seek(header.TailOffset, SeekOrigin.Begin);
ulong tailSig = _reader.ReadUInt64();
_host.CheckDecode(tailSig == Header.TailSignatureValue, "Incorrect tail signature");
return header;
}
private void InitToc(IChannel ch, out TableOfContentsEntry[] aliveColumns, out TableOfContentsEntry[] deadColumns, out int allRowsPerBlock, out long tocEndOffset)
{
if (_header.ColumnCount > 0)
_stream.Seek(_header.TableOfContentsOffset, SeekOrigin.Begin);
// Failure to recognize a codec is not by itself an error condition. It only
// means we cannot read the associated columns.
List<TableOfContentsEntry> aliveList = new List<TableOfContentsEntry>();
List<TableOfContentsEntry> deadList = new List<TableOfContentsEntry>();
allRowsPerBlock = 0;
for (int c = 0; c < _header.ColumnCount; ++c)
{
string name = _reader.ReadString();
IValueCodec codec;
bool gotCodec = _factory.TryReadCodec(_stream, out codec);
// Even in the case where we did not succeed loading the codec, we still
// want to skip to the next table of contents entry, so keep reading.
CompressionKind compression = (CompressionKind)_reader.ReadByte();
bool knowCompression = Enum.IsDefined(typeof(CompressionKind), compression);
int rowsPerBlock = (int)_reader.ReadLeb128Int();
// 0 is only a valid blocksize if there are no rows.
if (!(0 < rowsPerBlock || (rowsPerBlock == 0 && _header.RowCount == 0)))
throw ch.ExceptDecode("Bad number of rows per block {0} read", rowsPerBlock);
// Even though the format allows it, we do not (yet?) support different block sizes across columns.
if (c == 0)
allRowsPerBlock = rowsPerBlock;
else if (allRowsPerBlock != rowsPerBlock)
{
throw ch.ExceptNotSupp("Different rows per block per column not supported yet, encountered {0} and {1}",
allRowsPerBlock, rowsPerBlock);
}
long lookupOffset = _reader.ReadInt64();
if (_header.RowCount > 0)
{
// What is the number of element in the lookup table?
long lookupLen = (_header.RowCount - 1) / rowsPerBlock + 1;
ch.CheckDecode(Header.HeaderSize <= lookupOffset && lookupOffset <= _header.TailOffset - 16 * lookupLen,
"Lookup table offset out of range");
}
long metadataTocOffset = _reader.ReadInt64();
ch.CheckDecode(metadataTocOffset == 0 || Header.HeaderSize <= metadataTocOffset && metadataTocOffset <= _header.TailOffset,
"Metadata TOC offset out of range");
var entry = new TableOfContentsEntry(this, c, name, codec,
compression, rowsPerBlock, lookupOffset, metadataTocOffset);
if (gotCodec && knowCompression)
aliveList.Add(entry);
else
{
ch.Warning("Cannot interpret column '{0}' at index {1} because {2} unrecognized",
name, c,
gotCodec ? "compression" : (knowCompression ? "codec" : "codec and compression"));
deadList.Add(entry);
}
}
tocEndOffset = _stream.Position;
aliveColumns = aliveList.ToArray();
deadColumns = deadList.ToArray();
}
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
_reader.Dispose();
}
}
private void CalculateShufflePoolRows(IChannel ch, out int poolRows)
{
if (!RowShufflingTransformer.CanShuffleAll(Schema))
{
// This will only happen if we expand the set of types we can serialize,
// without expanding the set of types we can cache. That is entirely
// possible, but is not true at the current time.
ch.Warning("Not adding implicit shuffle, as we did not know how to copy some types of values");
poolRows = 0;
}
var poolSize = Math.Ceiling(_shuffleBlocks * _rowsPerBlock);
ch.Assert(poolSize >= 0);
// A pool size of 0 or 1 is like having no pool at all.
if (poolSize < 2)
{
ch.Trace("Not adding implicit shuffle, as it is unnecessary");
poolRows = 0;
return;
}
const int maxPoolSize = 1 << 28;
if (poolSize > maxPoolSize)
poolSize = maxPoolSize;
if (poolSize > _header.RowCount)
poolSize = _header.RowCount;
poolRows = checked((int)poolSize);
ch.Trace("Implicit shuffle will have pool size {0}", poolRows);
}
private DataViewRowCursor GetRowCursorCore(IEnumerable<DataViewSchema.Column> columnsNeeded, Random rand = null)
{
if (rand != null && _randomShufflePoolRows > 0)
{
// Don't bother with block shuffling, if the shuffle cursor is just going to hold
// the entire dataset in memory anyway.
var ourRand = _randomShufflePoolRows == _header.RowCount ? null : rand;
var cursor = new Cursor(this, columnsNeeded, ourRand);
return RowShufflingTransformer.GetShuffledCursor(_host, _randomShufflePoolRows, cursor, rand);
}
return new Cursor(this, columnsNeeded, rand);
}
public DataViewRowCursor GetRowCursor(IEnumerable<DataViewSchema.Column> columnsNeeded, Random rand = null)
{
_host.CheckValueOrNull(rand);
return GetRowCursorCore(columnsNeeded, rand);
}
public DataViewRowCursor[] GetRowCursorSet(IEnumerable<DataViewSchema.Column> columnsNeeded, int n, Random rand = null)
{
_host.CheckValueOrNull(rand);
return new DataViewRowCursor[] { GetRowCursorCore(columnsNeeded, rand) };
}
private sealed class Cursor : RootCursorBase
{
private static readonly FuncInstanceMethodInfo1<Cursor, Delegate> _noRowGetterMethodInfo
= FuncInstanceMethodInfo1<Cursor, Delegate>.Create(target => target.NoRowGetter<int>);
private readonly BinaryLoader _parent;
private readonly int[] _colToActivesIndex;
private readonly TableOfContentsEntry[] _actives;
private readonly int _numBlocks;
private readonly int _rowsPerBlock;
private readonly int _rowsInLastBlock;
private readonly ReadPipe[] _pipes;
private readonly Delegate[] _pipeGetters;
private readonly long _lastValidCounter;
// This may be null, in the event that we are not shuffling.
private readonly int[] _blockShuffleOrder;
private readonly Task _readerThread;
private readonly Task _pipeTask;
private readonly ExceptionMarshaller _exMarshaller;
private volatile bool _disposed;
private volatile bool _done;
public override DataViewSchema Schema => _parent.Schema;
public override long Batch
{
// REVIEW: Implement cursor set support.
get { return 0; }
}
public Cursor(BinaryLoader parent, IEnumerable<DataViewSchema.Column> columnsNeeded, Random rand)
: base(parent._host)
{
_parent = parent;
Ch.AssertValueOrNull(rand);
_exMarshaller = new ExceptionMarshaller();
TableOfContentsEntry[] toc = _parent._aliveColumns;
int[] activeIndices;
Utils.BuildSubsetMaps(toc.Length, columnsNeeded, out activeIndices, out _colToActivesIndex);
_actives = new TableOfContentsEntry[activeIndices.Length];
for (int i = 0; i < activeIndices.Length; ++i)
_actives[i] = toc[activeIndices[i]];
_lastValidCounter = _parent._header.RowCount - 1;
// Set up those evil pipes.
_pipes = new ReadPipe[parent.RowCount > 0 ? _actives.Length : 0];
_pipeGetters = new Delegate[_actives.Length];
// The uniformity of blocksize has already been checked during ToC read.
// However, if we have only generated columns, then we will have no defined
// block size. The case of only generated columns is perhaps not terribly
// likely or useful, but it is *possible*.
_rowsPerBlock = _parent._rowsPerBlock;
if (_rowsPerBlock == 0)
{
// This should happen if and only if all columns are generated.
// Just pick some value.
_rowsPerBlock = int.MaxValue;
}
_rowsInLastBlock = _parent.RowCount == 0 ? 0 : (int)(_parent.RowCount % _rowsPerBlock);
if (_rowsInLastBlock == 0)
_rowsInLastBlock = _rowsPerBlock;
_numBlocks = checked((int)((_parent.RowCount - 1) / _rowsPerBlock + 1));
_blockShuffleOrder = rand == null || _numBlocks == 0 ? null : Utils.GetRandomPermutation(rand, _numBlocks);
if (_pipes.Length == 0)
{
// Even in the case where we have no rows, and pipes have not
// been created, we still need getter delegates. These won't do
// anything but complain about the cursor being in a bad state,
// but they still need to exist.
for (int c = 0; c < _pipeGetters.Length; ++c)
_pipeGetters[c] = GetNoRowGetter(_actives[c].Type);
return;
}
// The following initalized fields should be used only by code that
// assumes we have some active columns and more than zero rows.
// How many buffers per pipe? Figure something based on ceildiv(threads / pipes), plus
// some thread wiggle room. (Double it, mayhap?)
int pipeBuffers = 2 * ((_parent._threads + _pipes.Length - 1) / _pipes.Length);
for (int c = 0; c < _pipes.Length; ++c)
{
_pipes[c] = ReadPipe.Create(this, c, pipeBuffers);
_pipeGetters[c] = _pipes[c].GetGetter();
}
// The data structures are initialized. Now set up the workers.
_readerThread = Utils.RunOnBackgroundThreadAsync(ReaderWorker);
_pipeTask = DecompressAsync();
}
protected override void Dispose(bool disposing)
{
if (_disposed)
return;
if (_done)
{
base.Dispose(disposing);
return;
}
if (disposing)
{
if (_readerThread != null)
{
// We should reach this block only in the event of a dispose
// before all rows have been iterated upon.
// First set the flag on the cursor. The stream-reader and the
// pipe-decompressor workers will detect this, stop their work,
// and do whatever "cleanup" is natural for them to perform.
_disposed = true;
// In the disk read -> decompress -> codec read pipeline, we
// clean up in reverse order.
// 1. First we clear out any pending codec readers, for each pipe.
// 2. Then we join the pipe worker threads, which in turn should
// have cleared out all of the pending blocks to decompress.
// 3. Then finally we join against the reader thread.
// This code is analogous to the stuff in MoveNextCore, except
// nothing is actually done with the resulting blocks.
try
{
for (; ; )
{
// This cross-block-index access pattern is deliberate, as
// by having a consistent access pattern everywhere we can
// have much greater confidence this will never deadlock.
bool anyTrue = false;
for (int c = 0; c < _pipes.Length; ++c)
anyTrue |= _pipes[c].MoveNextCleanup();
if (!anyTrue)
break;
}
}
catch (OperationCanceledException ex)
{
// REVIEW: Encountering this here means that we did not encounter
// the exception during normal cursoring, but at some later point. I feel
// we should not be tolerant of this, and should throw, though it might be
// an ambiguous point.
Contracts.Assert(ex.CancellationToken == _exMarshaller.Token);
_exMarshaller.ThrowIfSet(Ch);
Contracts.Assert(false);
}
finally
{
_pipeTask.Wait();
_readerThread.Wait();
}
}
}
_disposed = true;
base.Dispose(disposing);
}
private Task DecompressAsync()
{
Task[] pipeWorkers = new Task[_parent._threads];
long decompressSequence = -1;
long decompressSequenceLim = (long)_numBlocks * _actives.Length;
for (int w = 0; w < pipeWorkers.Length; ++w)
{
pipeWorkers[w] = Utils.RunOnBackgroundThreadAsync(() =>
{
try
{
for (; ; )
{
long seq = Interlocked.Increment(ref decompressSequence);
int pipeIndex = (int)(seq % _pipes.Length);
// If we ever return false, then we know we are past the block sequence
// with all the sentinel blocks. Since we are kicking off all blocks in
// order, then we know that all the sentinel block handling has been
// handled or is in the process of being handled by some worker, so we
// may safely exit.
if (!_pipes[pipeIndex].DecompressOne())
return;
}
}
catch (Exception ex)
{
_exMarshaller.Set("decompressing", ex);
}
});
}
return Task.WhenAll(pipeWorkers);
}
private void ReaderWorker()
{
try
{
int blockSteps = checked((int)((_parent.RowCount - 1) / _rowsPerBlock + 1));
Stream stream = _parent._stream;
int b;
for (b = 0; b < blockSteps && !_disposed; ++b)
{
int bi = _blockShuffleOrder == null ? b : _blockShuffleOrder[b];
int rows = bi == blockSteps - 1 ? _rowsInLastBlock : _rowsPerBlock;
for (int c = 0; c < _pipes.Length; ++c)
_pipes[c].PrepAndSendCompressedBlock(bi, b, rows);
}
// Add the end sentinel blocks, for all pipes, guaranteeing the useful simplifying
// invariant that that all sentinel blocks have the same block sequence index.
for (int c = 0; c < _pipes.Length; ++c)
_pipes[c].SendSentinelBlock(b);
}
catch (Exception ex)
{
_exMarshaller.Set("reading", ex);
}
}
private abstract class ReadPipe
{
protected readonly int ColumnIndex;
protected readonly Cursor Parent;
protected ExceptionMarshaller ExMarshaller { get { return Parent._exMarshaller; } }
protected IExceptionContext Ectx { get { return Parent.Ch; } }
public static ReadPipe Create(Cursor parent, int columnIndex, int bufferSize)
{
Contracts.AssertValue(parent);
var entry = parent._actives[columnIndex];
Contracts.AssertValue(entry);
Type genType = entry.IsGenerated ? typeof(ReadPipeGenerated<>) : typeof(ReadPipe<>);
genType = genType.MakeGenericType(entry.Type.RawType);
return (ReadPipe)Activator.CreateInstance(
genType, parent, columnIndex, bufferSize);
}
protected ReadPipe(Cursor parent, int columnIndex)
{
Contracts.AssertValue(parent);
Parent = parent;
Ectx.Assert(0 <= columnIndex && columnIndex < Utils.Size(parent._actives));
ColumnIndex = columnIndex;
}
public abstract void PrepAndSendCompressedBlock(long blockIndex, long blockSequence, int rowCount);
public abstract void SendSentinelBlock(long blockSequence);
/// <summary>
/// This will attempt to extract a compressed block from the
/// <see cref="ReadPipe{T}._toDecompress"/> queue. This returns true if and only if it
/// succeeded in extracting an item from the queue (even a sentinel block);
/// that is, if it returns false, then there are no more items to extract
/// (though, continuing to call this method is entirely possible, and legal,
/// if convenient).
/// </summary>
public abstract bool DecompressOne();
public abstract bool MoveNext();
/// <summary>
/// Necessary to be called in the event of a premature exiting. This executes
/// the same recycle-fetch block cycle as <see cref="MoveNext"/>, except that
/// nothing is actually done with the resulting block. This should be called
/// in a similar fashion as the cursor calls <see cref="MoveNext"/>.
/// </summary>
public abstract bool MoveNextCleanup();
public abstract Delegate GetGetter();
}
private sealed class ReadPipeGenerated<T> : ReadPipe
{
// REVIEW: I cleave to the invariants and behavior of the stream-based readpipe, even
// though in this generated case managing the memory buffers is not an issue. What is more
// important than efficiency in this generated case, however, is maintaining the invariants
// of thread and blocking behavior upon which we rely for our understanding of the lack of
// deadlock. Nevertheless, at some point it may be valuable to see if there are some harmless
// divergences from the stream-based buffer and block model. However, this should be undertaken
// with great care. It would also be nice to see if, so long as we're enforcing consistency,
// there's some way we can share more code.
private const int _bufferSize = 4;
private readonly BlockingCollection<Block> _toDecompress;
private readonly IEnumerator<Block> _toDecompressEnumerator;
private readonly BlockingCollection<Block> _toRead;
private readonly IEnumerator<Block> _toReadEnumerator;
// The waiter on insertions to toRead. Any "add" or "complete adding" must depend on this waiter.
private readonly OrderedWaiter _waiter;
private readonly ValueMapper<long, T> _mapper;
private Block _curr;
private int _remaining;
private sealed class Block
{
public readonly long BlockSequence;
public readonly long RowIndexMin;
public readonly long RowIndexLim;
/// <summary>
/// This indicates that this block does not contain any actual information, or
/// correspond to an actual block, but it will still contain the
/// <see cref="BlockSequence"/> index. Sentinel blocks are used to indicate that
/// there will be no more blocks to be decompressed along a particular pipe,
/// allowing the pipe worker to perform necessary cleanup.
/// </summary>
public bool IsSentinel { get { return RowIndexMin == -1; } }
public int Rows { get { return (int)(RowIndexLim - RowIndexMin); } }
public Block(long blockSequence, long min, long lim)
{
Contracts.Assert(blockSequence >= 0);
Contracts.Assert(0 <= min && min <= lim);
Contracts.Assert(lim - min <= int.MaxValue);
BlockSequence = blockSequence;
RowIndexMin = min;
RowIndexLim = lim;
Contracts.Assert(!IsSentinel);
}
/// <summary>
/// Constructor for a sentinel compressed block. (For example,
/// the pipe's last block, which contains no valid data.)
/// </summary>
public Block(long blockSequence)
{
Contracts.Assert(blockSequence >= 0);
BlockSequence = blockSequence;
RowIndexMin = RowIndexLim = -1;
Contracts.Assert(IsSentinel);
}
}
public ReadPipeGenerated(Cursor parent, int columnIndex, int bufferSize)
: base(parent, columnIndex)
{
Contracts.AssertValue(parent);
Contracts.AssertValue(parent.Ch);
TableOfContentsEntry entry = parent._actives[ColumnIndex];
Ectx.AssertValue(entry);
Ectx.Assert(entry.IsGenerated);
_toDecompress = new BlockingCollection<Block>(_bufferSize);
Ectx.Assert(bufferSize > 0);
_toDecompressEnumerator = _toDecompress.GetConsumingEnumerable(ExMarshaller.Token).GetEnumerator();
_toRead = new BlockingCollection<Block>(bufferSize);
_toReadEnumerator = _toRead.GetConsumingEnumerable(ExMarshaller.Token).GetEnumerator();
_waiter = new OrderedWaiter();
_mapper = entry.GetValueMapper<T>();
}
public override void PrepAndSendCompressedBlock(long blockIndex, long blockSequence, int rowCount)
{
long rowLim = blockIndex * Parent._rowsPerBlock;
var block = new Block(blockSequence, rowLim, rowLim + rowCount);
_toDecompress.Add(block, ExMarshaller.Token);
}
public override void SendSentinelBlock(long blockSequence)
{
Block sentBlock = new Block(blockSequence);
_toDecompress.Add(sentBlock, ExMarshaller.Token);
_toDecompress.CompleteAdding();
}
public override bool DecompressOne()
{
Block block;
lock (_toDecompressEnumerator)
{
if (!_toDecompressEnumerator.MoveNext())
return false;
block = _toDecompressEnumerator.Current;
}
Ectx.Assert(!_toRead.IsAddingCompleted);
if (block.IsSentinel)
{
_waiter.Wait(block.BlockSequence, ExMarshaller.Token);
_toRead.CompleteAdding();
_waiter.Increment();
return true;
}
if (Parent._disposed)
{
_waiter.Wait(block.BlockSequence, ExMarshaller.Token);
_waiter.Increment();
return true;
}
_waiter.Wait(block.BlockSequence, ExMarshaller.Token);
_toRead.Add(block, ExMarshaller.Token);
_waiter.Increment();
return true;
// This code mirrors that within the stream-based read pipe, except it has nothing to dispose.
}
public override bool MoveNext()
{
Ectx.Assert(_remaining >= 0);
Ectx.Assert(_remaining == 0 || _curr != null);
if (_remaining == 0)
{
if (_curr != null)
_curr = null;
if (!_toReadEnumerator.MoveNext())
return false;
_curr = _toReadEnumerator.Current;
Ectx.AssertValue(_curr);
_remaining = _curr.Rows;
}
Ectx.Assert(_remaining > 0);
_remaining--;
return true;
}
public override bool MoveNextCleanup()
{
// This is analogous to the _remaining == 0 part of
// MoveNext, except we don't actually do anything with
// the block we fetch.
if (_curr != null)
_curr = null;
if (!_toReadEnumerator.MoveNext())
return false;
_curr = _toReadEnumerator.Current;
return true;
}
private void Get(ref T value)
{
Ectx.Check(_curr != null, RowCursorUtils.FetchValueStateError);
long src = _curr.RowIndexLim - _remaining - 1;
_mapper(in src, ref value);
}
public override Delegate GetGetter()
{
ValueGetter<T> getter = Get;
return getter;
}
}
private sealed class ReadPipe<T> : ReadPipe
{
private const int _bufferSize = 4;
private readonly BlockLookup[] _lookup;
private readonly Stream _stream;
private readonly MemoryStreamPool _compPool;
private readonly MemoryStreamPool _decompPool;
/// <summary>
/// Calls from the stream reader worker into <see cref="PrepAndSendCompressedBlock"/> will feed
/// into this collection, and calls from the decompress worker into <see cref="DecompressOne"/>
/// will consume this collection.
/// </summary>
private readonly BlockingCollection<CompressedBlock> _toDecompress;
private readonly IEnumerator<CompressedBlock> _toDecompressEnumerator;
private readonly BlockingCollection<ReaderContainer> _toRead;
private readonly IEnumerator<ReaderContainer> _toReadEnumerator;
private readonly IValueCodec<T> _codec;
private readonly CompressionKind _compression;
// The waiter on insertions to toRead. Any "add" or "complete adding" must depend on this waiter.
private readonly OrderedWaiter _waiter;
private ReaderContainer _curr;
private int _remaining;
private sealed class CompressedBlock
{
public readonly MemoryStream Buffer;
public readonly int DecompressedLength;
public readonly long BlockIndex;
public readonly long BlockSequence;
public readonly int Rows;
/// <summary>
/// This indicates that this block does not contain any actual information, or
/// correspond to an actual block, but it will still contain the
/// <see cref="BlockSequence"/> index. Sentinel blocks are used to indicate that
/// there will be no more blocks to be decompressed along a particular pipe,
/// allowing the pipe worker to perform necessary cleanup.
/// </summary>
public bool IsSentinel { get { return BlockIndex == -1; } }
public CompressedBlock(MemoryStream buffer, int decompressedLength,
long blockIndex, long blockSequence, int rows)
{
Contracts.AssertValueOrNull(buffer);
Contracts.Assert(decompressedLength > 0);
Contracts.Assert(blockIndex >= 0);
Contracts.Assert(blockSequence >= 0);
Contracts.Assert(rows >= 0);
Buffer = buffer;
DecompressedLength = decompressedLength;
BlockIndex = blockIndex;
BlockSequence = blockSequence;
Rows = rows;
Contracts.Assert(!IsSentinel);
}
/// <summary>
/// Constructor for a sentinel compressed block. (For example,
/// the pipe's last block, which contains no valid data.)
/// </summary>
public CompressedBlock(long blockSequence)
{
Contracts.Assert(blockSequence >= 0);
BlockIndex = -1;
BlockSequence = blockSequence;
Contracts.Assert(IsSentinel);
}
}
private sealed class ReaderContainer
{
public readonly IValueReader<T> Reader;
public readonly MemoryStream Stream;
public readonly int Rows;
public readonly long BlockSequence;
public ReaderContainer(IValueReader<T> reader, MemoryStream stream, int rows, long blockSequence)
{
Contracts.AssertValue(reader);
Contracts.AssertValue(stream);
Contracts.Assert(rows > 0);
Reader = reader;
Stream = stream;
Rows = rows;
BlockSequence = blockSequence;
}
}
/// <summary>
/// This is called through reflection so it will appear to have no references.
/// </summary>
public ReadPipe(Cursor parent, int columnIndex, int bufferSize)
: base(parent, columnIndex)
{
Ectx.Assert(bufferSize > 0);
TableOfContentsEntry entry = Parent._actives[ColumnIndex];
Ectx.AssertValue(entry);
Ectx.Assert(!entry.IsGenerated);
Ectx.AssertValue(entry.Codec);
Ectx.Assert(entry.Codec is IValueCodec<T>);
Ectx.Assert(Enum.IsDefined(typeof(CompressionKind), entry.Compression));
_codec = (IValueCodec<T>)entry.Codec;
_compression = entry.Compression;
int maxComp;
int maxDecomp;
entry.GetMaxBlockSizes(out maxComp, out maxDecomp);
_compPool = parent._parent._bufferCollection.Get(maxComp);
_decompPool = parent._parent._bufferCollection.Get(maxDecomp);
_lookup = entry.GetLookup();
_stream = parent._parent._stream;
Ectx.AssertValue(_compPool);
Ectx.AssertValue(_decompPool);
Ectx.AssertValue(_lookup);
Ectx.AssertValue(_stream);
_toDecompress = new BlockingCollection<CompressedBlock>(_bufferSize);
_toDecompressEnumerator = _toDecompress.GetConsumingEnumerable(ExMarshaller.Token).GetEnumerator();
_toRead = new BlockingCollection<ReaderContainer>(bufferSize);
_toReadEnumerator = _toRead.GetConsumingEnumerable(ExMarshaller.Token).GetEnumerator();
_waiter = new OrderedWaiter();
}
public override void PrepAndSendCompressedBlock(long blockIndex, long blockSequence, int rowCount)
{
BlockLookup lookup = _lookup[(int)blockIndex];
var mem = _compPool.Get();
// Read the compressed buffer, then pass it off to the decompress worker threads.
EnsureCapacity(mem, lookup.BlockLength);
mem.SetLength(lookup.BlockLength);
ArraySegment<byte> buffer;
bool tmp = mem.TryGetBuffer(out buffer);
Contracts.Assert(tmp);
lock (_stream)
{
_stream.Seek(lookup.BlockOffset, SeekOrigin.Begin);
_stream.ReadBlock(buffer.Array, buffer.Offset, buffer.Count);
Contracts.Assert(lookup.BlockOffset + lookup.BlockLength == _stream.Position);
}
var block = new CompressedBlock(mem, lookup.DecompressedBlockLength, blockIndex, blockSequence, rowCount);
_toDecompress.Add(block, ExMarshaller.Token);
}
public override void SendSentinelBlock(long blockSequence)
{
CompressedBlock sentBlock = new CompressedBlock(blockSequence);
_toDecompress.Add(sentBlock, ExMarshaller.Token);
_toDecompress.CompleteAdding();
}
private static void EnsureCapacity(MemoryStream stream, int value)
{
// More or less copied and pasted from the memorystream's EnsureCapacity
// code... that sure would be useful to just have.
int cap = stream.Capacity;
if (cap >= value)
return;
const int arrayMaxLen = 0x7FFFFFC7;
int newCapacity = value;
if (newCapacity < 256)
newCapacity = 256;
if (newCapacity < cap * 2)
newCapacity = cap * 2;
if ((uint)(cap * 2) > arrayMaxLen)
newCapacity = value > arrayMaxLen ? value : arrayMaxLen;
stream.Capacity = newCapacity;
}
public override bool DecompressOne()
{
CompressedBlock block;
// It is necessary to lock the decompress enumerator: the
// collection itself is thread safe, the enumerator is not.
// The MoveNext and Current must be an atomic operation.
lock (_toDecompressEnumerator)
{
if (!_toDecompressEnumerator.MoveNext())
return false;
block = _toDecompressEnumerator.Current;
}
Contracts.Assert(!_toRead.IsAddingCompleted);
if (block.IsSentinel)
{
// We can only complete adding after we are certain that all prior workers
// that may be working, have had the opportunity to interact with _toRead.
_waiter.Wait(block.BlockSequence, ExMarshaller.Token);
_toRead.CompleteAdding();
_waiter.Increment();
return true;
}
MemoryStream buffer = block.Buffer;
buffer.Position = 0;
if (Parent._disposed)
{
// We have disposed. Skip the decompression steps, while returning resources.
_compPool.Return(ref buffer);
// In this case we still need to increment to the next block sequence number,
// so that future workers along this pipe will have an opportunity to return.
_waiter.Wait(block.BlockSequence, ExMarshaller.Token);
_waiter.Increment();
return true;
}
MemoryStream decomp = _decompPool.Get();
EnsureCapacity(decomp, block.DecompressedLength);
decomp.SetLength(block.DecompressedLength);
using (Stream stream = _compression.DecompressStream(buffer))
{
ArraySegment<byte> buf;
bool tmp = decomp.TryGetBuffer(out buf);
Contracts.Assert(tmp);
stream.ReadBlock(buf.Array, buf.Offset, buf.Count);
}
_compPool.Return(ref buffer);
decomp.Seek(0, SeekOrigin.Begin);
IValueReader<T> reader = _codec.OpenReader(decomp, block.Rows);
_waiter.Wait(block.BlockSequence, ExMarshaller.Token);
// Enter exclusive section for this pipe.
_toRead.Add(new ReaderContainer(reader, decomp, block.Rows, block.BlockSequence), ExMarshaller.Token);
_waiter.Increment();
// Exit exclusive section for this pipe.
return true;
}
public override bool MoveNext()
{
Contracts.Assert(_remaining >= 0);
Contracts.Assert(_remaining == 0 || _curr != null);
if (_remaining == 0)
{
if (_curr != null)
{
_curr.Reader.Dispose();
MemoryStream mem = _curr.Stream;
_curr = null;
_decompPool.Return(ref mem);
}
if (!_toReadEnumerator.MoveNext())
return false;
_curr = _toReadEnumerator.Current;
Contracts.AssertValue(_curr);
_remaining = _curr.Rows;
}
Contracts.Assert(_remaining > 0);
_curr.Reader.MoveNext();
_remaining--;
return true;
}
public override bool MoveNextCleanup()
{
// This is analogous to the _remaining == 0 part of
// MoveNext, except we don't actually do anything with
// the block we fetch.
if (_curr != null)
{
_curr.Reader.Dispose();
MemoryStream mem = _curr.Stream;
_curr = null;
_decompPool.Return(ref mem);
}
if (!_toReadEnumerator.MoveNext())
return false;
_curr = _toReadEnumerator.Current;
return true;
}
private void Get(ref T value)
{
Contracts.Check(_curr != null, RowCursorUtils.FetchValueStateError);
_curr.Reader.Get(ref value);
}
public override Delegate GetGetter()
{
ValueGetter<T> getter = Get;
return getter;
}
}
/// <summary>
/// Returns whether the given column is active in this row.
/// </summary>
public override bool IsColumnActive(DataViewSchema.Column column)
{
Ch.CheckParam(column.Index < _colToActivesIndex.Length, nameof(column));
return _colToActivesIndex[column.Index] >= 0;
}
protected override bool MoveNextCore()
{
Ch.Assert(!_disposed);
bool more = Position != _lastValidCounter;
for (int c = 0; c < _pipes.Length; ++c)
{
bool pipeMoved;
try
{
pipeMoved = _pipes[c].MoveNext();
}
catch (OperationCanceledException ex)
{
// Suppress the premature exit handing. We can be assured that all
// threads will exit if all potentially blocking operations are
// waiting on the same cancellation token that we catch here.
Contracts.Assert(ex.CancellationToken == _exMarshaller.Token);
_done = true;
// Unlike the early non-error dispose case, we do not make any
// effort to recycle buffers since it would be exceptionally difficult
// to do so. All threads are already unblocked, one of them with the
// source exception that kicked off this process, the remaining with
// other later exceptions or the operation canceled exception. So we
// are free to join. Still, given the exceptional nature, we won't
// wait forever to do it.
const int timeOut = 100;
_pipeTask.Wait(timeOut);
_readerThread.Wait(timeOut);
// Throw. Considering we are here, this must throw.
_exMarshaller.ThrowIfSet(Ch);
// This can't be, cause the cancellation token we were waiting
// on is private to the exception marshaller, and can't be
// set any other way than if set.
Contracts.Assert(false);
throw;
}
// They should all stop at the same time.
Ch.Assert(more == pipeMoved);
}
if (!more && _pipes.Length > 0)
{
// Set the _disposed flag, so that when the Dispose
// method is called it does not trigger the "premature
// exit" handling.
_done = true;
// If we got to this point these threads must have already
// completed their work, but for the sake of hygiene join
// against them anyway.
_pipeTask.Wait();
_readerThread.Wait();
}
return more;
}
/// <summary>
/// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
/// This throws if the column is not active in this row, or if the type
/// <typeparamref name="TValue"/> differs from this column's type.
/// </summary>
/// <typeparam name="TValue"> is the column's content type.</typeparam>
/// <param name="column"> is the output column whose getter should be returned.</param>
public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
{
Ch.CheckParam(column.Index < _colToActivesIndex.Length, nameof(column), "requested column not active.");
var originGetter = _pipeGetters[_colToActivesIndex[column.Index]];
var getter = originGetter as ValueGetter<TValue>;
if (getter == null)
throw Ch.Except($"Invalid TValue: '{typeof(TValue)}', " +
$"expected type: '{originGetter.GetType().GetGenericArguments().First()}'.");
return getter;
}
/// <summary>
/// Even in the case with no rows, there still must be valid delegates. This will return
/// a delegate that simply always throws.
/// </summary>
private Delegate GetNoRowGetter(DataViewType type)
=> Utils.MarshalInvoke(_noRowGetterMethodInfo, this, type.RawType);
private Delegate NoRowGetter<T>()
{
ValueGetter<T> del = (ref T value) => throw Ch.Except(RowCursorUtils.FetchValueStateError);
return del;
}
public override ValueGetter<DataViewRowId> GetIdGetter()
{
if (_blockShuffleOrder == null)
{
return
(ref DataViewRowId val) =>
{
Ch.Check(IsGood, RowCursorUtils.FetchValueStateError);
val = new DataViewRowId((ulong)Position, 0);
};
}
// Find the index of the last block. Because the last block is unevenly sized,
// but in the case of shuffling can occur anywhere, our calculations of the "true"
// position of the row have to account for the presence of this strange block.
int lastBlockIdx = 0;
for (int i = 1; i < _blockShuffleOrder.Length; ++i)
{
if (_blockShuffleOrder[i] > _blockShuffleOrder[lastBlockIdx])
lastBlockIdx = i;
}
int correction = _rowsPerBlock - _rowsInLastBlock;
long firstPositionToCorrect = ((long)lastBlockIdx * _rowsPerBlock) + _rowsInLastBlock;
return
(ref DataViewRowId val) =>
{
Ch.Check(IsGood, RowCursorUtils.FetchValueStateError);
long pos = Position;
if (pos >= firstPositionToCorrect)
pos += correction;
Ch.Assert(pos / _rowsPerBlock < _blockShuffleOrder.Length);
long blockPos = (long)_rowsPerBlock * _blockShuffleOrder[(int)(pos / _rowsPerBlock)];
blockPos += (pos % _rowsPerBlock);
Ch.Assert(0 <= blockPos && blockPos < _parent.RowCount);
val = new DataViewRowId((ulong)blockPos, 0);
};
}
}
internal sealed class InfoCommand : ICommand
{
public const string LoadName = "IdvInfo";
public sealed class Arguments
{
[DefaultArgument(ArgumentType.AtMostOnce, IsInputFileName = true, HelpText = "The data file", SortOrder = 0)]
public string DataFile;
[Argument(ArgumentType.AtMostOnce, HelpText = "Verbose?", ShortName = "v", Hide = true)]
public bool? Verbose;
}
private readonly IHostEnvironment _env;
private readonly string _dataFile;
public InfoCommand(IHostEnvironment env, Arguments args)
{
Contracts.CheckValue(env, nameof(env));
env.CheckValue(args, nameof(args));
env.CheckNonWhiteSpace(args.DataFile, nameof(args.DataFile), "Data file must be specified");
_dataFile = args.DataFile;
_env = env.Register(LoadName, verbose: args.Verbose);
}
private string VersionToString(ulong ver)
{
return string.Format("{0}.{1}.{2}.{3}", ver >> 48,
(ver >> 32) & 0xffff, (ver >> 16) & 0xffff, ver & 0xffff);
}
public void Run()
{
var host = _env.Register(LoadName);
var data = new MultiFileSource(_dataFile);
// We will not be iterating through the data, so the defaults are fine.
var args = new BinaryLoader.Arguments();
using (var loader = new BinaryLoader(host, args, data))
using (var ch = host.Start("Inspection"))
{
RunCore(ch, loader);
}
}
private void RunCore(IChannel ch, BinaryLoader loader)
{
Contracts.AssertValue(ch);
ch.AssertValue(loader);
// Report on basic info from the header.
Header header = loader._header;
long idvSize = header.TailOffset + sizeof(ulong);
if (loader._stream.Length != idvSize)
{
ch.Warning("Stream is {0} bytes, IDV is {1} bytes. This is legal but unusual.",
loader._stream.Length, idvSize);
}
ch.Info("IDV {0} (compat {1}), {2} col, {3} row, {4} bytes", VersionToString(header.Version),
VersionToString(header.CompatibleVersion), header.ColumnCount, header.RowCount, idvSize);
// Get all of the columns from the loader that are not generated.
// We will want to report them in the order they appear in the file,
// so order by column index.
var cols = loader._aliveColumns.Select(t => new KeyValuePair<bool, TableOfContentsEntry>(true, t))
.Concat(loader._deadColumns.Select(t => new KeyValuePair<bool, TableOfContentsEntry>(false, t)))
.Where(t => !t.Value.IsGenerated).OrderBy(t => t.Value.ColumnIndex);
long totalBlockSize = 0;
long totalMetadataSize = 0;
long totalMetadataTocSize = 0;
long blockCount = 0;
int colCount = 0;
foreach (var isLiveAndCol in cols)
{
var col = isLiveAndCol.Value;
ch.Assert(col.ColumnIndex == colCount); // *Every* column in the file should be here, even if dead.
// REVIEW: It is currently allowed in the format to point to the same block of data twice,
// even across columns. (Allowed in the sense that nothing prevents this from happening, but then
// there are no writers that take advantage of this to, say, resolve dependent blocks.) For our
// purposes here we will assume that all blocks are disjoint.
string typeDesc = col.Type == null ? "<?>" : col.Type.ToString();
long uncompressedSize = 0;
long compressedSize = 0;
var blockLookups = col.GetLookup();
ch.AssertValue(blockLookups); // Should be null iff this is generated, and we dropped those.
foreach (var lookup in blockLookups)
{
compressedSize += lookup.BlockLength;
uncompressedSize += lookup.DecompressedBlockLength;
blockCount++;
}
totalBlockSize += compressedSize;
ch.Info(MessageSensitivity.Schema, "Column {0} '{1}'{2} of {3} in {4} blocks of {5}", col.ColumnIndex, col.Name,
isLiveAndCol.Key ? "" : " (DEAD!)", typeDesc, blockLookups.Length, col.RowsPerBlock);
ch.Info(" {0} compressed from {1} with {2} ({3:0.00%})",
compressedSize, uncompressedSize, col.Compression, (compressedSize + 0.0) / uncompressedSize);
// Report on the metadata.
var mtoc = col.GetMetadataTocArray();
var deadMtoc = col.GetDeadMetadataTocArray();
if (mtoc == null)
ch.Assert(deadMtoc == null && col.MetadataTocOffset == 0);
else
{
long metadataSize = mtoc.Sum(t => t.BlockSize) + deadMtoc.Sum(t => t.BlockSize);
long metadataTocSize = col.GetMetadataTocEndOffset() - col.MetadataTocOffset;
string deadDisc = deadMtoc.Length > 0 ? string.Format(" ({0} dead)", deadMtoc.Length) : "";
ch.Info(" {0} pieces of metadata{1} has {2} byte table, {3} byte content",
mtoc.Length, deadDisc, metadataTocSize, metadataSize);
// REVIEW: Maybe with a switch, we could report on the individual pieces here as we do columns? Less important though.
totalMetadataSize += metadataSize;
totalMetadataTocSize += metadataTocSize;
}
colCount++;
}
ch.Assert(colCount == header.ColumnCount);
// Report on the the size breakdown.
long lookupTablesSize = blockCount * (sizeof(long) + sizeof(int) + sizeof(int));
long totalTocSize = loader._tocEndLim - header.TableOfContentsOffset;
const long headTailSize = Header.HeaderSize + sizeof(ulong);
long accountedSize = 0;
ch.Info(" "); // REVIEW: Ugh. Better way? Otherwise it's all smushed up.
Action<string, long> report =
(desc, size) =>
{
ch.Info("{0,8:0.000%} for {1} ({2} bytes)", (size + 0.0) / idvSize, desc, size);
accountedSize += size;
};
report("data blocks", totalBlockSize);
report("block lookup", lookupTablesSize);
report("table of contents", totalTocSize);
report("metadata contents", totalMetadataSize);
report("metadata table of contents", totalMetadataTocSize);
report("header/tail", headTailSize);
// The two could be unequal if there is an in-place modification done to the file.
if (idvSize != accountedSize)
report("unknown", idvSize - accountedSize);
}
}
}
}
|