-
-
Save Jofre/bd12e524a76f338d08b4af66c6dd5c99 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"cloud.google.com/go/bigquery" | |
"context" | |
"fmt" | |
"math/rand" | |
"reflect" | |
"time" | |
) | |
const ( | |
project = "<YOUR_PROJECT_ID>" | |
dataset = "<YOUR_DATASET_ID>" | |
table = "<YOUR_TABLE_ID>" | |
) | |
type row struct { | |
ByteField []byte | |
} | |
func generateRows(rows chan<- *row) { | |
for { | |
randBytes := make([]byte, 100) | |
_, _ = rand.Read(randBytes) | |
rows <- &row{randBytes} | |
time.Sleep(time.Millisecond * 500) // use whatever frequency you need to insert rows at | |
} | |
} | |
func unreadyTable(rows chan *row) bool { | |
client, err := bigquery.NewClient(context.Background(), project) | |
if err != nil { | |
return true | |
} | |
r := <-rows // get a row to try to insert | |
uploader := client.Dataset(dataset).Table(table).Uploader() | |
if err := uploader.Put(context.Background(), r); err != nil { | |
rows <- r | |
return true | |
} | |
i, err := client.Query(fmt.Sprintf("select * from `%s.%s.%s`", project, dataset, table)).Read(context.Background()) | |
if err != nil { | |
rows <- r | |
return true | |
} | |
var testRow []bigquery.Value | |
if err := i.Next(&testRow); err != nil { | |
rows <- r | |
return true | |
} | |
if reflect.DeepEqual(&row{testRow[0].([]byte)}, r) { | |
return false | |
} // there's probably a better way to check if it's equal | |
rows <- r | |
return true | |
} | |
func main() { | |
// initialize a channel where the rows will be sent | |
rows := make(chan *row, 1000) // make it big enough to hold several minutes of rows | |
// start generating rows to be inserted | |
go generateRows(rows) | |
// create the BigQuery client | |
client, err := bigquery.NewClient(context.Background(), project) | |
if err != nil { /* handle error */ | |
} | |
// delete the previous table | |
if err := client.Dataset(dataset).Table(table).Delete(context.Background()); err != nil { /* handle error */ | |
} | |
// create the new table | |
schema, err := bigquery.InferSchema(row{}) | |
if err != nil { /* handle error */ | |
} | |
if err := client.Dataset(dataset).Table(table).Create(context.Background(), &bigquery.TableMetadata{Schema: schema}); err != nil { /* handle error */ | |
} | |
// wait for the table to be ready | |
for ; unreadyTable(rows); time.Sleep(time.Second) { | |
} | |
// once it's ready, upload indefinitely | |
for { | |
if len(rows) > 0 { // if there are uninserted rows, create a batch and insert them | |
uploader := client.Dataset(dataset).Table(table).Uploader() | |
insert := make([]*row, min(500, len(rows))) // create a batch of all the rows on buffer, up to 500 | |
for i := range insert { | |
insert[i] = <-rows | |
} | |
go func(insert []*row) { // do the actual insert async | |
if err := uploader.Put(context.Background(), insert); err != nil { /* handle error */ | |
} | |
}(insert) | |
} else { // if there are no rows waiting to be inserted, wait and check again | |
time.Sleep(time.Second) | |
} | |
} | |
} | |
func min(a, b int) int { | |
if a < b { | |
return a | |
} | |
return b | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment