Skip to content

Instantly share code, notes, and snippets.

@CurtHagenlocher
Created February 27, 2024 03:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CurtHagenlocher/306865d4b4202906470f4f18fd410c4e to your computer and use it in GitHub Desktop.
Save CurtHagenlocher/306865d4b4202906470f4f18fd410c4e to your computer and use it in GitHub Desktop.
Load SqlDataReader into Arrow RecordBatch
using Apache.Arrow;
using Apache.Arrow.Types;
using Microsoft.Data.SqlClient;
using System.Data.Common;
using System.Data.SqlTypes;
namespace Loader
{
internal class Program
{
static void Main(string[] args)
{
using var connection = new SqlConnection("server=.;database=test;trusted_connection=true;encrypt=false");
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = "select * from customer";
using var reader = command.ExecuteReader();
var schema = new Schema(new[]
{
new Field("CUSTKEY", Int64Type.Default, false),
new Field("NAME", StringType.Default, false),
new Field("ADDRESS", StringType.Default, true),
new Field("NATIONKEY", Int64Type.Default, true),
new Field("PHONE", StringType.Default, true),
new Field("ACCTBAL", new Decimal128Type(20, 6), true),
new Field("MKTSEGMENT", StringType.Default, true),
new Field("COMMENT", StringType.Default, true),
}, null);
var builder0 = new Int64Array.Builder();
var builder1 = new StringArray.Builder();
var builder2 = new StringArray.Builder();
var builder3 = new Int64Array.Builder();
var builder4 = new StringArray.Builder();
var builder5 = new Decimal128Array.Builder(new Decimal128Type(20, 6));
var builder6 = new StringArray.Builder();
var builder7 = new StringArray.Builder();
int currentRow = 0;
int currentMax = 0;
const int pageSize = 2048;
while (reader.Read())
{
if (currentRow == currentMax)
{
currentMax += pageSize;
builder0.Reserve(currentMax);
builder1.Reserve(currentMax);
builder2.Reserve(currentMax);
builder3.Reserve(currentMax);
builder4.Reserve(currentMax);
builder5.Reserve(currentMax);
builder6.Reserve(currentMax);
builder7.Reserve(currentMax);
}
builder0.Append(reader.Int64(0));
builder1.Append(reader.String(1));
builder2.Append(reader.String(2));
builder3.Append(reader.Int64(3));
builder4.Append(reader.String(4));
SqlDecimal? column5 = reader.Decimal(5);
if (column5.HasValue) { builder5.Append(column5.Value); } else { builder5.AppendNull(); }
builder6.Append(reader.String(6));
builder7.Append(reader.String(7));
currentRow++;
}
RecordBatch batch = new RecordBatch(schema, new IArrowArray[]
{
builder0.Build(),
builder1.Build(),
builder2.Build(),
builder3.Build(),
builder4.Build(),
builder5.Build(),
builder6.Build(),
builder7.Build(),
}, builder0.Length);
System.Console.WriteLine("done");
}
}
static class Extensions
{
public static long? Int64(this DbDataReader reader, int position)
{
if (reader.IsDBNull(position)) { return null; }
return reader.GetInt64(position);
}
public static string String(this DbDataReader reader, int position)
{
if (reader.IsDBNull(position)) { return null; }
return reader.GetString(position);
}
public static SqlDecimal? Decimal(this SqlDataReader reader, int position)
{
if (reader.IsDBNull(position)) { return null; }
return reader.GetSqlDecimal(position);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment