Skip to content

Instantly share code, notes, and snippets.

@seanhagen
Created February 11, 2019 20:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seanhagen/80bb0c16c49b1cf3cc40be94b7a16808 to your computer and use it in GitHub Desktop.
Save seanhagen/80bb0c16c49b1cf3cc40be94b7a16808 to your computer and use it in GitHub Desktop.
Cloud Dataflow -- trying to get GCS writes working
package main
import (
"bufio"
"context"
"flag"
"log"
"reflect"
ps "cloud.google.com/go/pubsub"
"github.com/Z2hMedia/protobufs/go/events/games"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
logx "github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"github.com/davecgh/go-spew/spew"
pubsub "google.golang.org/genproto/googleapis/pubsub/v1"
)
func init() {
beam.RegisterType(reflect.TypeOf((*filerWriterThingFn)(nil)).Elem())
}
const topic = "games-dev"
const project = "biba-backbot"
const output = "gs://events-output/things.txt"
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
psc, err := ps.NewClient(ctx, project)
if err != nil {
log.Fatal(ctx, err)
}
sub, err := pubsubx.EnsureSubscription(ctx, psc, topic, "dataflow-runner-games")
if err != nil {
log.Fatal(ctx, err)
}
log.Printf("Running streaming wordcap with subscription: %v", sub.ID())
p := beam.NewPipeline()
s := p.Root()
opts := &pubsubio.ReadOptions{
Subscription: sub.ID(),
WithAttributes: true,
}
log.Println("creating pipeline")
// pubsub input
col := pubsubio.Read(s, project, topic, opts)
// debug.Print(s, col)
// col = beam.WindowInto(s, window.NewGlobalWindows(), col)
// debug.Print(s, col)
// try to do thing
out := beam.ParDo(s, processPubSubMsgFn, col)
// debug.Print(s, out)
LocalFnWrite(s, output, out)
pre := beam.AddFixedKey(s, out)
post := beam.GroupByKey(s, pre)
beam.ParDo0(s, writePubSubMsg, post)
// example pubsub read thing
// col2 := beam.ParDo(s, pubsubToBytesFn, col)
// str := beam.ParDo(s, stringx.FromBytes, col2)
// debug.Print(s, str)
if err := beamx.Run(context.Background(), p); err != nil {
spew.Dump(err)
log.Fatalf("failed to execute job: %v", err)
}
}
func pubsubToBytesFn(in *pubsub.PubsubMessage) []byte {
return in.Data
}
func processPubSubMsgFn(ctx context.Context, in *pubsub.PubsubMessage) string {
// log.Print("process pubsub msg fn called")
msg := &ps.Message{
Data: in.Data,
Attributes: in.Attributes,
}
// ev, err := games.Parse(msg)
// if err != nil {
// log.Printf("unable to parse pubsub message: %v", err)
// } else {
// log.Print(spew.Sdump(ev.Attributes))
// log.Print(spew.Sdump(ev.Data))
// }
ev, _ := games.Parse(msg)
out := ev.Attributes
switch ev.Data.(type) {
case *games.Event_MiniGame:
log.Printf("got a minigame event")
}
return spew.Sdump(ev)
}
func writePubSubMsg(ctx context.Context, l int, lines func(*string) bool) error {
// fs, err := filesystem.New(ctx, output)
// if err != nil {
// return err
// }
// defer fs.Close()
// fd, err := fs.OpenWrite(ctx, output)
// if err != nil {
// return err
// }
// buf := bufio.NewWriterSize(fd, 1<<20)
var line string
o := lines(&line)
log.Printf("l: %v -- o: %v -- line: %v", l, o, line)
return nil
}
// TODO(herohde) 7/12/2017: extend Write to write to a series of files
// as well as allow sharding.
// Write writes a PCollection<string> to a file as separate lines. The
// writer add a newline after each element.
func LocalFnWrite(s beam.Scope, filename string, col beam.PCollection) {
s = s.Scope("biba.Write")
filesystem.ValidateScheme(filename)
// NOTE(BEAM-3579): We may never call Teardown for non-local runners and
// FinishBundle doesn't have the right granularity. We therefore
// perform a GBK with a fixed key to get all values in a single invocation.
// TODO(BEAM-3860) 3/15/2018: use side input instead of GBK.
pre := beam.AddFixedKey(s, col)
post := beam.GroupByKey(s, pre)
beam.ParDo0(s, &filerWriterThingFn{Filename: filename}, post)
}
type filerWriterThingFn struct {
Filename string `json:"filename"`
}
func (w *filerWriterThingFn) ProcessElement(ctx context.Context, _ int, lines func(*string) bool) error {
log.Print("===========> filerWriterThingFn:ProcessElement -- start")
logx.Infof(ctx, "===========> filerWriterThingFn:ProcessElement -- start")
defer func() {
logx.Infof(ctx, "===========> filerWriterThingFn:ProcessElement -- done!")
}()
logx.Infof(ctx, "===========> filesystem.New(ctx, %v)", w.Filename)
fs, err := filesystem.New(ctx, w.Filename)
if err != nil {
log.Printf("===========> error getting filesystem: %v", err)
logx.Infof(ctx, "===========> error getting filesystem: %v", err)
return err
}
defer fs.Close()
logx.Infof(ctx, "===========> open write: %v", w.Filename)
fd, err := fs.OpenWrite(ctx, w.Filename)
if err != nil {
log.Printf("===========> unable to open for write: %v", err)
logx.Infof(ctx, "===========> unable to open for write: %v", err)
return err
}
buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer
logx.Infof(ctx, "===========> Writing to %v", w.Filename)
var line string
for lines(&line) {
if _, err := buf.WriteString(line); err != nil {
log.Printf("===========> unable to write string: %v", err)
logx.Infof(ctx, "===========> unable to write string: %v", err)
return err
}
if _, err := buf.Write([]byte{'\n'}); err != nil {
log.Printf("===========> unable to write newline: %v", err)
logx.Infof(ctx, "===========> unable to write newline: %v", err)
return err
}
}
if err := buf.Flush(); err != nil {
log.Printf("===========> unable to flush buffer: %v", err)
logx.Infof(ctx, "===========> unable to flush buffer: %v", err)
return err
}
err = fd.Close()
if err != nil {
log.Printf("===========> unable to close file descriptor: %v", err)
logx.Infof(ctx, "===========> unable to close file descriptor: %v", err)
}
return err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment