File: ArrowIntegrationTests.cs
Web Access
Project: src\test\Microsoft.Data.Analysis.Tests\Microsoft.Data.Analysis.Tests.csproj (Microsoft.Data.Analysis.Tests)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using Microsoft.ML.TestFramework;
using Xunit;
using Xunit.Abstractions;
 
namespace Microsoft.Data.Analysis.Tests
{
    public class ArrowIntegrationTests : BaseTestClass
    {
        public ArrowIntegrationTests(ITestOutputHelper output) : base(output, true)
        {
        }
 
        [Fact]
        public void TestArrowIntegration()
        {
            RecordBatch originalBatch = new RecordBatch.Builder()
                .Append("Column1", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, 10))))
                .Append("Column2", true, new Int32Array(
                    valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(),
                    nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(),
                    length: 10,
                    nullCount: 1,
                    offset: 0))
                .Append("Column3", true, new Int32Array(
                    valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(),
                    nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0x00).Append(0x00).Build(),
                    length: 10,
                    nullCount: 10,
                    offset: 0))
                .Append("NullableBooleanColumn", true, new BooleanArray(
                    valueBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(),
                    nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xed).Append(0xff).Build(),
                    length: 10,
                    nullCount: 2,
                    offset: 0))
                .Append("StringDataFrameColumn", false, new StringArray.Builder().AppendRange(Enumerable.Range(0, 10).Select(x => x.ToString())).Build())
                .Append("DoubleColumn", false, new DoubleArray.Builder().AppendRange(Enumerable.Repeat(1.0, 10)).Build())
                .Append("FloatColumn", false, new FloatArray.Builder().AppendRange(Enumerable.Repeat(1.0f, 10)).Build())
                .Append("ShortColumn", false, new Int16Array.Builder().AppendRange(Enumerable.Repeat((short)1, 10)).Build())
                .Append("LongColumn", false, new Int64Array.Builder().AppendRange(Enumerable.Repeat((long)1, 10)).Build())
                .Append("UIntColumn", false, new UInt32Array.Builder().AppendRange(Enumerable.Repeat((uint)1, 10)).Build())
                .Append("UShortColumn", false, new UInt16Array.Builder().AppendRange(Enumerable.Repeat((ushort)1, 10)).Build())
                .Append("ULongColumn", false, new UInt64Array.Builder().AppendRange(Enumerable.Repeat((ulong)1, 10)).Build())
                .Append("ByteColumn", false, new Int8Array.Builder().AppendRange(Enumerable.Repeat((sbyte)1, 10)).Build())
                .Append("UByteColumn", false, new UInt8Array.Builder().AppendRange(Enumerable.Repeat((byte)1, 10)).Build())
                .Append("Date64Column", false, new Date64Array.Builder().AppendRange(Enumerable.Repeat(DateTime.Now, 10)).Build())
                .Build();
 
            DataFrame df = DataFrame.FromArrowRecordBatch(originalBatch);
            DataFrameIOTests.VerifyColumnTypes(df, testArrowStringColumn: true);
 
            IEnumerable<RecordBatch> recordBatches = df.ToArrowRecordBatches();
 
            foreach (RecordBatch batch in recordBatches)
            {
                RecordBatchComparer.CompareBatches(originalBatch, batch);
                batch.Dispose();
            }
            originalBatch.Dispose();
        }
 
        [Fact]
        public void TestRecordBatchWithStructArrays()
        {
            RecordBatch CreateRecordBatch(string prependColumnNamesWith = "")
            {
                RecordBatch ret = new RecordBatch.Builder()
                    .Append(prependColumnNamesWith + "Column1", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, 10))))
                    .Append(prependColumnNamesWith + "Column2", true, new Int32Array(
                        valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(),
                        nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(),
                        length: 10,
                        nullCount: 1,
                        offset: 0))
                    .Append(prependColumnNamesWith + "Column3", true, new Int32Array(
                        valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(),
                        nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0x00).Append(0x00).Build(),
                        length: 10,
                        nullCount: 10,
                        offset: 0))
                    .Append(prependColumnNamesWith + "NullableBooleanColumn", true, new BooleanArray(
                        valueBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(),
                        nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xed).Append(0xff).Build(),
                        length: 10,
                        nullCount: 2,
                        offset: 0))
                    .Append(prependColumnNamesWith + "StringDataFrameColumn", false, new StringArray.Builder().AppendRange(Enumerable.Range(0, 10).Select(x => x.ToString())).Build())
                    .Append(prependColumnNamesWith + "DoubleColumn", false, new DoubleArray.Builder().AppendRange(Enumerable.Repeat(1.0, 10)).Build())
                    .Append(prependColumnNamesWith + "FloatColumn", false, new FloatArray.Builder().AppendRange(Enumerable.Repeat(1.0f, 10)).Build())
                    .Append(prependColumnNamesWith + "ShortColumn", false, new Int16Array.Builder().AppendRange(Enumerable.Repeat((short)1, 10)).Build())
                    .Append(prependColumnNamesWith + "LongColumn", false, new Int64Array.Builder().AppendRange(Enumerable.Repeat((long)1, 10)).Build())
                    .Append(prependColumnNamesWith + "UIntColumn", false, new UInt32Array.Builder().AppendRange(Enumerable.Repeat((uint)1, 10)).Build())
                    .Append(prependColumnNamesWith + "UShortColumn", false, new UInt16Array.Builder().AppendRange(Enumerable.Repeat((ushort)1, 10)).Build())
                    .Append(prependColumnNamesWith + "ULongColumn", false, new UInt64Array.Builder().AppendRange(Enumerable.Repeat((ulong)1, 10)).Build())
                    .Append(prependColumnNamesWith + "ByteColumn", false, new Int8Array.Builder().AppendRange(Enumerable.Repeat((sbyte)1, 10)).Build())
                    .Append(prependColumnNamesWith + "UByteColumn", false, new UInt8Array.Builder().AppendRange(Enumerable.Repeat((byte)1, 10)).Build())
                    .Build();
 
                return ret;
            }
 
            using RecordBatch originalBatch = CreateRecordBatch();
            ArrowBuffer.BitmapBuilder validityBitmapBuilder = new ArrowBuffer.BitmapBuilder();
            for (int i = 0; i < originalBatch.Length; i++)
            {
                validityBitmapBuilder.Append(true);
            }
            ArrowBuffer validityBitmap = validityBitmapBuilder.Build();
 
            StructType structType = new StructType(originalBatch.Schema.FieldsList);
            StructArray structArray = new StructArray(structType, originalBatch.Length, originalBatch.Arrays.Cast<Apache.Arrow.Array>(), validityBitmap);
            Schema schema = new Schema.Builder().Field(new Field("Struct", structType, false)).Build();
            using RecordBatch recordBatch = new RecordBatch(schema, new[] { structArray }, originalBatch.Length);
 
            DataFrame df = DataFrame.FromArrowRecordBatch(recordBatch);
            DataFrameIOTests.VerifyColumnTypes(df, testArrowStringColumn: true);
 
            IEnumerable<RecordBatch> recordBatches = df.ToArrowRecordBatches();
 
            using RecordBatch expected = CreateRecordBatch("Struct_");
            foreach (RecordBatch batch in recordBatches)
            {
                RecordBatchComparer.CompareBatches(expected, batch);
                batch.Dispose();
            }
        }
 
        [Fact]
        public async Task TestEmptyDataFrameRecordBatch()
        {
            PrimitiveDataFrameColumn<int> ageColumn = new PrimitiveDataFrameColumn<int>("Age");
            PrimitiveDataFrameColumn<int> lengthColumn = new PrimitiveDataFrameColumn<int>("CharCount");
            ArrowStringDataFrameColumn stringColumn = new ArrowStringDataFrameColumn("Empty");
            DataFrame df = new DataFrame(new List<DataFrameColumn>() { ageColumn, lengthColumn, stringColumn });
 
            IEnumerable<RecordBatch> recordBatches = df.ToArrowRecordBatches();
            bool foundARecordBatch = false;
            foreach (RecordBatch recordBatch in recordBatches)
            {
                foundARecordBatch = true;
                using MemoryStream stream = new MemoryStream();
                using ArrowStreamWriter writer = new ArrowStreamWriter(stream, recordBatch.Schema);
                await writer.WriteRecordBatchAsync(recordBatch);
 
                stream.Position = 0;
                using ArrowStreamReader reader = new ArrowStreamReader(stream);
                RecordBatch readRecordBatch = reader.ReadNextRecordBatch();
                while (readRecordBatch != null)
                {
                    RecordBatchComparer.CompareBatches(recordBatch, readRecordBatch);
                    readRecordBatch.Dispose();
                    readRecordBatch = reader.ReadNextRecordBatch();
                }
                recordBatch.Dispose();
            }
            Assert.True(foundARecordBatch);
        }
 
        [Fact]
        public void TestMutationOnArrowColumn()
        {
            using RecordBatch originalBatch = new RecordBatch.Builder()
                .Append("Column1", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, 10)))).Build();
            DataFrame df = DataFrame.FromArrowRecordBatch(originalBatch);
            Assert.Equal(1, df.Columns["Column1"][1]);
            df.Columns["Column1"][1] = 100;
            Assert.Equal(100, df.Columns["Column1"][1]);
            Assert.Equal(0, df.Columns["Column1"].NullCount);
        }
 
        [Fact]
        public void TestEmptyArrowColumns()
        {
            // Tests to ensure that we don't crash and the internal NullCounts stay consistent on encountering:
            // 1. Data + Empty null bitmaps
            // 2. Empty Data + Null bitmaps
            // 3. Empty Data + Empty null bitmaps
            using RecordBatch originalBatch = new RecordBatch.Builder()
                .Append("EmptyNullBitMapColumn", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, 10))))
                .Append("EmptyDataColumn", true, new Int32Array(
                    valueBuffer: ArrowBuffer.Empty,
                    nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0x00).Append(0x00).Build(),
                    length: 10,
                    nullCount: 10,
                    offset: 0)).Build();
            DataFrame df = DataFrame.FromArrowRecordBatch(originalBatch);
            Assert.Equal(0, df.Columns["EmptyNullBitMapColumn"].NullCount);
            Assert.Equal(10, df.Columns["EmptyNullBitMapColumn"].Length);
            df.Columns["EmptyNullBitMapColumn"][9] = null;
            Assert.Equal(1, df.Columns["EmptyNullBitMapColumn"].NullCount);
            Assert.Equal(10, df.Columns["EmptyDataColumn"].NullCount);
            Assert.Equal(10, df.Columns["EmptyDataColumn"].Length);
            df.Columns["EmptyDataColumn"][9] = 9;
            Assert.Equal(9, df.Columns["EmptyDataColumn"].NullCount);
            Assert.Equal(10, df.Columns["EmptyDataColumn"].Length);
            for (int i = 0; i < 9; i++)
            {
                Assert.Equal(i, (int)df.Columns["EmptyNullBitMapColumn"][i]);
                Assert.Null(df.Columns["EmptyDataColumn"][i]);
            }
 
            using RecordBatch batch1 = new RecordBatch.Builder()
                .Append("EmptyDataAndNullColumns", false, col => col.Int32(array => array.Clear())).Build();
            DataFrame emptyDataFrame = DataFrame.FromArrowRecordBatch(batch1);
            Assert.Equal(0, emptyDataFrame.Rows.Count);
            Assert.Equal(0, emptyDataFrame.Columns["EmptyDataAndNullColumns"].Length);
            Assert.Equal(0, emptyDataFrame.Columns["EmptyDataAndNullColumns"].NullCount);
        }
 
        [Fact]
        public void TestInconsistentNullBitMapLength()
        {
            // Arrow allocates buffers of length 64 by default. 64 * 8 = 512 bits in the NullBitMapBuffer. Anything lesser than 512 will not trigger a throw
            Int32Array int32 = new Int32Array.Builder().AppendRange(Enumerable.Range(0, 520)).Build();
            using RecordBatch originalBatch = new RecordBatch.Builder()
                .Append("EmptyDataColumn", true, new Int32Array(
                    valueBuffer: int32.ValueBuffer,
                    nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0x00).Build(),
                    length: 520,
                    nullCount: 520,
                    offset: 0)).Build();
 
            Assert.ThrowsAny<ArgumentException>(() => DataFrame.FromArrowRecordBatch(originalBatch));
        }
    }
}