File: PartitionedPathParser.cs
Web Access
Project: src\src\Microsoft.ML.Parquet\Microsoft.ML.Parquet.csproj (Microsoft.ML.Parquet)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Web;
using Microsoft.ML;
using Microsoft.ML.CommandLine;
using Microsoft.ML.Data;
using Microsoft.ML.Data.Utilities;
using Microsoft.ML.EntryPoints;
using Microsoft.ML.Runtime;
 
[assembly: LoadableClass(SimplePartitionedPathParser.Summary, typeof(SimplePartitionedPathParser), typeof(SimplePartitionedPathParser.Arguments), typeof(PartitionedPathParser),
    SimplePartitionedPathParser.UserName, SimplePartitionedPathParser.LoadName, SimplePartitionedPathParser.ShortName)]
[assembly: LoadableClass(ParquetPartitionedPathParser.Summary, typeof(ParquetPartitionedPathParser), null, typeof(PartitionedPathParser),
    ParquetPartitionedPathParser.UserName, ParquetPartitionedPathParser.LoadName, ParquetPartitionedPathParser.ShortName)]
 
// This is for deserialization
[assembly: LoadableClass(SimplePartitionedPathParser.Summary, typeof(SimplePartitionedPathParser), null, typeof(SignatureLoadModel),
    SimplePartitionedPathParser.UserName, SimplePartitionedPathParser.LoadName, SimplePartitionedPathParser.ShortName)]
[assembly: LoadableClass(ParquetPartitionedPathParser.Summary, typeof(ParquetPartitionedPathParser), null, typeof(SignatureLoadModel),
    ParquetPartitionedPathParser.UserName, ParquetPartitionedPathParser.LoadName, ParquetPartitionedPathParser.ShortName)]
 
[assembly: EntryPointModule(typeof(SimplePartitionedPathParser.Arguments))]
[assembly: EntryPointModule(typeof(ParquetPartitionedPathParserFactory))]
 
namespace Microsoft.ML.Data
{
    /// <summary>
    /// Delegate signature for a partitioned path parser.
    /// </summary>
    [BestFriend]
    internal delegate void PartitionedPathParser();
 
    /// <summary>
    /// Supports extracting column names and values from a path string.
    /// </summary>
    [BestFriend]
    internal interface IPartitionedPathParser
    {
        /// <summary>
        /// Extract the column definitions from a file path.
        /// </summary>
        /// <param name="path">The file path.</param>
        /// <returns>The resulting column definitions.</returns>
        /// <exception cref="InvalidOperationException">Thrown when parsing fails.</exception>
        IEnumerable<PartitionedFileLoader.Column> ParseColumns(string path);
 
        /// <summary>
        /// Extract the column values from a file path.
        /// </summary>
        /// <param name="path">The file path.</param>
        /// <returns>The resulting column values.</returns>
        /// <exception cref="InvalidOperationException">Thrown when parsing fails.</exception>
        IEnumerable<string> ParseValues(string path);
    }
 
    [TlcModule.ComponentKind("PartitionedPathParser")]
    [BestFriend]
    internal interface IPartitionedPathParserFactory : IComponentFactory<IPartitionedPathParser>
    {
        new IPartitionedPathParser CreateComponent(IHostEnvironment env);
    }
 
    internal sealed class SimplePartitionedPathParser : IPartitionedPathParser, ICanSaveModel
    {
        internal const string Summary = "A simple parser that extracts directory names as column values. Column names are defined as arguments.";
        internal const string UserName = "Simple Partitioned Path Parser";
        public const string LoadName = "SimplePathParser";
        public const string ShortName = "SmplPP";
 
        [TlcModule.Component(Name = SimplePartitionedPathParser.LoadName, FriendlyName = SimplePartitionedPathParser.UserName,
            Desc = SimplePartitionedPathParser.Summary, Alias = SimplePartitionedPathParser.ShortName)]
        public class Arguments : IPartitionedPathParserFactory
        {
            [Argument(ArgumentType.Multiple, HelpText = "Column definitions used to override the Partitioned Path Parser. Expected with the format name:type:numeric-source, for example, col=MyFeature:R4:1",
                ShortName = "col", SortOrder = 1)]
            public PartitionedFileLoader.Column[] Columns;
 
            [Argument(ArgumentType.AtMostOnce, HelpText = "Data type of each column.")]
            public InternalDataKind Type = InternalDataKind.Text;
 
            public IPartitionedPathParser CreateComponent(IHostEnvironment env) => new SimplePartitionedPathParser(env, this);
        }
 
        private static VersionInfo GetVersionInfo()
        {
            return new VersionInfo(
                modelSignature: "SMPLPARS",
                verWrittenCur: 0x00010001, // Initial
                verReadableCur: 0x00010001,
                verWeCanReadBack: 0x00010001,
                loaderSignature: LoadName,
                loaderAssemblyName: typeof(SimplePartitionedPathParser).Assembly.FullName);
        }
 
        private readonly IHost _host;
        private readonly PartitionedFileLoader.Column[] _columns;
 
        public SimplePartitionedPathParser(IHostEnvironment env, Arguments args)
        {
            _host = env.Register(LoadName);
 
            _columns = args.Columns;
            foreach (var col in _columns)
            {
                if (!col.Type.HasValue)
                {
                    col.Type = args.Type;
                }
            }
        }
 
        private SimplePartitionedPathParser(IHost host, ModelLoadContext ctx)
        {
            Contracts.AssertValue(host);
            _host = host;
            _host.AssertValue(ctx);
 
            // ** Binary format **
            // int: number of columns
            // foreach column:
            //   string: column representation
 
            int numColumns = ctx.Reader.ReadInt32();
            _host.CheckDecode(numColumns >= 0);
 
            _columns = new PartitionedFileLoader.Column[numColumns];
            for (int i = 0; i < numColumns; i++)
            {
                var column = PartitionedFileLoader.Column.Parse(ctx.LoadString());
                _host.CheckDecode(column != null);
                _columns[i] = column;
            }
        }
 
        public static SimplePartitionedPathParser Create(IHostEnvironment env, ModelLoadContext ctx)
        {
            Contracts.CheckValue(ctx, nameof(ctx));
            IHost host = env.Register(LoadName);
            ctx.CheckAtModel(GetVersionInfo());
 
            return host.Apply("Loading Parser",
                ch => new SimplePartitionedPathParser(host, ctx));
        }
 
        void ICanSaveModel.Save(ModelSaveContext ctx)
        {
            Contracts.CheckValue(ctx, nameof(ctx));
            ctx.SetVersionInfo(GetVersionInfo());
 
            // ** Binary format **
            // int: number of columns
            // foreach column:
            //   string: column representation
 
            ctx.Writer.Write(_columns.Length);
            StringBuilder sb = new StringBuilder();
            foreach (var col in _columns)
            {
                sb.Clear();
                _host.Check(col.TryUnparse(sb));
                ctx.SaveString(sb.ToString());
            }
        }
 
        public IEnumerable<PartitionedFileLoader.Column> ParseColumns(string path)
        {
            Contracts.AssertNonEmpty(path);
 
            // Verify that path matches the columns expectations.
            var values = ParseValues(path);
            foreach (var col in _columns)
            {
                if (col.Source < 0 || col.Source >= values.Count())
                {
                    throw Contracts.Except($"Column definition {col} is outside the bounds of path {path}.");
                }
            }
 
            return _columns;
        }
 
        public IEnumerable<string> ParseValues(string path)
        {
            Contracts.AssertNonEmpty(path);
 
            var dirs = PartitionedPathUtils.SplitDirectories(path);
            return dirs.Take(dirs.Count() - 1); // Ignore last directory which is the file name.
        }
    }
 
    [TlcModule.Component(Name = ParquetPartitionedPathParser.LoadName, FriendlyName = ParquetPartitionedPathParser.UserName,
        Desc = ParquetPartitionedPathParser.Summary, Alias = ParquetPartitionedPathParser.ShortName)]
    internal class ParquetPartitionedPathParserFactory : IPartitionedPathParserFactory
    {
        public IPartitionedPathParser CreateComponent(IHostEnvironment env) => new ParquetPartitionedPathParser();
    }
 
    [BestFriend]
    internal sealed class ParquetPartitionedPathParser : IPartitionedPathParser, ICanSaveModel
    {
        internal const string Summary = "Extract name/value pairs from Parquet formatted directory names. Example path: Year=2018/Month=12/data1.parquet";
        internal const string UserName = "Parquet Partitioned Path Parser";
        public const string LoadName = "ParquetPathParser";
        public const string ShortName = "ParqPP";
 
        private readonly IHost _host;
        private PartitionedFileLoader.Column[] _columns;
 
        private static VersionInfo GetVersionInfo()
        {
            return new VersionInfo(
                modelSignature: "PARQPARS",
                verWrittenCur: 0x00010001, // Initial
                verReadableCur: 0x00010001,
                verWeCanReadBack: 0x00010001,
                loaderSignature: LoadName,
                loaderAssemblyName: typeof(ParquetPartitionedPathParser).Assembly.FullName);
        }
 
        public ParquetPartitionedPathParser()
        {
            _columns = new PartitionedFileLoader.Column[0];
        }
 
        private ParquetPartitionedPathParser(IHost host, ModelLoadContext ctx)
        {
            Contracts.AssertValue(host);
            _host = host;
            _host.AssertValue(ctx);
 
            // ** Binary format **
            // int: number of columns
            // foreach column:
            //   string: column representation
 
            int numColumns = ctx.Reader.ReadInt32();
            _host.CheckDecode(numColumns >= 0);
 
            _columns = new PartitionedFileLoader.Column[numColumns];
            for (int i = 0; i < numColumns; i++)
            {
                var column = PartitionedFileLoader.Column.Parse(ctx.LoadString());
                _host.CheckDecode(column != null);
                _columns[i] = column;
            }
        }
 
        public static ParquetPartitionedPathParser Create(IHostEnvironment env, ModelLoadContext ctx)
        {
            Contracts.CheckValue(ctx, nameof(ctx));
            IHost host = env.Register(LoadName);
            ctx.CheckAtModel(GetVersionInfo());
 
            return host.Apply("Loading Parser",
                ch => new ParquetPartitionedPathParser(host, ctx));
        }
 
        void ICanSaveModel.Save(ModelSaveContext ctx)
        {
            Contracts.CheckValue(ctx, nameof(ctx));
            ctx.SetVersionInfo(GetVersionInfo());
 
            // ** Binary format **
            // int: number of columns
            // foreach column:
            //   string: column representation
 
            ctx.Writer.Write(_columns.Length);
            StringBuilder sb = new StringBuilder();
            foreach (var col in _columns)
            {
                sb.Clear();
                _host.Check(col.TryUnparse(sb));
                ctx.SaveString(sb.ToString());
            }
        }
        public IEnumerable<PartitionedFileLoader.Column> ParseColumns(string path)
        {
            if (!TryParseNames(path, out List<string> names))
            {
                throw Contracts.Except($"Failed to parse names from path {path}. Expected directory names with the format 'Name=Value'.");
            }
 
            _columns = new PartitionedFileLoader.Column[names.Count];
            for (int i = 0; i < names.Count; i++)
            {
                _columns[i] = new PartitionedFileLoader.Column()
                {
                    Name = names[i],
                    Source = i,
                    Type = InternalDataKind.Text
                };
            }
 
            return _columns;
        }
 
        public IEnumerable<string> ParseValues(string path)
        {
            if (!TryParseValues(path, out List<string> values))
            {
                throw Contracts.Except($"Failed to parse names from path {path}. Expected directory names with the format 'Name=Value'.");
            }
 
            if (values.Count != _columns.Length)
            {
                throw Contracts.Except($"The extracted value count of {values.Count} does not match the expected Column count of {_columns.Length} for path {path}");
            }
 
            return values;
        }
 
        public bool TryParseNames(string path, out List<string> names)
        {
            return TryParseNamesAndValues(path, out names, out List<string> values);
        }
 
        public bool TryParseValues(string path, out List<string> values)
        {
            return TryParseNamesAndValues(path, out List<string> names, out values);
        }
 
        public bool TryParseNamesAndValues(string path, out List<string> names, out List<string> values)
        {
            names = null;
            values = null;
 
            if (string.IsNullOrEmpty(path))
            {
                return false;
            }
 
            var dirs = PartitionedPathUtils.SplitDirectories(path);
            dirs = dirs.Take(dirs.Count() - 1); // Ignore last directory which is the file name.
 
            names = new List<string>(dirs.Count());
            values = new List<string>(dirs.Count());
 
            foreach (var dir in dirs)
            {
                if (!TryParseNameValueFromDir(dir, out string name, out string value))
                {
                    return false;
                }
 
                names.Add(name);
                values.Add(value);
            }
 
            return true;
        }
 
        /// <summary>
        /// Parse the name/value pair from a partitioned directory name.
        /// </summary>
        /// <param name="dir">The directory name.</param>
        /// <param name="name">The resulting name.</param>
        /// <param name="value">The resulting value.</param>
        /// <returns>true if the parsing was successful.</returns>
        private static bool TryParseNameValueFromDir(string dir, out string name, out string value)
        {
            const char nameValueSeparator = '=';
 
            name = null;
            value = null;
 
            if (string.IsNullOrEmpty(dir))
            {
                return false;
            }
 
            var nameValue = dir.Split(nameValueSeparator);
            if (nameValue.Length != 2)
            {
                return false;
            }
 
            name = nameValue[0];
            value = HttpUtility.UrlDecode(nameValue[1]);
 
            return true;
        }
    }
}