Skip to content

Instantly share code, notes, and snippets.

@wardviaene
Created June 9, 2020 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wardviaene/317e7106f38876372e6f80a5b2f15367 to your computer and use it in GitHub Desktop.
Save wardviaene/317e7106f38876372e6f80a5b2f15367 to your computer and use it in GitHub Desktop.
Use MapReduce to repartition parquet files in S3
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/in4it/gomap/pkg/context"
"github.com/in4it/gomap/pkg/types"
"github.com/in4it/gomap/pkg/utils"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)
type DataRecord struct {
Id *int32 `parquet:"name=id, type=INT32, repetitiontype=OPTIONAL"`
Firstname *string `parquet:"name=firstname, type=UTF8, repetitiontype=OPTIONAL"`
Lastname *string `parquet:"name=lastname, type=UTF8, repetitiontype=OPTIONAL"`
Street *string `parquet:"name=street, type=UTF8, repetitiontype=OPTIONAL"`
Zipcode *string `parquet:"name=zipcode, type=UTF8, repetitiontype=OPTIONAL"`
}
type DataRecordPartitioned struct {
Id *int32 `parquet:"name=id, type=INT32, repetitiontype=OPTIONAL"`
Firstname *string `parquet:"name=firstname, type=UTF8, repetitiontype=OPTIONAL"`
Lastname *string `parquet:"name=lastname, type=UTF8, repetitiontype=OPTIONAL"`
Street *string `parquet:"name=street, type=UTF8, repetitiontype=OPTIONAL"`
}
func repartitionParquet(bucket string) {
c := context.New()
var t DataRecordPartitioned
c.ReadParquet(bucket, new(DataRecord)).MapToKV(func(input types.RawInput) (types.RawOutput, types.RawOutput) {
var line DataRecord
err := utils.RawDecode(input, &line)
if err != nil {
panic(err)
}
return utils.StringToRawOutput(*line.Zipcode), utils.RawEncode([]DataRecord{line})
}).ReduceByKey(func(a, b types.RawInput) types.RawOutput {
var line1 []DataRecord
var line2 []DataRecord
err := utils.RawDecode(a, &line1)
if err != nil {
panic(err)
}
err = utils.RawDecode(b, &line2)
if err != nil {
panic(err)
}
return utils.RawEncode(append(line1, line2...))
}).Run().Foreach(func(key, value types.RawOutput) {
var lines []DataRecord
x := time.Now()
filename := "data-partitioned-" + fmt.Sprintf("%v", (x.Format("20060102150405"))) + ".parquet"
err := utils.RawDecode(value, &lines)
if err != nil {
panic(err)
}
fmt.Println("key: " + string(key))
fw, err := local.NewLocalFileWriter("/tmp/" + filename)
if err != nil {
log.Println("Can't create local file", err)
return
}
pw, err := writer.NewParquetWriter(fw, new(DataRecordPartitioned), 4)
if err != nil {
log.Println("Can't create parquet writer", err)
return
}
for _, l := range lines {
t = DataRecordPartitioned{
Id: l.Id,
Firstname: l.Firstname,
Lastname: l.Lastname,
Street: l.Street,
}
if err = pw.Write(t); err != nil {
log.Println("Write error", err)
}
}
if err = pw.WriteStop(); err != nil {
log.Println("WriteStop error", err)
return
}
log.Println("Write Finished")
fw.Close()
os.Remove(filename)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment