File: DataView\Transposer.cs
Web Access
Project: src\src\Microsoft.ML.Data\Microsoft.ML.Data.csproj (Microsoft.ML.Data)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using Microsoft.ML.Data.IO;
using Microsoft.ML.Internal.Utilities;
using Microsoft.ML.Runtime;
 
namespace Microsoft.ML.Data
{
    /// <summary>
    /// This provides a scalable method of getting a "transposed" view of a subset of columns from an
    /// <see cref="IDataView"/>. Instances of <see cref="Transposer"/> act like a wrapped version of
    /// the input dataview, except that an indicated set of columns will be transposable, even if they
    /// were not transposable before. Note that transposition is a somewhat slow and resource intensive
    /// operation.
    /// </summary>
    [BestFriend]
    internal sealed class Transposer : ITransposeDataView, IDisposable
    {
        private static readonly FuncInstanceMethodInfo1<Transposer, int, SlotCursor> _getSlotCursorCoreMethodInfo
            = FuncInstanceMethodInfo1<Transposer, int, SlotCursor>.Create(target => target.GetSlotCursorCore<int>);
 
        private readonly IHost _host;
        // The input view.
        private readonly IDataView _view;
        // Note that the transposer will still present things as transposed, if the input was a transpose
        // dataview and that thing was listed as transposed.
        private readonly ITransposeDataView _tview;
        private readonly Dictionary<string, int> _nameToICol;
        // The following may be null, if no columns needed to be split.
        private readonly BinaryLoader _splitView;
        public readonly int RowCount;
        // -1 for input columns that were not transposed, a non-negative index into _cols for those that were.
        private readonly int[] _inputToTransposed;
        private readonly DataViewSchema.Column[] _cols;
        private readonly int[] _splitLim;
        private bool _disposed;
 
        /// <summary>
        /// Creates an instance given a list of column names.
        /// </summary>
        /// <param name="env">The host environment</param>
        /// <param name="view">The view whose columns we want to transpose</param>
        /// <param name="forceSave">Whether the internal transposer should always unconditionally
        /// save the column we are transposing. Can be useful if the original dataview is possibly
        /// slow to iterate over that column.</param>
        /// <param name="columns">The non-empty list of columns to transpose</param>
        public static Transposer Create(IHostEnvironment env, IDataView view, bool forceSave,
            params string[] columns)
        {
            Contracts.CheckValue(env, nameof(env));
            var h = env.Register("Transposer");
            h.CheckValue(view, nameof(view));
 
            var indices = CheckNamesAndGetIndices(h, view, columns);
            return new Transposer(h, view, forceSave, indices);
        }
 
        /// <summary>
        /// Creates an instance given a list of column indices.
        /// </summary>
        /// <param name="env">The host environment</param>
        /// <param name="view">The view whose columns we want to transpose</param>
        /// <param name="forceSave">Whether the internal transposer should always unconditionally
        /// save the column we are transposing. Can be useful if the original dataview is possibly
        /// slow to iterate over that column.</param>
        /// <param name="columns">The non-empty list of columns to transpose</param>
        public static Transposer Create(IHostEnvironment env, IDataView view, bool forceSave,
            params int[] columns)
        {
            Contracts.CheckValue(env, nameof(env));
            var host = env.Register("Transposer");
            host.CheckValue(view, nameof(view));
 
            var indices = CheckIndices(host, view, columns);
            return new Transposer(host, view, forceSave, indices);
        }
 
        private Transposer(IHost host, IDataView view, bool forceSave, int[] columns)
        {
            Contracts.AssertValue(host);
            _host = host;
            _host.AssertValue(view);
            _host.CheckParam(Utils.Size(columns) > 0, nameof(columns), "Cannot be empty");
 
            // REVIEW: Might be a good idea to not have the view as is, but to
            // instead apply choose columns to it first. This could simplify some of
            // the operations.
            _view = view;
            _tview = _view as ITransposeDataView;
            // Remove duplicates and ensure it is sorted.
            IEnumerable<int> columnSet = columns.Distinct().OrderBy(c => c);
            if (_tview != null)
            {
                // Keep only those columns for which we do not have a slot view already.
                columnSet = columnSet.Where(c => _tview.GetSlotType(c) == null);
            }
            columns = columnSet.ToArray();
            _cols = new DataViewSchema.Column[columns.Length];
            var schema = _view.Schema;
            _nameToICol = new Dictionary<string, int>();
            // Let i be a column index in _view's Schema. _inputToTransposed[i] is -1 if the i-th column can
            // be accessed column-wisely. Otherwise, the i-th input will become the _inputToTransposed[i]-th
            // transposed column in the output.
            _inputToTransposed = Utils.CreateArray(schema.Count, -1);
            for (int c = 0; c < columns.Length; ++c)
            {
                _nameToICol[(_cols[c] = schema[columns[c]]).Name] = c;
                _inputToTransposed[columns[c]] = c;
            }
 
            using (var ch = _host.Start("Init"))
            {
                var args = new BinarySaver.Arguments();
                // Run deflate at a slightly degraded level, since we anticipate that this is
                // a read-once situation, as opposed to general IDVs which we expect to be run
                // multiple times.
                args.Compression = CompressionKind.Default;
                // Our access into the file will be more or less
                // unstructured and random consistently so keep
                // the block size pretty safe.
                args.MaxBytesPerBlock = 1 << 28;
                args.Silent = true;
                var saver = new BinarySaver(_host, args);
 
                for (int c = 0; c < _cols.Length; ++c)
                {
                    // REVIEW: Despite not *necessarily* relying on the serialization
                    // for the transposition, I'm still going to insist on serialization,
                    // since it would be strange if the same type failed or not in the
                    // transposer depending on the size. At least as a user, that would
                    // surprise me. Also I expect this to never happen...
                    var type = schema[_cols[c].Index].Type;
                    if (!saver.IsColumnSavable(type))
                        throw ch.ExceptParam(nameof(view), "Column named '{0}' is not serializable by the transposer", _cols[c].Name);
                    if (type is VectorDataViewType vectorType && !vectorType.IsKnownSize)
                        throw ch.ExceptParam(nameof(view), "Column named '{0}' is vector, but not of known size, and so cannot be transposed", _cols[c].Name);
                }
 
                var slicer = new DataViewSlicer(_host, view, columns);
                var slicerSchema = slicer.Schema;
                ch.Assert(Enumerable.Range(0, slicerSchema.Count).All(c => saver.IsColumnSavable(slicerSchema[c].Type)));
                _splitLim = new int[_cols.Length];
                List<int> toSave = new List<int>();
                int offset = 0;
                int slicedCount = 0;
                for (int c = 0; c < _cols.Length; ++c)
                {
                    int min;
                    int lim;
                    slicer.InColToOutRange(c, out min, out lim);
                    // It must be a passthrough. We're not going to write it, and will just rely
                    // on the original view to provide the column.
                    ch.Assert(min < lim);
                    int count = lim - min;
                    if (forceSave || count > 1)
                    {
                        toSave.AddRange(Enumerable.Range(min, count));
                        slicedCount++;
                        offset += count;
                    }
                    _splitLim[c] = offset;
                }
 
                long rowCount;
                ch.Trace("{0} of {1} input columns sliced into {2} columns", slicedCount, _cols.Length, toSave.Count);
                if (toSave.Count > 0)
                {
                    // Only bother to create _splitView if we have to.
                    var stream = new HybridMemoryStream();
                    saver.SaveData(stream, slicer, toSave.ToArray());
                    stream.Seek(0, SeekOrigin.Begin);
                    ch.Trace("Sliced data saved to {0} bytes", stream.Length);
                    var loaderArgs = new BinaryLoader.Arguments();
                    _splitView = new BinaryLoader(_host, loaderArgs, stream, leaveOpen: false);
                    rowCount = DataViewUtils.ComputeRowCount(_splitView);
                }
                else
                    rowCount = DataViewUtils.ComputeRowCount(_view);
                ch.Assert(rowCount >= 0);
                if (rowCount > Utils.ArrayMaxSize)
                    throw _host.ExceptParam(nameof(view), "View has {0} rows, we cannot transpose with more than {1}", rowCount, Utils.ArrayMaxSize);
                RowCount = (int)rowCount;
            }
        }
 
        public void Dispose()
        {
            if (!_disposed)
            {
                _disposed = true;
                if (_splitView != null)
                    _splitView.Dispose();
            }
        }
 
        private static int[] CheckNamesAndGetIndices(IHost host, IDataView view, string[] columns)
        {
            Contracts.AssertValue(host, "host");
            host.AssertValue(view, "view");
            host.CheckParam(Utils.Size(columns) > 0, nameof(columns), "Cannot be empty");
 
            var schema = view.Schema;
            int[] indices = new int[columns.Length];
            for (int c = 0; c < columns.Length; ++c)
            {
                if (!schema.TryGetColumnIndex(columns[c], out indices[c]))
                    throw host.ExceptParam(nameof(columns), "Column named '{0}' not found", columns[c]);
            }
            return indices;
        }
 
        private static int[] CheckIndices(IHost host, IDataView view, int[] columns)
        {
            Contracts.AssertValue(host);
            host.AssertValue(view);
 
            var schema = view.Schema;
            for (int c = 0; c < columns.Length; ++c)
            {
                if (!(0 <= columns[c] && columns[c] < schema.Count))
                    throw host.ExceptParam(nameof(columns), "Column index {0} illegal for data with {1} column", columns[c], schema.Count);
            }
            return columns;
        }
 
        public SlotCursor GetSlotCursor(int col)
        {
            _host.CheckParam(0 <= col && col < _view.Schema.Count, nameof(col));
            if (_inputToTransposed[col] == -1)
            {
                // Check if the parent view has this slot transposed. If it doesn't, fail.
                if (_tview?.GetSlotType(col) != null)
                    return _tview.GetSlotCursor(col);
                // Note that i-th transposed column is actually all the values at the i-th original column.
                throw _host.ExceptParam(nameof(col), "Bad call to GetSlotCursor on untransposable column '{0}'", _tview.Schema[col].Name);
            }
            var type = ((ITransposeDataView)this).GetSlotType(col).ItemType.RawType;
 
            var tcol = _inputToTransposed[col];
            _host.Assert(0 <= tcol && tcol < _cols.Length);
            _host.Assert(_cols[tcol].Index == col);
 
            return Utils.MarshalInvoke(_getSlotCursorCoreMethodInfo, this, type, col);
        }
 
        private SlotCursor GetSlotCursorCore<T>(int col)
        {
            if (_view.Schema[col].Type is VectorDataViewType)
                return new SlotCursorVec<T>(this, col);
            return new SlotCursorOne<T>(this, col);
        }
 
        VectorDataViewType ITransposeDataView.GetSlotType(int col)
        {
            // We don't need the col-th column to be transposed by this transform, so
            // its type is inherited from input data.
            if (_inputToTransposed[col] == -1)
                return _tview?.GetSlotType(col);
 
            var transposedColumn = _view.Schema[col];
            PrimitiveDataViewType elementType = null;
            if (transposedColumn.Type is PrimitiveDataViewType primitiveDataViewType)
                elementType = primitiveDataViewType;
            else if (transposedColumn.Type is VectorDataViewType vectorDataViewType)
                elementType = vectorDataViewType.ItemType;
            _host.Assert(elementType != null);
 
            return new VectorDataViewType(elementType, RowCount);
        }
 
        #region IDataView implementation stuff, passthrough on to view.
        // It is helpful to have transposed data views actually implement dataview, since
        // we are still and will likely forever remain in a state where only a few specialized
        // operations make use of the transpose dataview, with many operations instead being
        // handled in the standard row-wise fashion.
        public DataViewSchema Schema => _view.Schema;
 
        public bool CanShuffle { get { return _view.CanShuffle; } }
 
        public DataViewRowCursor GetRowCursor(IEnumerable<DataViewSchema.Column> columnsNeeded, Random rand = null)
            => _view.GetRowCursor(columnsNeeded, rand);
 
        public DataViewRowCursor[] GetRowCursorSet(IEnumerable<DataViewSchema.Column> columnsNeeded, int n, Random rand = null)
            => _view.GetRowCursorSet(columnsNeeded, n, rand);
 
        public long? GetRowCount()
        {
            // Not a passthrough.
            return RowCount;
        }
        #endregion
 
        private abstract class SlotCursor<T> : SlotCursor.RootSlotCursor
        {
            private readonly Transposer _parent;
            private readonly int _col;
            private ValueGetter<VBuffer<T>> _getter;
 
            protected SlotCursor(Transposer parent, int col)
                : base(parent._host)
            {
                Ch.Assert(0 <= col && col < parent.Schema.Count);
                _parent = parent;
                _col = col;
            }
 
            public override ValueGetter<VBuffer<TValue>> GetGetter<TValue>()
            {
                if (_getter == null)
                    _getter = GetGetterCore();
                ValueGetter<VBuffer<TValue>> getter = _getter as ValueGetter<VBuffer<TValue>>;
                if (getter == null)
                    throw Ch.Except($"Invalid TValue: '{typeof(TValue)}', " +
                        $"expected type: '{_getter.GetType().GetGenericArguments().First().GetGenericArguments().First()}'.");
                return getter;
            }
 
            public override VectorDataViewType GetSlotType()
            {
                Ch.Assert(0 <= _col && _col < _parent.Schema.Count);
                return ((ITransposeDataView)_parent).GetSlotType(_col);
            }
 
            protected abstract ValueGetter<VBuffer<T>> GetGetterCore();
        }
 
        private sealed class SlotCursorOne<T> : SlotCursor<T>
        {
            private readonly IDataView _view;
            private readonly int _col;
            private readonly int _len;
            private bool _moved;
 
            public SlotCursorOne(Transposer parent, int col)
                : base(parent, col)
            {
                Ch.Assert(0 <= col && col < parent.Schema.Count);
                int iinfo = parent._inputToTransposed[col];
                Ch.Assert(iinfo >= 0);
                int smin = iinfo == 0 ? 0 : parent._splitLim[iinfo - 1];
                if (parent._splitLim[iinfo] == smin)
                {
                    // This is a passthrough column.
                    _view = parent._view;
                    _col = parent._cols[iinfo].Index;
                }
                else
                {
                    _view = parent._splitView;
                    _col = smin;
                    Ch.Assert(parent._splitLim[iinfo] - _col == 1);
                }
                Ch.AssertValue(_view);
                Ch.Assert(_view.Schema[_col].Type is PrimitiveDataViewType);
                Ch.Assert(_view.Schema[_col].Type.RawType == typeof(T));
                _len = parent.RowCount;
            }
 
            protected override bool MoveNextCore()
            {
                // We only can move next on one slot, since this is a scalar column.
                return _moved = !_moved;
            }
 
            protected override ValueGetter<VBuffer<T>> GetGetterCore()
            {
                var isDefault = Conversion.Conversions.DefaultInstance.GetIsDefaultPredicate<T>(_view.Schema[_col].Type);
                bool valid = false;
                VBuffer<T> cached = default(VBuffer<T>);
                return
                    (ref VBuffer<T> dst) =>
                    {
                        Ch.Check(IsGood, RowCursorUtils.FetchValueStateError);
                        if (!valid)
                        {
                            var currentColumn = _view.Schema[_col];
                            using (var cursor = _view.GetRowCursor(currentColumn))
                            {
                                int[] indices = null;
                                T[] values = null;
                                int len = -1;
                                int count = 0;
                                T value = default(T);
                                ValueGetter<T> getter = cursor.GetGetter<T>(currentColumn);
                                while (cursor.MoveNext())
                                {
                                    len++;
                                    Ch.Assert(len <= _len);
                                    getter(ref value);
                                    if (isDefault(in value))
                                        continue;
                                    Utils.EnsureSize(ref indices, ++count);
                                    indices[count - 1] = len;
                                    Utils.EnsureSize(ref values, count);
                                    values[count - 1] = value;
                                }
                                len++;
                                Ch.Assert(len == _len);
                                if (count < len / 2 || count == len)
                                    cached = new VBuffer<T>(len, count, values, count == len ? null : indices);
                                else
                                    (new VBuffer<T>(len, count, values, indices)).CopyToDense(ref cached);
                            }
                            valid = true;
                        }
                        cached.CopyTo(ref dst);
                    };
            }
        }
 
        private sealed class SlotCursorVec<T> : SlotCursor<T>
        {
            // The source data view. Note that this might be either the original input dataview
            // if the column was not sufficiently large to justify "slicing," or the slicer dataview
            // if it was large enough to justify splitting. (So this source data view will not
            // necessarily be the same as the Transposer _view, but it might be.)
            private readonly IDataView _view;
            // In the case when we've sliced a dataview, the slot cursor will need to iterative over
            // multiple cursors to get all slots from the original dataview that transposer is transposing.
            // These fields define this range.
            private readonly int _colMin;
            private readonly int _colLim;
            // The length of the resulting vectors. This is the same as the row count from the original dataview.
            private readonly int _len;
 
            // Temporary working/storage buffers.
            private readonly VBuffer<T>[] _rbuff; // Working intermediate row-wise buffer.
            private readonly int[] _rbuffIndices; // Working intermediate row-wise indices.
            private int[][] _indices;    // Working intermediate index buffers.
            private T[][] _values;       // Working intermediate value buffers.
            private int[] _counts;       // Working intermediate count buffers.
 
            private struct ColumnBufferStorage
            {
                // The transposed contents of _colStored.
                public VBuffer<T> Buffer;
 
                // These two arrays are the "cached" arrays inside of the Buffer
                // to be swapped between the _cbuff and _values/_indices.
                public readonly T[] Values;
                public readonly int[] Indices;
 
                public ColumnBufferStorage(VBuffer<T> buffer, T[] values, int[] indices)
                {
                    Buffer = buffer;
                    Values = values;
                    Indices = indices;
                }
            }
 
            private ColumnBufferStorage[] _cbuff; // Working intermediate column-wise buffer.
 
            // Variables to track current cursor position.
            private int _colStored;      // The current column of the source data view actually stored in the intermediate buffers.
            private int _colCurr;        // The current column of the split view that our cursor has on its position.
            private int _slotCurr;       // The current slot that our cursor has on its position.
            private int _slotLim;        // The limit of the slot index for the current column, so we know when to move to next columns.
 
            /// <summary>
            /// Constructs a slot cursor.
            /// </summary>
            /// <param name="parent">The transposer.</param>
            /// <param name="col">The index of the transposed column.</param>
            public SlotCursorVec(Transposer parent, int col)
                : base(parent, col)
            {
                int iinfo = parent._inputToTransposed[col];
                Ch.Assert(iinfo >= 0);
                int smin = iinfo == 0 ? 0 : parent._splitLim[iinfo - 1];
                if (parent._splitLim[iinfo] == smin)
                {
                    // This is a passthrough column.
                    _view = parent._view;
                    _colMin = parent._cols[iinfo].Index;
                    _colLim = _colMin + 1;
                }
                else
                {
                    _view = parent._splitView;
                    _colMin = smin;
                    _colLim = parent._splitLim[iinfo];
                }
 
                Ch.AssertValue(_view);
                // Make the current state just "before" the first column so
                // we can move cleanly onto the first slot of the first column.
                _colStored = _colCurr = _colMin - 1;
                _slotLim = 0;
                _slotCurr = -1;
                // The transposer will store this many rows from the source data view (either the
                // slicer, or the original dataview if the column was not sufficiently large) column
                // before copying into the _indices/_values/_counts working buffers, during a phase
                // of EnsureValid.
                _rbuff = new VBuffer<T>[16];
                _rbuffIndices = new int[_rbuff.Length];
                _len = parent.RowCount;
            }
 
            /// <summary>
            /// Ensures that the column from the source data view stored in our intermediate buffers is the
            /// current column requested.
            /// </summary>
            private void EnsureValid()
            {
                Ch.Check(IsGood, RowCursorUtils.FetchValueStateError);
                Ch.Assert(_slotCurr >= 0);
                if (_colStored == _colCurr)
                    return;
 
                var type = _view.Schema[_colCurr].Type;
                DataViewType itemType = type.GetItemType();
                Ch.Assert(itemType.RawType == typeof(T));
                int vecLen = type.GetValueCount();
                Ch.Assert(vecLen > 0);
                InPredicate<T> isDefault = Conversion.Conversions.DefaultInstance.GetIsDefaultPredicate<T>(itemType);
                int maxPossibleSize = _rbuff.Length * vecLen;
                const int sparseThresholdRatio = 5;
                int sparseThreshold = (maxPossibleSize + sparseThresholdRatio - 1) / sparseThresholdRatio;
                Array.Clear(_rbuffIndices, 0, _rbuffIndices.Length);
                int offset = 0;
 
                // REVIEW: An obvious enhancement to make to this system is to take everything in the
                // below "using" and make it part of some sort of external task, which this method waits on
                // instead of actually doing the computation itself. The benefit there is that the next column
                // is having its values loaded into _indices/_values/_counts while the current column is being
                // served up to the consumer through _cbuff.
 
                var currentColumn = _view.Schema[_colCurr];
                using (var cursor = _view.GetRowCursor(currentColumn))
                {
                    // Make sure that the buffers (and subbuffers) are all of appropriate size.
                    Utils.EnsureSize(ref _indices, vecLen);
                    for (int i = 0; i < vecLen; ++i)
                        _indices[i] = _indices[i] ?? new int[_len];
                    Utils.EnsureSize(ref _values, vecLen);
                    for (int i = 0; i < vecLen; ++i)
                        _values[i] = _values[i] ?? new T[_len];
                    Utils.EnsureSize(ref _counts, vecLen, keepOld: false);
                    if (vecLen > 0)
                        Array.Clear(_counts, 0, vecLen);
 
                    var getter = cursor.GetGetter<VBuffer<T>>(currentColumn);
                    int irbuff = 0; // Next index into _rbuff. During the copy phase this doubles as the lim.
                    int countSum = 0;
                    // In the key value pair, the first is the slot index, then second is the row index in _rbuff.
                    var heap = new Heap<KeyValuePair<int, int>>((p1, p2) => p1.Key > p2.Key || (p1.Key == p2.Key && p1.Value > p2.Value), _rbuff.Length);
 
                    Action copyPhase =
                        () =>
                        {
                            if (countSum >= sparseThreshold)
                            {
                                // Slot by slot insertion, involving an exhaustive check over the tile.
                                for (int s = 0; s < vecLen; ++s)
                                {
                                    int[] indices = _indices[s];
                                    T[] values = _values[s];
 
                                    for (int r = 0; r < irbuff; ++r)
                                    {
                                        int rowNum = offset + r;
                                        var rbuff = _rbuff[r];
 
                                        var rbuffValues = rbuff.GetValues();
                                        if (rbuff.IsDense)
                                        {
                                            // Store it as sparse. We will densify later, if we must.
                                            if (!isDefault(in rbuffValues[s]))
                                            {
                                                indices[_counts[s]] = rowNum;
                                                values[_counts[s]++] = rbuffValues[s];
                                            }
                                        }
                                        else
                                        {
                                            var rbuffIndices = rbuff.GetIndices();
                                            int ii = _rbuffIndices[r];
                                            if (ii < rbuffIndices.Length && rbuffIndices[ii] == s)
                                            {
                                                if (!isDefault(in rbuffValues[ii]))
                                                {
                                                    indices[_counts[s]] = rowNum;
                                                    values[_counts[s]++] = rbuffValues[ii];
                                                }
                                                _rbuffIndices[r]++;
                                            }
                                        }
                                    }
                                }
                            }
                            else
                            {
                                // Slot by slot insertion, involving a structure to determine the row to insert next.
                                Ch.Assert(heap.Count == 0);
                                int s = -1;
                                int[] indices = null;
                                T[] values = null;
                                // Construct the initial heap.
                                for (int r = 0; r < irbuff; ++r)
                                {
                                    var rbuff = _rbuff[r];
                                    if (rbuff.GetValues().Length > 0)
                                        heap.Add(new KeyValuePair<int, int>(rbuff.IsDense ? 0 : rbuff.GetIndices()[0], r));
                                }
                                while (heap.Count > 0)
                                {
                                    var pair = heap.Pop(); // Key is the slot, pair is the row index.
                                    if (pair.Key != s)
                                    {
                                        Ch.Assert(pair.Key > s);
                                        s = pair.Key;
                                        indices = _indices[s];
                                        values = _values[s];
                                    }
                                    var rbuff = _rbuff[pair.Value];
                                    var rbuffValues = rbuff.GetValues();
                                    var rbuffIndices = rbuff.GetIndices();
                                    int ii = rbuff.IsDense ? s : _rbuffIndices[pair.Value]++;
                                    Ch.Assert(rbuff.IsDense || rbuffIndices[ii] == s);
                                    indices[_counts[s]] = pair.Value + offset;
                                    values[_counts[s]++] = rbuffValues[ii];
                                    if (++ii < rbuffValues.Length) // Still more stuff. Add another followup item to the heap.
                                        heap.Add(new KeyValuePair<int, int>(rbuff.IsDense ? s + 1 : rbuffIndices[ii], pair.Value));
                                }
                            }
                            Array.Clear(_rbuffIndices, 0, irbuff);
                            offset += irbuff;
                            countSum = irbuff = 0;
                        };
 
                    while (cursor.MoveNext())
                    {
                        int idx = checked((int)cursor.Position);
                        Ch.Assert(0 <= idx && idx < _len);
                        getter(ref _rbuff[irbuff]);
                        countSum += _rbuff[irbuff].GetValues().Length;
                        if (++irbuff == _rbuff.Length)
                            copyPhase();
                    }
                    if (irbuff > 0)
                        copyPhase();
                    Ch.Assert(offset == _len);
                }
 
                // REVIEW: Everything *above* could be factored into async code, but the below absolutely must
                // occur as an exclusive section.
 
                // Finalize the contents of _cbuff based on _counts/_values/_indices.
                Utils.EnsureSize(ref _cbuff, vecLen);
                for (int s = 0; s < vecLen; ++s)
                {
                    int count = _counts[s];
                    T[] values = _values[s];
                    int[] indices = _indices[s];
                    var temp = new VBuffer<T>(_len, count, values, indices);
                    if (count < _len / 2)
                    {
                        // Already sparse enough, I guess. Swap out the arrays.
                        ColumnBufferStorage existingBuffer = _cbuff[s];
                        _cbuff[s] = new ColumnBufferStorage(temp, values, indices);
                        _indices[s] = existingBuffer.Indices ?? new int[_len];
                        _values[s] = existingBuffer.Values ?? new T[_len];
                        Ch.Assert(_indices[s].Length == _len);
                        Ch.Assert(_values[s].Length == _len);
                    }
                    else
                    {
                        // Not dense enough. Densify temp into _cbuff[s]. Don't swap the arrays.
                        temp.CopyToDense(ref _cbuff[s].Buffer);
                    }
                }
                _colStored = _colCurr;
            }
 
            protected override bool MoveNextCore()
            {
                if (++_slotCurr < _slotLim)
                    return true;
                Ch.Assert(_slotCurr == _slotLim);
                _slotCurr = 0;
                if (++_colCurr == _colLim)
                    return false;
                _slotLim = _view.Schema[_colCurr].Type.GetValueCount();
                Ch.Assert(_slotLim > 0);
                return true;
            }
 
            private void Getter(ref VBuffer<T> dst)
            {
                Ch.Check(IsGood, RowCursorUtils.FetchValueStateError);
                EnsureValid();
                Ch.Assert(0 <= _slotCurr && _slotCurr < Utils.Size(_cbuff) && _cbuff[_slotCurr].Buffer.Length == _len);
                _cbuff[_slotCurr].Buffer.CopyTo(ref dst);
            }
 
            protected override ValueGetter<VBuffer<T>> GetGetterCore()
            {
                return Getter;
            }
        }
 
        /// <summary>
        /// This takes an input data view, and presents a dataset with "sliced" up columns
        /// that are partitionings of the original columns. Scalar columns and sufficiently
        /// small vector columns are just served up as themselves. The idea is that each of
        /// those slices should be small enough that storing an entire column in memory.
        /// </summary>
        private sealed class DataViewSlicer : IDataView
        {
            // REVIEW: Could this be useful in its own right as a transform? We will
            // have to have something that selects out a subset of columns, somehow, someday.
 
            private readonly IDataView _input;
            // For each input column, the structure handling the mapping of that column
            // into multiple split output columns.
            private readonly Splitter[] _splitters;
            // For each input column, indicate what the limit of the output columns is.
            private readonly int[] _incolToLim;
            // Each of our output columns maps to a splitter. Multiple columns can
            // map to the same splitter, that being kind of the point of a splitter.
            private readonly int[] _colToSplitIndex;
            // For each output column, indicates what output column it's surfacing
            // from the splitter.
            private readonly int[] _colToSplitCol;
 
            private readonly IHost _host;
 
            public bool CanShuffle { get { return _input.CanShuffle; } }
 
            public DataViewSchema Schema { get; }
 
            public DataViewSlicer(IHost host, IDataView input, int[] toSlice)
            {
                Contracts.AssertValue(host, "host");
                _host = host;
 
                _host.AssertValue(input);
                _host.AssertValue(toSlice);
 
                _input = input;
                _splitters = new Splitter[toSlice.Length];
                _incolToLim = new int[toSlice.Length];
                int outputColumnCount = 0;
                // Also build our schema's name to index here. The slicers just surface the original
                // input name as the name to all of our input columns.
                var nameToCol = new Dictionary<string, int>();
                for (int c = 0; c < toSlice.Length; ++c)
                {
                    var splitter = _splitters[c] = Splitter.Create(_input, toSlice[c]);
                    _host.Assert(splitter.ColumnCount >= 1);
                    // One splitter can produce multiple columns because it splits a input column into multiple output columns.
                    // _incolToLim[c] stores (the last output column index of the c-th splitter) + 1.
                    _incolToLim[c] = outputColumnCount += splitter.ColumnCount;
                    // toSlice[c] stores the input column index processed by the c-th splitter. In the output schema, we map a
                    // output column name to the last column produced by the associated splitter. For example, if input column
                    // "Features" (column index 5) gets splitted into three output columns "Features" (column index 0), "Features"
                    // (column index 1), "Features" (column index 2), nameToCol["Features"] should return 2. Note that output column
                    // names are identical to their source column name.
                    nameToCol[_input.Schema[toSlice[c]].Name] = outputColumnCount - 1;
                }
                // Here outputColumnCount denotes the total number of columns produced by all splitters.
                _colToSplitIndex = new int[outputColumnCount];
                _colToSplitCol = new int[outputColumnCount];
                // Below outputColumnCount becomes index of output columns. When outputColumnCount = 0, we process the first column
                // in the output data.
                outputColumnCount = 0;
                // Iterate through all splitters. For each splitter, multiple output columns can be produced.
                for (int c = 0; c < _splitters.Length; ++c)
                {
                    int outCount = _splitters[c].ColumnCount;
                    // Iterate through all columns produced by the c-th splitter.
                    for (int i = 0; i < outCount; ++i)
                    {
                        // Output column indexed by outputColumnCount is produce by _splitters[c].
                        _colToSplitIndex[outputColumnCount] = c;
                        // Output column indexed by outputColumnCount is the i-th column in _splitters[c]'s output columns.
                        _colToSplitCol[outputColumnCount++] = i;
                    }
                }
                _host.Assert(outputColumnCount == _colToSplitIndex.Length);
 
                // Sequentially concatenate output columns from all splitters to form output schema.
                var schemaBuilder = new DataViewSchema.Builder();
                for (int c = 0; c < _splitters.Length; ++c)
                    schemaBuilder.AddColumns(_splitters[c].OutputSchema);
                Schema = schemaBuilder.ToSchema();
            }
 
            public long? GetRowCount()
            {
                // Splitting columns into smaller pieces doesn't affect number of rows, so the row number
                // in output data is the same to that of input data.
                return _input.GetRowCount();
            }
 
            /// <summary>
            /// Given the index of a column we were told to split, get the corresponding range out output
            /// ranges.
            /// </summary>
            /// <param name="incol">The index into the array of column indices.</param>
            /// <param name="outMin">The minimum output column index corresponding to that split column</param>
            /// <param name="outLim">The exclusive limit of the output column index corresponding to that
            /// split column</param>
            public void InColToOutRange(int incol, out int outMin, out int outLim)
            {
                _host.Assert(0 <= incol && incol < _incolToLim.Length);
                outMin = incol == 0 ? 0 : _incolToLim[incol - 1];
                outLim = _incolToLim[incol];
            }
 
            /// <summary>
            /// Given an output column index, find which spliter produces it and which spliter column is its source.
            /// </summary>
            /// <param name="col">An output column index</param>
            /// <param name="splitInd"><see cref="_splitters"/>[splitInd] produces the specified output column.</param>
            /// <param name="splitCol">The specified output column is the splitCol-th column among columns produced by <see cref="_splitters"/>[splitInd].</param>
            private void OutputColumnToSplitterIndices(int col, out int splitInd, out int splitCol)
            {
                _host.Assert(0 <= col && col < _colToSplitIndex.Length);
                splitInd = _colToSplitIndex[col];
                splitCol = _colToSplitCol[col];
            }
 
            public DataViewRowCursor GetRowCursor(IEnumerable<DataViewSchema.Column> columnsNeeded, Random rand = null)
            {
                var predicate = RowCursorUtils.FromColumnsToPredicate(columnsNeeded, Schema);
 
                bool[] activeSplitters;
                var srcPred = CreateInputPredicate(predicate, out activeSplitters);
 
                var inputCols = _input.Schema.Where(x => srcPred(x.Index));
                return new Cursor(_host, this, _input.GetRowCursor(inputCols, rand), predicate, activeSplitters);
            }
 
            public DataViewRowCursor[] GetRowCursorSet(IEnumerable<DataViewSchema.Column> columnsNeeded, int n, Random rand = null)
            {
                _host.CheckValueOrNull(rand);
 
                var predicate = RowCursorUtils.FromColumnsToPredicate(columnsNeeded, Schema);
 
                bool[] activeSplitters;
                var srcPred = CreateInputPredicate(predicate, out activeSplitters);
 
                var srcCols = columnsNeeded.Where(x => srcPred(x.Index));
                var result = _input.GetRowCursorSet(srcCols, n, rand);
                for (int i = 0; i < result.Length; ++i)
                    result[i] = new Cursor(_host, this, result[i], predicate, activeSplitters);
                return result;
            }
 
            /// <summary>
            /// Given a possibly null predicate for this data view, produce the dependency predicate for the sources,
            /// as well as a list of all the splitters for which we should produce rowsets.
            /// </summary>
            /// <param name="pred">The predicate input into the <see cref="GetRowCursor(IEnumerable{DataViewSchema.Column}, Random)"/> method.</param>
            /// <param name="activeSplitters">A boolean indicator array of length equal to the number of splitters,
            /// indicating whether that splitter has any active columns in its outputs or not</param>
            /// <returns>The predicate to use when constructing the row cursor from the source</returns>
            private Func<int, bool> CreateInputPredicate(Func<int, bool> pred, out bool[] activeSplitters)
            {
                _host.AssertValueOrNull(pred);
 
                activeSplitters = new bool[_splitters.Length];
                var activeSrcSet = new HashSet<int>();
                int offset = 0;
                for (int i = 0; i < activeSplitters.Length; ++i)
                {
                    var splitter = _splitters[i];
                    // Don't activate input source columns if none of the resulting columns were selected.
                    bool isActive = pred == null || Enumerable.Range(offset, splitter.OutputSchema.Count).Any(c => pred(c));
                    if (isActive)
                    {
                        activeSplitters[i] = isActive;
                        activeSrcSet.Add(splitter.SrcCol);
                    }
                    offset += splitter.ColumnCount;
                }
                return activeSrcSet.Contains;
            }
 
            /// <summary>
            /// There is one instance of these per column, implementing the possible splitting
            /// of one column from a <see cref="IDataView"/> into multiple columns. The instance
            /// describes the resulting split columns through <see cref="Splitter.OutputSchema"/>,
            /// and then can be bound to an <see cref="DataViewRow"/> to provide that splitting functionality.
            /// </summary>
            private abstract class Splitter
            {
                private static readonly FuncStaticMethodInfo1<IDataView, int, Splitter> _createCoreMethodInfo
                    = new FuncStaticMethodInfo1<IDataView, int, Splitter>(CreateCore<int>);
 
                private readonly IDataView _view;
                private readonly int _col;
                public abstract int ColumnCount { get; }
 
                public int SrcCol { get { return _col; } }
 
                /// <summary>
                /// Output schema of a splitter. A splitter takes a column from input data and then divide it into multiple columns
                /// to form its output data.
                /// </summary>
                public abstract DataViewSchema OutputSchema { get; }
 
                protected Splitter(IDataView view, int col)
                {
                    Contracts.AssertValue(view);
                    Contracts.Assert(0 <= col && col < view.Schema.Count);
                    _view = view;
                    _col = col;
                }
 
                /// <summary>
                /// Creates a splitter for a given row.
                /// </summary>
                public static Splitter Create(IDataView view, int col)
                {
                    var type = view.Schema[col].Type;
                    int vectorSize = type.GetVectorSize();
                    Contracts.Assert(type is PrimitiveDataViewType || vectorSize > 0);
                    const int defaultSplitThreshold = 16;
                    if (vectorSize <= defaultSplitThreshold)
                        return Utils.MarshalInvoke(_createCoreMethodInfo, type.RawType, view, col);
                    else
                    {
                        // There are serious practical problems with trying to save many thousands of columns.
                        // We balance this by setting a hard limit on the number of splits per column we will
                        // generate.
                        const int maxSplitInto = 256;
                        int splitInto = (vectorSize - 1) / defaultSplitThreshold + 1;
                        int[] ends;
                        if (splitInto <= maxSplitInto)
                        {
                            ends = new int[splitInto];
                            for (int i = 0; i < ends.Length; ++i)
                                ends[i] = (i + 1) * defaultSplitThreshold;
                        }
                        else
                        {
                            ends = new int[maxSplitInto];
                            for (int i = 0; i < ends.Length; ++i)
                                ends[i] = (int)((long)(i + 1) * vectorSize / maxSplitInto);
                        }
                        ends[ends.Length - 1] = vectorSize;
                        // We have a min of 1 here, because if the first min was 0 then
                        // the first split would cover no slots.
                        Contracts.Assert(Utils.IsIncreasing(1, ends, vectorSize + 1));
                        return Utils.MarshalInvoke(CreateCore<int>, type.GetItemType().RawType, view, col, ends);
                    }
                }
 
                /// <summary>
                /// Given an input <see cref="DataViewRow"/>, create the <see cref="DataViewRow"/> containing the split
                /// version of the columns.
                /// </summary>
                public abstract DataViewRow Bind(DataViewRow row, Func<int, bool> pred);
 
                private static Splitter CreateCore<T>(IDataView view, int col)
                {
                    return new NoSplitter<T>(view, col);
                }
 
                private static Splitter CreateCore<T>(IDataView view, int col, int[] ends)
                {
                    return new ColumnSplitter<T>(view, col, ends);
                }
 
                private abstract class RowBase<TSplitter> : WrappingRow
                    where TSplitter : Splitter
                {
                    protected readonly TSplitter Parent;
 
                    public sealed override DataViewSchema Schema => Parent.OutputSchema;
 
                    public RowBase(TSplitter parent, DataViewRow input)
                        : base(input)
                    {
                        Contracts.AssertValue(parent);
                        Contracts.AssertValue(input);
                        Contracts.Assert(input.IsColumnActive(input.Schema[parent.SrcCol]));
                        Parent = parent;
                    }
                }
 
                /// <summary>
                /// A splitter that doesn't split, just passes through the column contents.
                /// Useful for when we've been told to "split" a column that we don't need
                /// to split.
                /// </summary>
                private sealed class NoSplitter<T> : Splitter
                {
                    public override int ColumnCount => 1;
 
                    public override DataViewSchema OutputSchema { get; }
 
                    /// <summary>
                    /// This is NoSplitter. Thus, the column, indexed by col, which supposes to be splitted will just be copied to an output
                    /// column without splitting.
                    /// </summary>
                    /// <param name="view">Input data whose columns can be splitted.</param>
                    /// <param name="col">The selected column's index.</param>
                    public NoSplitter(IDataView view, int col)
                        : base(view, col)
                    {
                        Contracts.Assert(_view.Schema[col].Type.RawType == typeof(T));
 
                        // The column selected for splitting.
                        var selectedColumn = _view.Schema[col];
 
                        var schemaBuilder = new DataViewSchema.Builder();
                        // Just copy the selected column to output since no splitting happens.
                        schemaBuilder.AddColumn(selectedColumn.Name, selectedColumn.Type, selectedColumn.Annotations);
                        OutputSchema = schemaBuilder.ToSchema();
                    }
 
                    public override DataViewRow Bind(DataViewRow row, Func<int, bool> pred)
                    {
                        Contracts.AssertValue(row);
                        Contracts.Assert(row.Schema == _view.Schema);
                        Contracts.AssertValue(pred);
                        Contracts.Assert(row.IsColumnActive(row.Schema[SrcCol]));
                        return new RowImpl(this, row, pred(0));
                    }
 
                    private sealed class RowImpl : RowBase<NoSplitter<T>>
                    {
                        private readonly bool _isActive;
 
                        public RowImpl(NoSplitter<T> parent, DataViewRow input, bool isActive)
                            : base(parent, input)
                        {
                            Contracts.Assert(Parent.ColumnCount == 1);
                            _isActive = isActive;
                        }
 
                        /// <summary>
                        /// Returns whether the given column is active in this row.
                        /// </summary>
                        public override bool IsColumnActive(DataViewSchema.Column column)
                        {
                            Contracts.CheckParam(column.Index < Parent.ColumnCount, nameof(column));
                            return _isActive;
                        }
 
                        /// <summary>
                        /// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
                        /// This throws if the column is not active in this row, or if the type
                        /// <typeparamref name="TValue"/> differs from this column's type.
                        /// </summary>
                        /// <typeparam name="TValue"> is the column's content type.</typeparam>
                        /// <param name="column"> is the output column whose getter should be returned.</param>
                        public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
                        {
                            Contracts.Check(IsColumnActive(column));
                            return Input.GetGetter<TValue>(Input.Schema[Parent.SrcCol]);
                        }
                    }
                }
 
                /// <summary>
                /// This splitter enables the partition of a single column into two or more
                /// columns.
                /// </summary>
                private sealed class ColumnSplitter<T> : Splitter
                {
                    private readonly int[] _lims;
                    // Cache of the types of each slice.
                    private readonly VectorDataViewType[] _types;
 
                    public override DataViewSchema OutputSchema { get; }
 
                    public override int ColumnCount { get { return _lims.Length; } }
 
                    /// <summary>
                    /// Provide a column partitioner that partitions a vector column into multiple
                    /// vector columns.
                    /// </summary>
                    /// <param name="view">The view where we are slicing one column</param>
                    /// <param name="col">The column we are slicing</param>
                    /// <param name="lims">Equal in length to the number of slices, this is
                    /// the limit of the indices of each slice, where the successive slice
                    /// starts with that limit as its minimum index. So slice i comes from
                    /// source slot indices from <c><paramref name="lims"/>[i-1]</c> inclusive to
                    /// <c><paramref name="lims"/>[i]</c> exclusive, with slice 0 starting at 0.</param>
                    public ColumnSplitter(IDataView view, int col, int[] lims)
                        : base(view, col)
                    {
                        var type = _view.Schema[SrcCol].Type as VectorDataViewType;
                        // Only valid use is for two or more slices.
                        Contracts.Assert(Utils.Size(lims) >= 2);
                        Contracts.AssertValue(type);
                        Contracts.Assert(type.Size > 0);
                        Contracts.Assert(type.ItemType.RawType == typeof(T));
                        Contracts.Assert(Utils.IsIncreasing(0, lims, type.Size + 1));
                        Contracts.Assert(lims[lims.Length - 1] == type.Size);
 
                        _lims = lims;
                        _types = new VectorDataViewType[_lims.Length];
                        _types[0] = new VectorDataViewType(type.ItemType, _lims[0]);
                        for (int c = 1; c < _lims.Length; ++c)
                            _types[c] = new VectorDataViewType(type.ItemType, _lims[c] - _lims[c - 1]);
 
                        var selectedColumn = _view.Schema[col];
                        var schemaBuilder = new DataViewSchema.Builder();
                        for (int c = 0; c < _lims.Length; ++c)
                            schemaBuilder.AddColumn(selectedColumn.Name, _types[c]);
                        OutputSchema = schemaBuilder.ToSchema();
                    }
 
                    public override DataViewRow Bind(DataViewRow row, Func<int, bool> pred)
                    {
                        Contracts.AssertValue(row);
                        Contracts.Assert(row.Schema == _view.Schema);
                        Contracts.AssertValue(pred);
                        Contracts.Assert(row.IsColumnActive(row.Schema[SrcCol]));
                        return new RowImpl(this, row, pred);
                    }
 
                    private sealed class RowImpl : RowBase<ColumnSplitter<T>>
                    {
                        // Counter of the last valid input, updated by EnsureValid.
                        private long _lastValid;
                        // The last valid input value.
                        private VBuffer<T> _inputValue;
                        // The delegate to get the input value.
                        private readonly ValueGetter<VBuffer<T>> _inputGetter;
                        // The limit of _inputValue.Indices
                        private readonly int[] _srcIndicesLims;
                        // Convenient accessor since we use this all over the place.
                        private int[] Lims { get { return Parent._lims; } }
                        // Getters.
                        private readonly ValueGetter<VBuffer<T>>[] _getters;
 
                        public RowImpl(ColumnSplitter<T> parent, DataViewRow input, Func<int, bool> pred)
                            : base(parent, input)
                        {
                            _inputGetter = input.GetGetter<VBuffer<T>>(input.Schema[Parent.SrcCol]);
                            _srcIndicesLims = new int[Lims.Length];
                            _lastValid = -1;
                            _getters = new ValueGetter<VBuffer<T>>[Lims.Length];
                            for (int c = 0; c < _getters.Length; ++c)
                                _getters[c] = pred(c) ? CreateGetter(c) : null;
                        }
 
                        /// <summary>
                        /// Returns whether the given column is active in this row.
                        /// </summary>
                        public override bool IsColumnActive(DataViewSchema.Column column)
                        {
                            Contracts.CheckParam(column.Index < Parent.ColumnCount, nameof(column));
                            return _getters[column.Index] != null;
                        }
 
                        /// <summary>
                        /// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
                        /// This throws if the column is not active in this row, or if the type
                        /// <typeparamref name="TValue"/> differs from this column's type.
                        /// </summary>
                        /// <typeparam name="TValue"> is the column's content type.</typeparam>
                        /// <param name="column"> is the output column whose getter should be returned.</param>
                        public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
                        {
                            Contracts.Check(IsColumnActive(column) && column.Index < _getters.Length);
                            Contracts.AssertValue(_getters[column.Index]);
                            var originFn = _getters[column.Index];
                            var fn = originFn as ValueGetter<TValue>;
                            if (fn == null)
                                throw Contracts.Except($"Invalid TValue in GetGetter: '{typeof(TValue)}', " +
                                    $"expected type: '{originFn.GetType().GetGenericArguments().First()}'.");
                            return fn;
                        }
 
                        private ValueGetter<VBuffer<T>> CreateGetter(int col)
                        {
                            int min = col == 0 ? 0 : Lims[col - 1];
                            int len = Lims[col] - min;
                            return
                                (ref VBuffer<T> value) =>
                                {
                                    EnsureValid();
                                    VBufferEditor<T> editor;
                                    if (_inputValue.IsDense)
                                    {
                                        editor = VBufferEditor.Create(ref value, len);
                                        _inputValue.GetValues().Slice(min, len).CopyTo(editor.Values);
                                        value = editor.Commit();
                                        return;
                                    }
                                    // In the sparse case we have ranges on Indices/Values to consider.
                                    int smin = col == 0 ? 0 : _srcIndicesLims[col - 1];
                                    int slim = _srcIndicesLims[col];
                                    int scount = slim - smin;
                                    if (scount == 0)
                                    {
                                        VBufferUtils.Resize(ref value, len, 0);
                                        return;
                                    }
 
                                    editor = VBufferEditor.Create(ref value, len, scount);
                                    bool isDense = len == scount;
                                    if (!isDense)
                                    {
                                        _inputValue.GetIndices().Slice(smin, scount).CopyTo(editor.Indices);
 
                                        if (min != 0)
                                        {
                                            for (int i = 0; i < scount; ++i)
                                                editor.Indices[i] -= min;
                                        }
                                    }
                                    _inputValue.GetValues().Slice(smin, scount).CopyTo(editor.Values);
                                    value = editor.Commit();
                                };
                        }
 
                        private void EnsureValid()
                        {
                            if (_lastValid == Input.Position)
                                return;
                            _inputGetter(ref _inputValue);
                            Contracts.Assert(_inputValue.Length == Parent._lims[Parent._lims.Length - 1]);
                            // If it's dense, there's no need to determine the beginnings
                            // and end of each slice.
                            if (_inputValue.IsDense)
                                return;
                            var indices = _inputValue.GetIndices();
                            if (indices.Length == 0)
                            {
                                // Handle this separately, since _inputValue.Indices might be null
                                // in this case, and then we may as well short circuit it anyway.
                                Array.Clear(_srcIndicesLims, 0, _srcIndicesLims.Length);
                                return;
                            }
 
                            int ii = 0;
                            for (int i = 0; i < Lims.Length; ++i)
                            {
                                int lim = Lims[i];
                                // REVIEW: Would some form of bisection search be better
                                // than this scan? Possibly if the search were to happen across
                                // all lims at the same time, somehow.
                                while (ii < indices.Length && indices[ii] < lim)
                                    ii++;
                                _srcIndicesLims[i] = ii;
                            }
                            _lastValid = Input.Position;
                        }
                    }
                }
            }
 
            /// <summary>
            /// The cursor implementation creates the <see cref="DataViewRow"/>s using <see cref="Splitter.Bind"/>,
            /// then collates the results from those rows as effectively one big row.
            /// </summary>
            private sealed class Cursor : SynchronizedCursorBase
            {
                private readonly DataViewSlicer _slicer;
                private readonly DataViewRow[] _sliceRows;
 
                public override DataViewSchema Schema => _slicer.Schema;
 
                public Cursor(IChannelProvider provider, DataViewSlicer slicer, DataViewRowCursor input, Func<int, bool> pred, bool[] activeSplitters)
                    : base(provider, input)
                {
                    Ch.AssertValue(slicer);
                    Ch.AssertValueOrNull(pred);
                    Ch.Assert(Utils.Size(activeSplitters) == slicer._splitters.Length);
 
                    _slicer = slicer;
                    _sliceRows = new DataViewRow[_slicer._splitters.Length];
                    var activeSrc = new bool[slicer._splitters.Length];
                    var activeSrcSet = new HashSet<int>();
                    int offset = 0;
                    Func<int, bool> defaultPred = null;
                    if (pred == null)
                        defaultPred = col => true;
 
                    for (int i = 0; i < activeSplitters.Length; ++i)
                    {
                        var splitter = _slicer._splitters[i];
                        var localOffset = offset;
                        if (activeSplitters[i])
                            _sliceRows[i] = splitter.Bind(input, pred == null ? defaultPred : col => pred(col + localOffset));
                        offset += splitter.ColumnCount;
                    }
                }
 
                /// <summary>
                /// Returns whether the given column is active in this row.
                /// </summary>
                public override bool IsColumnActive(DataViewSchema.Column column)
                {
                    Ch.Check(column.Index < Schema.Count, nameof(column));
                    int splitInd;
                    int splitCol;
                    _slicer.OutputColumnToSplitterIndices(column.Index, out splitInd, out splitCol);
                    var row = _sliceRows[splitInd];
                    return row != null && row.IsColumnActive(row.Schema[splitCol]);
                }
 
                /// <summary>
                /// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
                /// This throws if the column is not active in this row, or if the type
                /// <typeparamref name="TValue"/> differs from this column's type.
                /// </summary>
                /// <typeparam name="TValue"> is the column's content type.</typeparam>
                /// <param name="column"> is the output column whose getter should be returned.</param>
                public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
                {
                    Ch.Check(IsColumnActive(column));
                    int splitInd;
                    int splitCol;
                    _slicer.OutputColumnToSplitterIndices(column.Index, out splitInd, out splitCol);
                    var splitIndRow = _sliceRows[splitInd];
                    return splitIndRow.GetGetter<TValue>(splitIndRow.Schema[splitCol]);
                }
            }
        }
    }
 
    internal static class TransposerUtils
    {
        private static readonly FuncStaticMethodInfo1<IChannelProvider, SlotCursor, DataViewRowCursor> _getRowCursorShimCoreMethodInfo
            = new FuncStaticMethodInfo1<IChannelProvider, SlotCursor, DataViewRowCursor>(GetRowCursorShimCore<int>);
 
        private static readonly FuncInstanceMethodInfo1<SlotCursor, Delegate> _slotCursorGetGetterMethodInfo
            = FuncInstanceMethodInfo1<SlotCursor, Delegate>.Create(target => target.GetGetter<int>);
 
        /// <summary>
        /// This is a convenience method that extracts a single slot value's vector,
        /// while simultaneously verifying that there is exactly one value.
        /// </summary>
        public static void GetSingleSlotValue<T>(this ITransposeDataView view, int col, ref VBuffer<T> dst)
        {
            Contracts.CheckValue(view, nameof(view));
            Contracts.CheckParam(0 <= col && col < view.Schema.Count, nameof(col));
            using (var cursor = view.GetSlotCursor(col))
            {
                var getter = cursor.GetGetter<T>();
                if (!cursor.MoveNext())
                    throw Contracts.Except("Could not get single value on column '{0}' because there are no slots", view.Schema[col].Name);
                getter(ref dst);
                if (cursor.MoveNext())
                    throw Contracts.Except("Could not get single value on column '{0}' because there is more than one slot", view.Schema[col].Name);
            }
        }
 
        /// <summary>
        /// The <see cref="SlotCursor.GetGetter{TValue}"/> is parameterized by a type that becomes the
        /// type parameter for a <see cref="VBuffer{T}"/>, and this is generally preferable and more
        /// sensible but for various reasons it's often a lot simpler to have a get-getter be over
        /// the actual type returned by the getter, that is, parameterize this by the actual
        /// <see cref="VBuffer{T}"/> type.
        /// </summary>
        /// <typeparam name="TValue">The type, must be a <see cref="VBuffer{T}"/> generic type,
        /// though enforcement of this has to be done only at runtime for practical reasons</typeparam>
        /// <param name="cursor">The cursor to get the getter for</param>
        /// <param name="ctx">The exception contxt</param>
        /// <returns>The value getter</returns>
        public static ValueGetter<TValue> GetGetterWithVectorType<TValue>(this SlotCursor cursor, IExceptionContext ctx = null)
        {
            Contracts.CheckValueOrNull(ctx);
            ctx.CheckValue(cursor, nameof(cursor));
            var type = typeof(TValue);
            if (!type.IsGenericEx(typeof(VBuffer<>)))
                throw ctx.Except("Invalid TValue: '{0}'", typeof(TValue));
            var genTypeArgs = type.GetGenericArguments();
            ctx.Assert(genTypeArgs.Length == 1);
 
            var getter = Utils.MarshalInvoke(_slotCursorGetGetterMethodInfo, cursor, genTypeArgs[0]) as ValueGetter<TValue>;
            if (getter == null)
                throw ctx.Except("Invalid TValue: '{0}'", typeof(TValue));
            return getter;
        }
 
        /// <summary>
        /// Given a slot cursor, construct a single-column equivalent row cursor, with the single column
        /// active and having the same type. This is useful to exploit the many utility methods that exist
        /// to handle <see cref="DataViewRowCursor"/> and <see cref="DataViewRow"/> but that know nothing about
        /// <see cref="SlotCursor"/>, without having to rewrite all of them. This is, however, rather
        /// something of a hack; whenever possible or reasonable the slot cursor should be used directly.
        /// The name of this column is always "Waffles".
        /// </summary>
        /// <param name="provider">The channel provider used in creating the wrapping row cursor</param>
        /// <param name="cursor">The slot cursor to wrap</param>
        /// <returns>A row cursor with a single active column with the same type as the slot type</returns>
        public static DataViewRowCursor GetRowCursorShim(IChannelProvider provider, SlotCursor cursor)
        {
            Contracts.CheckValue(provider, nameof(provider));
            provider.CheckValue(cursor, nameof(cursor));
 
            return Utils.MarshalInvoke(_getRowCursorShimCoreMethodInfo, cursor.GetSlotType().ItemType.RawType, provider, cursor);
        }
 
        private static DataViewRowCursor GetRowCursorShimCore<T>(IChannelProvider provider, SlotCursor cursor)
        {
            return new SlotRowCursorShim<T>(provider, cursor);
        }
 
        /// <summary>
        /// Presents a single transposed column as a single-column dataview.
        /// </summary>
        public sealed class SlotDataView : IDataView
        {
            private static readonly FuncInstanceMethodInfo1<SlotDataView, bool, DataViewRowCursor> _getRowCursorMethodInfo
                = FuncInstanceMethodInfo1<SlotDataView, bool, DataViewRowCursor>.Create(target => target.GetRowCursor<int>);
 
            private readonly IHost _host;
            private readonly ITransposeDataView _data;
            private readonly int _col;
            private readonly DataViewType _type;
 
            public DataViewSchema Schema { get; }
 
            public bool CanShuffle => false;
 
            public SlotDataView(IHostEnvironment env, ITransposeDataView data, int col)
            {
                Contracts.CheckValue(env, nameof(env));
                _host = env.Register("SlotDataView");
                _host.CheckValue(data, nameof(data));
                _host.CheckParam(0 <= col && col < data.Schema.Count, nameof(col));
                _type = data.GetSlotType(col);
                _host.AssertValue(_type);
 
                _data = data;
                _col = col;
 
                var builder = new DataViewSchema.Builder();
                builder.AddColumn(_data.Schema[_col].Name, _type);
                Schema = builder.ToSchema();
            }
 
            public long? GetRowCount()
            {
                var type = _data.Schema[_col].Type;
                int valueCount = type.GetValueCount();
                _host.Assert(valueCount > 0);
                return valueCount;
            }
 
            public DataViewRowCursor GetRowCursor(IEnumerable<DataViewSchema.Column> columnsNeeded, Random rand = null)
            {
                bool hasZero = columnsNeeded != null && columnsNeeded.Any(x => x.Index == 0);
                return Utils.MarshalInvoke(_getRowCursorMethodInfo, this, _type.GetItemType().RawType, hasZero);
            }
 
            private DataViewRowCursor GetRowCursor<T>(bool active)
            {
                return new Cursor<T>(this, active);
            }
 
            public DataViewRowCursor[] GetRowCursorSet(IEnumerable<DataViewSchema.Column> columnsNeeded, int n, Random rand = null)
            {
                return new DataViewRowCursor[] { GetRowCursor(columnsNeeded, rand) };
            }
 
            private sealed class Cursor<T> : RootCursorBase
            {
                private readonly SlotDataView _parent;
                private readonly SlotCursor _slotCursor;
                private readonly Delegate _getter;
 
                public override DataViewSchema Schema => _parent.Schema;
 
                public override long Batch => 0;
 
                public Cursor(SlotDataView parent, bool active)
                    : base(parent._host)
                {
                    _parent = parent;
                    _slotCursor = _parent._data.GetSlotCursor(parent._col);
                    if (active)
                        _getter = _slotCursor.GetGetter<T>();
                }
 
                /// <summary>
                /// Returns whether the given column is active in this row.
                /// </summary>
                public override bool IsColumnActive(DataViewSchema.Column column)
                {
                    Ch.CheckParam(column.Index == 0, nameof(column));
                    return _getter != null;
                }
 
                /// <summary>
                /// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
                /// This throws if the column is not active in this row, or if the type
                /// <typeparamref name="TValue"/> differs from this column's type.
                /// </summary>
                /// <typeparam name="TValue"> is the column's content type.</typeparam>
                /// <param name="column"> is the output column whose getter should be returned.</param>
                public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
                {
                    Ch.CheckParam(column.Index == 0, nameof(column));
                    Ch.CheckParam(_getter != null, nameof(column), "requested column not active");
 
                    var getter = _getter as ValueGetter<TValue>;
                    if (getter == null)
                        throw Ch.Except($"Invalid TValue: '{typeof(TValue)}', " +
                            $"expected type: '{_getter.GetType().GetGenericArguments().First()}'.");
                    return getter;
                }
 
                public override ValueGetter<DataViewRowId> GetIdGetter() => GetId;
 
                private void GetId(ref DataViewRowId id)
                {
                    Ch.Check(_slotCursor.SlotIndex >= 0, RowCursorUtils.FetchValueStateError);
                    id = new DataViewRowId((ulong)_slotCursor.SlotIndex, 0);
                }
 
                protected override bool MoveNextCore() => _slotCursor.MoveNext();
            }
        }
 
        // REVIEW: This shim class is very similar to the above shim class, except at the
        // cursor level, not the cursorable level. Is there some non-horrifying way to unify both, somehow?
        private sealed class SlotRowCursorShim<T> : RootCursorBase
        {
            private readonly SlotCursor _slotCursor;
 
            public override DataViewSchema Schema { get; }
 
            public override long Batch => 0;
 
            public SlotRowCursorShim(IChannelProvider provider, SlotCursor cursor)
                : base(provider)
            {
                Contracts.AssertValue(cursor);
 
                _slotCursor = cursor;
                var builder = new DataViewSchema.Builder();
                builder.AddColumn("Waffles", cursor.GetSlotType());
                Schema = builder.ToSchema();
            }
 
            /// <summary>
            /// Returns whether the given column is active in this row.
            /// </summary>
            public override bool IsColumnActive(DataViewSchema.Column column)
            {
                Ch.CheckParam(column.Index == 0, nameof(column));
                return true;
            }
 
            /// <summary>
            /// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
            /// This throws if the column is not active in this row, or if the type
            /// <typeparamref name="TValue"/> differs from this column's type.
            /// </summary>
            /// <typeparam name="TValue"> is the column's content type.</typeparam>
            /// <param name="column"> is the output column whose getter should be returned.</param>
            public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
            {
                Ch.CheckParam(column.Index == 0, nameof(column));
                return _slotCursor.GetGetterWithVectorType<TValue>(Ch);
            }
 
            public override ValueGetter<DataViewRowId> GetIdGetter() => GetId;
 
            private void GetId(ref DataViewRowId id)
            {
                Ch.Check(_slotCursor.SlotIndex >= 0, RowCursorUtils.FetchValueStateError);
                id = new DataViewRowId((ulong)_slotCursor.SlotIndex, 0);
            }
 
            protected override bool MoveNextCore() => _slotCursor.MoveNext();
        }
    }
}