Last active
November 25, 2019 14:33
-
-
Save ilbambino/efe50bd92b37ebdc1689e2cde09b443f to your computer and use it in GitHub Desktop.
pubsub to bigquery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"regexp" | |
"time" | |
"cloud.google.com/go/bigquery" | |
"cloud.google.com/go/pubsub" | |
"google.golang.org/api/googleapi" | |
) | |
func consumeSubscription(subName string) (map[string][]Event, error) { | |
ctx := context.Background() | |
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) | |
defer cancel() | |
client, err := pubsub.NewClient(ctx, os.Getenv("PROJECT_ID")) | |
if err != nil { | |
return nil, err | |
} | |
eventQueues := make(map[string][]Event) | |
err = client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { | |
eventType := string(msg.Data) | |
fmt.Printf("Got message: %q\n", eventType) | |
event, err := ParseEventFromAttributes(eventType, msg.Attributes) | |
// values, err := attributesToValueSaver(eventType, msg.Attributes) | |
if err == nil { | |
eventQueues[event.Type()] = append(eventQueues[event.Type()], event) | |
} else { | |
fmt.Println("Error parsing", err) | |
} | |
msg.Ack() | |
}) | |
if err != nil { | |
fmt.Println(err) | |
return nil, err | |
} | |
return eventQueues, nil | |
} | |
const datasetID = "test_insert" | |
func storeEvents(tableID string, events []Event) error { | |
ctx := context.Background() | |
client, err := bigquery.NewClient(ctx, os.Getenv("PROJECT_ID")) | |
if err != nil { | |
return err | |
} | |
u := client.Dataset(datasetID).Table(tableID).Inserter() | |
savers := make([]*bigquery.StructSaver, len(events)) | |
for i := range events { | |
savers[i] = events[i].AsStructSaver() | |
} | |
if err := u.Put(ctx, savers); err != nil { | |
return err | |
} | |
return nil | |
} | |
var tableNameRegexp = regexp.MustCompile("[^a-zA-Z0-9_]+") //BQ doesn't accept other chars in the table name | |
func getTableName(eventType string) string { | |
return tableNameRegexp.ReplaceAllString(eventType, "") | |
} | |
func createBQTables() error { | |
ctx := context.Background() | |
client, err := bigquery.NewClient(ctx, os.Getenv("PROJECT_ID")) | |
if err != nil { | |
return err | |
} | |
for eventType, varType := range MapEventType { | |
tableName := getTableName(eventType) | |
table := client.Dataset(datasetID).Table(tableName) | |
_, err := table.Metadata(ctx) | |
if err == nil { | |
continue //already created table | |
} else { | |
if e, ok := err.(*googleapi.Error); ok { | |
if e.Code == 404 { // table not found, so needs to be created | |
schema, err := bigquery.InferSchema(varType) | |
if err != nil { | |
return err | |
} | |
if err := table.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { | |
return err | |
} | |
} else { | |
return err | |
} | |
} | |
} | |
} | |
return nil | |
} | |
// to test locally :) | |
func main() { | |
subName := "metrics-dev-subs" | |
err := createBQTables() | |
if err != nil { | |
panic(err) | |
} | |
queues, err := consumeSubscription(subName) | |
if err != nil { | |
fmt.Println(err) | |
} | |
for eventType, queue := range queues { | |
fmt.Println("Storing for: " + eventType) | |
err := storeEvents(getTableName(eventType), queue) | |
if err != nil { | |
fmt.Println("error storing events of " + eventType + err.Error()) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"strconv" | |
"time" | |
"cloud.google.com/go/bigquery" | |
) | |
// CreateUser represents an event User Creation | |
type CreateUser struct { | |
CreationTime time.Time `bigquery:"creation_time"` | |
EventOrigin string `bigquery:"event_origin"` | |
MrrPlan int `bigquery:"mrr_plan"` | |
OnTrial bool `bigquery:"on_trial"` | |
Organization string `bigquery:"organization"` | |
Plan string `bigquery:"plan"` | |
PlanPeriod int `bigquery:"plan_period"` | |
Revenue int `bigquery:"revenue"` | |
TrialEnds time.Time `bigquery:"trial_ends_at"` | |
TrialStarts time.Time `bigquery:"trial_starts_at"` | |
UserActiveDuration float64 `bigquery:"user_active_for"` | |
UserCreatedTime time.Time `bigquery:"user_created_at"` | |
UserID string `bigquery:"user_id"` | |
} | |
// Type returns the type of events accepted | |
func (i *CreateUser) Type() string { | |
return "Created user" | |
} | |
// AsStructSaver converts this event to the type expected to be used by BQ | |
func (i *CreateUser) AsStructSaver() *bigquery.StructSaver { | |
return &bigquery.StructSaver{Struct: *i} | |
} | |
// Parse fills the event with the info from the attributes | |
func (i *CreateUser) Parse(attrs map[string]string) error { | |
i.Plan = attrs["plan"] | |
i.Organization = attrs["organization"] | |
i.EventOrigin = attrs["event_origin"] | |
i.UserID = attrs["user_id"] | |
i.MrrPlan = getInt("mrr_plan", attrs) | |
i.PlanPeriod = getInt("plan_period", attrs) | |
i.Revenue = getInt("revenue", attrs) | |
i.UserActiveDuration = getFloat("user_active_for", attrs) | |
i.UserActiveDuration = getFloat("user_active_for", attrs) | |
i.CreationTime = getTime("creation_time", attrs) | |
i.TrialEnds = getTime("trial_ends_at", attrs) | |
i.TrialStarts = getTime("trial_starts_at", attrs) | |
i.UserCreatedTime = getTime("user_created_at", attrs) | |
i.OnTrial = getBool("on_trial", attrs) | |
return nil //TODO handle errors?? Right now it defaults to ZERO vals | |
} | |
func getInt(key string, attrs map[string]string) int { | |
valStr, ok := attrs[key] | |
if ok { | |
val, err := strconv.Atoi(valStr) | |
if err == nil { | |
return val | |
} | |
} | |
return 0 | |
} | |
func getFloat(key string, attrs map[string]string) float64 { | |
valStr, ok := attrs[key] | |
if ok { | |
val, err := strconv.ParseFloat(valStr, 64) | |
if err == nil { | |
return val | |
} | |
} | |
return 0 | |
} | |
func getTime(key string, attrs map[string]string) time.Time { | |
valStr, ok := attrs[key] | |
if ok { | |
val, err := time.Parse(time.RFC3339, valStr) | |
fmt.Println("DATE", valStr) | |
if err == nil { | |
return val | |
} | |
} | |
return time.Time{} | |
} | |
func getBool(key string, attrs map[string]string) bool { | |
valStr, ok := attrs[key] | |
if ok { | |
val, err := strconv.ParseBool(valStr) | |
if err == nil { | |
return val | |
} | |
} | |
return false | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/stretchr/testify/assert" | |
"testing" | |
"time" | |
) | |
func TestCreateUser_Parse(t *testing.T) { | |
testData := map[string]string{ | |
"creation_time": "2019-11-21T13:10:58.505Z", | |
"event_origin": "Central", | |
"mrr_plan": "10", | |
"on_trial": "true", | |
"organization": "someOrg", | |
"plan": "CARTO for students - Annual", | |
"plan_period": "1", | |
"revenue": "0", | |
"trial_ends_at": "2019-11-22T13:10:47.000Z", | |
"trial_starts_at": "2019-11-21T13:10:48.000Z", | |
"user_active_for": "0.001344737799189815", | |
"user_created_at": "2019-11-21T13:10:46.887Z", | |
"user_id": "ee5d41c7-f599-4876-9646-8ba023031287", | |
} | |
user := CreateUser{} | |
user.Parse(testData) | |
assert.Equal(t, 10, user.MrrPlan) | |
assert.Equal(t, 1, user.PlanPeriod) | |
assert.Equal(t, 0, user.Revenue) | |
assert.Equal(t, 0.001344737799189815, user.UserActiveDuration) | |
assert.Equal(t, testData["event_origin"], user.EventOrigin) | |
assert.Equal(t, testData["user_id"], user.UserID) | |
assert.Equal(t, "2019-11-21T13:10:58Z", user.CreationTime.Format(time.RFC3339)) | |
assert.Equal(t, true, user.OnTrial) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module pubsubtobq | |
go 1.11 | |
require ( | |
cloud.google.com/go/bigquery v1.3.0 | |
cloud.google.com/go/pubsub v1.1.0 | |
github.com/stretchr/testify v1.4.0 | |
google.golang.org/api v0.14.0 | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment