Skip to content

Instantly share code, notes, and snippets.

@Jofre
Last active June 25, 2018 16:44
Show Gist options
  • Save Jofre/bd12e524a76f338d08b4af66c6dd5c99 to your computer and use it in GitHub Desktop.
Save Jofre/bd12e524a76f338d08b4af66c6dd5c99 to your computer and use it in GitHub Desktop.
package main
import (
"cloud.google.com/go/bigquery"
"context"
"fmt"
"math/rand"
"reflect"
"time"
)
const (
project = "<YOUR_PROJECT_ID>"
dataset = "<YOUR_DATASET_ID>"
table = "<YOUR_TABLE_ID>"
)
type row struct {
ByteField []byte
}
func generateRows(rows chan<- *row) {
for {
randBytes := make([]byte, 100)
_, _ = rand.Read(randBytes)
rows <- &row{randBytes}
time.Sleep(time.Millisecond * 500) // use whatever frequency you need to insert rows at
}
}
func unreadyTable(rows chan *row) bool {
client, err := bigquery.NewClient(context.Background(), project)
if err != nil {
return true
}
r := <-rows // get a row to try to insert
uploader := client.Dataset(dataset).Table(table).Uploader()
if err := uploader.Put(context.Background(), r); err != nil {
rows <- r
return true
}
i, err := client.Query(fmt.Sprintf("select * from `%s.%s.%s`", project, dataset, table)).Read(context.Background())
if err != nil {
rows <- r
return true
}
var testRow []bigquery.Value
if err := i.Next(&testRow); err != nil {
rows <- r
return true
}
if reflect.DeepEqual(&row{testRow[0].([]byte)}, r) {
return false
} // there's probably a better way to check if it's equal
rows <- r
return true
}
func main() {
// initialize a channel where the rows will be sent
rows := make(chan *row, 1000) // make it big enough to hold several minutes of rows
// start generating rows to be inserted
go generateRows(rows)
// create the BigQuery client
client, err := bigquery.NewClient(context.Background(), project)
if err != nil { /* handle error */
}
// delete the previous table
if err := client.Dataset(dataset).Table(table).Delete(context.Background()); err != nil { /* handle error */
}
// create the new table
schema, err := bigquery.InferSchema(row{})
if err != nil { /* handle error */
}
if err := client.Dataset(dataset).Table(table).Create(context.Background(), &bigquery.TableMetadata{Schema: schema}); err != nil { /* handle error */
}
// wait for the table to be ready
for ; unreadyTable(rows); time.Sleep(time.Second) {
}
// once it's ready, upload indefinitely
for {
if len(rows) > 0 { // if there are uninserted rows, create a batch and insert them
uploader := client.Dataset(dataset).Table(table).Uploader()
insert := make([]*row, min(500, len(rows))) // create a batch of all the rows on buffer, up to 500
for i := range insert {
insert[i] = <-rows
}
go func(insert []*row) { // do the actual insert async
if err := uploader.Put(context.Background(), insert); err != nil { /* handle error */
}
}(insert)
} else { // if there are no rows waiting to be inserted, wait and check again
time.Sleep(time.Second)
}
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment