Created
February 11, 2019 20:08
-
-
Save seanhagen/80bb0c16c49b1cf3cc40be94b7a16808 to your computer and use it in GitHub Desktop.
Cloud Dataflow -- trying to get GCS writes working
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"context" | |
"flag" | |
"log" | |
"reflect" | |
ps "cloud.google.com/go/pubsub" | |
"github.com/Z2hMedia/protobufs/go/events/games" | |
"github.com/apache/beam/sdks/go/pkg/beam" | |
"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem" | |
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs" | |
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio" | |
logx "github.com/apache/beam/sdks/go/pkg/beam/log" | |
"github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx" | |
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx" | |
"github.com/davecgh/go-spew/spew" | |
pubsub "google.golang.org/genproto/googleapis/pubsub/v1" | |
) | |
func init() { | |
beam.RegisterType(reflect.TypeOf((*filerWriterThingFn)(nil)).Elem()) | |
} | |
const topic = "games-dev" | |
const project = "biba-backbot" | |
const output = "gs://events-output/things.txt" | |
func main() { | |
flag.Parse() | |
beam.Init() | |
ctx := context.Background() | |
psc, err := ps.NewClient(ctx, project) | |
if err != nil { | |
log.Fatal(ctx, err) | |
} | |
sub, err := pubsubx.EnsureSubscription(ctx, psc, topic, "dataflow-runner-games") | |
if err != nil { | |
log.Fatal(ctx, err) | |
} | |
log.Printf("Running streaming wordcap with subscription: %v", sub.ID()) | |
p := beam.NewPipeline() | |
s := p.Root() | |
opts := &pubsubio.ReadOptions{ | |
Subscription: sub.ID(), | |
WithAttributes: true, | |
} | |
log.Println("creating pipeline") | |
// pubsub input | |
col := pubsubio.Read(s, project, topic, opts) | |
// debug.Print(s, col) | |
// col = beam.WindowInto(s, window.NewGlobalWindows(), col) | |
// debug.Print(s, col) | |
// try to do thing | |
out := beam.ParDo(s, processPubSubMsgFn, col) | |
// debug.Print(s, out) | |
LocalFnWrite(s, output, out) | |
pre := beam.AddFixedKey(s, out) | |
post := beam.GroupByKey(s, pre) | |
beam.ParDo0(s, writePubSubMsg, post) | |
// example pubsub read thing | |
// col2 := beam.ParDo(s, pubsubToBytesFn, col) | |
// str := beam.ParDo(s, stringx.FromBytes, col2) | |
// debug.Print(s, str) | |
if err := beamx.Run(context.Background(), p); err != nil { | |
spew.Dump(err) | |
log.Fatalf("failed to execute job: %v", err) | |
} | |
} | |
func pubsubToBytesFn(in *pubsub.PubsubMessage) []byte { | |
return in.Data | |
} | |
func processPubSubMsgFn(ctx context.Context, in *pubsub.PubsubMessage) string { | |
// log.Print("process pubsub msg fn called") | |
msg := &ps.Message{ | |
Data: in.Data, | |
Attributes: in.Attributes, | |
} | |
// ev, err := games.Parse(msg) | |
// if err != nil { | |
// log.Printf("unable to parse pubsub message: %v", err) | |
// } else { | |
// log.Print(spew.Sdump(ev.Attributes)) | |
// log.Print(spew.Sdump(ev.Data)) | |
// } | |
ev, _ := games.Parse(msg) | |
out := ev.Attributes | |
switch ev.Data.(type) { | |
case *games.Event_MiniGame: | |
log.Printf("got a minigame event") | |
} | |
return spew.Sdump(ev) | |
} | |
func writePubSubMsg(ctx context.Context, l int, lines func(*string) bool) error { | |
// fs, err := filesystem.New(ctx, output) | |
// if err != nil { | |
// return err | |
// } | |
// defer fs.Close() | |
// fd, err := fs.OpenWrite(ctx, output) | |
// if err != nil { | |
// return err | |
// } | |
// buf := bufio.NewWriterSize(fd, 1<<20) | |
var line string | |
o := lines(&line) | |
log.Printf("l: %v -- o: %v -- line: %v", l, o, line) | |
return nil | |
} | |
// TODO(herohde) 7/12/2017: extend Write to write to a series of files | |
// as well as allow sharding. | |
// Write writes a PCollection<string> to a file as separate lines. The | |
// writer add a newline after each element. | |
func LocalFnWrite(s beam.Scope, filename string, col beam.PCollection) { | |
s = s.Scope("biba.Write") | |
filesystem.ValidateScheme(filename) | |
// NOTE(BEAM-3579): We may never call Teardown for non-local runners and | |
// FinishBundle doesn't have the right granularity. We therefore | |
// perform a GBK with a fixed key to get all values in a single invocation. | |
// TODO(BEAM-3860) 3/15/2018: use side input instead of GBK. | |
pre := beam.AddFixedKey(s, col) | |
post := beam.GroupByKey(s, pre) | |
beam.ParDo0(s, &filerWriterThingFn{Filename: filename}, post) | |
} | |
type filerWriterThingFn struct { | |
Filename string `json:"filename"` | |
} | |
func (w *filerWriterThingFn) ProcessElement(ctx context.Context, _ int, lines func(*string) bool) error { | |
log.Print("===========> filerWriterThingFn:ProcessElement -- start") | |
logx.Infof(ctx, "===========> filerWriterThingFn:ProcessElement -- start") | |
defer func() { | |
logx.Infof(ctx, "===========> filerWriterThingFn:ProcessElement -- done!") | |
}() | |
logx.Infof(ctx, "===========> filesystem.New(ctx, %v)", w.Filename) | |
fs, err := filesystem.New(ctx, w.Filename) | |
if err != nil { | |
log.Printf("===========> error getting filesystem: %v", err) | |
logx.Infof(ctx, "===========> error getting filesystem: %v", err) | |
return err | |
} | |
defer fs.Close() | |
logx.Infof(ctx, "===========> open write: %v", w.Filename) | |
fd, err := fs.OpenWrite(ctx, w.Filename) | |
if err != nil { | |
log.Printf("===========> unable to open for write: %v", err) | |
logx.Infof(ctx, "===========> unable to open for write: %v", err) | |
return err | |
} | |
buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer | |
logx.Infof(ctx, "===========> Writing to %v", w.Filename) | |
var line string | |
for lines(&line) { | |
if _, err := buf.WriteString(line); err != nil { | |
log.Printf("===========> unable to write string: %v", err) | |
logx.Infof(ctx, "===========> unable to write string: %v", err) | |
return err | |
} | |
if _, err := buf.Write([]byte{'\n'}); err != nil { | |
log.Printf("===========> unable to write newline: %v", err) | |
logx.Infof(ctx, "===========> unable to write newline: %v", err) | |
return err | |
} | |
} | |
if err := buf.Flush(); err != nil { | |
log.Printf("===========> unable to flush buffer: %v", err) | |
logx.Infof(ctx, "===========> unable to flush buffer: %v", err) | |
return err | |
} | |
err = fd.Close() | |
if err != nil { | |
log.Printf("===========> unable to close file descriptor: %v", err) | |
logx.Infof(ctx, "===========> unable to close file descriptor: %v", err) | |
} | |
return err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment