Skip to content

Instantly share code, notes, and snippets.

@wolfeidau
Last active October 8, 2022 23:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wolfeidau/5a78ad5514b56ca53df55eb75d528bd8 to your computer and use it in GitHub Desktop.
Save wolfeidau/5a78ad5514b56ca53df55eb75d528bd8 to your computer and use it in GitHub Desktop.
Apache Arrow Write Example
{
"customers": [
{
"customer_id": "c7281d12-9ea0-4574-b41a-3a6de5aba119",
"name": "Party Foods",
"created_date": "2022-08-23T22:18:31.445Z"
}
]
}
module github.com/wolfeidau/arrow-cookbook-golang
go 1.18
require (
github.com/alecthomas/kong v0.6.1
github.com/apache/arrow/go/v10 v10.0.0-20220805182631-d26489c3c842
github.com/rs/zerolog v1.27.0
)
require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/goccy/go-json v0.9.10 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.6+incompatible // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.0.0-20220804214406-8e32c043e418 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"time"
"github.com/alecthomas/kong"
"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/apache/arrow/go/v10/parquet"
"github.com/apache/arrow/go/v10/parquet/compress"
"github.com/apache/arrow/go/v10/parquet/pqarrow"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
var (
version = "unknown"
cfg struct {
Version kong.VersionFlag
InputFile string `kong:"required,arg"`
OutputFile string `kong:"required,arg"`
}
)
type CustomerEntry struct {
CustomerID string `json:"customer_id,omitempty"`
Name string `json:"name,omitempty"`
CreatedDate string `json:"created_date,omitempty"` // date the customer was created in the system
}
type ImportFile struct {
Customers []CustomerEntry
}
func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.Kitchen}).With().Stack().Caller().Logger()
log.Logger.Level(zerolog.DebugLevel)
kong.Parse(&cfg,
kong.Vars{"version": version},
)
data, err := ioutil.ReadFile(cfg.InputFile)
if err != nil {
log.Fatal().Err(err).Msg("failed to read input json file")
}
importFile := new(ImportFile)
err = json.Unmarshal(data, importFile)
if err != nil {
log.Fatal().Err(err).Msg("failed to unmarshal json file")
}
outFile, err := os.Create(cfg.OutputFile)
if err != nil {
log.Fatal().Err(err).Msg("failed to open output file")
}
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "customer_id", Type: arrow.BinaryTypes.String},
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "created_date", Type: arrow.BinaryTypes.String},
{Name: "imported_date", Type: arrow.BinaryTypes.String},
},
nil,
)
props := parquet.NewWriterProperties(
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithRootName("spark_schema"),
parquet.WithRootRepetition(parquet.Repetitions.Required),
)
pqWriter, err := pqarrow.NewFileWriter(schema, outFile, props, pqarrow.DefaultWriterProps())
if err != nil {
log.Fatal().Err(err).Msg("failed to create output writer")
}
defer pqWriter.Close()
pool := memory.NewGoAllocator()
b := array.NewRecordBuilder(pool, schema)
defer b.Release()
for _, customer := range importFile.Customers {
err = writeRecord(b, pqWriter, customer)
if err != nil {
log.Fatal().Err(err).Msg("failed to output to writer")
}
}
}
func writeRecord(b *array.RecordBuilder, pqWriter *pqarrow.FileWriter, customer CustomerEntry) error {
b.Field(0).(*array.StringBuilder).AppendString(customer.CustomerID)
b.Field(1).(*array.StringBuilder).AppendString(customer.Name)
b.Field(2).(*array.StringBuilder).AppendString(customer.CreatedDate)
b.Field(3).AppendNull()
rec := b.NewRecord()
defer rec.Release()
err := pqWriter.WriteBuffered(rec)
if err != nil {
return fmt.Errorf("failed to write to parquet file: %w", err)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment