File: PrimitiveDataFrameColumn.cs
Web Access
Project: src\src\Microsoft.Data.Analysis\Microsoft.Data.Analysis.csproj (Microsoft.Data.Analysis)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Apache.Arrow;
using Apache.Arrow.Types;
using Microsoft.ML;
using Microsoft.ML.Data;
 
namespace Microsoft.Data.Analysis
{
    /// <summary>
    /// A column to hold primitive types such as int, float etc.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public partial class PrimitiveDataFrameColumn<T> : DataFrameColumn, IEnumerable<T?>
        where T : unmanaged
    {
        private readonly PrimitiveColumnContainer<T> _columnContainer;
 
        internal PrimitiveColumnContainer<T> ColumnContainer => _columnContainer;
 
        internal PrimitiveDataFrameColumn(string name, PrimitiveColumnContainer<T> columnContainer) : base(name, columnContainer.Length, typeof(T))
        {
            _columnContainer = columnContainer;
        }
 
        public PrimitiveDataFrameColumn(string name, IEnumerable<T?> values) : base(name, 0, typeof(T))
        {
            _columnContainer = new PrimitiveColumnContainer<T>(values);
            Length = _columnContainer.Length;
        }
 
        public PrimitiveDataFrameColumn(string name, IEnumerable<T> values) : base(name, 0, typeof(T))
        {
            _columnContainer = new PrimitiveColumnContainer<T>(values);
            Length = _columnContainer.Length;
        }
 
        public PrimitiveDataFrameColumn(string name, long length = 0) : base(name, length, typeof(T))
        {
            _columnContainer = new PrimitiveColumnContainer<T>(length);
        }
 
        public PrimitiveDataFrameColumn(string name, ReadOnlyMemory<byte> buffer, ReadOnlyMemory<byte> nullBitMap, int length = 0, int nullCount = 0) : base(name, length, typeof(T))
        {
            _columnContainer = new PrimitiveColumnContainer<T>(buffer, nullBitMap, length, nullCount);
        }
 
        /// <summary>
        /// Returns an enumerable of immutable memory buffers representing the underlying values
        /// </summary>
        /// <remarks><see langword="null" /> values are encoded in the buffers returned by GetReadOnlyNullBitmapBuffers in the Apache Arrow format</remarks>
        /// <returns>IEnumerable<see cref="ReadOnlyMemory{T}"/></returns>
        public IEnumerable<ReadOnlyMemory<T>> GetReadOnlyDataBuffers()
        {
            for (int i = 0; i < _columnContainer.Buffers.Count; i++)
            {
                yield return _columnContainer.Buffers[i].ReadOnlyMemory;
            }
        }
 
        /// <summary>
        /// Returns an enumerable of immutable <see cref="ReadOnlyMemory{Byte}"/> buffers representing <see langword="null" /> values in the Apache Arrow format
        /// </summary>
        /// <remarks>Each <see cref="ReadOnlyMemory{Byte}"/> encodes the <see langword="null" /> values for its corresponding Data buffer</remarks>
        /// <returns>IEnumerable<see cref="ReadOnlyMemory{Byte}"/></returns>
        public IEnumerable<ReadOnlyMemory<byte>> GetReadOnlyNullBitMapBuffers()
        {
            for (int i = 0; i < _columnContainer.NullBitMapBuffers.Count; i++)
            {
                ReadOnlyDataFrameBuffer<byte> buffer = _columnContainer.NullBitMapBuffers[i];
                yield return buffer.RawReadOnlyMemory;
            }
        }
 
        private IArrowType GetArrowType()
        {
            if (typeof(T) == typeof(bool))
                return BooleanType.Default;
            else if (typeof(T) == typeof(double))
                return DoubleType.Default;
            else if (typeof(T) == typeof(float))
                return FloatType.Default;
            else if (typeof(T) == typeof(sbyte))
                return Int8Type.Default;
            else if (typeof(T) == typeof(int))
                return Int32Type.Default;
            else if (typeof(T) == typeof(long))
                return Int64Type.Default;
            else if (typeof(T) == typeof(short))
                return Int16Type.Default;
            else if (typeof(T) == typeof(byte))
                return UInt8Type.Default;
            else if (typeof(T) == typeof(uint))
                return UInt32Type.Default;
            else if (typeof(T) == typeof(ulong))
                return UInt64Type.Default;
            else if (typeof(T) == typeof(ushort))
                return UInt16Type.Default;
            else if (typeof(T) == typeof(DateTime))
                return Date64Type.Default;
            else
                throw new NotImplementedException(nameof(T));
        }
 
        protected internal override Field GetArrowField() => new Field(Name, GetArrowType(), NullCount != 0);
 
        protected internal override int GetMaxRecordBatchLength(long startIndex) => _columnContainer.MaxRecordBatchLength(startIndex);
 
        private int GetNullCount(long startIndex, int numberOfRows)
        {
            int nullCount = 0;
            for (long i = startIndex; i < numberOfRows; i++)
            {
                if (!IsValid(i))
                    nullCount++;
            }
            return nullCount;
        }
 
        protected internal override Apache.Arrow.Array ToArrowArray(long startIndex, int numberOfRows)
        {
            int bufferIndex = numberOfRows == 0 ? 0 : _columnContainer.GetIndexOfBufferContainingRowIndex(startIndex);
            int offset = (int)(startIndex - bufferIndex * ReadOnlyDataFrameBuffer<T>.MaxCapacity);
 
            if (numberOfRows != 0 && numberOfRows > _columnContainer.Buffers[bufferIndex].Length - offset)
            {
                throw new ArgumentException(Strings.SpansMultipleBuffers, nameof(numberOfRows));
            }
 
            int nullCount = GetNullCount(startIndex, numberOfRows);
 
            //DateTime requires convertion
            if (this.DataType == typeof(DateTime))
            {
                if (numberOfRows == 0)
                    return new Date64Array(ArrowBuffer.Empty, ArrowBuffer.Empty, numberOfRows, nullCount, offset);
 
                ReadOnlyDataFrameBuffer<T> valueBuffer = (numberOfRows == 0) ? null : _columnContainer.Buffers[bufferIndex];
                ReadOnlyDataFrameBuffer<byte> nullBuffer = (numberOfRows == 0) ? null : _columnContainer.NullBitMapBuffers[bufferIndex];
 
                ReadOnlySpan<DateTime> valueSpan = MemoryMarshal.Cast<T, DateTime>(valueBuffer.ReadOnlySpan);
                Date64Array.Builder builder = new Date64Array.Builder().Reserve(valueBuffer.Length);
 
                for (int i = 0; i < valueBuffer.Length; i++)
                {
                    if (BitUtility.GetBit(nullBuffer.ReadOnlySpan, i))
                        builder.Append(valueSpan[i]);
                    else
                        builder.AppendNull();
                }
 
                return builder.Build();
            }
 
            //No convertion
            ArrowBuffer arrowValueBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.Buffers[bufferIndex].ReadOnlyBuffer);
            ArrowBuffer arrowNullBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.NullBitMapBuffers[bufferIndex].ReadOnlyBuffer);
 
            Type type = this.DataType;
            if (type == typeof(bool))
                return new BooleanArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(double))
                return new DoubleArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(float))
                return new FloatArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(int))
                return new Int32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(long))
                return new Int64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(sbyte))
                return new Int8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(short))
                return new Int16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(uint))
                return new UInt32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(ulong))
                return new UInt64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(ushort))
                return new UInt16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else if (type == typeof(byte))
                return new UInt8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
            else
                throw new NotImplementedException(type.ToString());
        }
 
        public new IReadOnlyList<T?> this[long startIndex, int length]
        {
            get
            {
                if (startIndex >= Length)
                {
                    throw new ArgumentOutOfRangeException(nameof(startIndex));
                }
                return _columnContainer[startIndex, length];
            }
        }
 
        protected override IReadOnlyList<object> GetValues(long startIndex, int length)
        {
            if (startIndex >= Length)
            {
                throw new ArgumentOutOfRangeException(nameof(startIndex));
            }
 
            var ret = new List<object>(length);
            long endIndex = Math.Min(Length, startIndex + length);
            for (long i = startIndex; i < endIndex; i++)
            {
                ret.Add(this[i]);
            }
            return ret;
        }
 
        internal virtual PrimitiveDataFrameColumn<T> CreateNewColumn(string name, PrimitiveColumnContainer<T> container)
        {
            return new PrimitiveDataFrameColumn<T>(name, container);
        }
 
        protected virtual PrimitiveDataFrameColumn<T> CreateNewColumn(string name, long length = 0)
        {
            return new PrimitiveDataFrameColumn<T>(name, length);
        }
 
        protected T? GetTypedValue(long rowIndex) => _columnContainer[rowIndex];
 
        protected override object GetValue(long rowIndex) => GetTypedValue(rowIndex);
 
        protected override void SetValue(long rowIndex, object value)
        {
            if (value == null || value.GetType() == typeof(T))
            {
                _columnContainer[rowIndex] = (T?)value;
            }
            else
            {
                throw new ArgumentException(string.Format(Strings.MismatchedValueType, DataType), nameof(value));
            }
        }
 
        public new T? this[long rowIndex]
        {
            get => GetTypedValue(rowIndex);
            set => _columnContainer[rowIndex] = value;
        }
 
        /// <inheritdoc/>
        public override double Median()
        {
            // Not the most efficient implementation. Using a selection algorithm here would be O(n) instead of O(nLogn)
            var notNullValuesCount = Length - NullCount;
            if (notNullValuesCount == 0)
                return 0;
 
            PrimitiveDataFrameColumn<long> sortIndices = GetSortIndices();
            long middle = notNullValuesCount / 2;
            double middleValue = (double)Convert.ChangeType(this[sortIndices[middle].Value].Value, typeof(double));
            if (notNullValuesCount % 2 == 0)
            {
                double otherMiddleValue = (double)Convert.ChangeType(this[sortIndices[middle - 1].Value].Value, typeof(double));
                return (middleValue + otherMiddleValue) / 2;
            }
 
            return middleValue;
        }
 
        /// <inheritdoc/>
        public override double Mean()
        {
            if (Length == 0)
                return 0;
            return (double)Convert.ChangeType((T)Sum(), typeof(double)) / (Length - NullCount);
        }
 
        protected internal override void Resize(long length)
        {
            _columnContainer.Resize(length);
            Length = _columnContainer.Length;
        }
 
        public void Append(T? value)
        {
            _columnContainer.Append(value);
            Length++;
        }
 
        public void AppendMany(T? value, long count)
        {
            _columnContainer.AppendMany(value, count);
            Length += count;
        }
 
        /// <inheritdoc/>
        public override long NullCount
        {
            get
            {
                Debug.Assert(_columnContainer.NullCount >= 0);
                return _columnContainer.NullCount;
            }
        }
 
        /// <inheritdoc/>
        public override bool IsValid(long index) => _columnContainer.IsValid(index);
 
        public IEnumerator<T?> GetEnumerator() => _columnContainer.GetEnumerator();
 
        protected override IEnumerator GetEnumeratorCore() => GetEnumerator();
 
        /// <inheritdoc/>
        public override bool IsNumericColumn()
        {
            var type = typeof(T);
 
            return type == typeof(byte)
                || type == typeof(sbyte)
                || type == typeof(ushort)
                || type == typeof(short)
                || type == typeof(uint)
                || type == typeof(int)
                || type == typeof(ulong)
                || type == typeof(long)
                || type == typeof(float)
                || type == typeof(double)
                || type == typeof(decimal);
        }
 
        /// <summary>
        /// Returns a new column with <see langword="null" /> elements replaced by <paramref name="value"/>.
        /// </summary>
        /// <param name="value"></param>
        /// <param name="inPlace">Indicates if the operation should be performed in place.</param>
        public PrimitiveDataFrameColumn<T> FillNulls(T value, bool inPlace = false)
        {
            PrimitiveDataFrameColumn<T> column = inPlace ? this : Clone();
            column.ColumnContainer.FillNulls(value);
            return column;
        }
 
        protected override DataFrameColumn FillNullsImplementation(object value, bool inPlace)
        {
            T convertedValue = (T)Convert.ChangeType(value, typeof(T));
            return FillNulls(convertedValue, inPlace);
        }
 
        /// <inheritdoc/>
        public new PrimitiveDataFrameColumn<T> DropNulls()
        {
            return (PrimitiveDataFrameColumn<T>)DropNullsImplementation();
        }
 
        protected override DataFrameColumn DropNullsImplementation()
        {
            var ret = CreateNewColumn(Name, Length - NullCount);
 
            long j = 0;
            for (int b = 0; b < ColumnContainer.NullBitMapBuffers.Count; b++)
            {
                var span = ColumnContainer.Buffers[b].ReadOnlySpan;
                var validitySpan = ColumnContainer.NullBitMapBuffers[b].ReadOnlySpan;
 
                for (int i = 0; i < span.Length; i++)
                {
                    if (BitUtility.IsValid(validitySpan, i))
                        ret[j++] = span[i];
                }
            }
 
            return ret;
        }
 
        /// <inheritdoc/>
        public override DataFrame ValueCounts()
        {
            Dictionary<T, ICollection<long>> groupedValues = GroupColumnValues<T>(out HashSet<long> _);
            PrimitiveDataFrameColumn<T> keys = new PrimitiveDataFrameColumn<T>("Values");
            PrimitiveDataFrameColumn<long> counts = new PrimitiveDataFrameColumn<long>("Counts");
            foreach (KeyValuePair<T, ICollection<long>> keyValuePair in groupedValues)
            {
                keys.Append(keyValuePair.Key);
                counts.Append(keyValuePair.Value.Count);
            }
            return new DataFrame(new List<DataFrameColumn> { keys, counts });
        }
 
        /// <inheritdoc/>
        public override bool HasDescription()
        {
            return this.IsNumericColumn() || typeof(T) == typeof(DateTime);
        }
 
        /// <summary>
        /// Returns a clone of this column.
        /// </summary>
        /// <param name="numberOfNullsToAppend"></param>
        /// <returns></returns>
        public new PrimitiveDataFrameColumn<T> Clone(long numberOfNullsToAppend = 0)
        {
            return (PrimitiveDataFrameColumn<T>)CloneImplementation(numberOfNullsToAppend);
        }
 
        /// <summary>
        /// Returns a clone of this column.
        /// </summary>
        /// <param name="mapIndices">A column who values are used as indices </param>
        /// <param name="invertMapIndices"></param>
        /// <param name="numberOfNullsToAppend"></param>
        /// <returns></returns>
        public new PrimitiveDataFrameColumn<T> Clone(DataFrameColumn mapIndices, bool invertMapIndices = false, long numberOfNullsToAppend = 0)
        {
            return (PrimitiveDataFrameColumn<T>)CloneImplementation(mapIndices, invertMapIndices, numberOfNullsToAppend);
        }
 
        /// <inheritdoc/>
        protected override DataFrameColumn CloneImplementation(DataFrameColumn mapIndices, bool invertMapIndices, long numberOfNullsToAppend)
        {
            PrimitiveDataFrameColumn<T> clone;
            if (!(mapIndices is null))
            {
                Type dataType = mapIndices.DataType;
                if (dataType != typeof(long) && dataType != typeof(int) && dataType != typeof(bool))
                    throw new ArgumentException(String.Format(Strings.MultipleMismatchedValueType, typeof(long), typeof(int), typeof(bool)), nameof(mapIndices));
                if (dataType == typeof(long))
                    clone = Clone(mapIndices as PrimitiveDataFrameColumn<long>, invertMapIndices);
                else if (dataType == typeof(int))
                    clone = Clone(mapIndices as PrimitiveDataFrameColumn<int>, invertMapIndices);
                else
                    clone = CloneImplementation(mapIndices as PrimitiveDataFrameColumn<bool>);
 
                if (numberOfNullsToAppend != 0)
                    clone.AppendMany(null, numberOfNullsToAppend);
            }
            else
            {
                clone = Clone();
            }
 
            return clone;
        }
 
        protected override DataFrameColumn CloneImplementation(long numberOfNullsToAppend)
        {
            var newColumnContainer = _columnContainer.Clone();
            var clone = CreateNewColumn(Name, newColumnContainer);
 
            if (numberOfNullsToAppend != 0)
                clone.AppendMany(null, numberOfNullsToAppend);
 
            return clone;
        }
 
        private PrimitiveDataFrameColumn<T> CloneImplementation(PrimitiveDataFrameColumn<bool> boolColumn)
        {
            if (boolColumn.Length > Length)
                throw new ArgumentException(Strings.MapIndicesExceedsColumnLength, nameof(boolColumn));
            PrimitiveDataFrameColumn<T> ret = CreateNewColumn(Name);
 
            for (long i = 0; i < boolColumn.Length; i++)
            {
                bool? value = boolColumn[i];
                if (value.HasValue && value.Value == true)
                    ret.Append(this[i]);
            }
            return ret;
        }
 
        private PrimitiveDataFrameColumn<T> CloneImplementation<U>(PrimitiveDataFrameColumn<U> mapIndices, bool invertMapIndices = false)
            where U : unmanaged
        {
            if (!mapIndices.IsNumericColumn())
                throw new ArgumentException(String.Format(Strings.MismatchedValueType, Strings.NumericColumnType), nameof(mapIndices));
 
            PrimitiveColumnContainer<T> retContainer;
            if (mapIndices.DataType == typeof(long))
            {
                retContainer = _columnContainer.Clone(mapIndices._columnContainer, typeof(long), invertMapIndices);
            }
            else if (mapIndices.DataType == typeof(int))
            {
                retContainer = _columnContainer.Clone(mapIndices._columnContainer, typeof(int), invertMapIndices);
            }
            else
                throw new NotImplementedException();
 
            PrimitiveDataFrameColumn<T> ret = CreateNewColumn(Name, retContainer);
            return ret;
        }
 
        public PrimitiveDataFrameColumn<T> Clone(PrimitiveDataFrameColumn<long> mapIndices, bool invertMapIndices = false)
        {
            if (mapIndices is null)
                return Clone();
 
            return CloneImplementation(mapIndices, invertMapIndices);
        }
 
        public PrimitiveDataFrameColumn<T> Clone(PrimitiveDataFrameColumn<int> mapIndices, bool invertMapIndices = false)
        {
            if (mapIndices is null)
                return Clone();
 
            return CloneImplementation(mapIndices, invertMapIndices);
        }
 
        public PrimitiveDataFrameColumn<T> Clone(IEnumerable<long> mapIndices)
        {
            IEnumerator<long> rows = mapIndices.GetEnumerator();
            PrimitiveDataFrameColumn<T> ret = CreateNewColumn(Name);
            long numberOfRows = 0;
            while (rows.MoveNext() && numberOfRows < Length)
            {
                numberOfRows++;
                var curRow = rows.Current;
                var value = _columnContainer[curRow];
                ret.Append(value);
            }
            return ret;
        }
 
        public PrimitiveDataFrameColumn<T> Clone(IEnumerable<int> mapIndices)
        {
            return Clone(mapIndices.Select(x => (long)x));
        }
 
        internal BooleanDataFrameColumn CloneAsBooleanColumn()
        {
            PrimitiveColumnContainer<bool> newColumnContainer = _columnContainer.CloneAsBoolContainer();
            return new BooleanDataFrameColumn(Name, newColumnContainer);
        }
 
        internal ByteDataFrameColumn CloneAsByteColumn()
        {
            PrimitiveColumnContainer<byte> newColumnContainer = _columnContainer.CloneAsByteContainer();
            return new ByteDataFrameColumn(Name, newColumnContainer);
        }
 
        internal SByteDataFrameColumn CloneAsSByteColumn()
        {
            PrimitiveColumnContainer<sbyte> newColumnContainer = _columnContainer.CloneAsSByteContainer();
            return new SByteDataFrameColumn(Name, newColumnContainer);
        }
 
        internal DoubleDataFrameColumn CloneAsDoubleColumn()
        {
            PrimitiveColumnContainer<double> newColumnContainer = _columnContainer.CloneAsDoubleContainer();
            return new DoubleDataFrameColumn(Name, newColumnContainer);
        }
 
        internal DecimalDataFrameColumn CloneAsDecimalColumn()
        {
            PrimitiveColumnContainer<decimal> newColumnContainer = _columnContainer.CloneAsDecimalContainer();
            return new DecimalDataFrameColumn(Name, newColumnContainer);
        }
 
        internal Int16DataFrameColumn CloneAsInt16Column()
        {
            PrimitiveColumnContainer<short> newColumnContainer = _columnContainer.CloneAsShortContainer();
            return new Int16DataFrameColumn(Name, newColumnContainer);
        }
 
 
        internal UInt16DataFrameColumn CloneAsUInt16Column()
        {
            PrimitiveColumnContainer<ushort> newColumnContainer = _columnContainer.CloneAsUShortContainer();
            return new UInt16DataFrameColumn(Name, newColumnContainer);
        }
 
        internal Int32DataFrameColumn CloneAsInt32Column()
        {
            PrimitiveColumnContainer<int> newColumnContainer = _columnContainer.CloneAsIntContainer();
            return new Int32DataFrameColumn(Name, newColumnContainer);
        }
 
        internal UInt32DataFrameColumn CloneAsUInt32Column()
        {
            PrimitiveColumnContainer<uint> newColumnContainer = _columnContainer.CloneAsUIntContainer();
            return new UInt32DataFrameColumn(Name, newColumnContainer);
        }
 
        internal Int64DataFrameColumn CloneAsInt64Column()
        {
            PrimitiveColumnContainer<long> newColumnContainer = _columnContainer.CloneAsLongContainer();
            return new Int64DataFrameColumn(Name, newColumnContainer);
        }
 
        internal UInt64DataFrameColumn CloneAsUInt64Column()
        {
            PrimitiveColumnContainer<ulong> newColumnContainer = _columnContainer.CloneAsULongContainer();
            return new UInt64DataFrameColumn(Name, newColumnContainer);
        }
 
        internal SingleDataFrameColumn CloneAsSingleColumn()
        {
            PrimitiveColumnContainer<float> newColumnContainer = _columnContainer.CloneAsFloatContainer();
            return new SingleDataFrameColumn(Name, newColumnContainer);
        }
 
        /// <inheritdoc/>
        public override GroupBy GroupBy(int columnIndex, DataFrame parent)
        {
            Dictionary<T, ICollection<long>> dictionary = GroupColumnValues<T>(out HashSet<long> _);
            return new GroupBy<T>(parent, columnIndex, dictionary);
        }
 
        public override Dictionary<TKey, ICollection<long>> GroupColumnValues<TKey>(out HashSet<long> nullIndices)
        {
            if (typeof(TKey) == typeof(T))
            {
                Dictionary<T, ICollection<long>> multimap = new Dictionary<T, ICollection<long>>(EqualityComparer<T>.Default);
                nullIndices = new HashSet<long>();
                for (int b = 0; b < _columnContainer.Buffers.Count; b++)
                {
                    ReadOnlyDataFrameBuffer<T> buffer = _columnContainer.Buffers[b];
                    ReadOnlySpan<T> readOnlySpan = buffer.ReadOnlySpan;
                    ReadOnlySpan<byte> nullBitMapSpan = _columnContainer.NullBitMapBuffers[b].ReadOnlySpan;
                    long previousLength = b * ReadOnlyDataFrameBuffer<T>.MaxCapacity;
                    for (int i = 0; i < readOnlySpan.Length; i++)
                    {
                        long currentLength = i + previousLength;
                        if (BitUtility.IsValid(nullBitMapSpan, i))
                        {
                            bool containsKey = multimap.TryGetValue(readOnlySpan[i], out ICollection<long> values);
                            if (containsKey)
                            {
                                values.Add(currentLength);
                            }
                            else
                            {
                                multimap.Add(readOnlySpan[i], new List<long>() { currentLength });
                            }
                        }
                        else
                        {
                            nullIndices.Add(currentLength);
                        }
                    }
                }
                return multimap as Dictionary<TKey, ICollection<long>>;
            }
            else
            {
                throw new NotImplementedException(nameof(TKey));
            }
        }
 
        /// <summary>
        /// Applies a function to all column values in place.
        /// </summary>
        /// <param name="func">The function to apply</param>
        [Obsolete("Method is obsolete, use Apply(Func<T, T> func, bool inPlace = false) instead")]
        public void ApplyElementwise(Func<T?, long, T?> func) => _columnContainer.ApplyElementwise(func);
 
        /// <summary>
        /// Applies a function to all values in the column, that are not null.
        /// </summary>
        /// <param name="func">The function to apply.</param>
        /// /// <param name="inPlace">A boolean flag to indicate if the operation should be in place.</param>
        /// <returns>A new <see cref="PrimitiveDataFrameColumn{T}"/> if <paramref name="inPlace"/> is not set. Returns this column otherwise.</returns>
        public PrimitiveDataFrameColumn<T> Apply(Func<T, T> func, bool inPlace = false)
        {
            var column = inPlace ? this : this.Clone();
            column.ColumnContainer.Apply(func);
            return column;
        }
 
        /// <summary>
        /// Applies a function to all column values.
        /// </summary>
        /// <typeparam name="TResult">The new column's type</typeparam>
        /// <param name="func">The function to apply</param>
        /// <returns>A new PrimitiveDataFrameColumn containing the new values</returns>
        [Obsolete("Method is obsolete, use Apply(Func<T, T> func, bool inPlace = false) instead")]
        public PrimitiveDataFrameColumn<TResult> Apply<TResult>(Func<T?, TResult?> func) where TResult : unmanaged
        {
            var resultColumn = new PrimitiveDataFrameColumn<TResult>("Result", Length);
            _columnContainer.Apply(func, resultColumn._columnContainer);
            return resultColumn;
        }
 
        /// <summary>
        /// Clamps values beyond the specified thresholds
        /// </summary>
        /// <param name="min">Minimum value. All values below this threshold will be set to it</param>
        /// <param name="max">Maximum value. All values above this threshold will be set to it</param>
        /// <param name="inPlace">Indicates if the operation should be performed in place</param>
        public PrimitiveDataFrameColumn<T> Clamp(T min, T max, bool inPlace = false)
        {
            PrimitiveDataFrameColumn<T> ret = inPlace ? this : Clone();
 
            Comparer<T> comparer = Comparer<T>.Default;
            for (long i = 0; i < ret.Length; i++)
            {
                T? value = ret[i];
                if (value == null)
                    continue;
 
                if (comparer.Compare(value.Value, min) < 0)
                    ret[i] = min;
 
                if (comparer.Compare(value.Value, max) > 0)
                    ret[i] = max;
            }
            return ret;
        }
 
        protected override DataFrameColumn ClampImplementation<U>(U min, U max, bool inPlace)
        {
            object convertedLower = Convert.ChangeType(min, typeof(T));
            if (typeof(T) == typeof(U) || convertedLower != null)
                return Clamp((T)convertedLower, (T)Convert.ChangeType(max, typeof(T)), inPlace);
            else
                throw new ArgumentException(string.Format(Strings.MismatchedValueType, typeof(T)), nameof(U));
        }
 
        /// <summary>
        /// Returns a new column filtered by the lower and upper bounds
        /// </summary>
        /// <param name="min">The minimum value in the resulting column</param>
        /// <param name="max">The maximum value in the resulting column</param>
        public PrimitiveDataFrameColumn<T> Filter(T min, T max)
        {
            PrimitiveDataFrameColumn<T> ret = new PrimitiveDataFrameColumn<T>(Name);
            Comparer<T> comparer = Comparer<T>.Default;
            for (long i = 0; i < Length; i++)
            {
                T? value = this[i];
                if (value == null)
                    continue;
 
                if (comparer.Compare(value.Value, min) >= 0 && comparer.Compare(value.Value, max) <= 0)
                    ret.Append(value);
            }
            return ret;
        }
 
        protected override DataFrameColumn FilterImplementation<U>(U min, U max)
        {
            object convertedLower = Convert.ChangeType(min, typeof(T));
            if (typeof(T) == typeof(U) || convertedLower != null)
                return Filter((T)convertedLower, (T)Convert.ChangeType(max, typeof(T)));
            else
                throw new ArgumentException(string.Format(Strings.MismatchedValueType, typeof(T)), nameof(U));
        }
 
        public override DataFrameColumn Description()
        {
            float? max;
            float? min;
            float? mean;
            try
            {
                max = (float)Convert.ChangeType(Max(), typeof(float));
            }
            catch (Exception)
            {
                max = null;
            }
            try
            {
                min = (float)Convert.ChangeType(Min(), typeof(float));
            }
            catch (Exception)
            {
                min = null;
            }
            try
            {
                mean = (float)Convert.ChangeType(Sum(), typeof(float)) / Length;
            }
            catch (Exception)
            {
                mean = null;
            }
            PrimitiveDataFrameColumn<float> column = new PrimitiveDataFrameColumn<float>(Name);
            column.Append(Length - NullCount);
            column.Append(max);
            column.Append(min);
            column.Append(mean);
            return column;
        }
 
        protected internal override void AddDataViewColumn(DataViewSchema.Builder builder)
        {
            builder.AddColumn(Name, GetDataViewType());
        }
 
        private static DataViewType GetDataViewType()
        {
            if (typeof(T) == typeof(bool))
            {
                return BooleanDataViewType.Instance;
            }
            else if (typeof(T) == typeof(byte))
            {
                return NumberDataViewType.Byte;
            }
            else if (typeof(T) == typeof(double))
            {
                return NumberDataViewType.Double;
            }
            else if (typeof(T) == typeof(DateTime))
            {
                return DateTimeDataViewType.Instance;
            }
            else if (typeof(T) == typeof(float))
            {
                return NumberDataViewType.Single;
            }
            else if (typeof(T) == typeof(int))
            {
                return NumberDataViewType.Int32;
            }
            else if (typeof(T) == typeof(long))
            {
                return NumberDataViewType.Int64;
            }
            else if (typeof(T) == typeof(sbyte))
            {
                return NumberDataViewType.SByte;
            }
            else if (typeof(T) == typeof(short))
            {
                return NumberDataViewType.Int16;
            }
            else if (typeof(T) == typeof(uint))
            {
                return NumberDataViewType.UInt32;
            }
            else if (typeof(T) == typeof(ulong))
            {
                return NumberDataViewType.UInt64;
            }
            else if (typeof(T) == typeof(ushort))
            {
                return NumberDataViewType.UInt16;
            }
            // The following 2 implementations are not ideal, but IDataView doesn't support
            // these types
            else if (typeof(T) == typeof(char))
            {
                return NumberDataViewType.UInt16;
            }
            else if (typeof(T) == typeof(decimal))
            {
                return NumberDataViewType.Double;
            }
 
            throw new NotSupportedException("Type is " + typeof(T).Name);
        }
 
        protected internal override Delegate GetDataViewGetter(DataViewRowCursor cursor)
        {
            // special cases for types that have NA values
            if (typeof(T) == typeof(float))
            {
                return CreateSingleValueGetterDelegate(cursor, (PrimitiveDataFrameColumn<float>)(object)this);
            }
            else if (typeof(T) == typeof(double))
            {
                return CreateDoubleValueGetterDelegate(cursor, (PrimitiveDataFrameColumn<double>)(object)this);
            }
            // special cases for types not supported
            else if (typeof(T) == typeof(char))
            {
                return CreateCharValueGetterDelegate(cursor, (PrimitiveDataFrameColumn<char>)(object)this);
            }
            else if (typeof(T) == typeof(decimal))
            {
                return CreateDecimalValueGetterDelegate(cursor, (PrimitiveDataFrameColumn<decimal>)(object)this);
            }
            return CreateValueGetterDelegate(cursor);
        }
 
        private ValueGetter<T> CreateValueGetterDelegate(DataViewRowCursor cursor) =>
            (ref T value) => value = this[cursor.Position].GetValueOrDefault();
 
        private static ValueGetter<float> CreateSingleValueGetterDelegate(DataViewRowCursor cursor, PrimitiveDataFrameColumn<float> column) =>
            (ref float value) => value = column[cursor.Position] ?? float.NaN;
 
        private static ValueGetter<double> CreateDoubleValueGetterDelegate(DataViewRowCursor cursor, PrimitiveDataFrameColumn<double> column) =>
            (ref double value) => value = column[cursor.Position] ?? double.NaN;
 
        private static ValueGetter<ushort> CreateCharValueGetterDelegate(DataViewRowCursor cursor, PrimitiveDataFrameColumn<char> column) =>
            (ref ushort value) => value = column[cursor.Position].GetValueOrDefault();
 
        private static ValueGetter<double> CreateDecimalValueGetterDelegate(DataViewRowCursor cursor, PrimitiveDataFrameColumn<decimal> column) =>
            (ref double value) => value = (double?)column[cursor.Position] ?? double.NaN;
 
        protected internal override void AddValueUsingCursor(DataViewRowCursor cursor, Delegate getter)
        {
            long row = cursor.Position;
            T value = default;
            Debug.Assert(getter != null, "Excepted getter to be valid");
            (getter as ValueGetter<T>)(ref value);
 
            if (Length > row)
            {
                this[row] = value;
            }
            else if (Length == row)
            {
                Append(value);
            }
            else
            {
                throw new IndexOutOfRangeException(nameof(row));
            }
        }
 
        protected internal override Delegate GetValueGetterUsingCursor(DataViewRowCursor cursor, DataViewSchema.Column schemaColumn)
        {
            return cursor.GetGetter<T>(schemaColumn);
        }
 
        public override Dictionary<long, ICollection<long>> GetGroupedOccurrences(DataFrameColumn other, out HashSet<long> otherColumnNullIndices)
        {
            return GetGroupedOccurrences<T>(other, out otherColumnNullIndices);
        }
 
        public override PrimitiveDataFrameColumn<bool> ElementwiseIsNull()
        {
            var ret = new BooleanDataFrameColumn(Name, Length);
 
            for (long i = 0; i < Length; i++)
            {
                ret[i] = !IsValid(i);
            }
 
            return ret;
        }
 
        public override PrimitiveDataFrameColumn<bool> ElementwiseIsNotNull()
        {
            var ret = new BooleanDataFrameColumn(Name, Length);
 
            for (long i = 0; i < Length; i++)
            {
                ret[i] = IsValid(i);
            }
 
            return ret;
        }
 
        internal DataFrameColumn HandleOperationImplementation<U>(BinaryOperation operation, PrimitiveDataFrameColumn<U> column, bool inPlace)
            where U : unmanaged
        {
            if (column.Length != Length)
            {
                throw new ArgumentException(Strings.MismatchedColumnLengths, nameof(column));
            }
            switch (typeof(T))
            {
                case Type boolType when boolType == typeof(bool):
                    if (typeof(U) == typeof(bool))
                    {
                        PrimitiveDataFrameColumn<U> primitiveColumn = this as PrimitiveDataFrameColumn<U>;
                        var newColumn = inPlace ? primitiveColumn : primitiveColumn.Clone();
                        newColumn._columnContainer.HandleOperation(operation, column._columnContainer);
                        return newColumn;
                    }
                    throw new NotSupportedException();
                case Type decimalType when decimalType == typeof(decimal):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<U> primitiveColumn = this as PrimitiveDataFrameColumn<U>;
                        PrimitiveDataFrameColumn<U> newColumn = inPlace ? primitiveColumn : primitiveColumn.Clone();
                        newColumn._columnContainer.HandleOperation(operation, column._columnContainer);
                        return newColumn;
                    }
                    else
                    {
                        if (inPlace)
                        {
                            throw new ArgumentException(string.Format(Strings.MismatchedColumnValueType, typeof(T)), nameof(column));
                        }
                        PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                        decimalColumn._columnContainer.HandleOperation(operation, column.CloneAsDecimalColumn()._columnContainer);
                        return decimalColumn;
                    }
                case Type DateTimeType when DateTimeType == typeof(DateTime):
                    throw new NotSupportedException();
                case Type byteType when byteType == typeof(byte):
                case Type charType when charType == typeof(char):
                case Type doubleType when doubleType == typeof(double):
                case Type floatType when floatType == typeof(float):
                case Type intType when intType == typeof(int):
                case Type longType when longType == typeof(long):
                case Type sbyteType when sbyteType == typeof(sbyte):
                case Type shortType when shortType == typeof(short):
                case Type uintType when uintType == typeof(uint):
                case Type ulongType when ulongType == typeof(ulong):
                case Type ushortType when ushortType == typeof(ushort):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<U> primitiveColumn = this as PrimitiveDataFrameColumn<U>;
                        PrimitiveDataFrameColumn<U> newColumn = inPlace ? primitiveColumn : primitiveColumn.Clone();
                        newColumn._columnContainer.HandleOperation(operation, column._columnContainer);
                        return newColumn;
                    }
                    else
                    {
                        if (inPlace)
                        {
                            throw new ArgumentException(string.Format(Strings.MismatchedColumnValueType, typeof(T)), nameof(column));
                        }
                        if (typeof(U) == typeof(decimal))
                        {
                            PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                            decimalColumn._columnContainer.HandleOperation(operation, (column as PrimitiveDataFrameColumn<decimal>)._columnContainer);
                            return decimalColumn;
                        }
                        else
                        {
                            PrimitiveDataFrameColumn<double> doubleColumn = CloneAsDoubleColumn();
                            doubleColumn._columnContainer.HandleOperation(operation, column.CloneAsDoubleColumn()._columnContainer);
                            return doubleColumn;
                        }
                    }
                default:
                    throw new NotSupportedException();
            }
        }
 
        internal DataFrameColumn HandleOperationImplementation<U>(BinaryOperation operation, U value, bool inPlace)
        {
            switch (typeof(T))
            {
                case Type boolType when boolType == typeof(bool):
                    throw new NotSupportedException();
                case Type decimalType when decimalType == typeof(decimal):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<T> primitiveColumn = this;
                        PrimitiveDataFrameColumn<T> newColumn = inPlace ? primitiveColumn : primitiveColumn.Clone();
                        newColumn._columnContainer.HandleOperation(operation, Unsafe.As<U, T>(ref value));
                        return newColumn;
                    }
                    else
                    {
                        if (inPlace)
                        {
                            throw new ArgumentException(string.Format(Strings.MismatchedValueType, typeof(T)), nameof(value));
                        }
                        PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                        decimalColumn._columnContainer.HandleOperation(operation, DecimalConverter<U>.Instance.GetDecimal(value));
                        return decimalColumn;
                    }
                case Type DateTimeType when DateTimeType == typeof(DateTime):
                    throw new NotSupportedException();
                case Type byteType when byteType == typeof(byte):
                case Type charType when charType == typeof(char):
                case Type doubleType when doubleType == typeof(double):
                case Type floatType when floatType == typeof(float):
                case Type intType when intType == typeof(int):
                case Type longType when longType == typeof(long):
                case Type sbyteType when sbyteType == typeof(sbyte):
                case Type shortType when shortType == typeof(short):
                case Type uintType when uintType == typeof(uint):
                case Type ulongType when ulongType == typeof(ulong):
                case Type ushortType when ushortType == typeof(ushort):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<T> primitiveColumn = this;
                        PrimitiveDataFrameColumn<T> newColumn = inPlace ? primitiveColumn : primitiveColumn.Clone();
                        newColumn._columnContainer.HandleOperation(operation, Unsafe.As<U, T>(ref value));
                        return newColumn;
                    }
                    else
                    {
                        if (inPlace)
                        {
                            throw new ArgumentException(string.Format(Strings.MismatchedValueType, typeof(T)), nameof(value));
                        }
                        if (typeof(U) == typeof(decimal))
                        {
                            PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                            decimalColumn._columnContainer.HandleOperation(operation, DecimalConverter<U>.Instance.GetDecimal(value));
                            return decimalColumn;
                        }
                        else
                        {
                            PrimitiveDataFrameColumn<double> doubleColumn = CloneAsDoubleColumn();
                            doubleColumn._columnContainer.HandleOperation(operation, DoubleConverter<U>.Instance.GetDouble(value));
                            return doubleColumn;
                        }
                    }
                default:
                    throw new NotSupportedException();
            }
        }
 
        internal DataFrameColumn HandleReverseOperationImplementation<U>(BinaryOperation operation, U value, bool inPlace = false)
        {
            switch (typeof(T))
            {
                case Type boolType when boolType == typeof(bool):
                    throw new NotSupportedException();
                case Type decimalType when decimalType == typeof(decimal):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<T> newColumn = inPlace ? this : Clone();
                        newColumn._columnContainer.HandleReverseOperation(operation, Unsafe.As<U, T>(ref value));
                        return newColumn;
                    }
                    else
                    {
                        if (inPlace)
                        {
                            throw new ArgumentException(string.Format(Strings.MismatchedValueType, typeof(T)), nameof(value));
                        }
                        PrimitiveDataFrameColumn<decimal> clonedDecimalColumn = CloneAsDecimalColumn();
                        clonedDecimalColumn._columnContainer.HandleReverseOperation(operation, DecimalConverter<U>.Instance.GetDecimal(value));
                        return clonedDecimalColumn;
                    }
                case Type DateTimeType when DateTimeType == typeof(DateTime):
                    throw new NotSupportedException();
                case Type byteType when byteType == typeof(byte):
                case Type charType when charType == typeof(char):
                case Type doubleType when doubleType == typeof(double):
                case Type floatType when floatType == typeof(float):
                case Type intType when intType == typeof(int):
                case Type longType when longType == typeof(long):
                case Type sbyteType when sbyteType == typeof(sbyte):
                case Type shortType when shortType == typeof(short):
                case Type uintType when uintType == typeof(uint):
                case Type ulongType when ulongType == typeof(ulong):
                case Type ushortType when ushortType == typeof(ushort):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<T> newColumn = inPlace ? this : Clone();
                        newColumn._columnContainer.HandleReverseOperation(operation, Unsafe.As<U, T>(ref value));
                        return newColumn;
                    }
                    else
                    {
                        if (inPlace)
                        {
                            throw new ArgumentException(string.Format(Strings.MismatchedValueType, typeof(T)), nameof(value));
                        }
                        if (typeof(U) == typeof(decimal))
                        {
                            PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                            decimalColumn._columnContainer.HandleReverseOperation(operation, DecimalConverter<U>.Instance.GetDecimal(value));
                            return decimalColumn;
                        }
                        else
                        {
                            PrimitiveDataFrameColumn<double> clonedDoubleColumn = CloneAsDoubleColumn();
                            clonedDoubleColumn._columnContainer.HandleReverseOperation(operation, DoubleConverter<U>.Instance.GetDouble(value));
                            return clonedDoubleColumn;
                        }
                    }
                default:
                    throw new NotSupportedException();
            }
        }
 
        internal PrimitiveDataFrameColumn<bool> HandleBitwiseOperationImplementation<U>(BinaryOperation operation, U value, bool inPlace)
        {
            switch (typeof(T))
            {
                case Type boolType when boolType == typeof(bool):
                    if (typeof(U) != typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    PrimitiveDataFrameColumn<bool> typedColumn = this as PrimitiveDataFrameColumn<bool>;
                    PrimitiveDataFrameColumn<bool> retColumn = inPlace ? typedColumn : typedColumn.Clone();
                    retColumn._columnContainer.HandleOperation(operation, Unsafe.As<U, bool>(ref value));
                    return retColumn;
 
                default:
                    throw new NotSupportedException();
            }
        }
 
        internal DataFrameColumn HandleOperationImplementation(BinaryIntOperation operation, int value, bool inPlace)
        {
            switch (typeof(T))
            {
                case Type boolType when boolType == typeof(bool):
                    throw new NotSupportedException();
                case Type byteType when byteType == typeof(byte):
                    PrimitiveDataFrameColumn<byte> byteColumn = this as PrimitiveDataFrameColumn<byte>;
                    PrimitiveDataFrameColumn<byte> newbyteColumn = inPlace ? byteColumn : byteColumn.Clone();
                    newbyteColumn._columnContainer.HandleOperation(operation, value);
                    return newbyteColumn;
                case Type charType when charType == typeof(char):
                    PrimitiveDataFrameColumn<char> charColumn = this as PrimitiveDataFrameColumn<char>;
                    PrimitiveDataFrameColumn<char> newcharColumn = inPlace ? charColumn : charColumn.Clone();
                    newcharColumn._columnContainer.HandleOperation(operation, value);
                    return newcharColumn;
                case Type decimalType when decimalType == typeof(decimal):
                    throw new NotSupportedException();
                case Type doubleType when doubleType == typeof(double):
                    throw new NotSupportedException();
                case Type floatType when floatType == typeof(float):
                    throw new NotSupportedException();
                case Type intType when intType == typeof(int):
                    PrimitiveDataFrameColumn<int> intColumn = this as PrimitiveDataFrameColumn<int>;
                    PrimitiveDataFrameColumn<int> newintColumn = inPlace ? intColumn : intColumn.Clone();
                    newintColumn._columnContainer.HandleOperation(operation, value);
                    return newintColumn;
                case Type longType when longType == typeof(long):
                    PrimitiveDataFrameColumn<long> longColumn = this as PrimitiveDataFrameColumn<long>;
                    PrimitiveDataFrameColumn<long> newlongColumn = inPlace ? longColumn : longColumn.Clone();
                    newlongColumn._columnContainer.HandleOperation(operation, value);
                    return newlongColumn;
                case Type sbyteType when sbyteType == typeof(sbyte):
                    PrimitiveDataFrameColumn<sbyte> sbyteColumn = this as PrimitiveDataFrameColumn<sbyte>;
                    PrimitiveDataFrameColumn<sbyte> newsbyteColumn = inPlace ? sbyteColumn : sbyteColumn.Clone();
                    newsbyteColumn._columnContainer.HandleOperation(operation, value);
                    return newsbyteColumn;
                case Type shortType when shortType == typeof(short):
                    PrimitiveDataFrameColumn<short> shortColumn = this as PrimitiveDataFrameColumn<short>;
                    PrimitiveDataFrameColumn<short> newshortColumn = inPlace ? shortColumn : shortColumn.Clone();
                    newshortColumn._columnContainer.HandleOperation(operation, value);
                    return newshortColumn;
                case Type uintType when uintType == typeof(uint):
                    PrimitiveDataFrameColumn<uint> uintColumn = this as PrimitiveDataFrameColumn<uint>;
                    PrimitiveDataFrameColumn<uint> newuintColumn = inPlace ? uintColumn : uintColumn.Clone();
                    newuintColumn._columnContainer.HandleOperation(operation, value);
                    return newuintColumn;
                case Type ulongType when ulongType == typeof(ulong):
                    PrimitiveDataFrameColumn<ulong> ulongColumn = this as PrimitiveDataFrameColumn<ulong>;
                    PrimitiveDataFrameColumn<ulong> newulongColumn = inPlace ? ulongColumn : ulongColumn.Clone();
                    newulongColumn._columnContainer.HandleOperation(operation, value);
                    return newulongColumn;
                case Type ushortType when ushortType == typeof(ushort):
                    PrimitiveDataFrameColumn<ushort> ushortColumn = this as PrimitiveDataFrameColumn<ushort>;
                    PrimitiveDataFrameColumn<ushort> newushortColumn = inPlace ? ushortColumn : ushortColumn.Clone();
                    newushortColumn._columnContainer.HandleOperation(operation, value);
                    return newushortColumn;
                case Type DateTimeType when DateTimeType == typeof(DateTime):
                    throw new NotSupportedException();
                default:
                    throw new NotSupportedException();
            }
        }
 
        internal PrimitiveDataFrameColumn<bool> HandleOperationImplementation<U>(ComparisonOperation operation, PrimitiveDataFrameColumn<U> column)
            where U : unmanaged
        {
            if (column.Length != Length)
            {
                throw new ArgumentException(Strings.MismatchedColumnLengths, nameof(column));
            }
            switch (typeof(T))
            {
                case Type boolType when boolType == typeof(bool):
                    if (typeof(U) != typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    return new BooleanDataFrameColumn(Name, (this as PrimitiveDataFrameColumn<U>)._columnContainer.HandleOperation(operation, column._columnContainer));
                case Type decimalType when decimalType == typeof(decimal):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<U> primitiveColumn = this as PrimitiveDataFrameColumn<U>;
                        return new BooleanDataFrameColumn(Name, primitiveColumn._columnContainer.HandleOperation(operation, column._columnContainer));
                    }
                    else
                    {
                        PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                        return new BooleanDataFrameColumn(Name, decimalColumn._columnContainer.HandleOperation(operation, column.CloneAsDecimalColumn()._columnContainer));
                    }
                case Type DateTimeType when DateTimeType == typeof(DateTime):
                    if (typeof(U) != typeof(DateTime))
                    {
                        throw new NotSupportedException();
                    }
                    return new BooleanDataFrameColumn(Name, (this as PrimitiveDataFrameColumn<U>)._columnContainer.HandleOperation(operation, column._columnContainer));
                case Type byteType when byteType == typeof(byte):
                case Type charType when charType == typeof(char):
                case Type doubleType when doubleType == typeof(double):
                case Type floatType when floatType == typeof(float):
                case Type intType when intType == typeof(int):
                case Type longType when longType == typeof(long):
                case Type sbyteType when sbyteType == typeof(sbyte):
                case Type shortType when shortType == typeof(short):
                case Type uintType when uintType == typeof(uint):
                case Type ulongType when ulongType == typeof(ulong):
                case Type ushortType when ushortType == typeof(ushort):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<U> primitiveColumn = this as PrimitiveDataFrameColumn<U>;
                        return new BooleanDataFrameColumn(Name, primitiveColumn._columnContainer.HandleOperation(operation, column._columnContainer));
                    }
                    else
                    {
                        if (typeof(U) == typeof(decimal))
                        {
                            PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                            return new BooleanDataFrameColumn(Name, decimalColumn._columnContainer.HandleOperation(operation, (column as PrimitiveDataFrameColumn<decimal>)._columnContainer));
                        }
                        else
                        {
                            PrimitiveDataFrameColumn<double> doubleColumn = CloneAsDoubleColumn();
                            return new BooleanDataFrameColumn(Name, doubleColumn._columnContainer.HandleOperation(operation, column.CloneAsDoubleColumn()._columnContainer));
                        }
                    }
                default:
                    throw new NotSupportedException();
            }
        }
 
        internal PrimitiveDataFrameColumn<bool> HandleOperationImplementation<U>(ComparisonOperation operation, U value)
        {
            switch (typeof(T))
            {
                case Type boolType when boolType == typeof(bool):
                    if (typeof(U) != typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    return new BooleanDataFrameColumn(Name, (this as PrimitiveDataFrameColumn<bool>)._columnContainer.HandleOperation(operation, Unsafe.As<U, bool>(ref value)));
                case Type decimalType when decimalType == typeof(decimal):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<T> primitiveColumn = this;
                        return new BooleanDataFrameColumn(Name, primitiveColumn._columnContainer.HandleOperation(operation, Unsafe.As<U, T>(ref value)));
                    }
                    else
                    {
                        PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                        return new BooleanDataFrameColumn(Name, decimalColumn._columnContainer.HandleOperation(operation, DecimalConverter<U>.Instance.GetDecimal(value)));
                    }
                case Type DateTimeType when DateTimeType == typeof(DateTime):
                    if (typeof(U) != typeof(DateTime))
                    {
                        throw new NotSupportedException();
                    }
                    return new BooleanDataFrameColumn(Name, (this as PrimitiveDataFrameColumn<DateTime>)._columnContainer.HandleOperation(operation, Unsafe.As<U, DateTime>(ref value)));
                case Type byteType when byteType == typeof(byte):
                case Type charType when charType == typeof(char):
                case Type doubleType when doubleType == typeof(double):
                case Type floatType when floatType == typeof(float):
                case Type intType when intType == typeof(int):
                case Type longType when longType == typeof(long):
                case Type sbyteType when sbyteType == typeof(sbyte):
                case Type shortType when shortType == typeof(short):
                case Type uintType when uintType == typeof(uint):
                case Type ulongType when ulongType == typeof(ulong):
                case Type ushortType when ushortType == typeof(ushort):
                    if (typeof(U) == typeof(bool))
                    {
                        throw new NotSupportedException();
                    }
                    if (typeof(U) == typeof(T))
                    {
                        // No conversions
                        PrimitiveDataFrameColumn<T> primitiveColumn = this;
                        return new BooleanDataFrameColumn(Name, primitiveColumn._columnContainer.HandleOperation(operation, Unsafe.As<U, T>(ref value)));
                    }
                    else
                    {
                        if (typeof(U) == typeof(decimal))
                        {
                            PrimitiveDataFrameColumn<decimal> decimalColumn = CloneAsDecimalColumn();
                            return new BooleanDataFrameColumn(Name, decimalColumn._columnContainer.HandleOperation(operation, DecimalConverter<U>.Instance.GetDecimal(value)));
                        }
                        else
                        {
                            PrimitiveDataFrameColumn<double> doubleColumn = CloneAsDoubleColumn();
                            return new BooleanDataFrameColumn(Name, doubleColumn._columnContainer.HandleOperation(operation, DoubleConverter<U>.Instance.GetDouble(value)));
                        }
                    }
                default:
                    throw new NotSupportedException();
            }
        }
    }
}