// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ML.Internal.Utilities;
using Microsoft.ML.Runtime;
namespace Microsoft.ML.Data
public sealed partial class TextLoader
private sealed class Cursor : RootCursorBase
// Lines are divided into batches and processed a batch at a time. This enables
// parallel parsing.
private const int BatchSize = 64;
private readonly Bindings _bindings;
private readonly Parser _parser;
private readonly bool[] _active; // Which columns are active.
private readonly int _srcNeeded; // Largest source index that is needed by this cursor.
private readonly LineReader _reader;
private readonly IEnumerator<int> _ator;
private readonly Delegate[] _getters;
private readonly ParseStats _stats;
private readonly RowSet _rows;
// This holds the overall line from the line reader currently served up in the cursor.
private long _total;
private long _batch;
private bool _disposed;
public override long Batch
get { return _batch; }
private static void SetupCursor(TextLoader parent, bool[] active, int n,
out int srcNeeded, out int cthd)
// Note that files is allowed to be empty.
Contracts.Assert(active == null || active.Length == parent._bindings.OutputSchema.Count);
var bindings = parent._bindings;
// This ensures _srcNeeded is >= 0.
int srcLim = 1;
for (int i = 0; i < bindings.Infos.Length; i++)
if (active != null && !active[i])
var info = bindings.Infos[i];
foreach (var seg in info.Segments)
if (srcLim < seg.Lim)
srcLim = seg.Lim;
if (srcLim > parent._inputSize && parent._inputSize > 0)
srcLim = parent._inputSize;
srcNeeded = srcLim - 1;
Contracts.Assert(srcNeeded >= 0);
// Determine the number of threads to use.
cthd = DataViewUtils.GetThreadCount(n, !parent._useThreads);
long cblkMax = parent._maxRows / BatchSize;
if (cthd > cblkMax)
cthd = Math.Max(1, (int)cblkMax);
// Note that we don't filter out rows with parsing issues since it's not acceptable to
// produce a different set of rows when subsetting columns. Any parsing errors need to be
// translated to NaN, not result in skipping the row. We should produce some diagnostics
// to alert the user to the issues.
private Cursor(TextLoader parent, ParseStats stats, bool[] active, LineReader reader, int srcNeeded, int cthd)
: base(parent._host)
Ch.Assert(active == null || active.Length == parent._bindings.OutputSchema.Count);
Ch.Assert(srcNeeded >= 0);
Ch.Assert(cthd > 0);
_total = -1;
_batch = -1;
_bindings = parent._bindings;
_parser = parent._parser;
_active = active;
_reader = reader;
_stats = stats;
_srcNeeded = srcNeeded;
ParallelState state = null;
if (cthd > 1)
state = new ParallelState(this, out _rows, cthd);
_rows = _parser.CreateRowSet(_stats, 1, _active);
_getters = new Delegate[_bindings.Infos.Length];
for (int i = 0; i < _getters.Length; i++)
if (_active != null && !_active[i])
ColumnPipe v = _rows.Pipes[i];
Ch.Assert(v != null);
_getters[i] = v.GetGetter();
Ch.Assert(_getters[i] != null);
if (state != null)
_ator = ParseParallel(state).GetEnumerator();
state = null;
_ator = ParseSequential().GetEnumerator();
if (state != null)
public static DataViewRowCursor Create(TextLoader parent, IMultiStreamSource files, bool[] active)
// Note that files is allowed to be empty.
Contracts.Assert(active == null || active.Length == parent._bindings.OutputSchema.Count);
int srcNeeded;
int cthd;
SetupCursor(parent, active, 0, out srcNeeded, out cthd);
Contracts.Assert(cthd > 0);
var reader = new LineReader(files, BatchSize, 100, parent.HasHeader, parent.ReadMultilines, parent._separators, parent._escapeChar, parent._maxRows, 1);
var stats = new ParseStats(parent._host, 1);
return new Cursor(parent, stats, active, reader, srcNeeded, cthd);
public static DataViewRowCursor[] CreateSet(TextLoader parent, IMultiStreamSource files, bool[] active, int n)
// Note that files is allowed to be empty.
Contracts.Assert(active == null || active.Length == parent._bindings.OutputSchema.Count);
int srcNeeded;
int cthd;
SetupCursor(parent, active, n, out srcNeeded, out cthd);
Contracts.Assert(cthd > 0);
var reader = new LineReader(files, BatchSize, 100, parent.HasHeader, parent.ReadMultilines, parent._separators, parent._escapeChar, parent._maxRows, cthd);
var stats = new ParseStats(parent._host, cthd);
if (cthd <= 1)
return new DataViewRowCursor[1] { new Cursor(parent, stats, active, reader, srcNeeded, 1) };
var cursors = new DataViewRowCursor[cthd];
for (int i = 0; i < cursors.Length; i++)
cursors[i] = new Cursor(parent, stats, active, reader, srcNeeded, 1);
var result = cursors;
cursors = null;
return result;
if (cursors != null)
foreach (var curs in cursors)
if (curs != null)
public override ValueGetter<DataViewRowId> GetIdGetter()
(ref DataViewRowId val) =>
Ch.Check(IsGood, RowCursorUtils.FetchValueStateError);
val = new DataViewRowId((ulong)_total, 0);
public static void GetSomeLines(IMultiStreamSource source, int count, bool readMultilines, char[] separators, char escapeChar, ref List<ReadOnlyMemory<char>> lines)
Contracts.Assert(count > 0);
if (count < 2)
count = 2;
LineBatch batch;
var reader = new LineReader(source, count, 1, false, readMultilines, separators, escapeChar, count, 1);
batch = reader.GetBatch();
Contracts.Assert(batch.Exception == null);
if (Utils.Size(batch.Infos) == 0)
for (int i = 0; i < batch.Infos.Length; i++)
Utils.Add(ref lines, batch.Infos[i].Text.AsMemory());
/// <summary>
/// Look in the first file for args embedded as comments. This gathers comments
/// that come before any data line that start with #@.
/// </summary>
public static string GetEmbeddedArgs(IMultiStreamSource files)
if (files.Count == 0)
return null;
StringBuilder sb = new StringBuilder();
using (var rdr = files.OpenTextReader(0))
string pre = "";
for (; ; )
string text = rdr.ReadLine();
if (text == null)
if (text.Length == 0)
if (text.StartsWith("//"))
if (text[0] != '#')
if (text.Length <= 2 || text[1] != '@')
pre = " ";
return sb.ToString();
public override DataViewSchema Schema => _bindings.OutputSchema;
protected override void Dispose(bool disposing)
if (_disposed)
if (disposing)
_disposed = true;
protected override bool MoveNextCore()
if (_ator.MoveNext())
_rows.Index = _ator.Current;
return true;
_rows.Index = -1;
return false;
/// <summary>
/// Returns whether the given column is active in this row.
/// </summary>
public override bool IsColumnActive(DataViewSchema.Column column)
Ch.Check(column.Index < _bindings.Infos.Length);
return _active == null || _active[column.Index];
/// <summary>
/// Returns a value getter delegate to fetch the value of column with the given columnIndex, from the row.
/// This throws if the column is not active in this row, or if the type
/// <typeparamref name="TValue"/> differs from this column's type.
/// </summary>
/// <typeparam name="TValue"> is the column's content type.</typeparam>
/// <param name="column"> is the output column whose getter should be returned.</param>
public override ValueGetter<TValue> GetGetter<TValue>(DataViewSchema.Column column)
Ch.CheckParam(column.Index < _getters.Length, nameof(column), "requested column not valid.");
var originFn = _getters[column.Index];
var fn = originFn as ValueGetter<TValue>;
if (fn == null)
throw Ch.Except($"Invalid TValue in GetGetter: '{typeof(TValue)}', " +
$"expected type: '{originFn.GetType().GetGenericArguments().First()}'.");
return fn;
private IEnumerable<int> ParseSequential()
Ch.Assert(_rows.Count == 1);
LineBatch batch;
var helper = _parser.CreateHelper(_rows.Stats, _srcNeeded);
while ((batch = _reader.GetBatch()).Infos != null)
Ch.Assert(batch.Exception == null);
_total = batch.Total;
foreach (var info in batch.Infos)
Ch.Assert(info.Line > 0);
_parser.ParseRow(_rows, 0, helper, _active, batch.Path, info.Line, info.Text);
_batch = batch.Batch;
yield return 0;
_batch = long.MaxValue;
// Time-out in milliseconds for waiting on an event. This allows checking for
// abort situations.
private const int TimeOut = 100;
private readonly struct LineBatch
public readonly string Path;
// Total lines, up to the first line of this batch.
public readonly long Total;
public readonly long Batch;
public readonly LineInfo[] Infos;
public readonly Exception Exception;
public LineBatch(string path, long total, long batch, LineInfo[] infos)
Path = path;
Total = total;
Batch = batch;
Infos = infos;
Exception = null;
public LineBatch(Exception ex)
Path = null;
Total = 0;
Batch = 0;
Infos = null;
Exception = ex;
private readonly struct LineInfo
public readonly long Line;
public readonly string Text;
public LineInfo(long line, string text)
Contracts.Assert(line > 0);
Line = line;
Text = text;
// This reads batches of lines on a separate thread. It filters out a header, comments, and blank lines.
private sealed class LineReader
private readonly long _limit;
private readonly bool _hasHeader;
private readonly bool _readMultilines;
private readonly char[] _separators;
private readonly char _escapeChar;
private readonly int _batchSize;
private readonly IMultiStreamSource _files;
// The line reader can be referenced by multiple workers. This is the reference count.
private int _cref;
private BlockingQueue<LineBatch> _queue;
private Task _thdRead;
private volatile bool _abort;
public LineReader(IMultiStreamSource files, int batchSize, int bufSize, bool hasHeader, bool readMultilines, char[] separators, char escapeChar, long limit, int cref)
// Note that files is allowed to be empty.
Contracts.Assert(files.Count >= 0);
Contracts.Assert(batchSize >= 2);
Contracts.Assert(bufSize > 0);
Contracts.Assert(limit >= 0);
Contracts.Assert(cref > 0);
_limit = limit;
_hasHeader = hasHeader;
_batchSize = batchSize;
_readMultilines = readMultilines;
_separators = separators;
_escapeChar = escapeChar;
_files = files;
_cref = cref;
_queue = new BlockingQueue<LineBatch>(bufSize);
_thdRead = Utils.RunOnBackgroundThreadAsync(ThreadProc);
public void Release()
int n = Interlocked.Decrement(ref _cref);
Contracts.Assert(n >= 0);
if (n != 0)
if (_thdRead != null)
_abort = true;
_thdRead = null;
if (_queue != null)
_queue = null;
public LineBatch GetBatch()
if (!_queue.TryTake(out LineBatch batch, millisecondsTimeout: -1))
return default;
if (batch.Exception == null)
return batch;
throw Contracts.ExceptDecode(batch.Exception, "Stream reading encountered exception");
private class MultiLineReader
private readonly char _sep0;
private readonly char[] _separators;
private readonly bool _sepsContainsSpace;
private readonly char _escapeChar;
private readonly bool _escapeCharIsDoubleQuote;
private readonly StringBuilder _sb;
private readonly TextReader _rdr;
public MultiLineReader(TextReader rdr, char[] separators, char escapeChar)
_sep0 = separators[0];
_separators = separators;
_sepsContainsSpace = IsSep(' ');
_escapeChar = escapeChar;
_escapeCharIsDoubleQuote = (escapeChar == '"');
_sb = new StringBuilder();
_rdr = rdr;
// When reading lines that contain quoted fields, the quoted fields can contain
// '\n' so we we'll need to read multiple lines (multilines) to get all the fields
// of a given row.
public string ReadMultiLine(long lineNum, bool ignoreHashLine)
string line;
line = _rdr.ReadLine();
// if it was an empty line or if we've reached the end of file (i.e. line = null)
if (string.IsNullOrEmpty(line))
return line;
// In ML.NET we filter out lines beginning with // and # at the beginning of the file
// Or lines beginning with // elsewhere in the file.
// Thus, we don't care to check if there's a quoted multiline when the line begins with
// these chars.
if (line[0] == '/' && line[1] == '/')
return line;
if (ignoreHashLine && line[0] == '#')
return line;
// Get more lines until the last field of the line doesn't contain its newline
// inside a quoted field
bool lastFieldIncludesNewLine = LastFieldIncludesNewLine(line, false);
if (!lastFieldIncludesNewLine)
return line;
while (lastFieldIncludesNewLine)
line = _rdr.ReadLine();
if (line == null)
throw new EndOfStreamException($"A quoted field opened on line {lineNum} was never closed, and we've read to the last line in the file without finding the closing quote");
lastFieldIncludesNewLine = LastFieldIncludesNewLine(line, true);
return _sb.ToString();
// The startsInsideQuoted parameter indicates if the last field of the previous line
// ended in a quoted field which included the newline character,
// if it is true, then the beginning of this line is considered to be part
// of the last field of the previous line.
public bool LastFieldIncludesNewLine(string line, bool startsInsideQuoted = false)
if (line.Length == 0)
return startsInsideQuoted;
int ichCur = 0;
int ichLim = line.Length;
bool quotingError = false;
bool ret = FieldIncludesNewLine(ref line, ref ichCur, ichLim, ref quotingError, startsInsideQuoted);
while (ichCur < ichLim)
ret = FieldIncludesNewLine(ref line, ref ichCur, ichLim, ref quotingError, false);
if (quotingError)
return false;
// Skip empty fields
while (ichCur < ichLim && IsSep(line[ichCur]))
return ret;
private bool FieldIncludesNewLine(ref string line, ref int ichCur, int ichLim,
ref bool quotingError, bool startsInsideQuoted)
if (!startsInsideQuoted && !_sepsContainsSpace)
// Ignore leading spaces
while (ichCur < ichLim && line[ichCur] == ' ')
if (ichCur >= ichLim) // if there were only leading spaces on the line
return startsInsideQuoted;
if (startsInsideQuoted || line[ichCur] == '"')
// Quoted Field Case
if (!startsInsideQuoted)
if (_escapeCharIsDoubleQuote)
for (; ; ichCur++)
if (ichCur >= ichLim)
// We've reached the end of the line without finding the closing quote,
// so next line will start on this quoted field
return true;
if (line[ichCur] == '"')
if (++ichCur >= ichLim)
// Last character in line was the closing quote of the field
return false;
if (line[ichCur] == '"')
// 2 Double quotes means escaped quote
// If it wasn't an escaped quote, then this is supposed to be
// the closing quote of the field
for (; ; ichCur++)
if (ichCur >= ichLim)
// We've reached the end of the line without finding the closing quote,
// so next line will start on this quoted field
return true;
if (line[ichCur] == _escapeChar)
if (++ichCur >= ichLim)
// Last character in line was escapeChar
return true;
// Whatever char comes after an escapeChar is ignored
else if (line[ichCur] == '"')
// Since this wasn't an escaped quote, then this is supposed to be
// the closing quote of the field
// After finding the closing quote of the field...
// There should only be empty spaces until the next separator
if (!_sepsContainsSpace)
// Ignore leading spaces
while (ichCur < ichLim && line[ichCur] == ' ')
// If there's anything else than spaces or the next separator,
// this will actually be a QuotingError on the parser, so we decide that this
// line contains a quoting error, and so it's not going to be considered a valid field
// and the rest of the line should be ignored.
if (ichCur >= ichLim || IsSep(line[ichCur]))
return false;
quotingError = true;
return false;
// Unquoted field case.
// An unquoted field shouldn't contain new lines
while (ichCur < ichLim && !IsSep(line[ichCur]))
return false;
private bool IsSep(char ch)
if (ch == _sep0)
return true;
for (int i = 1; i < _separators.Length; i++)
if (ch == _separators[i])
return true;
return false;
private void ThreadProc()
Contracts.Assert(_batchSize >= 2);
if (_limit <= 0)
long total = 0;
long batch = -1;
for (int ifile = 0; ifile < _files.Count; ifile++)
string path = _files.GetPathOrNull(ifile);
using (var rdr = _files.OpenTextReader(ifile))
var multilineReader = new MultiLineReader(rdr, _separators, _escapeChar);
string text;
long line = 0;
for (; ; )
// REVIEW: Avoid allocating a string for every line. This would probably require
// introducing a CharSpan type (similar to ReadOnlyMemory but based on char[] or StringBuilder)
// and implementing all the necessary conversion functionality on it. See task 3871.
if (_readMultilines)
text = multilineReader.ReadMultiLine(line, true);
text = rdr.ReadLine();
if (text == null)
goto LNext;
if (text.Length > 0 && text[0] != '#' && !text.StartsWith("//"))
// REVIEW: Use a pool of batches?
int index = 0;
var infos = new LineInfo[_batchSize];
if (!_hasHeader)
// Not a header or comment, so first line is a real line.
infos[index++] = new LineInfo(line, text);
if (++total >= _limit)
PostPartial(path, total - index, ref batch, index, infos);
for (; ; )
if (_abort)
if (_readMultilines)
text = multilineReader.ReadMultiLine(line, false);
text = rdr.ReadLine();
if (text == null)
// We're done with this file. Queue the last partial batch.
PostPartial(path, total - index, ref batch, index, infos);
goto LNext;
// Filter out comments and empty strings.
if (text.Length >= 2)
// Don't use string.StartsWith("//") - it is too slow.
if (text[0] == '/' && text[1] == '/')
else if (text.Length == 0)
infos[index] = new LineInfo(line, text);
if (++index >= infos.Length)
var lines = new LineBatch(path, total - index + 1, batch, infos);
while (!_queue.TryAdd(lines, TimeOut))
if (_abort)
infos = new LineInfo[_batchSize];
index = 0;
if (++total >= _limit)
PostPartial(path, total - index, ref batch, index, infos);
catch (Exception ex)
while (!_queue.TryAdd(new LineBatch(ex), TimeOut))
if (_abort)
private void PostPartial(string path, long total, ref long batch, int index, LineInfo[] infos)
Contracts.Assert(0 <= total);
Contracts.Assert(0 <= index && index < Utils.Size(infos));
// Queue the last partial batch.
if (index <= 0)
Array.Resize(ref infos, index);
while (!_queue.TryAdd(new LineBatch(path, total, batch, infos), TimeOut))
if (_abort)
private IEnumerable<int> ParseParallel(ParallelState state)
using (state)
foreach (var batch in state.GetBatches())
// If the collation of rows happened correctly, this should have a precise value.
Contracts.Assert(batch.Total == _total + 1, $"batch.Total:{batch.Total} while _total + 1:{_total + 1}.");
_total = batch.Total - 1;
for (int irow = batch.IrowMin; irow < batch.IrowLim; irow++)
yield return irow;
if (state.ParsingException != null)
throw Ch.ExceptDecode(state.ParsingException,
"Parsing failed with an exception: {0}", state.ParsingException.Message);
private struct RowBatch
public int IrowMin;
public int IrowLim;
public long Total;
public RowBatch(int irowMin, int irowLim, long total)
Contracts.Assert(0 <= irowMin && irowMin < irowLim);
Contracts.Assert(total >= 0);
IrowMin = irowMin;
IrowLim = irowLim;
Total = total;
private sealed class ParallelState : IDisposable
private readonly Cursor _curs;
private readonly LineReader _reader;
private readonly RowSet _rows;
// Number of blocks in this RowSet.
private readonly int _blockCount;
// Size of blocks in this RowSet.
private const int BlockSize = BatchSize;
// Ordered waiters on block numbers.
// _waiterWorking orders reading from _reader.
// _waiterPublish orders publishing to _queue.
private readonly OrderedWaiter _waiterReading;
private readonly OrderedWaiter _waiterPublish;
// A small capacity blocking collection that the main cursor thread consumes.
private readonly BlockingQueue<RowBatch> _queue;
private readonly Task[] _threads;
// Number of threads still running.
private int _threadsRunning;
// Signals threads to shut down.
private volatile bool _done;
// Exception during parsing.
public volatile Exception ParsingException;
public ParallelState(Cursor curs, out RowSet rows, int cthd)
Contracts.Assert(cthd > 0);
_curs = curs;
_reader = _curs._reader;
// Why cthd + 3? We need two blocks for the blocking collection, and one
// more for the block currently being dished out by the cursor.
_blockCount = cthd + 3;
// Why cthd + 3? We need two blocks for the blocking collection, and one
// more for the block currently being dished out by the cursor.
_rows = rows = _curs._parser.CreateRowSet(_curs._stats,
checked(_blockCount * BlockSize), _curs._active);
_waiterReading = new OrderedWaiter(firstCleared: false);
_waiterPublish = new OrderedWaiter(firstCleared: false);
// The size limit here ensures that worker threads are never writing to
// a range that is being served up by the cursor.
_queue = new BlockingQueue<RowBatch>(2);
_threads = new Task[cthd];
_threadsRunning = cthd;
for (int tid = 0; tid < _threads.Length; tid++)
_threads[tid] = Utils.RunOnBackgroundThreadAsync(ThreadProc, tid);
public void Dispose()
// Signal all the threads to shut down and wait for them.
private void Quit()
// Signal that we're done and wake up all the threads.
_done = true;
public IEnumerable<RowBatch> GetBatches()
return _queue.GetConsumingEnumerable();
private void ThreadProc(object obj)
// The object is the thread index, or "id".
int tid = (int)obj;
Contracts.Assert(0 <= tid && tid < _threads.Length);
catch (Exception ex)
// Record the exception and tell everyone to shut down.
ParsingException = ex;
// If this is the last thread to shut down, close the queue.
if (Interlocked.Decrement(ref _threadsRunning) <= 0)
private void Parse(int tid)
long blk = tid;
int iblk = tid;
Contracts.Assert(iblk < _blockCount - 3);
var helper = _curs._parser.CreateHelper(_rows.Stats, _curs._srcNeeded);
while (!_done)
// Algorithm:
// * When it is our turn, grab a block of lines.
// * Parse rows.
// * When it is our turn, enqueue the batch.
// When it is our turn, read the lines and signal the next worker that it is ok to read.
LineBatch lines;
if (_done)
lines = _reader.GetBatch();
Contracts.Assert(lines.Exception == null);
if (lines.Infos == null || _done)
// Parse the lines into rows.
Contracts.Assert(lines.Infos.Length <= BlockSize);
var batch = new RowBatch(iblk * BlockSize, iblk * BlockSize + lines.Infos.Length, lines.Total);
int irow = batch.IrowMin;
foreach (var info in lines.Infos)
Contracts.Assert(info.Line > 0);
if (_done)
_curs._parser.ParseRow(_rows, irow, helper, _curs._active, lines.Path, info.Line, info.Text);
Contracts.Assert(irow == batch.IrowLim);
if (_done)
// When it is our turn, publish the rows and signal the next worker that it is ok to publish.
if (_done)
while (!_queue.TryAdd(batch, TimeOut))
if (_done)
blk += _threads.Length;
iblk += _threads.Length;
if (iblk >= _blockCount)
iblk -= _blockCount;
Contracts.Assert(0 <= iblk && iblk < _blockCount);