|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ML.Data.Conversion;
using Microsoft.ML.Internal.Utilities;
using Microsoft.ML.Runtime;
namespace Microsoft.ML.Data
{
[BestFriend]
internal static class DataViewUtils
{
/// <summary>
/// Generate a unique temporary column name for the given schema.
/// Use tag to independently create multiple temporary, unique column
/// names for a single transform.
/// </summary>
public static string GetTempColumnName(this DataViewSchema schema, string tag = null)
{
Contracts.CheckValue(schema, nameof(schema));
int col;
if (!string.IsNullOrWhiteSpace(tag) && !schema.TryGetColumnIndex(tag, out col))
return tag;
for (int i = 0; ; i++)
{
string name = string.IsNullOrWhiteSpace(tag) ?
string.Format("temp_{0:000}", i) :
string.Format("temp_{0}_{1:000}", tag, i);
if (!schema.TryGetColumnIndex(name, out col))
return name;
}
}
/// <summary>
/// Generate n unique temporary column names for the given schema.
/// Use tag to independently create multiple temporary, unique column
/// names for a single transform.
/// </summary>
public static string[] GetTempColumnNames(this DataViewSchema schema, int n, string tag = null)
{
Contracts.CheckValue(schema, nameof(schema));
Contracts.Check(n > 0, "n");
var res = new string[n];
int j = 0;
for (int i = 0; i < n; i++)
{
for (; ; )
{
string name = string.IsNullOrWhiteSpace(tag) ?
string.Format("temp_{0:000}", j) :
string.Format("temp_{0}_{1:000}", tag, j);
j++;
int col;
if (!schema.TryGetColumnIndex(name, out col))
{
res[i] = name;
break;
}
}
}
return res;
}
/// <summary>
/// Get the row count from the input view by any means necessary, even explicit enumeration
/// and counting if <see cref="IDataView.GetRowCount"/> insists on returning <c>null</c>.
/// </summary>
public static long ComputeRowCount(IDataView view)
{
long? countNullable = view.GetRowCount();
if (countNullable != null)
return countNullable.Value;
long count = 0;
using (var cursor = view.GetRowCursor())
{
while (cursor.MoveNext())
count++;
}
return count;
}
/// <summary>
/// Get the target number of threads to use, given an indicator of thread count.
/// </summary>
public static int GetThreadCount(int num = 0, bool preferOne = false)
{
int conc = Math.Max(2, Environment.ProcessorCount - 1);
if (num > 0)
return Math.Min(num, 2 * conc);
if (preferOne)
return 1;
return conc;
}
/// <summary>
/// Try to create a cursor set from upstream and consolidate it here. The host determines
/// the target cardinality of the cursor set.
/// </summary>
public static bool TryCreateConsolidatingCursor(out DataViewRowCursor curs,
IDataView view, IEnumerable<DataViewSchema.Column> columnsNeeded, IHost host, Random rand)
{
Contracts.CheckValue(host, nameof(host));
host.CheckValue(view, nameof(view));
int cthd = GetThreadCount();
host.Assert(cthd > 0);
if (cthd == 1 || !AllCacheable(columnsNeeded))
{
curs = null;
return false;
}
var inputs = view.GetRowCursorSet(columnsNeeded, cthd, rand);
host.Check(Utils.Size(inputs) > 0);
if (inputs.Length == 1)
curs = inputs[0];
else
{
// We have a somewhat arbitrary batch size of about 64 for buffering results from the
// intermediate cursors, since that at least empirically for most datasets seems to
// strike a nice balance between a size large enough to benefit from parallelism but
// small enough so as to not be too onerous to keep in memory.
const int batchSize = 64;
curs = DataViewUtils.ConsolidateGeneric(host, inputs, batchSize);
}
return true;
}
/// <summary>
/// From the given input cursor, split it into a cursor set with the given
/// cardinality. If not all the active columns are cachable, this will only
/// produce the given input cursor.
/// </summary>
public static DataViewRowCursor[] CreateSplitCursors(IChannelProvider provider, DataViewRowCursor input, int num)
{
Contracts.CheckValue(provider, nameof(provider));
provider.CheckValue(input, nameof(input));
if (num <= 1)
return new DataViewRowCursor[1] { input };
// If any active columns are not cachable, we can't split.
if (!AllCacheable(input.Schema, input.IsColumnActive))
return new DataViewRowCursor[1] { input };
// REVIEW: Should we limit the cardinality to some reasonable size?
// REVIEW: Ideally a splitter should be owned by a data view
// we might split, so that we can share the cache pools among multiple
// cursors.
// REVIEW: Keep the utility method here, move this splitter stuff
// to some other file.
return Splitter.Split(provider, input.Schema, input, num);
}
/// <summary>
/// Return whether all the active columns, as determined by the predicate, are
/// cachable - either primitive types or vector types.
/// </summary>
public static bool AllCacheable(DataViewSchema schema, Func<DataViewSchema.Column, bool> predicate)
{
Contracts.CheckValue(schema, nameof(schema));
Contracts.CheckValue(predicate, nameof(predicate));
for (int col = 0; col < schema.Count; col++)
{
if (!predicate(schema[col]))
continue;
var type = schema[col].Type;
if (!IsCacheable(type))
return false;
}
return true;
}
/// <summary>
/// Return whether all the active columns, as determined by the predicate, are
/// cachable - either primitive types or vector types.
/// </summary>
public static bool AllCacheable(IEnumerable<DataViewSchema.Column> columnsNeeded)
{
Contracts.CheckValue(columnsNeeded, nameof(columnsNeeded));
if (columnsNeeded == null)
return false;
foreach (var col in columnsNeeded)
if (!IsCacheable(col.Type))
return false;
return true;
}
/// <summary>
/// Determine whether the given type is cachable - either a primitive type or a vector type.
/// </summary>
public static bool IsCacheable(this DataViewType type)
=> type != null && (type is PrimitiveDataViewType || type is VectorDataViewType);
/// <summary>
/// Tests whether the cursors are mutually compatible for consolidation,
/// that is, they all are non-null, have the same schemas, and the same
/// set of columns are active.
/// </summary>
public static bool SameSchemaAndActivity(DataViewRowCursor[] cursors)
{
// There must be something to actually consolidate.
if (Utils.Size(cursors) == 0)
return true;
var firstCursor = cursors[0];
if (firstCursor == null)
return false;
if (cursors.Length == 1)
return true;
var schema = firstCursor.Schema;
// All cursors must have the same schema.
for (int i = 1; i < cursors.Length; ++i)
{
if (cursors[i] == null || cursors[i].Schema != schema)
return false;
}
// All cursors must have the same columns active.
for (int c = 0; c < schema.Count; ++c)
{
bool active = firstCursor.IsColumnActive(schema[c]);
for (int i = 1; i < cursors.Length; ++i)
{
if (cursors[i].IsColumnActive(schema[c]) != active)
return false;
}
}
return true;
}
/// <summary>
/// Given a parallel cursor set, this consolidates them into a single cursor. The batchSize
/// is a hint used for efficiency.
/// </summary>
public static DataViewRowCursor ConsolidateGeneric(IChannelProvider provider, DataViewRowCursor[] inputs, int batchSize)
{
Contracts.CheckValue(provider, nameof(provider));
provider.CheckNonEmpty(inputs, nameof(inputs));
provider.CheckParam(batchSize >= 0, nameof(batchSize));
if (inputs.Length == 1)
return inputs[0];
object[] pools = null;
return Splitter.Consolidate(provider, inputs, batchSize, ref pools);
}
/// <summary>
/// A convenience class to facilitate the creation of a split, as well as a convenient
/// place to store shared resources that can be reused among multiple splits of a cursor
/// with the same schema. Since splitting also returns a consolidator, this also contains
/// a consolidating logic.
///
/// In a very rough sense, both the splitters and consolidators are written in the same way:
/// For all input cursors, and all active columns, an "in pipe" is created. A worker thread
/// per input cursor busily retrieves values from the cursors and stores them in the "in
/// pipe." At appropriate times, "batch" objects are synthesized from the inputs consumed
/// thusfar, and inserted into a blocking collection. The output cursor or cursors likewise
/// have a set of "out pipe" instances, one per each of the active columns, through which
/// successive batches are presented for consumption by the user of the output cursors. Of
/// course, both split and consolidate have many details from which they differ, for example, the
/// consolidator must accept batches as they come and reconcile them among multiple inputs,
/// while the splitter is more free.
///
/// It is ideal if a data view that could be split retains one of these objects itself,
/// so that multiple splittings will have the capability of sharing buffers from cursoring
/// to cursoring, but this is not required.
/// </summary>
private sealed class Splitter
{
private static readonly FuncStaticMethodInfo1<object[], int, object> _getPoolCoreMethodInfo
= new FuncStaticMethodInfo1<object[], int, object>(GetPoolCore<int>);
private static readonly FuncInstanceMethodInfo1<Splitter, DataViewRowCursor, int, InPipe> _createInPipeMethodInfo
= FuncInstanceMethodInfo1<Splitter, DataViewRowCursor, int, InPipe>.Create(target => target.CreateInPipe<int>);
private readonly DataViewSchema _schema;
private readonly object[] _cachePools;
/// <summary>
/// Pipes, in addition to column values, will also communicate extra information
/// enumerated within this. This enum serves the purpose of providing nice readable
/// indices to these "extra" information in pipes.
/// </summary>
private enum ExtraIndex
{
Id,
#pragma warning disable MSML_GeneralName // Allow for this private enum.
_Lim
#pragma warning restore MSML_GeneralName
}
private Splitter(DataViewSchema schema)
{
Contracts.AssertValue(schema);
_schema = schema;
_cachePools = new object[_schema.Count + (int)ExtraIndex._Lim];
}
public static DataViewRowCursor Consolidate(IChannelProvider provider, DataViewRowCursor[] inputs, int batchSize, ref object[] ourPools)
{
Contracts.AssertValue(provider);
using (var ch = provider.Start("Consolidate"))
{
return ConsolidateCore(provider, inputs, ref ourPools, ch);
}
}
private static DataViewRowCursor ConsolidateCore(IChannelProvider provider, DataViewRowCursor[] inputs, ref object[] ourPools, IChannel ch)
{
ch.CheckNonEmpty(inputs, nameof(inputs));
if (inputs.Length == 1)
return inputs[0];
ch.CheckParam(SameSchemaAndActivity(inputs), nameof(inputs), "Inputs not compatible for consolidation");
DataViewRowCursor cursor = inputs[0];
var schema = cursor.Schema;
ch.CheckParam(AllCacheable(schema, cursor.IsColumnActive), nameof(inputs), "Inputs had some uncachable input columns");
int[] activeToCol;
int[] colToActive;
Utils.BuildSubsetMaps(schema, cursor.IsColumnActive, out activeToCol, out colToActive);
// Because the schema of the consolidator is not necessary fixed, we are merely
// opportunistic about buffer sharing, from cursoring to cursoring. If we can do
// it easily, great, if not, no big deal.
if (Utils.Size(ourPools) != schema.Count)
ourPools = new object[schema.Count + (int)ExtraIndex._Lim];
// Create the out pipes.
OutPipe[] outPipes = new OutPipe[activeToCol.Length + (int)ExtraIndex._Lim];
for (int i = 0; i < activeToCol.Length; ++i)
{
int c = activeToCol[i];
DataViewType type = schema[c].Type;
var pool = GetPool(type, ourPools, c);
outPipes[i] = OutPipe.Create(type, pool);
}
int idIdx = activeToCol.Length + (int)ExtraIndex.Id;
outPipes[idIdx] = OutPipe.Create(RowIdDataViewType.Instance, GetPool(RowIdDataViewType.Instance, ourPools, idIdx));
// Create the structures to synchronize between the workers and the consumer.
const int toConsumeBound = 4;
var toConsume = new BlockingCollection<Batch>(toConsumeBound);
var batchColumnPool = new MadeObjectPool<BatchColumn[]>(() => new BatchColumn[outPipes.Length]);
Task[] workers = new Task[inputs.Length];
MinWaiter waiter = new MinWaiter(workers.Length);
bool done = false;
for (int t = 0; t < workers.Length; ++t)
{
var localCursor = inputs[t];
ch.Assert(localCursor.Position < 0);
// Note that these all take ownership of their respective cursors,
// so they all handle their disposal internal to the thread.
workers[t] = Utils.RunOnBackgroundThreadAsync(() =>
{
// This will be the last batch sent in the finally. If iteration proceeds without
// error, it will remain null, and be sent as a sentinel. If iteration results in
// an exception that we catch, the exception catching block will set this to an
// exception bearing block, and that will be passed along as the last block instead.
Batch lastBatch = null;
try
{
using (localCursor)
{
InPipe[] inPipes = new InPipe[outPipes.Length];
for (int i = 0; i < activeToCol.Length; ++i)
inPipes[i] = outPipes[i].CreateInPipe(RowCursorUtils.GetGetterAsDelegate(localCursor, activeToCol[i]));
inPipes[idIdx] = outPipes[idIdx].CreateInPipe(localCursor.GetIdGetter());
long oldBatch = 0;
int count = 0;
// This event is used to synchronize ourselves using a MinWaiter
// so that we add batches to the consumer queue at the appropriate time.
ManualResetEventSlim waiterEvent = null;
Action pushBatch = () =>
{
if (count > 0)
{
var batchColumns = batchColumnPool.Get();
for (int i = 0; i < inPipes.Length; ++i)
batchColumns[i] = inPipes[i].GetBatchColumnAndReset();
// REVIEW: Is it worth not allocating new Batch object for each batch?
var batch = new Batch(batchColumnPool, batchColumns, count, oldBatch);
count = 0;
// The waiter event should never be null since this is only
// called after a point where waiter.Register has been called.
ch.AssertValue(waiterEvent);
waiterEvent.Wait();
waiterEvent = null;
toConsume.Add(batch);
}
};
// Handle the first one separately, then go into the main loop.
if (localCursor.MoveNext() && !done)
{
oldBatch = localCursor.Batch;
foreach (var pipe in inPipes)
pipe.Fill();
count++;
// Register with the min waiter that we want to wait on this batch number.
waiterEvent = waiter.Register(oldBatch);
while (localCursor.MoveNext() && !done)
{
if (oldBatch != localCursor.Batch)
{
ch.Assert(count == 0 || localCursor.Batch > oldBatch);
pushBatch();
oldBatch = localCursor.Batch;
waiterEvent = waiter.Register(oldBatch);
}
foreach (var pipe in inPipes)
pipe.Fill();
count++;
}
pushBatch();
}
}
}
catch (Exception ex)
{
// Whoops, we won't be sending null as the sentinel now.
lastBatch = new Batch(ex);
toConsume.Add(new Batch(ex));
}
finally
{
if (waiter.Retire() == 0)
{
if (lastBatch == null)
{
// If it wasn't null, this already sent along an exception bearing batch, in which
// case sending the sentinel is unnecessary and unhelpful.
toConsume.Add(null);
}
toConsume.CompleteAdding();
}
}
});
}
Action quitAction = () =>
{
done = true;
var myOutPipes = outPipes;
foreach (var batch in toConsume.GetConsumingEnumerable())
{
if (batch == null)
continue;
batch.SetAll(myOutPipes);
foreach (var outPipe in myOutPipes)
outPipe.Unset();
}
Task.WaitAll(workers);
};
return new Cursor(provider, schema, activeToCol, colToActive, outPipes, toConsume, quitAction);
}
private static object GetPool(DataViewType type, object[] pools, int poolIdx)
{
return Utils.MarshalInvoke(_getPoolCoreMethodInfo, type.RawType, pools, poolIdx);
}
private static MadeObjectPool<T[]> GetPoolCore<T>(object[] pools, int poolIdx)
{
var pool = pools[poolIdx] as MadeObjectPool<T[]>;
if (pool == null)
pools[poolIdx] = pool = new MadeObjectPool<T[]>(() => null);
return pool;
}
public static DataViewRowCursor[] Split(IChannelProvider provider, DataViewSchema schema, DataViewRowCursor input, int cthd)
{
Contracts.AssertValue(provider, "provider");
var splitter = new Splitter(schema);
using (var ch = provider.Start("CursorSplitter"))
{
var result = splitter.SplitCore(provider, input, cthd);
return result;
}
}
private DataViewRowCursor[] SplitCore(IChannelProvider ch, DataViewRowCursor input, int cthd)
{
Contracts.AssertValue(ch);
ch.AssertValue(input);
ch.Assert(input.Schema == _schema);
ch.Assert(cthd >= 2);
ch.Assert(AllCacheable(_schema, input.IsColumnActive));
// REVIEW: Should the following be configurable?
// How would we even expose these sorts of parameters to a user?
const int maxBatchCount = 128;
const int toConsumeBound = 4;
// Create the mappings between active column index, and column index.
int[] activeToCol;
int[] colToActive;
Utils.BuildSubsetMaps(_schema, input.IsColumnActive, out activeToCol, out colToActive);
// Only one set of in-pipes, one per column, as well as for extra side information.
InPipe[] inPipes = new InPipe[activeToCol.Length + (int)ExtraIndex._Lim];
// There are as many sets of out pipes as there are output cursors.
OutPipe[][] outPipes = new OutPipe[cthd][];
for (int i = 0; i < cthd; ++i)
outPipes[i] = new OutPipe[inPipes.Length];
// For each column, create the InPipe, and all OutPipes per output cursor.
for (int c = 0; c < activeToCol.Length; ++c)
{
ch.Assert(0 <= activeToCol[c] && activeToCol[c] < _schema.Count);
ch.Assert(c == 0 || activeToCol[c - 1] < activeToCol[c]);
var column = input.Schema[activeToCol[c]];
ch.Assert(input.IsColumnActive(column));
ch.Assert(column.Type.IsCacheable());
var inPipe = inPipes[c] =
Utils.MarshalInvoke(_createInPipeMethodInfo, this, column.Type.RawType, input, activeToCol[c]);
for (int i = 0; i < cthd; ++i)
outPipes[i][c] = inPipe.CreateOutPipe(column.Type);
}
// Beyond the InPipes corresponding to column values, we have extra side info pipes.
int idIdx = activeToCol.Length + (int)ExtraIndex.Id;
inPipes[idIdx] = CreateIdInPipe(input);
for (int i = 0; i < cthd; ++i)
outPipes[i][idIdx] = inPipes[idIdx].CreateOutPipe(RowIdDataViewType.Instance);
var toConsume = new BlockingCollection<Batch>(toConsumeBound);
var batchColumnPool = new MadeObjectPool<BatchColumn[]>(() => new BatchColumn[inPipes.Length]);
bool done = false;
int outputsRunning = cthd;
// Set up and start the thread that consumes the input, and utilizes the InPipe
// instances to compose the Batch objects. The thread takes ownership of the
// cursor, and so handles its disposal.
Task thread = Utils.RunOnBackgroundThreadAsync(
() =>
{
Batch lastBatch = null;
try
{
using (input)
{
long batchId = 0;
int count = 0;
Action pushBatch = () =>
{
var batchColumns = batchColumnPool.Get();
for (int c = 0; c < inPipes.Length; ++c)
batchColumns[c] = inPipes[c].GetBatchColumnAndReset();
// REVIEW: Is it worth not allocating new Batch object for each batch?
var batch = new Batch(batchColumnPool, batchColumns, count, batchId++);
count = 0;
toConsume.Add(batch);
};
while (input.MoveNext() && !done)
{
foreach (var pipe in inPipes)
pipe.Fill();
if (++count >= maxBatchCount)
pushBatch();
}
if (count > 0)
pushBatch();
}
}
catch (Exception ex)
{
lastBatch = new Batch(ex);
}
finally
{
// The last batch might be an exception, as in the above case. We pass along the exception
// bearing batch as the first of the last batches, so that the first worker to encounter this
// will know to throw. The remaining get the regular "stop working" null sentinel.
toConsume.Add(lastBatch);
for (int i = 1; i < cthd; ++i)
toConsume.Add(null);
toConsume.CompleteAdding();
}
});
Action quitAction = () =>
{
int remaining = Interlocked.Decrement(ref outputsRunning);
ch.Assert(remaining >= 0);
if (remaining == 0)
{
done = true;
// A relatively quick and convenient way to dispose of batches, is to use
// set/unset on some output pipes repeatedly. Since all have been disposed,
// we may as well use the first set of output pipes.
var myOutPipes = outPipes[0];
foreach (var batch in toConsume.GetConsumingEnumerable())
{
if (batch == null)
continue;
batch.SetAll(myOutPipes);
foreach (var outPipe in myOutPipes)
outPipe.Unset();
}
thread.Wait();
}
};
var cursors = new Cursor[cthd];
for (int i = 0; i < cthd; ++i)
cursors[i] = new Cursor(ch, _schema, activeToCol, colToActive, outPipes[i], toConsume, quitAction);
return cursors;
}
/// <summary>
/// An in pipe creator intended to be used from the splitter only.
/// </summary>
private InPipe CreateInPipe<T>(DataViewRow input, int col)
{
Contracts.AssertValue(input);
Contracts.Assert(0 <= col && col < _schema.Count);
return CreateInPipeCore(col, input.GetGetter<T>(_schema[col]));
}
/// <summary>
/// An in pipe creator intended to be used from the splitter only.
/// </summary>
private InPipe CreateIdInPipe(DataViewRow input)
{
Contracts.AssertValue(input);
return CreateInPipeCore(_schema.Count + (int)ExtraIndex.Id, input.GetIdGetter());
}
private InPipe CreateInPipeCore<T>(int poolIdx, ValueGetter<T> getter)
{
Contracts.Assert(0 <= poolIdx && poolIdx < _cachePools.Length);
Contracts.AssertValue(getter);
var pool = (MadeObjectPool<T[]>)_cachePools[poolIdx];
if (pool == null)
{
// REVIEW: If we changed our InPipe behavior so that it only worked over a maximum size
// in all scenarios, both during splitting and consolidating, it would be possible to set this
// to be of fixed size so we don't have to do reallocation.
Interlocked.CompareExchange(ref _cachePools[poolIdx], new MadeObjectPool<T[]>(() => null), null);
pool = (MadeObjectPool<T[]>)_cachePools[poolIdx];
}
return InPipe.Create(pool, getter);
}
/// <summary>
/// There is one of these created per input cursor, per "channel" of information
/// (necessary channels include values from active columns, as well as additional
/// side information), in both splitting and consolidating. This is a running buffer
/// of the input cursor's values. It is used to create <see cref="BatchColumn"/> objects.
/// </summary>
private abstract class InPipe
{
public int Count { get; protected set; }
private InPipe()
{
}
public abstract void Fill();
public abstract BatchColumn GetBatchColumnAndReset();
public static InPipe Create<T>(MadeObjectPool<T[]> pool, ValueGetter<T> getter)
{
return new Impl<T>(pool, getter);
}
/// <summary>
/// Creates an out pipe corresponding to the in pipe. This is useful for the splitter,
/// when we are creating an in pipe.
/// </summary>
public abstract OutPipe CreateOutPipe(DataViewType type);
private sealed class Impl<T> : InPipe
{
private readonly MadeObjectPool<T[]> _pool;
private readonly ValueGetter<T> _getter;
private T[] _values;
public Impl(MadeObjectPool<T[]> pool, ValueGetter<T> getter)
{
Contracts.AssertValue(pool);
Contracts.AssertValue(getter);
_pool = pool;
_getter = getter;
_values = _pool.Get();
Contracts.AssertValueOrNull(_values);
}
public override void Fill()
{
Utils.EnsureSize(ref _values, Count + 1, keepOld: true);
_getter(ref _values[Count++]);
}
public override BatchColumn GetBatchColumnAndReset()
{
// REVIEW: Is it worth avoiding an allocation of these new BatchColumn objects?
var retval = new BatchColumn.Impl<T>(_values, Count);
_values = _pool.Get();
Count = 0;
return retval;
}
public override OutPipe CreateOutPipe(DataViewType type)
{
Contracts.AssertValue(type);
Contracts.Assert(typeof(T) == type.RawType);
return OutPipe.Create(type, _pool);
}
}
}
/// <summary>
/// These are objects continuously created by the <see cref="InPipe"/> to spin off the
/// values they have collected. They are collected into a <see cref="Batch"/>
/// object, and eventually one is consumed by an <see cref="OutPipe"/> instance.
/// </summary>
private abstract class BatchColumn
{
public readonly int Count;
private BatchColumn(int count)
{
Contracts.Assert(count > 0);
Count = count;
}
public sealed class Impl<T> : BatchColumn
{
public readonly T[] Values;
public Impl(T[] values, int count)
: base(count)
{
Contracts.Assert(Utils.Size(values) >= count);
Values = values;
}
}
}
/// <summary>
/// This holds a collection of <see cref="BatchColumn"/> objects, which together hold all
/// the values from a set of rows from the input cursor. These are produced as needed
/// by the input cursor reader, and consumed by each of the output cursors.
///
/// This class also serves a secondary role in marshalling exceptions thrown in the workers
/// producing batches, into the threads consuming these batches.
/// <see cref="HasException"/> lets us know if this is one of these "special" batches.
/// If it is, then the <see cref="SetAll"/> method will throw whenever it is called, by the
/// consumer of the batches.
/// </summary>
private sealed class Batch
{
private readonly MadeObjectPool<BatchColumn[]> _pool;
private readonly BatchColumn[] _batchColumns;
public readonly int Count;
public readonly long BatchId;
private readonly Exception _ex;
public bool HasException { get { return _ex != null; } }
/// <summary>
/// Construct a batch object to communicate the <see cref="BatchColumn"/> objects to consumers.
/// </summary>
public Batch(MadeObjectPool<BatchColumn[]> pool, BatchColumn[] batchColumns, int count, long batchId)
{
Contracts.AssertValue(pool);
Contracts.AssertValue(batchColumns);
Contracts.Assert(count > 0);
Contracts.Assert(batchId >= 0);
Contracts.Assert(batchColumns.All(bc => bc.Count == count));
_pool = pool;
_batchColumns = batchColumns;
Count = count;
BatchId = batchId;
Contracts.Assert(!HasException);
}
/// <summary>
/// Construct a batch object to communicate that something went wrong. In this case all other fields
/// will have default values.
/// </summary>
public Batch(Exception ex)
{
Contracts.AssertValue(ex);
_ex = ex;
Contracts.Assert(HasException);
}
/// <summary>
/// Gives all of the batch columns to the output pipes. This should be called only once,
/// per batch object, because the the batch columns will be returned to the pool.
///
/// If this was an exception bearing batch, that exception will be propagated and thrown
/// in this.
/// </summary>
public void SetAll(OutPipe[] pipes)
{
if (_ex != null)
throw Contracts.Except(_ex, "Splitter/consolidator worker encountered exception while consuming source data");
Contracts.Assert(Utils.Size(pipes) == _batchColumns.Length);
for (int p = 0; p < _batchColumns.Length; ++p)
{
pipes[p].Set(_batchColumns[p]);
_batchColumns[p] = null;
}
_pool.Return(_batchColumns);
}
}
/// <summary>
/// This helps a cursor present the results of a <see cref="BatchColumn"/>. Practically its role
/// really is to just provide a stable delegate for the <see cref="DataViewRow.GetGetter{T}(DataViewSchema.Column)"/>.
/// There is one of these created per column, per output cursor, i.e., in splitting
/// there are <c>n</c> of these created per column, and when consolidating only one of these
/// is created per column.
/// </summary>
private abstract class OutPipe
{
private int _count;
private int _index;
public int Remaining => _count - _index;
private OutPipe()
{
}
public static OutPipe Create(DataViewType type, object pool)
{
Contracts.AssertValue(type);
Contracts.AssertValue(pool);
Type pipeType;
if (type is VectorDataViewType vectorType)
pipeType = typeof(ImplVec<>).MakeGenericType(vectorType.ItemType.RawType);
else
{
Contracts.Assert(type is PrimitiveDataViewType);
pipeType = typeof(ImplOne<>).MakeGenericType(type.RawType);
}
var constructor = pipeType.GetConstructor(new Type[] { typeof(object) });
return (OutPipe)constructor.Invoke(new object[] { pool });
}
/// <summary>
/// Creates an in pipe corresponding to this out pipe. Useful for the consolidator,
/// when we are creating many in pipes from a single out pipe.
/// </summary>
public abstract InPipe CreateInPipe(Delegate getter);
/// <summary>
/// Sets this <see cref="OutPipe"/> to start presenting the output of a batch column.
/// Note that this positions the output on the first item, not before the first item,
/// so it is not necessary to call <see cref="MoveNext"/> to get the first value.
/// </summary>
/// <param name="batchCol">The batch column whose values we should start presenting.</param>
public abstract void Set(BatchColumn batchCol);
public abstract void Unset();
public abstract Delegate GetGetter();
/// <summary>
/// Moves to the next value. Note that this should be called only when we are certain that
/// we have a next value to move to, that is, when <see cref="Remaining"/> is non-zero.
/// </summary>
public void MoveNext()
{
Contracts.Assert(_index < _count);
++_index;
}
private abstract class Impl<T> : OutPipe
{
private readonly MadeObjectPool<T[]> _pool;
protected T[] Values;
public Impl(object pool)
{
Contracts.Assert(pool is MadeObjectPool<T[]>);
_pool = (MadeObjectPool<T[]>)pool;
}
public override InPipe CreateInPipe(Delegate getter)
{
Contracts.AssertValue(getter);
Contracts.Assert(getter is ValueGetter<T>);
return InPipe.Create<T>(_pool, (ValueGetter<T>)getter);
}
public override void Set(BatchColumn batchCol)
{
Contracts.AssertValue(batchCol);
Contracts.Assert(batchCol is BatchColumn.Impl<T>);
// In all possible scenarios, there is never cause for an output pipe
// to end early while the cursor itself has more rows, I believe, except
// if we at some point decide to optimize move many.
Contracts.Assert(_count == 0 || (_index == _count - 1));
// REVIEW: This sort of loose typing makes me angry. Roar!
var batchColTyped = (BatchColumn.Impl<T>)batchCol;
if (Values != null)
_pool.Return(Values);
Values = batchColTyped.Values;
_count = batchColTyped.Count;
_index = 0;
Contracts.Assert(_count <= Utils.Size(Values));
}
public override void Unset()
{
Contracts.Assert(_index <= _count);
// Remove all the objects from the pool
// to free up references to those objects
while (_pool.Count > 0)
_pool.Get();
Values = null;
_count = 0;
_index = 0;
}
public override Delegate GetGetter()
{
ValueGetter<T> getter = Getter;
return getter;
}
protected abstract void Getter(ref T value);
}
private sealed class ImplVec<T> : Impl<VBuffer<T>>
{
public ImplVec(object pool)
: base(pool)
{
}
protected override void Getter(ref VBuffer<T> value)
{
Contracts.Check(_index < _count, "Cannot get value as the cursor is not in a good state");
Values[_index].CopyTo(ref value);
}
}
private sealed class ImplOne<T> : Impl<T>
{
public ImplOne(object pool)
: base(pool)
{
}
protected override void Getter(ref T value)
{
Contracts.Check(_index < _count, "Cannot get value as the cursor is not in a good state");
value = Values[_index];
}
}
}
/// <summary>
/// A cursor used by both the splitter and consolidator, that iteratively consumes
/// <see cref="Batch"/> objects from the input blocking collection, and yields the
/// values stored therein through the help of <see cref="OutPipe"/> objects.
/// </summary>
private sealed class Cursor : RootCursorBase
{
private readonly DataViewSchema _schema;
private readonly int[] _activeToCol;
private readonly int[] _colToActive;
private readonly OutPipe[] _pipes;
private readonly Delegate[] _getters;
private readonly ValueGetter<DataViewRowId> _idGetter;
private readonly BlockingCollection<Batch> _batchInputs;
private readonly Action _quitAction;
private int _remaining;
private long _batch;
private bool _disposed;
public override DataViewSchema Schema => _schema;
public override long Batch => _batch;
/// <summary>
/// Constructs one of the split cursors.
/// </summary>
/// <param name="provider">The channel provider.</param>
/// <param name="schema">The schema.</param>
/// <param name="activeToCol">The mapping from active indices, to input column indices.</param>
/// <param name="colToActive">The reverse mapping from input column indices to active indices,
/// where -1 is present if this column is not active.</param>
/// <param name="pipes">The output pipes, one per channel of information</param>
/// <param name="batchInputs"></param>
/// <param name="quitAction"></param>
public Cursor(IChannelProvider provider, DataViewSchema schema, int[] activeToCol, int[] colToActive,
OutPipe[] pipes, BlockingCollection<Batch> batchInputs, Action quitAction)
: base(provider)
{
Ch.AssertValue(schema);
Ch.AssertValue(activeToCol);
Ch.AssertValue(colToActive);
Ch.AssertValue(pipes);
Ch.AssertValue(batchInputs);
Ch.AssertValueOrNull(quitAction);
Ch.Assert(colToActive.Length == schema.Count);
Ch.Assert(activeToCol.Length + (int)ExtraIndex._Lim == pipes.Length);
Ch.Assert(pipes.All(p => p != null));
// Could also confirm the inverse mappiness of activeToCol/colToActive, but that seems like a bit much.
_schema = schema;
_activeToCol = activeToCol;
_colToActive = colToActive;
_pipes = pipes;
_getters = new Delegate[pipes.Length];
for (int i = 0; i < activeToCol.Length; ++i)
_getters[i] = _pipes[i].GetGetter();
_idGetter = (ValueGetter<DataViewRowId>)_pipes[activeToCol.Length + (int)ExtraIndex.Id].GetGetter();
_batchInputs = batchInputs;
_batch = -1;
_quitAction = quitAction;
}
protected override void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
foreach (var pipe in _pipes)
pipe.Unset();
_quitAction?.Invoke();
}
_disposed = true;
base.Dispose(disposing);
}
public override ValueGetter<DataViewRowId> GetIdGetter() => _idGetter;
protected override bool MoveNextCore()
{
Ch.Assert(!_disposed);
if (--_remaining > 0)
{
// We are still consuming the current output pipes.
foreach (var pipe in _pipes)
{
pipe.MoveNext();
Ch.Assert(pipe.Remaining == _remaining);
}
}
else
{
// We are done with the current output pipe or we are just getting started.
// REVIEW: Before testing I had a solution based on consuming enumerables, but the
// consuming enumerable did not do the "right thing," consistent with how it is documented,
// or indeed consistent with how I've ever seen it work in the past. All but one of the
// consuming enumerables would exit immediately, despite the underlying collection not
// being add completed? The below "Take" based mechanism with a sentinel does work, but I
// find the fact that the first solution did not work very troubling.
var nextBatch = _batchInputs.Take();
if (nextBatch == null)
return false;
Ch.Assert(nextBatch.HasException || nextBatch.BatchId > Batch);
Ch.Assert(nextBatch.HasException || nextBatch.Count > 0);
_batch = nextBatch.BatchId;
_remaining = nextBatch.Count;
// Note that setting an out pipe sets it on the first item, not before it, so it is not
// necessary to move the pipe.
nextBatch.SetAll(_pipes);
Ch.Assert(!nextBatch.HasException);
}
return true;
}
/// <summary>
/// Returns whether the given column is active in this row.
/// </summary>
public override bool IsColumnActive(DataViewSchema.Column column)
{
Ch.CheckParam(column.Index < _colToActive.Length, nameof(column));
return _colToActive[column.Index] >= 0;
}
/// <summary>
/// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
/// This throws if the column is not active in this row, or if the type
/// <typeparamref name="TValue"/> differs from this column's type.
/// </summary>
/// <typeparam name="TValue"> is the column's content type.</typeparam>
/// <param name="column"> is the output column whose getter should be returned.</param>
public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
{
Ch.CheckParam(IsColumnActive(column), nameof(column), "requested column not active.");
Ch.CheckParam(column.Index < _colToActive.Length, nameof(column), "requested column is not active or valid for the Schema.");
var originGetter = _getters[_colToActive[column.Index]];
var getter = originGetter as ValueGetter<TValue>;
if (getter == null)
throw Ch.Except($"Invalid TValue: '{typeof(TValue)}', " +
$"expected type: '{originGetter.GetType().GetGenericArguments().First()}'.");
return getter;
}
}
}
/// <summary>
/// This is a consolidating cursor that is usable even with cursors that are uncachable,
/// at the cost of being totally synchronous, that is, there is no parallel benefit from
/// having split the input cursors.
/// </summary>
internal sealed class SynchronousConsolidatingCursor : RootCursorBase
{
private static readonly FuncInstanceMethodInfo1<SynchronousConsolidatingCursor, int, Delegate> _createGetterMethodInfo
= FuncInstanceMethodInfo1<SynchronousConsolidatingCursor, int, Delegate>.Create(target => target.CreateGetter<int>);
private readonly DataViewRowCursor[] _cursors;
private readonly Delegate[] _getters;
private readonly DataViewSchema _schema;
private readonly Heap<CursorStats> _mins;
private readonly int[] _activeToCol;
private readonly int[] _colToActive;
// The batch number of the current input cursor, or -1 if this cursor is not in Good state.
private long _batch;
// Index into _cursors array pointing to the current cursor, or -1 if this cursor is not in Good state.
private int _icursor;
// If this cursor is in Good state then this should equal _cursors[_icursor], else null.
private DataViewRowCursor _currentCursor;
private bool _disposed;
private readonly struct CursorStats
{
public readonly long Batch;
public readonly int CursorIdx;
public CursorStats(long batch, int idx)
{
Batch = batch;
CursorIdx = idx;
}
}
// REVIEW: It is not *really* necessary that we actually pass along the
// input batch as our own batch. Should we suppress it?
public override long Batch { get { return _batch; } }
public override DataViewSchema Schema => _schema;
public SynchronousConsolidatingCursor(IChannelProvider provider, DataViewRowCursor[] cursors)
: base(provider)
{
Ch.CheckNonEmpty(cursors, nameof(cursors));
_cursors = cursors;
_schema = _cursors[0].Schema;
Utils.BuildSubsetMaps(_schema, _cursors[0].IsColumnActive, out _activeToCol, out _colToActive);
_getters = new Delegate[_activeToCol.Length];
for (int i = 0; i < _activeToCol.Length; ++i)
_getters[i] = CreateGetter(_activeToCol[i]);
_icursor = -1;
_batch = -1;
_mins = new Heap<CursorStats>((s1, s2) => s1.Batch > s2.Batch);
InitHeap();
}
private void InitHeap()
{
for (int i = 0; i < _cursors.Length; ++i)
{
DataViewRowCursor cursor = _cursors[i];
Ch.Assert(cursor.Position < 0);
if (cursor.MoveNext())
_mins.Add(new CursorStats(cursor.Batch, i));
}
}
protected override void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
_batch = -1;
_icursor = -1;
_currentCursor = null;
foreach (var cursor in _cursors)
cursor.Dispose();
}
_disposed = true;
base.Dispose(disposing);
}
public override ValueGetter<DataViewRowId> GetIdGetter()
{
ValueGetter<DataViewRowId>[] idGetters = new ValueGetter<DataViewRowId>[_cursors.Length];
for (int i = 0; i < _cursors.Length; ++i)
idGetters[i] = _cursors[i].GetIdGetter();
return
(ref DataViewRowId val) =>
{
Ch.Check(_icursor >= 0, RowCursorUtils.FetchValueStateError);
idGetters[_icursor](ref val);
};
}
private Delegate CreateGetter(int col)
{
return Utils.MarshalInvoke(_createGetterMethodInfo, this, Schema[col].Type.RawType, col);
}
private Delegate CreateGetter<T>(int col)
{
ValueGetter<T>[] getters = new ValueGetter<T>[_cursors.Length];
var type = Schema[col].Type;
for (int i = 0; i < _cursors.Length; ++i)
{
var cursor = _cursors[i];
Ch.AssertValue(cursor);
Ch.Assert(col < cursor.Schema.Count);
Ch.Assert(cursor.IsColumnActive(Schema[col]));
Ch.Assert(type.Equals(cursor.Schema[col].Type));
getters[i] = _cursors[i].GetGetter<T>(cursor.Schema[col]);
}
ValueGetter<T> mine =
(ref T value) =>
{
Ch.Check(_icursor >= 0, RowCursorUtils.FetchValueStateError);
getters[_icursor](ref value);
};
return mine;
}
protected override bool MoveNextCore()
{
Ch.Assert(!_disposed);
if (Position >= 0 && _currentCursor.MoveNext())
{
// If we're still in this batch, no need to do anything, yet.
if (_currentCursor.Batch == _batch)
return true;
// We've run past the end of our batch, but not past the end of our
// cursor. Put this cursor back into the heap, and prepare to select
// a new minimum batch cursor.
Ch.Assert(_currentCursor.Batch > _batch);
_mins.Add(new CursorStats(_currentCursor.Batch, _icursor));
}
// This will happen if none of the cursors have any output rows left.
if (_mins.Count == 0)
return false;
// This is either the first call, or a time when we've run past the end of
// some batch with some cursors with more rows. Because we only know the
// batch ID once we've moved into a row, we do not need to, at this time.
var stats = _mins.Pop();
Ch.Assert(Position < 0 || stats.Batch > _batch);
_icursor = stats.CursorIdx;
_currentCursor = _cursors[stats.CursorIdx];
_batch = _currentCursor.Batch;
Ch.Assert(stats.Batch == _currentCursor.Batch);
return true;
}
/// <summary>
/// Returns whether the given column is active in this row.
/// </summary>
public override bool IsColumnActive(DataViewSchema.Column column)
{
Ch.CheckParam(column.Index < _colToActive.Length, nameof(column));
return _colToActive[column.Index] >= 0;
}
/// <summary>
/// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
/// This throws if the column is not active in this row, or if the type
/// <typeparamref name="TValue"/> differs from this column's type.
/// </summary>
/// <typeparam name="TValue"> is the column's content type.</typeparam>
/// <param name="column"> is the output column whose getter should be returned.</param>
public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
{
Ch.CheckParam(IsColumnActive(column), nameof(column), "requested column not active");
Ch.CheckParam(column.Index < _colToActive.Length, nameof(column), "requested column not active or is invalid for the schema. ");
var originGetter = _getters[_colToActive[column.Index]];
var getter = originGetter as ValueGetter<TValue>;
if (getter == null)
throw Ch.Except($"Invalid TValue: '{typeof(TValue)}', " +
$"expected type: '{originGetter.GetType().GetGenericArguments().First()}'.");
return getter;
}
}
public static ValueGetter<ReadOnlyMemory<char>>[] PopulateGetterArray(DataViewRowCursor cursor, List<int> colIndices)
{
var n = colIndices.Count;
var getters = new ValueGetter<ReadOnlyMemory<char>>[n];
for (int i = 0; i < n; i++)
{
ValueGetter<ReadOnlyMemory<char>> getter;
var srcColIndex = colIndices[i];
var colType = cursor.Schema[srcColIndex].Type;
if (colType is VectorDataViewType vectorType)
{
getter = Utils.MarshalInvoke(GetVectorFlatteningGetter<int>, vectorType.ItemType.RawType,
cursor, srcColIndex, vectorType.ItemType);
}
else
{
getter = Utils.MarshalInvoke(GetSingleValueGetter<int>, colType.RawType,
cursor, srcColIndex, colType);
}
getters[i] = getter;
}
return getters;
}
public static ValueGetter<ReadOnlyMemory<char>> GetSingleValueGetter<T>(DataViewRow cursor, int i, DataViewType colType)
{
var floatGetter = cursor.GetGetter<T>(cursor.Schema[i]);
T v = default(T);
ValueMapper<T, StringBuilder> conversion;
if (!Conversions.DefaultInstance.TryGetStringConversion<T>(colType, out conversion))
{
var error = $"Cannot display {colType}";
conversion = (in T src, ref StringBuilder builder) =>
{
if (builder == null)
builder = new StringBuilder();
else
builder.Clear();
builder.Append(error);
};
}
StringBuilder dst = null;
ValueGetter<ReadOnlyMemory<char>> getter =
(ref ReadOnlyMemory<char> value) =>
{
floatGetter(ref v);
conversion(in v, ref dst);
string text = dst.ToString();
value = text.AsMemory();
};
return getter;
}
public static ValueGetter<ReadOnlyMemory<char>> GetVectorFlatteningGetter<T>(DataViewRow cursor, int colIndex, DataViewType colType)
{
var vecGetter = cursor.GetGetter<VBuffer<T>>(cursor.Schema[colIndex]);
var vbuf = default(VBuffer<T>);
const int previewValues = 100;
ValueMapper<T, StringBuilder> conversion;
Conversions.DefaultInstance.TryGetStringConversion<T>(colType, out conversion);
StringBuilder dst = null;
ValueGetter<ReadOnlyMemory<char>> getter =
(ref ReadOnlyMemory<char> value) =>
{
vecGetter(ref vbuf);
bool isLong = vbuf.Length > previewValues;
var suffix = isLong ? string.Format(",...(+{0})", vbuf.Length - previewValues) : "";
var stringRep = string.Join(",", vbuf.Items(true).Take(previewValues)
.Select(
x =>
{
var v = x.Value;
conversion(in v, ref dst);
return dst.ToString();
}));
value = string.Format("<{0}{1}>", stringRep, suffix).AsMemory();
};
return getter;
}
}
}
|