Skip to content

Instantly share code, notes, and snippets.

@seanhagen
Created February 21, 2019 01:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seanhagen/d76edfe92aedc3a0b551bfbdc26f2385 to your computer and use it in GitHub Desktop.
Save seanhagen/d76edfe92aedc3a0b551bfbdc26f2385 to your computer and use it in GitHub Desktop.
Example non-working pipeline
package main
import (
"context"
"flag"
"log"
"reflect"
"cloud.google.com/go/bigquery"
ps "cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/go/pkg/beam"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
xlog "github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/go/pkg/beam/x/debug"
"github.com/bramp/morebeam"
"github.com/davecgh/go-spew/spew"
"github.com/gofrs/uuid"
"google.golang.org/api/iterator"
pubsub "google.golang.org/genproto/googleapis/pubsub/v1"
)
func init() {
beam.RegisterType(reflect.TypeOf(Event{}))
beam.RegisterType(reflect.TypeOf((*pubsubToEventFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*assignTimeFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*assignDateFn)(nil)).Elem())
}
const topic = "<pubsub-topic-here>"
const project = "<google-cloud-project-here>"
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
out := PubsubToEvent(ctx, s, project, topic)
timed := AssignTime(ctx, s, out)
dated := AssignDate(ctx, s, out)
combined := Combiner(ctx, s, timed, dated)
debug.Print(s, combined)
if err := beamx.Run(context.Background(), p); err != nil {
spew.Dump(err)
log.Fatalf("failed to execute job: %v", err)
}
}
type Event struct {
ID string
Event your.Event
TimeID int
DateID int
TimezoneID int
}
type pubsubToEventFn struct{}
// ProcessElement ...
func (fn *pubsubToEventFn) ProcessElement(ctx context.Context, in *pubsub.PubsubMessage) (Event, error) {
var ge Event
tmp, err := yourPubSubMessageToEventParser(msg)
if err != nil {
xlog.Warnf(ctx, "Processing json event failed: %v", err)
return ge, err
}
ge = Event{
ID: in.MessageId,
Event: *tmp,
}
return ge, nil
}
// PubsubToEvent ...
func PubsubToEvent(ctx context.Context, s beam.Scope, project, topic string) beam.PCollection {
s = s.Scope("Process PubSub")
psc, err := ps.NewClient(ctx, project)
if err != nil {
xlog.Fatal(ctx, err)
}
sub, err := pubsubx.EnsureSubscription(ctx, psc, topic, "<your-subscription-name-here>")
if err != nil {
xlog.Fatal(ctx, err)
}
opts := &pubsubio.ReadOptions{
Subscription: sub.ID(),
WithAttributes: true,
TimestampAttribute: "timestamp",
IDAttribute: "id",
}
// pubsub input
col := pubsubio.Read(s, project, topic, opts)
events := beam.ParDo(s, &pubsubToEventFn{}, col)
debug.Print(s, events)
fn := func(ev Event) string {
if ev.ID != "" {
return ev.ID
}
id, _ := uuid.NewV4()
return id.String()
}
return morebeam.AddKey(s, fn, events)
}
func Combiner(ctx context.Context, s beam.Scope, cols ...beam.PCollection) beam.PCollection {
s = s.Scope("Re-Combine")
fl := beam.Flatten(s, cols...)
fn := func(key string, a, b Event) Event {
log.Printf("combine for key %v:\n%#v\n%#v\n", key, a, b)
return b
}
return beam.CombinePerKey(s, fn, fl)
}
// AssignTime ...
func AssignTime(ctx context.Context, s beam.Scope, events beam.PCollection) beam.PCollection {
s = s.Scope("Assign Time")
return beam.ParDo(s, &assignTimeFn{}, events)
}
type assignTimeFn struct {
bq *bigquery.Client
}
// Setup ...
func (fn *assignTimeFn) Setup(ctx context.Context) error {
x, err := bigquery.NewClient(ctx, project)
if err != nil {
return err
}
fn.bq = x
return nil
}
// Teardown ...
func (fn *assignTimeFn) Teardown() error {
return fn.bq.Close()
}
// ProcessElement ...
func (fn assignTimeFn) ProcessElement(ctx context.Context, key string, ev Event) (string, Event, error) {
tfmt := ev.Event.EventTime().Format("15:04:05")
log.Printf("key: %v -- event time: %v", key, tfmt)
q := fn.bq.Query(`select id from ` + "`biba-backbot.warehouse.times`" + ` where hms = @hms`)
q.Parameters = []bigquery.QueryParameter{{Name: "hms", Value: tfmt}}
type Res struct {
Id int
}
it, err := q.Read(ctx)
if err != nil {
return key, ev, err
}
var c Res
for {
err := it.Next(&c)
if err == iterator.Done {
break
}
if err != nil {
return key, ev, err
}
}
ev.TimeID = c.Id
return key, ev, nil
}
func AssignDate(ctx context.Context, s beam.Scope, events beam.PCollection) beam.PCollection {
s = s.Scope("Assign Date")
return beam.ParDo(s, &assignDateFn{}, events)
}
type assignDateFn struct {
bq *bigquery.Client
}
// Setup ...
func (fn *assignDateFn) Setup(ctx context.Context) error {
x, err := bigquery.NewClient(ctx, project)
if err != nil {
return err
}
fn.bq = x
return nil
}
// Teardown ...
func (fn *assignDateFn) Teardown() error {
return fn.bq.Close()
}
// ProcessElement ...
func (fn assignDateFn) ProcessElement(ctx context.Context, key string, ev Event) (string, Event, error) {
dfmt := ev.Event.EventTime().Format("2006-01-02")
log.Printf("key: %v -- event date: %v", key, dfmt)
q := fn.bq.Query(`select id from ` + "`biba-backbot.warehouse.dates`" + ` where ymd = @ymd`)
q.Parameters = []bigquery.QueryParameter{{Name: "ymd", Value: dfmt}}
type Res struct {
Id int
}
it, err := q.Read(ctx)
if err != nil {
return key, ev, err
}
var c Res
for {
err := it.Next(&c)
if err == iterator.Done {
break
}
if err != nil {
return key, ev, err
}
}
ev.DateID = c.Id
return key, ev, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment