Skip to content

Instantly share code, notes, and snippets.

@marceloneppel
Created December 24, 2019 05:04
Show Gist options
  • Save marceloneppel/2e2c3f937e1a3f4cba6f6cc2d814a77f to your computer and use it in GitHub Desktop.
Save marceloneppel/2e2c3f937e1a3f4cba6f6cc2d814a77f to your computer and use it in GitHub Desktop.
Apache Beam Golang UDF - Pipeline
package main
import (
"context"
"flag"
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/go/pkg/beam/x/debug"
udf "github.com/marceloneppel/apache-beam-golang-udf"
"github.com/marceloneppel/apache-beam-golang-udf/examples/common"
)
type parse struct {
Location string
}
func (c *parse) ProcessElement(ctx context.Context, line string, emit func(string)) {
function, err := udf.GetFunction(ctx, "./parse.go", "csv", "Parse")
if err != nil {
log.Infof(ctx, err.Error())
return
}
emit(function.(func(string) string)(line))
}
func init() {
beam.RegisterType(reflect.TypeOf((*parse)(nil)).Elem())
}
func main() {
flag.Parse()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "./file.csv")
processedLines := beam.ParDo(s, &parse{
Location: *common.Location,
}, lines)
debug.Print(s, processedLines)
beamx.Run(context.Background(), p)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment