Skip to content

Instantly share code, notes, and snippets.

@ilbambino
Last active November 25, 2019 14:33
Show Gist options
  • Save ilbambino/efe50bd92b37ebdc1689e2cde09b443f to your computer and use it in GitHub Desktop.
Save ilbambino/efe50bd92b37ebdc1689e2cde09b443f to your computer and use it in GitHub Desktop.
pubsub to bigquery
package main
import (
"context"
"fmt"
"os"
"regexp"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/pubsub"
"google.golang.org/api/googleapi"
)
func consumeSubscription(subName string) (map[string][]Event, error) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
client, err := pubsub.NewClient(ctx, os.Getenv("PROJECT_ID"))
if err != nil {
return nil, err
}
eventQueues := make(map[string][]Event)
err = client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
eventType := string(msg.Data)
fmt.Printf("Got message: %q\n", eventType)
event, err := ParseEventFromAttributes(eventType, msg.Attributes)
// values, err := attributesToValueSaver(eventType, msg.Attributes)
if err == nil {
eventQueues[event.Type()] = append(eventQueues[event.Type()], event)
} else {
fmt.Println("Error parsing", err)
}
msg.Ack()
})
if err != nil {
fmt.Println(err)
return nil, err
}
return eventQueues, nil
}
const datasetID = "test_insert"
func storeEvents(tableID string, events []Event) error {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, os.Getenv("PROJECT_ID"))
if err != nil {
return err
}
u := client.Dataset(datasetID).Table(tableID).Inserter()
savers := make([]*bigquery.StructSaver, len(events))
for i := range events {
savers[i] = events[i].AsStructSaver()
}
if err := u.Put(ctx, savers); err != nil {
return err
}
return nil
}
var tableNameRegexp = regexp.MustCompile("[^a-zA-Z0-9_]+") //BQ doesn't accept other chars in the table name
func getTableName(eventType string) string {
return tableNameRegexp.ReplaceAllString(eventType, "")
}
func createBQTables() error {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, os.Getenv("PROJECT_ID"))
if err != nil {
return err
}
for eventType, varType := range MapEventType {
tableName := getTableName(eventType)
table := client.Dataset(datasetID).Table(tableName)
_, err := table.Metadata(ctx)
if err == nil {
continue //already created table
} else {
if e, ok := err.(*googleapi.Error); ok {
if e.Code == 404 { // table not found, so needs to be created
schema, err := bigquery.InferSchema(varType)
if err != nil {
return err
}
if err := table.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
return err
}
} else {
return err
}
}
}
}
return nil
}
// to test locally :)
func main() {
subName := "metrics-dev-subs"
err := createBQTables()
if err != nil {
panic(err)
}
queues, err := consumeSubscription(subName)
if err != nil {
fmt.Println(err)
}
for eventType, queue := range queues {
fmt.Println("Storing for: " + eventType)
err := storeEvents(getTableName(eventType), queue)
if err != nil {
fmt.Println("error storing events of " + eventType + err.Error())
}
}
}
package main
import (
"fmt"
"strconv"
"time"
"cloud.google.com/go/bigquery"
)
// CreateUser represents an event User Creation
type CreateUser struct {
CreationTime time.Time `bigquery:"creation_time"`
EventOrigin string `bigquery:"event_origin"`
MrrPlan int `bigquery:"mrr_plan"`
OnTrial bool `bigquery:"on_trial"`
Organization string `bigquery:"organization"`
Plan string `bigquery:"plan"`
PlanPeriod int `bigquery:"plan_period"`
Revenue int `bigquery:"revenue"`
TrialEnds time.Time `bigquery:"trial_ends_at"`
TrialStarts time.Time `bigquery:"trial_starts_at"`
UserActiveDuration float64 `bigquery:"user_active_for"`
UserCreatedTime time.Time `bigquery:"user_created_at"`
UserID string `bigquery:"user_id"`
}
// Type returns the type of events accepted
func (i *CreateUser) Type() string {
return "Created user"
}
// AsStructSaver converts this event to the type expected to be used by BQ
func (i *CreateUser) AsStructSaver() *bigquery.StructSaver {
return &bigquery.StructSaver{Struct: *i}
}
// Parse fills the event with the info from the attributes
func (i *CreateUser) Parse(attrs map[string]string) error {
i.Plan = attrs["plan"]
i.Organization = attrs["organization"]
i.EventOrigin = attrs["event_origin"]
i.UserID = attrs["user_id"]
i.MrrPlan = getInt("mrr_plan", attrs)
i.PlanPeriod = getInt("plan_period", attrs)
i.Revenue = getInt("revenue", attrs)
i.UserActiveDuration = getFloat("user_active_for", attrs)
i.UserActiveDuration = getFloat("user_active_for", attrs)
i.CreationTime = getTime("creation_time", attrs)
i.TrialEnds = getTime("trial_ends_at", attrs)
i.TrialStarts = getTime("trial_starts_at", attrs)
i.UserCreatedTime = getTime("user_created_at", attrs)
i.OnTrial = getBool("on_trial", attrs)
return nil //TODO handle errors?? Right now it defaults to ZERO vals
}
func getInt(key string, attrs map[string]string) int {
valStr, ok := attrs[key]
if ok {
val, err := strconv.Atoi(valStr)
if err == nil {
return val
}
}
return 0
}
func getFloat(key string, attrs map[string]string) float64 {
valStr, ok := attrs[key]
if ok {
val, err := strconv.ParseFloat(valStr, 64)
if err == nil {
return val
}
}
return 0
}
func getTime(key string, attrs map[string]string) time.Time {
valStr, ok := attrs[key]
if ok {
val, err := time.Parse(time.RFC3339, valStr)
fmt.Println("DATE", valStr)
if err == nil {
return val
}
}
return time.Time{}
}
func getBool(key string, attrs map[string]string) bool {
valStr, ok := attrs[key]
if ok {
val, err := strconv.ParseBool(valStr)
if err == nil {
return val
}
}
return false
}
package main
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestCreateUser_Parse(t *testing.T) {
testData := map[string]string{
"creation_time": "2019-11-21T13:10:58.505Z",
"event_origin": "Central",
"mrr_plan": "10",
"on_trial": "true",
"organization": "someOrg",
"plan": "CARTO for students - Annual",
"plan_period": "1",
"revenue": "0",
"trial_ends_at": "2019-11-22T13:10:47.000Z",
"trial_starts_at": "2019-11-21T13:10:48.000Z",
"user_active_for": "0.001344737799189815",
"user_created_at": "2019-11-21T13:10:46.887Z",
"user_id": "ee5d41c7-f599-4876-9646-8ba023031287",
}
user := CreateUser{}
user.Parse(testData)
assert.Equal(t, 10, user.MrrPlan)
assert.Equal(t, 1, user.PlanPeriod)
assert.Equal(t, 0, user.Revenue)
assert.Equal(t, 0.001344737799189815, user.UserActiveDuration)
assert.Equal(t, testData["event_origin"], user.EventOrigin)
assert.Equal(t, testData["user_id"], user.UserID)
assert.Equal(t, "2019-11-21T13:10:58Z", user.CreationTime.Format(time.RFC3339))
assert.Equal(t, true, user.OnTrial)
}
module pubsubtobq
go 1.11
require (
cloud.google.com/go/bigquery v1.3.0
cloud.google.com/go/pubsub v1.1.0
github.com/stretchr/testify v1.4.0
google.golang.org/api v0.14.0
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment